3 #include <event2/event.h>
11 #include <trp_internal.h>
12 #include <tr_config.h>
18 /* hold a trps instance and a config manager */
19 struct tr_trps_event_cookie {
25 /* callback to schedule event to process messages */
26 static void tr_trps_mq_cb(TR_MQ *mq, void *arg)
28 struct event *mq_ev=(struct event *)arg;
29 event_active(mq_ev, 0, 0);
32 static void msg_free_helper(void *p)
34 tr_msg_free_decoded((TR_MSG *)p);
36 /* takes a TR_MSG and puts it in a TR_MQ_MSG for processing by the main thread */
37 static TRP_RC tr_trps_msg_handler(TRPS_INSTANCE *trps,
41 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
42 TR_MQ_MSG *mq_msg=NULL;
44 /* n.b., conn is available here, but do not hold onto the reference
45 * because it may be cleaned up if the originating connection goes
46 * down before the message is processed */
47 mq_msg=tr_mq_msg_new(tmp_ctx, "tr_msg");
51 tr_mq_msg_set_payload(mq_msg, (void *)tr_msg, msg_free_helper);
52 trps_mq_append(trps, mq_msg);
53 talloc_free(tmp_ctx); /* cleans up the message if it did not get appended correctly */
58 static int tr_trps_gss_handler(gss_name_t client_name, gss_buffer_t gss_name,
62 struct tr_trps_event_cookie *cookie=(struct tr_trps_event_cookie *)cookie_in;
63 TRPS_INSTANCE *trps = cookie->trps;
64 TR_CFG_MGR *cfg_mgr = cookie->cfg_mgr;
65 TR_NAME name={gss_name->value, gss_name->length};
67 tr_debug("tr_trps_gss_handler()");
69 if ((!client_name) || (!gss_name) || (!trps) || (!cfg_mgr)) {
70 tr_debug("tr_trps_gss_handler: Bad parameters.");
74 /* look up the RP client matching the GSS name */
75 if ((NULL == (rp = tr_rp_client_lookup(cfg_mgr->active->rp_clients, &name)))) {
76 tr_debug("tr_trps_gss_handler: Unknown GSS name %.*s", name.len, name.buf);
80 /*trps->rp_gss = rp;*/
81 tr_debug("Client's GSS Name: %.*s", name.len, name.buf);
86 /* data passed to thread */
91 /* thread to handle GSS connections to peers */
92 static void *tr_trps_conn_thread(void *arg)
94 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
95 struct thread_data *thread_data=talloc_get_type_abort(arg, struct thread_data);
96 TRP_CONNECTION *conn=thread_data->conn;
97 TRPS_INSTANCE *trps=thread_data->trps;
100 tr_debug("tr_trps_conn_thread: started");
101 trps_handle_connection(trps, conn);
103 msg=tr_mq_msg_new(tmp_ctx, "thread_exit");
104 tr_mq_msg_set_payload(msg, (void *)conn, NULL); /* do not pass a free routine */
106 tr_err("tr_trps_conn_thread: error allocating TR_MQ_MSG");
108 trps_mq_append(trps, msg);
110 tr_debug("tr_trps_conn_thread: exit");
111 talloc_free(tmp_ctx);
115 /* called when a connection to the TRPS port is received */
116 static void tr_trps_event_cb(int listener, short event, void *arg)
118 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
119 TRPS_INSTANCE *trps = talloc_get_type_abort(arg, TRPS_INSTANCE); /* aborts on wrong type */
120 TRP_CONNECTION *conn=NULL;
121 TR_NAME *gssname=NULL;
123 struct thread_data *thread_data;
125 if (0==(event & EV_READ)) {
126 tr_debug("tr_trps_event_cb: unexpected event on TRPS socket (event=0x%X)", event);
128 /* create a thread to handle this connection */
129 asprintf(&name, "trustrouter@%s", trps->hostname);
130 gssname=tr_new_name(name);
131 free(name); name=NULL;
132 conn=trp_connection_accept(tmp_ctx, listener, gssname);
134 /* need to monitor this fd and trigger events when read becomes possible */
135 thread_data=talloc(conn, struct thread_data);
136 if (thread_data==NULL) {
137 tr_err("tr_trps_event_cb: unable to allocate thread_data");
138 talloc_free(tmp_ctx);
141 thread_data->conn=conn;
142 thread_data->trps=trps;
143 pthread_create(conn->thread, NULL, tr_trps_conn_thread, thread_data);
144 pthread_detach(*(conn->thread)); /* we will not rejoin the thread */
145 trps_add_connection(trps, conn); /* remember the connection */
148 talloc_free(tmp_ctx);
151 static void tr_trps_cleanup_thread(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
153 /* everything belonging to the thread is in the TRP_CONNECTION
154 * associated with it */
155 trps_remove_connection(trps, conn);
156 tr_debug("Deleted connection");
159 static void tr_trps_process_mq(int socket, short event, void *arg)
161 TRPS_INSTANCE *trps=talloc_get_type_abort(arg, TRPS_INSTANCE);
166 tr_debug("tr_trps_process_mw: starting");
167 msg=trps_mq_pop(trps);
169 s=tr_mq_msg_get_message(msg);
170 if (0==strcmp(s, "thread_exit")) {
171 tr_trps_cleanup_thread(trps,
172 talloc_get_type_abort(tr_mq_msg_get_payload(msg),
175 else if (0==strcmp(s, "tr_msg")) {
176 tmp=tr_msg_encode(tr_mq_msg_get_payload(msg));
177 tr_debug("tr_msg: %s", tmp);
178 tr_msg_free_encoded(tmp);
181 tr_notice("tr_trps_process_mq: unknown message '%s' received.", tr_mq_msg_get_message(msg));
184 msg=trps_mq_pop(trps);
186 tr_debug("tr_trps_process_mw: ending");
189 static int tr_trps_events_destructor(void *obj)
191 TR_TRPS_EVENTS *ev=talloc_get_type_abort(obj, TR_TRPS_EVENTS);
193 event_free(ev->mq_ev);
196 TR_TRPS_EVENTS *tr_trps_events_new(TALLOC_CTX *mem_ctx)
198 TR_TRPS_EVENTS *ev=talloc(mem_ctx, TR_TRPS_EVENTS);
200 ev->listen_ev=talloc(ev, struct tr_socket_event);
202 if (ev->listen_ev==NULL) {
206 talloc_set_destructor((void *)ev, tr_trps_events_destructor);
211 /* Configure the trps instance and set up its event handler.
212 * Returns 0 on success, nonzero on failure. Results in
213 * trps_ev, which should be allocated by caller. */
214 int tr_trps_event_init(struct event_base *base,
217 TR_TRPS_EVENTS *trps_ev)
219 TALLOC_CTX *tmp_ctx=talloc_new(NULL);
220 struct tr_socket_event *listen_ev=NULL;
221 struct tr_trps_event_cookie *cookie;
224 if (trps_ev == NULL) {
225 tr_debug("tr_trps_event_init: Null trps_ev.");
230 /* get convenient handles */
231 listen_ev=trps_ev->listen_ev;
233 /* Create the cookie for callbacks. It is part of the trps context, so it will
234 * be cleaned up when trps is freed by talloc_free. */
235 cookie=talloc(tmp_ctx, struct tr_trps_event_cookie);
236 if (cookie == NULL) {
237 tr_debug("tr_trps_event_init: Unable to allocate cookie.");
242 cookie->cfg_mgr=cfg_mgr;
243 talloc_steal(trps, cookie);
245 /* get a trps listener */
246 listen_ev->sock_fd=trps_get_listener(trps,
249 cfg_mgr->active->internal->hostname,
250 cfg_mgr->active->internal->trps_port,
252 if (listen_ev->sock_fd < 0) {
253 tr_crit("Error opening TRP server socket.");
259 listen_ev->ev=event_new(base,
264 event_add(listen_ev->ev, NULL);
266 /* now set up message queue processing event, only triggered by
268 trps_ev->mq_ev=event_new(base,
273 tr_mq_set_notify_cb(trps->mq, tr_trps_mq_cb, trps_ev->mq_ev);
276 talloc_free(tmp_ctx);