Sweep for expired routes. Sweeps every two seconds for now, not yet configurable.
[trust_router.git] / tr / tr_trp.c
1 #include <pthread.h>
2 #include <fcntl.h>
3 #include <event2/event.h>
4 #include <talloc.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <string.h>
8 #include <sys/time.h>
9
10 #include <gsscon.h>
11 #include <tr_rp.h>
12 #include <trp_internal.h>
13 #include <tr_config.h>
14 #include <tr_event.h>
15 #include <tr_msg.h>
16 #include <tr_trp.h>
17 #include <tr_debug.h>
18
19 /* hold a trps instance and a config manager */
20 struct tr_trps_event_cookie {
21   TRPS_INSTANCE *trps;
22   TR_CFG_MGR *cfg_mgr;
23 };
24
25 /* callback to schedule event to process messages */
26 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
27 {
28   struct event *mq_ev=(struct event *)arg;
29   event_active(mq_ev, 0, 0);
30 }
31
32 static void msg_free_helper(void *p)
33 {
34   tr_msg_free_decoded((TR_MSG *)p);
35 }
36 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
37 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
38                                   TRP_CONNECTION *conn,
39                                   TR_MSG *tr_msg)
40 {
41   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
42   TR_MQ_MSG *mq_msg=NULL;
43
44   /* n.b., conn is available here, but do not hold onto the reference
45    * because it may be cleaned up if the originating connection goes
46    * down before the message is processed */
47   mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
48   if (mq_msg==NULL) {
49     return TRP_NOMEM;
50   }
51   tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
52   trps_mq_append(trps, mq_msg);
53   talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
54   return TRP_SUCCESS;
55 }
56
57
58 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
59                                void *cookie_in)
60 {
61   TR_RP_CLIENT *rp;
62   struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
63   TRPS_INSTANCE *trps = cookie->trps;
64   TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
65   TR_NAME name={gss_name->value, gss_name->length};
66
67   tr_debug("tr_trps_gss_handler()");
68
69   if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
70     tr_debug("tr_trps_gss_handler: Bad parameters.");
71     return -1;
72   }
73   
74   /* look up the RP client matching the GSS name */
75   if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
76     tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
77     return -1;
78   }
79
80   /*trps->rp_gss = rp;*/
81   tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
82
83   return 0;
84 }
85
86 /* data passed to thread */
87 struct trps_thread_data {
88   TRP_CONNECTION *conn;
89   TRPS_INSTANCE *trps;
90 };
91 /* thread to handle GSS connections to peers */
92 static void *tr_trps_thread(void *arg)
93 {
94   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
95   struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
96   TRP_CONNECTION *conn=thread_data->conn;
97   TRPS_INSTANCE *trps=thread_data->trps;
98   TR_MQ_MSG *msg=NULL;
99
100   tr_debug("tr_trps_thread: started");
101   trps_handle_connection(trps, conn);
102
103   msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
104   tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
105   if (msg==NULL)
106     tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
107   else
108     trps_mq_append(trps, msg);
109
110   tr_debug("tr_trps_thread: exit");
111   talloc_free(tmp_ctx);
112   return NULL;
113 }
114
115 /* called when a connection to the TRPS port is received */
116 static void tr_trps_event_cb(int listener, short event, void *arg)
117 {
118   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
119   TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
120   TRP_CONNECTION *conn=NULL;
121   TR_NAME *gssname=NULL;
122   char *name=NULL;
123   struct trps_thread_data *thread_data=NULL;
124
125   if (0==(event & EV_READ)) {
126     tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
127   } else {
128     /* create a thread to handle this connection */
129     asprintf(&name, "trustrouter@%s", trps->hostname);
130     gssname=tr_new_name(name);
131     free(name); name=NULL;
132     conn=trp_connection_accept(tmp_ctx, listener, gssname);
133     if (conn!=NULL) {
134       /* need to monitor this fd and trigger events when read becomes possible */
135       thread_data=talloc(conn, struct trps_thread_data);
136       if (thread_data==NULL) {
137         tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
138         talloc_free(tmp_ctx);
139         return;
140       }
141       thread_data->conn=conn;
142       thread_data->trps=trps;
143       pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
144       pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
145       trps_add_connection(trps, conn); /* remember the connection */
146     }
147   }
148   talloc_free(tmp_ctx);
149 }
150
151 static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
152 {
153   /* everything belonging to the thread is in the TRP_CONNECTION
154    * associated with it */
155   trps_remove_connection(trps, conn);
156   tr_debug("Deleted connection");
157 }
158
159 static void tr_trps_print_route_table(TRPS_INSTANCE *trps, FILE *f)
160 {
161   char *table=trp_rtable_to_str(NULL, trps->rtable, " | ", NULL);
162   if (table==NULL)
163     fprintf(f, "Unable to print route table.\n");
164   else {
165     fprintf(f, "%s\n", table);
166     talloc_free(table);
167   }
168 }
169
170 static void tr_trps_process_mq(int socket, short event, void *arg)
171 {
172   TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
173   TR_MQ_MSG *msg=NULL;
174   const char *s=NULL;
175
176   msg=trps_mq_pop(trps);
177   while (msg!=NULL) {
178     s=tr_mq_msg_get_message(msg);
179     if (0==strcmp(s, "trps_thread_exit")) {
180       tr_trps_cleanup_thread(trps,
181                              talloc_get_type_abort(tr_mq_msg_get_payload(msg),
182                                                    TRP_CONNECTION));
183     }
184     else if (0==strcmp(s, "tr_msg")) {
185       if (trps_handle_tr_msg(trps, tr_mq_msg_get_payload(msg))!=TRP_SUCCESS)
186         tr_notice("tr_trps_process_mq: error handling message.");
187       else {
188         tr_trps_print_route_table(trps, stderr);
189       }
190     }
191     else
192       tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
193
194     tr_mq_msg_free(msg);
195     msg=trps_mq_pop(trps);
196   }
197 }
198
199 static void tr_trps_sweep(int listener, short event, void *arg)
200 {
201   TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
202   tr_debug("tr_trps_sweep: sweeping routes");
203   trps_sweep_routes(trps);
204 }
205
206 static int tr_trps_events_destructor(void *obj)
207 {
208   TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
209   if (ev->mq_ev!=NULL)
210     event_free(ev->mq_ev);
211   if (ev->sweep_ev!=NULL)
212     event_free(ev->sweep_ev);
213   return 0;
214 }
215 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
216 {
217   TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
218   if (ev!=NULL) {
219     ev->listen_ev=talloc(ev, struct tr_socket_event);
220     ev->mq_ev=NULL;
221     ev->sweep_ev=NULL;
222     if (ev->listen_ev==NULL) {
223       talloc_free(ev);
224       ev=NULL;
225     }
226     talloc_set_destructor((void *)ev, tr_trps_events_destructor);
227   }
228   return ev;
229 }
230
231 /* Configure the trps instance and set up its event handler.
232  * Fills in trps_ev, which should be allocated by caller. */
233 TRP_RC tr_trps_event_init(struct event_base *base,
234                        TRPS_INSTANCE *trps,
235                        TR_CFG_MGR *cfg_mgr,
236                        TR_TRPS_EVENTS *trps_ev)
237 {
238   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
239   struct tr_socket_event *listen_ev=NULL;
240   struct tr_trps_event_cookie *cookie;
241   struct timeval two_secs={2, 0};
242   TRP_RC retval=TRP_ERROR;
243
244   if (trps_ev == NULL) {
245     tr_debug("tr_trps_event_init: Null trps_ev.");
246     retval=TRP_BADARG;
247     goto cleanup;
248   }
249
250   /* get convenient handles */
251   listen_ev=trps_ev->listen_ev;
252
253   /* Create the cookie for callbacks. It is part of the trps context, so it will
254    * be cleaned up when trps is freed by talloc_free. */
255   cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
256   if (cookie == NULL) {
257     tr_debug("tr_trps_event_init: Unable to allocate cookie.");
258     retval=TRP_NOMEM;
259     goto cleanup;
260   }
261   cookie->trps=trps;
262   cookie->cfg_mgr=cfg_mgr;
263   talloc_steal(trps, cookie);
264
265   /* get a trps listener */
266   listen_ev->sock_fd=trps_get_listener(trps,
267                                        tr_trps_msg_handler,
268                                        tr_trps_gss_handler,
269                                        cfg_mgr->active->internal->hostname,
270                                        cfg_mgr->active->internal->trps_port,
271                                        (void *)cookie);
272   if (listen_ev->sock_fd < 0) {
273     tr_crit("Error opening TRP server socket.");
274     retval=TRP_ERROR;
275     goto cleanup;
276   }
277   
278   /* and its event */
279   listen_ev->ev=event_new(base,
280                           listen_ev->sock_fd,
281                           EV_READ|EV_PERSIST,
282                           tr_trps_event_cb,
283                           (void *)trps);
284   event_add(listen_ev->ev, NULL);
285   
286   /* now set up message queue processing event, only triggered by
287    * tr_trps_mq_cb() */
288   trps_ev->mq_ev=event_new(base,
289                            0,
290                            EV_PERSIST,
291                            tr_trps_process_mq,
292                            (void *)trps);
293   tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
294
295   /* now set up the route table sweep timer event */
296   trps_ev->sweep_ev=event_new(base, -1, EV_TIMEOUT|EV_PERSIST, tr_trps_sweep, (void *)trps);
297   /* todo: event_add(trps_ev->sweep_ev, &(cfg_mgr->active->internal->route_sweep_interval)); */
298   event_add(trps_ev->sweep_ev, &two_secs);
299
300   retval=TRP_SUCCESS;
301
302 cleanup:
303   talloc_free(tmp_ctx);
304   return retval;
305 }
306
307
308 struct trpc_notify_cb_data {
309   int msg_ready;
310   pthread_cond_t cond;
311   pthread_mutex_t mutex;
312 };
313
314 static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
315 {
316   struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
317   pthread_mutex_lock(&(cb_data->mutex));
318   if (!cb_data->msg_ready) {
319     cb_data->msg_ready=1;
320     pthread_cond_signal(&(cb_data->cond));
321   }
322   pthread_mutex_unlock(&(cb_data->mutex));
323 }
324
325 /* data passed to thread */
326 struct trpc_thread_data {
327   TRPC_INSTANCE *trpc;
328   TRPS_INSTANCE *trps;
329 };
330 static void *tr_trpc_thread(void *arg)
331 {
332   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
333   struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
334   TRPC_INSTANCE *trpc=thread_data->trpc;
335   TRPS_INSTANCE *trps=thread_data->trps;
336   TRP_RC rc=TRP_ERROR;
337   TR_MQ_MSG *msg=NULL;
338   const char *msg_type=NULL;
339   char *encoded_msg=NULL;
340
341   struct trpc_notify_cb_data cb_data={0,
342                                       PTHREAD_COND_INITIALIZER,
343                                       PTHREAD_MUTEX_INITIALIZER};
344
345   tr_debug("tr_trpc_thread: started");
346
347   /* set up the mq for receiving */
348   pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
349
350   tr_mq_lock(trpc->mq);
351   tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
352   tr_mq_unlock(trpc->mq);
353
354   rc=trpc_connect(trpc);
355   if (rc!=TRP_SUCCESS) {
356     tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
357               trpc_get_server(trpc),
358               trpc_get_port(trpc));
359   } else {
360     while (1) {
361       cb_data.msg_ready=0;
362       pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
363       /* verify the condition */
364       if (cb_data.msg_ready) {
365         msg=trpc_mq_pop(trpc);
366         if (msg==NULL) {
367           /* no message in the queue */
368           tr_err("tr_trpc_thread: notified of msg, but queue empty");
369           break;
370         }
371
372         msg_type=tr_mq_msg_get_message(msg);
373
374         if (0==strcmp(msg_type, "trpc_abort")) {
375           tr_mq_msg_free(msg);
376           break; /* exit loop */
377         }
378         else if (0==strcmp(msg_type, "trpc_send")) {
379           encoded_msg=tr_mq_msg_get_payload(msg);
380           if (encoded_msg==NULL)
381             tr_notice("tr_trpc_thread: null outgoing TRP message.");
382           else {
383             rc = trpc_send_msg(trpc, encoded_msg);
384             if (rc!=TRP_SUCCESS) {
385               tr_notice("tr_trpc_thread: trpc_send_msg failed.");
386               tr_mq_msg_free(msg);
387               break;
388             }
389           }
390         }
391         else
392           tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
393
394         tr_mq_msg_free(msg);
395       }
396     }
397   }
398
399   msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
400   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
401   if (msg==NULL)
402     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
403   else
404     trps_mq_append(trps, msg);
405
406   talloc_free(tmp_ctx);
407   return NULL;
408 }
409
410 /* starts a trpc thread to connect to server:port */
411 TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
412 {
413   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
414   TRPC_INSTANCE *trpc=NULL;
415   TRP_CONNECTION *conn=NULL;
416   struct trpc_thread_data *thread_data=NULL;
417
418   tr_debug("tr_trpc_initiate entered");
419   trpc=trpc_new(tmp_ctx);
420   if (trpc==NULL) {
421     tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
422     goto cleanup;
423   }
424   tr_debug("tr_trpc_initiate: allocated trpc");
425
426   conn=trp_connection_new(trpc);
427   if (conn==NULL) {
428     tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
429     goto cleanup;
430   }
431   trpc_set_conn(trpc, conn);
432   trpc_set_server(trpc, talloc_strdup(trpc, server));
433   trpc_set_port(trpc, port);
434   tr_debug("tr_trpc_initiate: allocated connection");
435   
436   /* start thread */
437   thread_data=talloc(trpc, struct trpc_thread_data);
438   if (thread_data==NULL) {
439     tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
440     goto cleanup;
441   }
442   thread_data->trpc=trpc;
443   thread_data->trps=trps;
444
445   pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
446   pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
447
448   tr_debug("tr_trpc_initiate: started trpc thread");
449   trps_add_trpc(trps, trpc);
450
451   talloc_report_full(trps, stderr);
452   talloc_report_full(tmp_ctx, stderr);
453
454  cleanup:
455   talloc_free(tmp_ctx);
456   return trpc;
457 }