Make outgoing connections. Connect to self as a test.
[trust_router.git] / tr / tr_trp.c
1 #include <pthread.h>
2 #include <fcntl.h>
3 #include <event2/event.h>
4 #include <talloc.h>
5 #include <errno.h>
6 #include <unistd.h>
7 #include <string.h>
8
9 #include <gsscon.h>
10 #include <tr_rp.h>
11 #include <trp_internal.h>
12 #include <tr_config.h>
13 #include <tr_event.h>
14 #include <tr_msg.h>
15 #include <tr_trp.h>
16 #include <tr_debug.h>
17
18 /* hold a trps instance and a config manager */
19 struct tr_trps_event_cookie {
20   TRPS_INSTANCE *trps;
21   TR_CFG_MGR *cfg_mgr;
22 };
23
24 /* callback to schedule event to process messages */
25 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
26 {
27   struct event *mq_ev=(struct event *)arg;
28   event_active(mq_ev, 0, 0);
29 }
30
31 static void msg_free_helper(void *p)
32 {
33   tr_msg_free_decoded((TR_MSG *)p);
34 }
35 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
36 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
37                                   TRP_CONNECTION *conn,
38                                   TR_MSG *tr_msg)
39 {
40   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
41   TR_MQ_MSG *mq_msg=NULL;
42
43   /* n.b., conn is available here, but do not hold onto the reference
44    * because it may be cleaned up if the originating connection goes
45    * down before the message is processed */
46   mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
47   if (mq_msg==NULL) {
48     return TRP_NOMEM;
49   }
50   tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
51   trps_mq_append(trps, mq_msg);
52   talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
53   return TRP_SUCCESS;
54 }
55
56
57 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
58                                void *cookie_in)
59 {
60   TR_RP_CLIENT *rp;
61   struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
62   TRPS_INSTANCE *trps = cookie->trps;
63   TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
64   TR_NAME name={gss_name->value, gss_name->length};
65
66   tr_debug("tr_trps_gss_handler()");
67
68   if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
69     tr_debug("tr_trps_gss_handler: Bad parameters.");
70     return -1;
71   }
72   
73   /* look up the RP client matching the GSS name */
74   if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
75     tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
76     return -1;
77   }
78
79   /*trps->rp_gss = rp;*/
80   tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
81
82   return 0;
83 }
84
85 /* data passed to thread */
86 struct trps_thread_data {
87   TRP_CONNECTION *conn;
88   TRPS_INSTANCE *trps;
89 };
90 /* thread to handle GSS connections to peers */
91 static void *tr_trps_thread(void *arg)
92 {
93   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
94   struct trps_thread_data *thread_data=talloc_get_type_abort(arg, struct trps_thread_data);
95   TRP_CONNECTION *conn=thread_data->conn;
96   TRPS_INSTANCE *trps=thread_data->trps;
97   TR_MQ_MSG *msg=NULL;
98
99   tr_debug("tr_trps_thread: started");
100   trps_handle_connection(trps, conn);
101
102   msg=tr_mq_msg_new(tmp_ctx, "trps_thread_exit");
103   tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
104   if (msg==NULL)
105     tr_err("tr_trps_thread: error allocating TR_MQ_MSG");
106   else
107     trps_mq_append(trps, msg);
108
109   tr_debug("tr_trps_thread: exit");
110   talloc_free(tmp_ctx);
111   return NULL;
112 }
113
114 /* called when a connection to the TRPS port is received */
115 static void tr_trps_event_cb(int listener, short event, void *arg)
116 {
117   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
118   TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
119   TRP_CONNECTION *conn=NULL;
120   TR_NAME *gssname=NULL;
121   char *name=NULL;
122   struct trps_thread_data *thread_data=NULL;
123
124   if (0==(event & EV_READ)) {
125     tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
126   } else {
127     /* create a thread to handle this connection */
128     asprintf(&name, "trustrouter@%s", trps->hostname);
129     gssname=tr_new_name(name);
130     free(name); name=NULL;
131     conn=trp_connection_accept(tmp_ctx, listener, gssname);
132     if (conn!=NULL) {
133       /* need to monitor this fd and trigger events when read becomes possible */
134       thread_data=talloc(conn, struct trps_thread_data);
135       if (thread_data==NULL) {
136         tr_err("tr_trps_event_cb: unable to allocate trps_thread_data");
137         talloc_free(tmp_ctx);
138         return;
139       }
140       thread_data->conn=conn;
141       thread_data->trps=trps;
142       pthread_create(trp_connection_get_thread(conn), NULL, tr_trps_thread, thread_data);
143       pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
144       trps_add_connection(trps, conn); /* remember the connection */
145     }
146   }
147   talloc_free(tmp_ctx);
148 }
149
150 static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
151 {
152   /* everything belonging to the thread is in the TRP_CONNECTION
153    * associated with it */
154   trps_remove_connection(trps, conn);
155   tr_debug("Deleted connection");
156 }
157
158 static void tr_trps_process_mq(int socket, short event, void *arg)
159 {
160   TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
161   TR_MQ_MSG *msg=NULL;
162   const char *s=NULL;
163   char *tmp=NULL;
164
165   tr_debug("tr_trps_process_mw: starting");
166   msg=trps_mq_pop(trps);
167   while (msg!=NULL) {
168     s=tr_mq_msg_get_message(msg);
169     if (0==strcmp(s, "trps_thread_exit")) {
170       tr_trps_cleanup_thread(trps,
171                              talloc_get_type_abort(tr_mq_msg_get_payload(msg),
172                                                    TRP_CONNECTION));
173     }
174     else if (0==strcmp(s, "tr_msg")) {
175       tmp=tr_msg_encode(tr_mq_msg_get_payload(msg));
176       tr_debug("tr_msg: %s", tmp);
177       tr_msg_free_encoded(tmp);
178     }
179     else
180       tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
181
182     tr_mq_msg_free(msg);
183     msg=trps_mq_pop(trps);
184   }
185   tr_debug("tr_trps_process_mw: ending");
186 }
187
188 static int tr_trps_events_destructor(void *obj)
189 {
190   TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
191   if (ev->mq_ev!=NULL)
192     event_free(ev->mq_ev);
193   return 0;
194 }
195 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
196 {
197   TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
198   if (ev!=NULL) {
199     ev->listen_ev=talloc(ev, struct tr_socket_event);
200     ev->mq_ev=NULL;
201     if (ev->listen_ev==NULL) {
202       talloc_free(ev);
203       ev=NULL;
204     }
205     talloc_set_destructor((void *)ev, tr_trps_events_destructor);
206   }
207   return ev;
208 }
209
210 /* Configure the trps instance and set up its event handler.
211  * Fills in trps_ev, which should be allocated by caller. */
212 TRP_RC tr_trps_event_init(struct event_base *base,
213                        TRPS_INSTANCE *trps,
214                        TR_CFG_MGR *cfg_mgr,
215                        TR_TRPS_EVENTS *trps_ev)
216 {
217   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
218   struct tr_socket_event *listen_ev=NULL;
219   struct tr_trps_event_cookie *cookie;
220   TRP_RC retval=TRP_ERROR;
221
222   if (trps_ev == NULL) {
223     tr_debug("tr_trps_event_init: Null trps_ev.");
224     retval=TRP_BADARG;
225     goto cleanup;
226   }
227
228   /* get convenient handles */
229   listen_ev=trps_ev->listen_ev;
230
231   /* Create the cookie for callbacks. It is part of the trps context, so it will
232    * be cleaned up when trps is freed by talloc_free. */
233   cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
234   if (cookie == NULL) {
235     tr_debug("tr_trps_event_init: Unable to allocate cookie.");
236     retval=TRP_NOMEM;
237     goto cleanup;
238   }
239   cookie->trps=trps;
240   cookie->cfg_mgr=cfg_mgr;
241   talloc_steal(trps, cookie);
242
243   /* get a trps listener */
244   listen_ev->sock_fd=trps_get_listener(trps,
245                                        tr_trps_msg_handler,
246                                        tr_trps_gss_handler,
247                                        cfg_mgr->active->internal->hostname,
248                                        cfg_mgr->active->internal->trps_port,
249                                        (void *)cookie);
250   if (listen_ev->sock_fd < 0) {
251     tr_crit("Error opening TRP server socket.");
252     retval=TRP_ERROR;
253     goto cleanup;
254   }
255   
256   /* and its event */
257   listen_ev->ev=event_new(base,
258                           listen_ev->sock_fd,
259                           EV_READ|EV_PERSIST,
260                           tr_trps_event_cb,
261                           (void *)trps);
262   event_add(listen_ev->ev, NULL);
263   
264   /* now set up message queue processing event, only triggered by
265    * tr_trps_mq_cb() */
266   trps_ev->mq_ev=event_new(base,
267                            0,
268                            EV_PERSIST,
269                            tr_trps_process_mq,
270                            (void *)trps);
271   tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
272
273   retval=TRP_SUCCESS;
274
275 cleanup:
276   talloc_free(tmp_ctx);
277   return retval;
278 }
279
280
281 struct trpc_notify_cb_data {
282   int msg_ready;
283   pthread_cond_t cond;
284   pthread_mutex_t mutex;
285 };
286
287 static void tr_trpc_mq_cb(TR_MQ *mq, void *arg)
288 {
289   struct trpc_notify_cb_data *cb_data=(struct trpc_notify_cb_data *) arg;
290   pthread_mutex_lock(&(cb_data->mutex));
291   if (!cb_data->msg_ready) {
292     cb_data->msg_ready=1;
293     pthread_cond_signal(&(cb_data->cond));
294   }
295   pthread_mutex_unlock(&(cb_data->mutex));
296 }
297
298 /* data passed to thread */
299 struct trpc_thread_data {
300   TRPC_INSTANCE *trpc;
301   TRPS_INSTANCE *trps;
302 };
303 static void *tr_trpc_thread(void *arg)
304 {
305   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
306   struct trpc_thread_data *thread_data=talloc_get_type_abort(arg, struct trpc_thread_data);
307   TRPC_INSTANCE *trpc=thread_data->trpc;
308   TRPS_INSTANCE *trps=thread_data->trps;
309   TRP_RC rc=TRP_ERROR;
310   TR_MQ_MSG *msg=NULL;
311   const char *msg_type=NULL;
312   char *encoded_msg=NULL;
313
314   struct trpc_notify_cb_data cb_data={0,
315                                       PTHREAD_COND_INITIALIZER,
316                                       PTHREAD_MUTEX_INITIALIZER};
317
318   tr_debug("tr_trpc_thread: started");
319
320   /* set up the mq for receiving */
321   pthread_mutex_lock(&(cb_data.mutex)); /* hold this lock until we enter the main loop */
322
323   tr_mq_lock(trpc->mq);
324   tr_mq_set_notify_cb(trpc->mq, tr_trpc_mq_cb, (void *) &cb_data);
325   tr_mq_unlock(trpc->mq);
326
327   rc=trpc_connect(trpc);
328   if (rc!=TRP_SUCCESS) {
329     tr_notice("tr_trpc_thread: failed to initiate connection to %s:%d.",
330               trpc_get_server(trpc),
331               trpc_get_port(trpc));
332   } else {
333     while (1) {
334       cb_data.msg_ready=0;
335       pthread_cond_wait(&(cb_data.cond), &(cb_data.mutex));
336       /* verify the condition */
337       if (cb_data.msg_ready) {
338         msg=trpc_mq_pop(trpc);
339         if (msg==NULL) {
340           /* no message in the queue */
341           tr_err("tr_trpc_thread: notified of msg, but queue empty");
342           break;
343         }
344
345         msg_type=tr_mq_msg_get_message(msg);
346
347         if (0==strcmp(msg_type, "trpc_abort")) {
348           tr_mq_msg_free(msg);
349           break; /* exit loop */
350         }
351         else if (0==strcmp(msg_type, "trpc_send")) {
352           encoded_msg=tr_mq_msg_get_payload(msg);
353           if (encoded_msg==NULL)
354             tr_notice("tr_trpc_thread: null outgoing TRP message.");
355           else {
356             rc = trpc_send_msg(trpc, encoded_msg);
357             if (rc!=TRP_SUCCESS) {
358               tr_notice("tr_trpc_thread: trpc_send_msg failed.");
359               tr_mq_msg_free(msg);
360               break;
361             }
362           }
363         }
364         else
365           tr_notice("tr_trpc_thread: unknown message '%s' received.", msg_type);
366
367         tr_mq_msg_free(msg);
368       }
369     }
370   }
371
372   msg=tr_mq_msg_new(tmp_ctx, "trpc_thread_exit");
373   tr_mq_msg_set_payload(msg, (void *)trpc, NULL); /* do not pass a free routine */
374   if (msg==NULL)
375     tr_err("tr_trpc_thread: error allocating TR_MQ_MSG");
376   else
377     trps_mq_append(trps, msg);
378
379   talloc_free(tmp_ctx);
380   return NULL;
381 }
382
383 /* starts a trpc thread to connect to server:port */
384 TRPC_INSTANCE *tr_trpc_initiate(TRPS_INSTANCE *trps, const char *server, unsigned int port)
385 {
386   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
387   TRPC_INSTANCE *trpc=NULL;
388   TRP_CONNECTION *conn=NULL;
389   struct trpc_thread_data *thread_data=NULL;
390
391   tr_debug("tr_trpc_initiate entered");
392   trpc=trpc_new(tmp_ctx);
393   if (trpc==NULL) {
394     tr_crit("tr_trpc_initiate: could not allocate TRPC_INSTANCE.");
395     goto cleanup;
396   }
397   tr_debug("tr_trpc_initiate: allocated trpc");
398
399   conn=trp_connection_new(trpc);
400   if (conn==NULL) {
401     tr_crit("tr_trpc_initiate: could not allocate TRP_CONNECTION.");
402     goto cleanup;
403   }
404   trpc_set_conn(trpc, conn);
405   trpc_set_server(trpc, talloc_strdup(trpc, server));
406   trpc_set_port(trpc, port);
407   tr_debug("tr_trpc_initiate: allocated connection");
408   
409   /* start thread */
410   thread_data=talloc(trpc, struct trpc_thread_data);
411   if (thread_data==NULL) {
412     tr_crit("tr_trpc_initiate: could not allocate struct trpc_thread_data.");
413     goto cleanup;
414   }
415   thread_data->trpc=trpc;
416   thread_data->trps=trps;
417
418   pthread_create(trp_connection_get_thread(conn), NULL, tr_trpc_thread, thread_data);
419   pthread_detach(*(trp_connection_get_thread(conn))); /* we will not rejoin the thread */
420
421   tr_debug("tr_trpc_initiate: started trpc thread");
422   trps_add_trpc(trps, trpc);
423
424   talloc_report_full(trps, stderr);
425   talloc_report_full(tmp_ctx, stderr);
426
427  cleanup:
428   talloc_free(tmp_ctx);
429   return trpc;
430 }