Successful messages via mq to main thread.
[trust_router.git] / tr / tr_trp.c
1 #include <pthread.h>
2 #include <fcntl.h>
3 #include <event2/event.h>
4 #include <talloc.h>
5 #include <errno.h>
6 #include <unistd.h>
7
8 #include <gsscon.h>
9 #include <tr_rp.h>
10 #include <trp_internal.h>
11 #include <tr_config.h>
12 #include <tr_event.h>
13 #include <tr_debug.h>
14 #include <tr_trp.h>
15
16 /* hold a trps instance and a config manager */
17 struct tr_trps_event_cookie {
18   TRPS_INSTANCE *trps;
19   TR_CFG_MGR *cfg_mgr;
20 };
21
22
23 /* callback to schedule event to process messages */
24 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
25 {
26   struct event *mq_ev=(struct event *)arg;
27   event_active(mq_ev, 0, 0);
28 }
29
30 static int tr_trps_req_handler (TRPS_INSTANCE *trps,
31                                 TRP_REQ *orig_req, 
32                                 void *tr_in)
33 {
34   if (orig_req != NULL) 
35     free(orig_req);
36   return -1; /* not handling anything right now */
37 }
38
39
40 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
41                                void *cookie_in)
42 {
43   TR_RP_CLIENT *rp;
44   struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
45   TRPS_INSTANCE *trps = cookie->trps;
46   TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
47   TR_NAME name={gss_name->value, gss_name->length};
48
49   tr_debug("tr_trps_gss_handler()");
50
51   if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
52     tr_debug("tr_trps_gss_handler: Bad parameters.");
53     return -1;
54   }
55   
56   /* look up the RP client matching the GSS name */
57   if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
58     tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
59     return -1;
60   }
61
62   /*trps->rp_gss = rp;*/
63   tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
64
65   return 0;
66 }
67
68 /* data passed to thread */
69 struct thread_data {
70   TRP_CONNECTION *conn;
71   TRPS_INSTANCE *trps;
72 };
73 /* thread to handle GSS connections to peers */
74 static void *tr_trps_conn_thread(void *arg)
75 {
76   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
77   struct thread_data *thread_data=talloc_get_type_abort(arg, struct thread_data);
78   TRP_CONNECTION *conn=thread_data->conn;
79   TRPS_INSTANCE *trps=thread_data->trps;
80   TR_MQ_MSG *msg=NULL;
81
82   tr_debug("tr_trps_conn_thread: started");
83   /* try to establish a GSS context */
84   if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) {
85     tr_notice("tr_trps_conn_thread: failed to authorize connection");
86     pthread_exit(NULL);
87   }
88   tr_notice("tr_trps_conn_thread: authorized connection");
89   
90   msg=tr_mq_msg_new(tmp_ctx);
91   if (msg==NULL)
92     tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG");
93   else
94     trps_mq_append(trps, msg);
95
96   tr_debug("tr_trps_conn_thread: exit");
97   talloc_free(tmp_ctx);
98   return NULL;
99 }
100
101 /* called when a connection to the TRPS port is received */
102 static void tr_trps_event_cb(int listener, short event, void *arg)
103 {
104   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
105   TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
106   TRP_CONNECTION *conn=NULL;
107   TR_NAME *gssname=NULL;
108   char *name=NULL;
109   struct thread_data *thread_data;
110
111   if (0==(event & EV_READ)) {
112     tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
113   } else {
114     /* create a thread to handle this connection */
115     asprintf(&name, "trustrouter@%s", trps->hostname);
116     gssname=tr_new_name(name);
117     free(name); name=NULL;
118     conn=trp_connection_accept(tmp_ctx, listener, gssname, trps_auth_cb, NULL, trps);
119     if (conn!=NULL) {
120       /* need to monitor this fd and trigger events when read becomes possible */
121       thread_data=talloc(conn, struct thread_data);
122       if (thread_data==NULL) {
123         tr_err("tr_trps_event_cb: unable to allocate thread_data");
124         talloc_free(tmp_ctx);
125         return;
126       }
127       thread_data->conn=conn;
128       thread_data->trps=trps;
129       pthread_create(conn->thread, NULL, tr_trps_conn_thread, thread_data);
130       pthread_detach(*(conn->thread)); /* we will not rejoin the thread */
131       trps_add_connection(trps, conn); /* remember the connection */
132     }
133   }
134   talloc_free(tmp_ctx);
135 }
136
137 static void tr_trps_process_mq(int socket, short event, void *arg)
138 {
139   TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
140   TR_MQ_MSG *msg=NULL;
141
142   tr_debug("tr_trps_process_mw: starting");
143   msg=trps_mq_pop(trps);
144   while (msg!=NULL) {
145     tr_debug("tr_trps_process_mq: received message");
146     tr_mq_msg_free(msg);
147     msg=trps_mq_pop(trps);
148   }
149   tr_debug("tr_trps_process_mw: ending");
150 }
151
152 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
153 {
154   TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
155   if (ev!=NULL) {
156     ev->listen_ev=talloc(ev, struct tr_socket_event);
157     ev->mq_ev=NULL;
158     if (ev->listen_ev==NULL) {
159       talloc_free(ev);
160       ev=NULL;
161     }
162   }
163   return ev;
164 }
165
166 /* Configure the trps instance and set up its event handler.
167  * Returns 0 on success, nonzero on failure. Results in 
168  * trps_ev, which should be allocated by caller. */
169 int tr_trps_event_init(struct event_base *base,
170                        TRPS_INSTANCE *trps,
171                        TR_CFG_MGR *cfg_mgr,
172                        TR_TRPS_EVENTS *trps_ev)
173 {
174   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
175   struct tr_socket_event *listen_ev=NULL;
176   struct tr_trps_event_cookie *cookie;
177   int retval=0;
178
179   if (trps_ev == NULL) {
180     tr_debug("tr_trps_event_init: Null trps_ev.");
181     retval=1;
182     goto cleanup;
183   }
184
185   /* get convenient handles */
186   listen_ev=trps_ev->listen_ev;
187
188   /* Create the cookie for callbacks. It is part of the trps context, so it will
189    * be cleaned up when trps is freed by talloc_free. */
190   cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
191   if (cookie == NULL) {
192     tr_debug("tr_trps_event_init: Unable to allocate cookie.");
193     retval=1;
194     goto cleanup;
195   }
196   cookie->trps=trps;
197   cookie->cfg_mgr=cfg_mgr;
198   talloc_steal(trps, cookie);
199
200   /* get a trps listener */
201   listen_ev->sock_fd=trps_get_listener(trps,
202                                      tr_trps_req_handler,
203                                      tr_trps_gss_handler,
204                                      cfg_mgr->active->internal->hostname,
205                                      cfg_mgr->active->internal->trps_port,
206                                      (void *)cookie);
207   if (listen_ev->sock_fd < 0) {
208     tr_crit("Error opening TRP server socket.");
209     retval=1;
210     goto cleanup;
211   }
212   
213   /* and its event */
214   listen_ev->ev=event_new(base,
215                           listen_ev->sock_fd,
216                           EV_READ|EV_PERSIST,
217                           tr_trps_event_cb,
218                           (void *)trps);
219   event_add(listen_ev->ev, NULL);
220   
221   /* now set up message queue processing event, only triggered by
222    * tr_trps_mq_cb() */
223   trps_ev->mq_ev=event_new(base,
224                            0,
225                            EV_PERSIST,
226                            tr_trps_process_mq,
227                            (void *)trps);
228   tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
229
230 cleanup:
231   talloc_free(tmp_ctx);
232   return retval;
233 }
234