Trust router: open TRP connection to self, send multiple msgs.
[trust_router.git] / tr / tr_trp.c
1 #include <pthread.h>
2 #include <fcntl.h>
3 #include <event2/event.h>
4 #include <talloc.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <string.h>
8
9 #include <gsscon.h>
10 #include <tr_rp.h>
11 #include <trp_internal.h>
12 #include <tr_config.h>
13 #include <tr_event.h>
14 #include <tr_msg.h>
15 #include <tr_trp.h>
16 #include <tr_debug.h>
17
18 /* hold a trps instance and a config manager */
19 struct tr_trps_event_cookie {
20   TRPS_INSTANCE *trps;
21   TR_CFG_MGR *cfg_mgr;
22 };
23
24 /* callback to schedule event to process messages */
25 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
26 {
27   struct event *mq_ev=(struct event *)arg;
28   event_active(mq_ev, 0, 0);
29 }
30
31 static void msg_free_helper(void *p)
32 {
33   tr_msg_free_decoded((TR_MSG *)p);
34 }
35 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
36 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
37                                   TRP_CONNECTION *conn,
38                                   TR_MSG *tr_msg)
39 {
40   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
41   TR_MQ_MSG *mq_msg=NULL;
42
43   /* n.b., conn is available here, but do not hold onto the reference
44    * because it may be cleaned up if the originating connection goes
45    * down before the message is processed */
46   mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
47   if (mq_msg==NULL) {
48     return TRP_NOMEM;
49   }
50   tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
51   trps_mq_append(trps, mq_msg);
52   talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
53   return TRP_SUCCESS;
54 }
55
56
57 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
58                                void *cookie_in)
59 {
60   TR_RP_CLIENT *rp;
61   struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
62   TRPS_INSTANCE *trps = cookie->trps;
63   TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
64   TR_NAME name={gss_name->value, gss_name->length};
65
66   tr_debug("tr_trps_gss_handler()");
67
68   if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
69     tr_debug("tr_trps_gss_handler: Bad parameters.");
70     return -1;
71   }
72   
73   /* look up the RP client matching the GSS name */
74   if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
75     tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
76     return -1;
77   }
78
79   /*trps->rp_gss = rp;*/
80   tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
81
82   return 0;
83 }
84
85 /* data passed to thread */
86 struct trps_thread_data {
87   TRP_CONNECTION *conn;
88   TRPS_INSTANCE *trps;
89 };
90 /* thread to handle GSS connections to peers */
91 static void *tr_trps_thread(void *arg)
92 {
93   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
94   struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
95   TRP_CONNECTION *conn=thread_data->conn;
96   TRPS_INSTANCE *trps=thread_data->trps;
97   TR_MQ_MSG *msg=NULL;
98
99   tr_debug("tr_trps_thread: started");
100   trps_handle_connection(trps, conn);
101
102   msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
103   tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
104   if (msg==NULL)
105     tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
106   else
107     trps_mq_append(trps, msg);
108
109   tr_debug("tr_trps_thread: exit");
110   talloc_free(tmp_ctx);
111   return NULL;
112 }
113
114 /* called when a connection to the TRPS port is received */
115 static void tr_trps_event_cb(int listener, short event, void *arg)
116 {
117   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
118   TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
119   TRP_CONNECTION *conn=NULL;
120   TR_NAME *gssname=NULL;
121   char *name=NULL;
122   struct trps_thread_data *thread_data=NULL;
123
124   if (0==(event & EV_READ)) {
125     tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
126   } else {
127     /* create a thread to handle this connection */
128     asprintf(&name, "trustrouter@%s", trps->hostname);
129     gssname=tr_new_name(name);
130     free(name); name=NULL;
131     conn=trp_connection_accept(tmp_ctx, listener, gssname);
132     if (conn!=NULL) {
133       /* need to monitor this fd and trigger events when read becomes possible */
134       thread_data=talloc(conn, struct trps_thread_data);
135       if (thread_data==NULL) {
136         tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
137         talloc_free(tmp_ctx);
138         return;
139       }
140       thread_data->conn=conn;
141       thread_data->trps=trps;
142       pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
143       pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
144       trps_add_connection(trps, conn); /* remember the connection */
145     }
146   }
147   talloc_free(tmp_ctx);
148 }
149
150 static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
151 {
152   /* everything belonging to the thread is in the TRP_CONNECTION
153    * associated with it */
154   trps_remove_connection(trps, conn);
155   tr_debug("Deleted connection");
156 }
157
158 static void tr_trps_process_mq(int socket, short event, void *arg)
159 {
160   TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
161   TR_MQ_MSG *msg=NULL;
162   const char *s=NULL;
163   char *tmp=NULL;
164
165   msg=trps_mq_pop(trps);
166   while (msg!=NULL) {
167     s=tr_mq_msg_get_message(msg);
168     if (0==strcmp(s, "trps_thread_exit")) {
169       tr_trps_cleanup_thread(trps,
170                              talloc_get_type_abort(tr_mq_msg_get_payload(msg),
171                                                    TRP_CONNECTION));
172     }
173     else if (0==strcmp(s, "tr_msg")) {
174       tmp=tr_msg_encode(tr_mq_msg_get_payload(msg));
175       tr_debug("tr_msg: %s", tmp);
176       tr_msg_free_encoded(tmp);
177     }
178     else
179       tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
180
181     tr_mq_msg_free(msg);
182     msg=trps_mq_pop(trps);
183   }
184 }
185
186 static int tr_trps_events_destructor(void *obj)
187 {
188   TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
189   if (ev->mq_ev!=NULL)
190     event_free(ev->mq_ev);
191   return 0;
192 }
193 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
194 {
195   TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
196   if (ev!=NULL) {
197     ev->listen_ev=talloc(ev, struct tr_socket_event);
198     ev->mq_ev=NULL;
199     if (ev->listen_ev==NULL) {
200       talloc_free(ev);
201       ev=NULL;
202     }
203     talloc_set_destructor((void *)ev, tr_trps_events_destructor);
204   }
205   return ev;
206 }
207
208 /* Configure the trps instance and set up its event handler.
209  * Fills in trps_ev, which should be allocated by caller. */
210 TRP_RC tr_trps_event_init(struct event_base *base,
211                        TRPS_INSTANCE *trps,
212                        TR_CFG_MGR *cfg_mgr,
213                        TR_TRPS_EVENTS *trps_ev)
214 {
215   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
216   struct tr_socket_event *listen_ev=NULL;
217   struct tr_trps_event_cookie *cookie;
218   TRP_RC retval=TRP_ERROR;
219
220   if (trps_ev == NULL) {
221     tr_debug("tr_trps_event_init: Null trps_ev.");
222     retval=TRP_BADARG;
223     goto cleanup;
224   }
225
226   /* get convenient handles */
227   listen_ev=trps_ev->listen_ev;
228
229   /* Create the cookie for callbacks. It is part of the trps context, so it will
230    * be cleaned up when trps is freed by talloc_free. */
231   cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
232   if (cookie == NULL) {
233     tr_debug("tr_trps_event_init: Unable to allocate cookie.");
234     retval=TRP_NOMEM;
235     goto cleanup;
236   }
237   cookie->trps=trps;
238   cookie->cfg_mgr=cfg_mgr;
239   talloc_steal(trps, cookie);
240
241   /* get a trps listener */
242   listen_ev->sock_fd=trps_get_listener(trps,
243                                        tr_trps_msg_handler,
244                                        tr_trps_gss_handler,
245                                        cfg_mgr->active->internal->hostname,
246                                        cfg_mgr->active->internal->trps_port,
247                                        (void *)cookie);
248   if (listen_ev->sock_fd < 0) {
249     tr_crit("Error opening TRP server socket.");
250     retval=TRP_ERROR;
251     goto cleanup;
252   }
253   
254   /* and its event */
255   listen_ev->ev=event_new(base,
256                           listen_ev->sock_fd,
257                           EV_READ|EV_PERSIST,
258                           tr_trps_event_cb,
259                           (void *)trps);
260   event_add(listen_ev->ev, NULL);
261   
262   /* now set up message queue processing event, only triggered by
263    * tr_trps_mq_cb() */
264   trps_ev->mq_ev=event_new(base,
265                            0,
266                            EV_PERSIST,
267                            tr_trps_process_mq,
268                            (void *)trps);
269   tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
270
271   retval=TRP_SUCCESS;
272
273 cleanup:
274   talloc_free(tmp_ctx);
275   return retval;
276 }
277
278
279 struct trpc_notify_cb_data {
280   int msg_ready;
281   pthread_cond_t cond;
282   pthread_mutex_t mutex;
283 };
284
285 static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
286 {
287   struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
288   pthread_mutex_lock(&(cb_data->mutex));
289   if (!cb_data->msg_ready) {
290     cb_data->msg_ready=1;
291     pthread_cond_signal(&(cb_data->cond));
292   }
293   pthread_mutex_unlock(&(cb_data->mutex));
294 }
295
296 /* data passed to thread */
297 struct trpc_thread_data {
298   TRPC_INSTANCE *trpc;
299   TRPS_INSTANCE *trps;
300 };
301 static void *tr_trpc_thread(void *arg)
302 {
303   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
304   struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
305   TRPC_INSTANCE *trpc=thread_data->trpc;
306   TRPS_INSTANCE *trps=thread_data->trps;
307   TRP_RC rc=TRP_ERROR;
308   TR_MQ_MSG *msg=NULL;
309   const char *msg_type=NULL;
310   char *encoded_msg=NULL;
311
312   struct trpc_notify_cb_data cb_data={0,
313                                       PTHREAD_COND_INITIALIZER,
314                                       PTHREAD_MUTEX_INITIALIZER};
315
316   tr_debug("tr_trpc_thread: started");
317
318   /* set up the mq for receiving */
319   pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
320
321   tr_mq_lock(trpc->mq);
322   tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
323   tr_mq_unlock(trpc->mq);
324
325   rc=trpc_connect(trpc);
326   if (rc!=TRP_SUCCESS) {
327     tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
328               trpc_get_server(trpc),
329               trpc_get_port(trpc));
330   } else {
331     while (1) {
332       cb_data.msg_ready=0;
333       pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
334       /* verify the condition */
335       if (cb_data.msg_ready) {
336         msg=trpc_mq_pop(trpc);
337         if (msg==NULL) {
338           /* no message in the queue */
339           tr_err("tr_trpc_thread: notified of msg, but queue empty");
340           break;
341         }
342
343         msg_type=tr_mq_msg_get_message(msg);
344
345         if (0==strcmp(msg_type, "trpc_abort")) {
346           tr_mq_msg_free(msg);
347           break; /* exit loop */
348         }
349         else if (0==strcmp(msg_type, "trpc_send")) {
350           encoded_msg=tr_mq_msg_get_payload(msg);
351           if (encoded_msg==NULL)
352             tr_notice("tr_trpc_thread: null outgoing TRP message.");
353           else {
354             rc = trpc_send_msg(trpc, encoded_msg);
355             if (rc!=TRP_SUCCESS) {
356               tr_notice("tr_trpc_thread: trpc_send_msg failed.");
357               tr_mq_msg_free(msg);
358               break;
359             }
360           }
361         }
362         else
363           tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
364
365         tr_mq_msg_free(msg);
366       }
367     }
368   }
369
370   msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
371   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
372   if (msg==NULL)
373     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
374   else
375     trps_mq_append(trps, msg);
376
377   talloc_free(tmp_ctx);
378   return NULL;
379 }
380
381 /* starts a trpc thread to connect to server:port */
382 TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
383 {
384   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
385   TRPC_INSTANCE *trpc=NULL;
386   TRP_CONNECTION *conn=NULL;
387   struct trpc_thread_data *thread_data=NULL;
388
389   tr_debug("tr_trpc_initiate entered");
390   trpc=trpc_new(tmp_ctx);
391   if (trpc==NULL) {
392     tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
393     goto cleanup;
394   }
395   tr_debug("tr_trpc_initiate: allocated trpc");
396
397   conn=trp_connection_new(trpc);
398   if (conn==NULL) {
399     tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
400     goto cleanup;
401   }
402   trpc_set_conn(trpc, conn);
403   trpc_set_server(trpc, talloc_strdup(trpc, server));
404   trpc_set_port(trpc, port);
405   tr_debug("tr_trpc_initiate: allocated connection");
406   
407   /* start thread */
408   thread_data=talloc(trpc, struct trpc_thread_data);
409   if (thread_data==NULL) {
410     tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
411     goto cleanup;
412   }
413   thread_data->trpc=trpc;
414   thread_data->trps=trps;
415
416   pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
417   pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
418
419   tr_debug("tr_trpc_initiate: started trpc thread");
420   trps_add_trpc(trps, trpc);
421
422   talloc_report_full(trps, stderr);
423   talloc_report_full(tmp_ctx, stderr);
424
425  cleanup:
426   talloc_free(tmp_ctx);
427   return trpc;
428 }