Generate scheduled updates. Untested, but builds.
[trust_router.git] / tr / tr_trp.c
1 #include <pthread.h>
2 #include <fcntl.h>
3 #include <event2/event.h>
4 #include <talloc.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <sys/time.h>
9
10 #include <gsscon.h>
11 #include <tr_rp.h>
12 #include <trp_internal.h>
13 #include <tr_config.h>
14 #include <tr_event.h>
15 #include <tr_msg.h>
16 #include <tr_trp.h>
17 #include <tr_debug.h>
18
19 /* data for event callbacks */
20 struct tr_trps_event_cookie {
21   TRPS_INSTANCE *trps;
22   TR_CFG_MGR *cfg_mgr;
23   struct event *ev;
24 };
25
26 /* callback to schedule event to process messages */
27 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
28 {
29   struct event *mq_ev=(struct event *)arg;
30   event_active(mq_ev, 0, 0);
31 }
32
33 static void msg_free_helper(void *p)
34 {
35   tr_msg_free_decoded((TR_MSG *)p);
36 }
37 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
38 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
39                                   TRP_CONNECTION *conn,
40                                   TR_MSG *tr_msg)
41 {
42   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
43   TR_MQ_MSG *mq_msg=NULL;
44
45   /* n.b., conn is available here, but do not hold onto the reference
46    * because it may be cleaned up if the originating connection goes
47    * down before the message is processed */
48   mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
49   if (mq_msg==NULL) {
50     return TRP_NOMEM;
51   }
52   tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
53   trps_mq_append(trps, mq_msg);
54   talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
55   return TRP_SUCCESS;
56 }
57
58
59 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
60                                void *cookie_in)
61 {
62   TR_RP_CLIENT *rp;
63   struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
64   TRPS_INSTANCE *trps = cookie->trps;
65   TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
66   TR_NAME name={gss_name->value, gss_name->length};
67
68   tr_debug("tr_trps_gss_handler()");
69
70   if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
71     tr_debug("tr_trps_gss_handler: Bad parameters.");
72     return -1;
73   }
74   
75   /* look up the RP client matching the GSS name */
76   if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
77     tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
78     return -1;
79   }
80
81   /*trps->rp_gss = rp;*/
82   tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
83
84   return 0;
85 }
86
87 /* data passed to thread */
88 struct trps_thread_data {
89   TRP_CONNECTION *conn;
90   TRPS_INSTANCE *trps;
91 };
92 /* thread to handle GSS connections to peers */
93 static void *tr_trps_thread(void *arg)
94 {
95   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
96   struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
97   TRP_CONNECTION *conn=thread_data->conn;
98   TRPS_INSTANCE *trps=thread_data->trps;
99   TR_MQ_MSG *msg=NULL;
100
101   tr_debug("tr_trps_thread: started");
102   trps_handle_connection(trps, conn);
103
104   msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
105   tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
106   if (msg==NULL)
107     tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
108   else
109     trps_mq_append(trps, msg);
110
111   tr_debug("tr_trps_thread: exit");
112   talloc_free(tmp_ctx);
113   return NULL;
114 }
115
116 /* called when a connection to the TRPS port is received */
117 static void tr_trps_event_cb(int listener, short event, void *arg)
118 {
119   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
120   TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
121   TRP_CONNECTION *conn=NULL;
122   TR_NAME *gssname=NULL;
123   char *name=NULL;
124   struct trps_thread_data *thread_data=NULL;
125
126   if (0==(event & EV_READ)) {
127     tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
128   } else {
129     /* create a thread to handle this connection */
130     asprintf(&name, "trustrouter@%s", trps->hostname);
131     gssname=tr_new_name(name);
132     free(name); name=NULL;
133     conn=trp_connection_accept(tmp_ctx, listener, gssname);
134     if (conn!=NULL) {
135       /* need to monitor this fd and trigger events when read becomes possible */
136       thread_data=talloc(conn, struct trps_thread_data);
137       if (thread_data==NULL) {
138         tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
139         talloc_free(tmp_ctx);
140         return;
141       }
142       thread_data->conn=conn;
143       thread_data->trps=trps;
144       pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
145       pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
146       trps_add_connection(trps, conn); /* remember the connection */
147     }
148   }
149   talloc_free(tmp_ctx);
150 }
151
152 static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
153 {
154   /* everything belonging to the thread is in the TRP_CONNECTION
155    * associated with it */
156   trps_remove_connection(trps, conn);
157   tr_debug("Deleted connection");
158 }
159
160 static void tr_trps_print_route_table(TRPS_INSTANCE *trps, FILE *f)
161 {
162   char *table=trp_rtable_to_str(NULL, trps->rtable, " | ", NULL);
163   if (table==NULL)
164     fprintf(f, "Unable to print route table.\n");
165   else {
166     fprintf(f, "%s\n", table);
167     talloc_free(table);
168   }
169 }
170
171 static void tr_trps_process_mq(int socket, short event, void *arg)
172 {
173   TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
174   TR_MQ_MSG *msg=NULL;
175   const char *s=NULL;
176
177   msg=trps_mq_pop(trps);
178   while (msg!=NULL) {
179     s=tr_mq_msg_get_message(msg);
180     if (0==strcmp(s, "trps_thread_exit")) {
181       tr_trps_cleanup_thread(trps,
182                              talloc_get_type_abort(tr_mq_msg_get_payload(msg),
183                                                    TRP_CONNECTION));
184     }
185     else if (0==strcmp(s, "tr_msg")) {
186       if (trps_handle_tr_msg(trps, tr_mq_msg_get_payload(msg))!=TRP_SUCCESS)
187         tr_notice("tr_trps_process_mq: error handling message.");
188       else {
189         tr_trps_print_route_table(trps, stderr);
190       }
191     }
192     else
193       tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
194
195     tr_mq_msg_free(msg);
196     msg=trps_mq_pop(trps);
197   }
198 }
199
200 static void tr_trps_update(int listener, short event, void *arg)
201 {
202   struct tr_trps_event_cookie *cookie=talloc_get_type_abort(arg, struct tr_trps_event_cookie);
203   TRPS_INSTANCE *trps=cookie->trps;
204   struct event *ev=cookie->ev;
205
206   tr_debug("tr_trps_update: sending scheduled route updates.");
207   trps_scheduled_update(trps);
208   event_add(ev, &(trps->update_interval));
209 }
210
211 static void tr_trps_sweep(int listener, short event, void *arg)
212 {
213   struct tr_trps_event_cookie *cookie=talloc_get_type_abort(arg, struct tr_trps_event_cookie);
214   TRPS_INSTANCE *trps=cookie->trps;
215   struct event *ev=cookie->ev;
216
217   tr_debug("tr_trps_sweep: sweeping routes");
218   trps_sweep_routes(trps);
219   /* schedule the event to run again */
220   event_add(ev, &(trps->sweep_interval));
221 }
222
223 static int tr_trps_events_destructor(void *obj)
224 {
225   TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
226   if (ev->mq_ev!=NULL)
227     event_free(ev->mq_ev);
228   if (ev->sweep_ev!=NULL)
229     event_free(ev->sweep_ev);
230   return 0;
231 }
232 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
233 {
234   TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
235   if (ev!=NULL) {
236     ev->listen_ev=talloc(ev, struct tr_socket_event);
237     ev->mq_ev=NULL;
238     ev->update_ev=NULL;
239     ev->sweep_ev=NULL;
240     if (ev->listen_ev==NULL) {
241       talloc_free(ev);
242       ev=NULL;
243     }
244     talloc_set_destructor((void *)ev, tr_trps_events_destructor);
245   }
246   return ev;
247 }
248
249 /* Configure the trps instance and set up its event handler.
250  * Fills in trps_ev, which should be allocated by caller. */
251 TRP_RC tr_trps_event_init(struct event_base *base,
252                        TRPS_INSTANCE *trps,
253                        TR_CFG_MGR *cfg_mgr,
254                        TR_TRPS_EVENTS *trps_ev)
255 {
256   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
257   struct tr_socket_event *listen_ev=NULL;
258   struct tr_trps_event_cookie *trps_cookie=NULL;
259   struct tr_trps_event_cookie *update_cookie=NULL;
260   struct tr_trps_event_cookie *sweep_cookie=NULL;
261   TRP_RC retval=TRP_ERROR;
262
263   if (trps_ev == NULL) {
264     tr_debug("tr_trps_event_init: Null trps_ev.");
265     retval=TRP_BADARG;
266     goto cleanup;
267   }
268
269   /* get convenient handles */
270   listen_ev=trps_ev->listen_ev;
271
272   /* Create the cookie for callbacks. It is part of the trps context, so it will
273    * be cleaned up when trps is freed by talloc_free. */
274   trps_cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
275   if (trps_cookie == NULL) {
276     tr_debug("tr_trps_event_init: Unable to allocate trps_cookie.");
277     retval=TRP_NOMEM;
278     goto cleanup;
279   }
280   trps_cookie->trps=trps;
281   trps_cookie->cfg_mgr=cfg_mgr;
282   talloc_steal(trps, trps_cookie);
283
284   /* get a trps listener */
285   listen_ev->sock_fd=trps_get_listener(trps,
286                                        tr_trps_msg_handler,
287                                        tr_trps_gss_handler,
288                                        cfg_mgr->active->internal->hostname,
289                                        cfg_mgr->active->internal->trps_port,
290                                        (void *)trps_cookie);
291   if (listen_ev->sock_fd < 0) {
292     tr_crit("Error opening TRP server socket.");
293     retval=TRP_ERROR;
294     goto cleanup;
295   }
296   trps_cookie->ev=listen_ev->ev; /* in case it needs to frob the event */
297   
298   /* and its event */
299   listen_ev->ev=event_new(base,
300                           listen_ev->sock_fd,
301                           EV_READ|EV_PERSIST,
302                           tr_trps_event_cb,
303                           (void *)trps);
304   event_add(listen_ev->ev, NULL);
305   
306   /* now set up message queue processing event, only triggered by
307    * tr_trps_mq_cb() */
308   trps_ev->mq_ev=event_new(base,
309                            0,
310                            EV_PERSIST,
311                            tr_trps_process_mq,
312                            (void *)trps);
313   tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
314
315   /* now set up the route update timer event */
316   update_cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
317   if (update_cookie == NULL) {
318     tr_debug("tr_trps_event_init: Unable to allocate update_cookie.");
319     retval=TRP_NOMEM;
320     goto cleanup;
321   }
322   update_cookie->trps=trps;
323   update_cookie->cfg_mgr=cfg_mgr;
324   talloc_steal(trps, update_cookie);
325   trps_ev->update_ev=event_new(base, -1, EV_TIMEOUT, tr_trps_update, (void *)update_cookie);
326   update_cookie->ev=trps_ev->update_ev; /* in case it needs to frob the event */
327   event_add(trps_ev->update_ev, &(trps->update_interval));
328
329   /* now set up the route table sweep timer event */
330   sweep_cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
331   if (sweep_cookie == NULL) {
332     tr_debug("tr_trps_event_init: Unable to allocate sweep_cookie.");
333     retval=TRP_NOMEM;
334     goto cleanup;
335   }
336   sweep_cookie->trps=trps;
337   sweep_cookie->cfg_mgr=cfg_mgr;
338   talloc_steal(trps, sweep_cookie);
339   trps_ev->sweep_ev=event_new(base, -1, EV_TIMEOUT, tr_trps_sweep, (void *)sweep_cookie);
340   sweep_cookie->ev=trps_ev->sweep_ev; /* in case it needs to frob the event */
341   event_add(trps_ev->sweep_ev, &(trps->sweep_interval));
342
343   retval=TRP_SUCCESS;
344
345 cleanup:
346   talloc_free(tmp_ctx);
347   return retval;
348 }
349
350
351 struct trpc_notify_cb_data {
352   int msg_ready;
353   pthread_cond_t cond;
354   pthread_mutex_t mutex;
355 };
356
357 static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
358 {
359   struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
360   pthread_mutex_lock(&(cb_data->mutex));
361   if (!cb_data->msg_ready) {
362     cb_data->msg_ready=1;
363     pthread_cond_signal(&(cb_data->cond));
364   }
365   pthread_mutex_unlock(&(cb_data->mutex));
366 }
367
368 /* data passed to thread */
369 struct trpc_thread_data {
370   TRPC_INSTANCE *trpc;
371   TRPS_INSTANCE *trps;
372 };
373 static void *tr_trpc_thread(void *arg)
374 {
375   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
376   struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
377   TRPC_INSTANCE *trpc=thread_data->trpc;
378   TRPS_INSTANCE *trps=thread_data->trps;
379   TRP_RC rc=TRP_ERROR;
380   TR_MQ_MSG *msg=NULL;
381   const char *msg_type=NULL;
382   char *encoded_msg=NULL;
383
384   struct trpc_notify_cb_data cb_data={0,
385                                       PTHREAD_COND_INITIALIZER,
386                                       PTHREAD_MUTEX_INITIALIZER};
387
388   tr_debug("tr_trpc_thread: started");
389
390   /* set up the mq for receiving */
391   pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
392
393   tr_mq_lock(trpc->mq);
394   tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
395   tr_mq_unlock(trpc->mq);
396
397   rc=trpc_connect(trpc);
398   if (rc!=TRP_SUCCESS) {
399     tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
400               trpc_get_server(trpc),
401               trpc_get_port(trpc));
402   } else {
403     while (1) {
404       cb_data.msg_ready=0;
405       pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
406       /* verify the condition */
407       if (cb_data.msg_ready) {
408         msg=trpc_mq_pop(trpc);
409         if (msg==NULL) {
410           /* no message in the queue */
411           tr_err("tr_trpc_thread: notified of msg, but queue empty");
412           break;
413         }
414
415         msg_type=tr_mq_msg_get_message(msg);
416
417         if (0==strcmp(msg_type, "trpc_abort")) {
418           tr_mq_msg_free(msg);
419           break; /* exit loop */
420         }
421         else if (0==strcmp(msg_type, "trpc_send")) {
422           encoded_msg=tr_mq_msg_get_payload(msg);
423           if (encoded_msg==NULL)
424             tr_notice("tr_trpc_thread: null outgoing TRP message.");
425           else {
426             rc = trpc_send_msg(trpc, encoded_msg);
427             if (rc!=TRP_SUCCESS) {
428               tr_notice("tr_trpc_thread: trpc_send_msg failed.");
429               tr_mq_msg_free(msg);
430               break;
431             }
432           }
433         }
434         else
435           tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
436
437         tr_mq_msg_free(msg);
438       }
439     }
440   }
441
442   msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
443   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
444   if (msg==NULL)
445     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
446   else
447     trps_mq_append(trps, msg);
448
449   talloc_free(tmp_ctx);
450   return NULL;
451 }
452
453 /* starts a trpc thread to connect to server:port */
454 TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
455 {
456   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
457   TRPC_INSTANCE *trpc=NULL;
458   TRP_CONNECTION *conn=NULL;
459   struct trpc_thread_data *thread_data=NULL;
460
461   tr_debug("tr_trpc_initiate entered");
462   trpc=trpc_new(tmp_ctx);
463   if (trpc==NULL) {
464     tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
465     goto cleanup;
466   }
467   tr_debug("tr_trpc_initiate: allocated trpc");
468
469   conn=trp_connection_new(trpc);
470   if (conn==NULL) {
471     tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
472     goto cleanup;
473   }
474   trpc_set_conn(trpc, conn);
475   trpc_set_server(trpc, talloc_strdup(trpc, server));
476   trpc_set_port(trpc, port);
477   tr_debug("tr_trpc_initiate: allocated connection");
478   
479   /* start thread */
480   thread_data=talloc(trpc, struct trpc_thread_data);
481   if (thread_data==NULL) {
482     tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
483     goto cleanup;
484   }
485   thread_data->trpc=trpc;
486   thread_data->trps=trps;
487
488   pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
489   pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
490
491   tr_debug("tr_trpc_initiate: started trpc thread");
492   trps_add_trpc(trps, trpc);
493
494   talloc_report_full(trps, stderr);
495   talloc_report_full(tmp_ctx, stderr);
496
497  cleanup:
498   talloc_free(tmp_ctx);
499   return trpc;
500 }
501
502 /* Called by the config manager after a change to the active configuration.
503  * Updates configuration of objects that do not know about the config manager. */
504 void tr_config_changed(TR_CFG *new_cfg, void *cookie)
505 {
506   TRPS_INSTANCE *trps=talloc_get_type_abort(cookie, TRPS_INSTANCE);
507   trps_set_update_interval(trps, new_cfg->internal->route_update_interval);
508   trps_set_sweep_interval(trps, new_cfg->internal->route_sweep_interval);
509 }