Update the routing table when TRP updates are received.
[trust_router.git] / tr / tr_trp.c
1 #include <pthread.h>
2 #include <fcntl.h>
3 #include <event2/event.h>
4 #include <talloc.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <string.h>
8
9 #include <gsscon.h>
10 #include <tr_rp.h>
11 #include <trp_internal.h>
12 #include <tr_config.h>
13 #include <tr_event.h>
14 #include <tr_msg.h>
15 #include <tr_trp.h>
16 #include <tr_debug.h>
17
18 /* hold a trps instance and a config manager */
19 struct tr_trps_event_cookie {
20   TRPS_INSTANCE *trps;
21   TR_CFG_MGR *cfg_mgr;
22 };
23
24 /* callback to schedule event to process messages */
25 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
26 {
27   struct event *mq_ev=(struct event *)arg;
28   event_active(mq_ev, 0, 0);
29 }
30
31 static void msg_free_helper(void *p)
32 {
33   tr_msg_free_decoded((TR_MSG *)p);
34 }
35 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
36 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
37                                   TRP_CONNECTION *conn,
38                                   TR_MSG *tr_msg)
39 {
40   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
41   TR_MQ_MSG *mq_msg=NULL;
42
43   /* n.b., conn is available here, but do not hold onto the reference
44    * because it may be cleaned up if the originating connection goes
45    * down before the message is processed */
46   mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
47   if (mq_msg==NULL) {
48     return TRP_NOMEM;
49   }
50   tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
51   trps_mq_append(trps, mq_msg);
52   talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
53   return TRP_SUCCESS;
54 }
55
56
57 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
58                                void *cookie_in)
59 {
60   TR_RP_CLIENT *rp;
61   struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
62   TRPS_INSTANCE *trps = cookie->trps;
63   TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
64   TR_NAME name={gss_name->value, gss_name->length};
65
66   tr_debug("tr_trps_gss_handler()");
67
68   if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
69     tr_debug("tr_trps_gss_handler: Bad parameters.");
70     return -1;
71   }
72   
73   /* look up the RP client matching the GSS name */
74   if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
75     tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
76     return -1;
77   }
78
79   /*trps->rp_gss = rp;*/
80   tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
81
82   return 0;
83 }
84
85 /* data passed to thread */
86 struct trps_thread_data {
87   TRP_CONNECTION *conn;
88   TRPS_INSTANCE *trps;
89 };
90 /* thread to handle GSS connections to peers */
91 static void *tr_trps_thread(void *arg)
92 {
93   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
94   struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
95   TRP_CONNECTION *conn=thread_data->conn;
96   TRPS_INSTANCE *trps=thread_data->trps;
97   TR_MQ_MSG *msg=NULL;
98
99   tr_debug("tr_trps_thread: started");
100   trps_handle_connection(trps, conn);
101
102   msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
103   tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
104   if (msg==NULL)
105     tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
106   else
107     trps_mq_append(trps, msg);
108
109   tr_debug("tr_trps_thread: exit");
110   talloc_free(tmp_ctx);
111   return NULL;
112 }
113
114 /* called when a connection to the TRPS port is received */
115 static void tr_trps_event_cb(int listener, short event, void *arg)
116 {
117   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
118   TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
119   TRP_CONNECTION *conn=NULL;
120   TR_NAME *gssname=NULL;
121   char *name=NULL;
122   struct trps_thread_data *thread_data=NULL;
123
124   if (0==(event & EV_READ)) {
125     tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
126   } else {
127     /* create a thread to handle this connection */
128     asprintf(&name, "trustrouter@%s", trps->hostname);
129     gssname=tr_new_name(name);
130     free(name); name=NULL;
131     conn=trp_connection_accept(tmp_ctx, listener, gssname);
132     if (conn!=NULL) {
133       /* need to monitor this fd and trigger events when read becomes possible */
134       thread_data=talloc(conn, struct trps_thread_data);
135       if (thread_data==NULL) {
136         tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
137         talloc_free(tmp_ctx);
138         return;
139       }
140       thread_data->conn=conn;
141       thread_data->trps=trps;
142       pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
143       pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
144       trps_add_connection(trps, conn); /* remember the connection */
145     }
146   }
147   talloc_free(tmp_ctx);
148 }
149
150 static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
151 {
152   /* everything belonging to the thread is in the TRP_CONNECTION
153    * associated with it */
154   trps_remove_connection(trps, conn);
155   tr_debug("Deleted connection");
156 }
157
158 static void tr_trps_print_route_table(TRPS_INSTANCE *trps, FILE *f)
159 {
160   char *table=trp_rtable_to_str(NULL, trps->rtable, " | ", NULL);
161   if (table==NULL)
162     fprintf(f, "Unable to print route table.\n");
163   else {
164     fprintf(f, "%s\n", table);
165     talloc_free(table);
166   }
167 }
168
169 static void tr_trps_process_mq(int socket, short event, void *arg)
170 {
171   TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
172   TR_MQ_MSG *msg=NULL;
173   const char *s=NULL;
174
175   msg=trps_mq_pop(trps);
176   while (msg!=NULL) {
177     s=tr_mq_msg_get_message(msg);
178     if (0==strcmp(s, "trps_thread_exit")) {
179       tr_trps_cleanup_thread(trps,
180                              talloc_get_type_abort(tr_mq_msg_get_payload(msg),
181                                                    TRP_CONNECTION));
182     }
183     else if (0==strcmp(s, "tr_msg")) {
184       if (trps_handle_tr_msg(trps, tr_mq_msg_get_payload(msg))!=TRP_SUCCESS)
185         tr_notice("tr_trps_process_mq: error handling message.");
186       else {
187         tr_trps_print_route_table(trps, stderr);
188       }
189     }
190     else
191       tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
192
193     tr_mq_msg_free(msg);
194     msg=trps_mq_pop(trps);
195   }
196 }
197
198 static int tr_trps_events_destructor(void *obj)
199 {
200   TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
201   if (ev->mq_ev!=NULL)
202     event_free(ev->mq_ev);
203   return 0;
204 }
205 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
206 {
207   TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
208   if (ev!=NULL) {
209     ev->listen_ev=talloc(ev, struct tr_socket_event);
210     ev->mq_ev=NULL;
211     if (ev->listen_ev==NULL) {
212       talloc_free(ev);
213       ev=NULL;
214     }
215     talloc_set_destructor((void *)ev, tr_trps_events_destructor);
216   }
217   return ev;
218 }
219
220 /* Configure the trps instance and set up its event handler.
221  * Fills in trps_ev, which should be allocated by caller. */
222 TRP_RC tr_trps_event_init(struct event_base *base,
223                        TRPS_INSTANCE *trps,
224                        TR_CFG_MGR *cfg_mgr,
225                        TR_TRPS_EVENTS *trps_ev)
226 {
227   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
228   struct tr_socket_event *listen_ev=NULL;
229   struct tr_trps_event_cookie *cookie;
230   TRP_RC retval=TRP_ERROR;
231
232   if (trps_ev == NULL) {
233     tr_debug("tr_trps_event_init: Null trps_ev.");
234     retval=TRP_BADARG;
235     goto cleanup;
236   }
237
238   /* get convenient handles */
239   listen_ev=trps_ev->listen_ev;
240
241   /* Create the cookie for callbacks. It is part of the trps context, so it will
242    * be cleaned up when trps is freed by talloc_free. */
243   cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
244   if (cookie == NULL) {
245     tr_debug("tr_trps_event_init: Unable to allocate cookie.");
246     retval=TRP_NOMEM;
247     goto cleanup;
248   }
249   cookie->trps=trps;
250   cookie->cfg_mgr=cfg_mgr;
251   talloc_steal(trps, cookie);
252
253   /* get a trps listener */
254   listen_ev->sock_fd=trps_get_listener(trps,
255                                        tr_trps_msg_handler,
256                                        tr_trps_gss_handler,
257                                        cfg_mgr->active->internal->hostname,
258                                        cfg_mgr->active->internal->trps_port,
259                                        (void *)cookie);
260   if (listen_ev->sock_fd < 0) {
261     tr_crit("Error opening TRP server socket.");
262     retval=TRP_ERROR;
263     goto cleanup;
264   }
265   
266   /* and its event */
267   listen_ev->ev=event_new(base,
268                           listen_ev->sock_fd,
269                           EV_READ|EV_PERSIST,
270                           tr_trps_event_cb,
271                           (void *)trps);
272   event_add(listen_ev->ev, NULL);
273   
274   /* now set up message queue processing event, only triggered by
275    * tr_trps_mq_cb() */
276   trps_ev->mq_ev=event_new(base,
277                            0,
278                            EV_PERSIST,
279                            tr_trps_process_mq,
280                            (void *)trps);
281   tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
282
283   retval=TRP_SUCCESS;
284
285 cleanup:
286   talloc_free(tmp_ctx);
287   return retval;
288 }
289
290
291 struct trpc_notify_cb_data {
292   int msg_ready;
293   pthread_cond_t cond;
294   pthread_mutex_t mutex;
295 };
296
297 static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
298 {
299   struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
300   pthread_mutex_lock(&(cb_data->mutex));
301   if (!cb_data->msg_ready) {
302     cb_data->msg_ready=1;
303     pthread_cond_signal(&(cb_data->cond));
304   }
305   pthread_mutex_unlock(&(cb_data->mutex));
306 }
307
308 /* data passed to thread */
309 struct trpc_thread_data {
310   TRPC_INSTANCE *trpc;
311   TRPS_INSTANCE *trps;
312 };
313 static void *tr_trpc_thread(void *arg)
314 {
315   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
316   struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
317   TRPC_INSTANCE *trpc=thread_data->trpc;
318   TRPS_INSTANCE *trps=thread_data->trps;
319   TRP_RC rc=TRP_ERROR;
320   TR_MQ_MSG *msg=NULL;
321   const char *msg_type=NULL;
322   char *encoded_msg=NULL;
323
324   struct trpc_notify_cb_data cb_data={0,
325                                       PTHREAD_COND_INITIALIZER,
326                                       PTHREAD_MUTEX_INITIALIZER};
327
328   tr_debug("tr_trpc_thread: started");
329
330   /* set up the mq for receiving */
331   pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
332
333   tr_mq_lock(trpc->mq);
334   tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
335   tr_mq_unlock(trpc->mq);
336
337   rc=trpc_connect(trpc);
338   if (rc!=TRP_SUCCESS) {
339     tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
340               trpc_get_server(trpc),
341               trpc_get_port(trpc));
342   } else {
343     while (1) {
344       cb_data.msg_ready=0;
345       pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
346       /* verify the condition */
347       if (cb_data.msg_ready) {
348         msg=trpc_mq_pop(trpc);
349         if (msg==NULL) {
350           /* no message in the queue */
351           tr_err("tr_trpc_thread: notified of msg, but queue empty");
352           break;
353         }
354
355         msg_type=tr_mq_msg_get_message(msg);
356
357         if (0==strcmp(msg_type, "trpc_abort")) {
358           tr_mq_msg_free(msg);
359           break; /* exit loop */
360         }
361         else if (0==strcmp(msg_type, "trpc_send")) {
362           encoded_msg=tr_mq_msg_get_payload(msg);
363           if (encoded_msg==NULL)
364             tr_notice("tr_trpc_thread: null outgoing TRP message.");
365           else {
366             rc = trpc_send_msg(trpc, encoded_msg);
367             if (rc!=TRP_SUCCESS) {
368               tr_notice("tr_trpc_thread: trpc_send_msg failed.");
369               tr_mq_msg_free(msg);
370               break;
371             }
372           }
373         }
374         else
375           tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
376
377         tr_mq_msg_free(msg);
378       }
379     }
380   }
381
382   msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
383   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
384   if (msg==NULL)
385     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
386   else
387     trps_mq_append(trps, msg);
388
389   talloc_free(tmp_ctx);
390   return NULL;
391 }
392
393 /* starts a trpc thread to connect to server:port */
394 TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
395 {
396   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
397   TRPC_INSTANCE *trpc=NULL;
398   TRP_CONNECTION *conn=NULL;
399   struct trpc_thread_data *thread_data=NULL;
400
401   tr_debug("tr_trpc_initiate entered");
402   trpc=trpc_new(tmp_ctx);
403   if (trpc==NULL) {
404     tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
405     goto cleanup;
406   }
407   tr_debug("tr_trpc_initiate: allocated trpc");
408
409   conn=trp_connection_new(trpc);
410   if (conn==NULL) {
411     tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
412     goto cleanup;
413   }
414   trpc_set_conn(trpc, conn);
415   trpc_set_server(trpc, talloc_strdup(trpc, server));
416   trpc_set_port(trpc, port);
417   tr_debug("tr_trpc_initiate: allocated connection");
418   
419   /* start thread */
420   thread_data=talloc(trpc, struct trpc_thread_data);
421   if (thread_data==NULL) {
422     tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
423     goto cleanup;
424   }
425   thread_data->trpc=trpc;
426   thread_data->trps=trps;
427
428   pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
429   pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
430
431   tr_debug("tr_trpc_initiate: started trpc thread");
432   trps_add_trpc(trps, trpc);
433
434   talloc_report_full(trps, stderr);
435   talloc_report_full(tmp_ctx, stderr);
436
437  cleanup:
438   talloc_free(tmp_ctx);
439   return trpc;
440 }