Update the routing table when TRP updates are received.
[trust_router.git] / trp / trps.c
1 #include <fcntl.h>
2 #include <talloc.h>
3 #include <errno.h>
4 #include <unistd.h>
5
6 #include <gsscon.h>
7 #include <tr_rp.h>
8 #include <trust_router/tr_name.h>
9 #include <trp_internal.h>
10 #include <trp_rtable.h>
11 #include <tr_debug.h>
12
13
14 static int trps_destructor(void *object)
15 {
16   TRPS_INSTANCE *trps=talloc_get_type_abort(object, TRPS_INSTANCE);
17   if (trps->rtable!=NULL)
18     trp_rtable_free(trps->rtable);
19   return 0;
20 }
21
22 TRPS_INSTANCE *trps_new (TALLOC_CTX *mem_ctx)
23 {
24   TRPS_INSTANCE *trps=talloc(mem_ctx, TRPS_INSTANCE);
25   if (trps!=NULL)  {
26     trps->hostname=NULL;
27     trps->port=0;
28     trps->cookie=NULL;
29     trps->conn=NULL;
30     trps->trpc=NULL;
31     trps->mq=tr_mq_new(trps);
32     if (trps->mq==NULL) {
33       /* failed to allocate mq */
34       talloc_free(trps);
35       trps=NULL;
36     } else {
37       trps->rtable=trp_rtable_new();
38       if (trps->rtable==NULL) {
39         /* failed to allocate rtable */
40         talloc_free(trps);
41         trps=NULL;
42       } else
43         talloc_set_destructor((void *)trps, trps_destructor);
44     }
45   }
46   return trps;
47 }
48
49 void trps_free (TRPS_INSTANCE *trps)
50 {
51   if (trps!=NULL)
52     talloc_free(trps);
53 }
54
55 TR_MQ_MSG *trps_mq_pop(TRPS_INSTANCE *trps)
56 {
57   return tr_mq_pop(trps->mq);
58 }
59
60 void trps_mq_append(TRPS_INSTANCE *trps, TR_MQ_MSG *msg)
61 {
62   tr_mq_append(trps->mq, msg);
63 }
64
65 /* stand-in for a function that finds the connection for a particular peer */
66 #if 0
67 static TRP_CONNECTION *trps_find_connection(TRPS_INSTANCE *trps)
68 {
69   return trps->conn;
70 }
71 #endif
72
73 void trps_add_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *new)
74 {
75   if (trps->conn==NULL)
76     trps->conn=new;
77   else
78     trp_connection_append(trps->conn, new);
79
80   talloc_steal(trps, new);
81 }
82
83 /* ok to call more than once; guarantees connection no longer in the list.
84  * Caller is responsible for freeing the removed element afterwards.  */
85 void trps_remove_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *remove)
86 {
87   trps->conn=trp_connection_remove(trps->conn, remove);
88 }
89
90 void trps_add_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *trpc)
91 {
92   if (trps->trpc==NULL)
93     trps->trpc=trpc;
94   else
95     trpc_append(trps->trpc, trpc);
96
97   talloc_steal(trps, trpc);
98 }
99
100 /* ok to call more than once; guarantees trpc no longer in the list.
101  * Caller is responsible for freeing the removed element afterwards.  */
102 void trps_remove_trpc(TRPS_INSTANCE *trps, TRPC_INSTANCE *remove)
103 {
104   trps->trpc=trpc_remove(trps->trpc, remove);
105 }
106
107 TRP_RC trps_send_msg (TRPS_INSTANCE *trps, void *peer, const char *msg)
108 {
109   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
110   TR_MQ_MSG *mq_msg=NULL;
111   char *msg_dup=NULL;
112   TRP_RC rc=TRP_ERROR;
113
114   /* Currently ignore peer and just send to an open connection.
115    * In reality, need to identify the correct peer and send via that
116    * one.  */
117   if (trps->trpc != NULL) {
118     if (trpc_get_status(trps->trpc)!=TRP_CONNECTION_UP)
119       tr_debug("trps_send_msg: skipping message sent while TRPC connection not up.");
120     else {
121       mq_msg=tr_mq_msg_new(tmp_ctx, "trpc_send");
122       msg_dup=talloc_strdup(mq_msg, msg); /* get local copy in mq_msg context */
123       tr_mq_msg_set_payload(mq_msg, msg_dup, NULL); /* no need for a free() func */
124       trpc_mq_append(trps->trpc, mq_msg);
125       rc=TRP_SUCCESS;
126     }
127   }
128   talloc_free(tmp_ctx);
129   return rc;
130 }
131
132 static int trps_listen (TRPS_INSTANCE *trps, int port) 
133 {
134   int rc = 0;
135   int conn = -1;
136   int optval = 1;
137
138   union {
139     struct sockaddr_storage storage;
140     struct sockaddr_in in4;
141   } addr;
142
143   struct sockaddr_in *saddr = (struct sockaddr_in *) &addr.in4;
144
145   saddr->sin_port = htons (port);
146   saddr->sin_family = AF_INET;
147   saddr->sin_addr.s_addr = INADDR_ANY;
148
149   if (0 > (conn = socket (AF_INET, SOCK_STREAM, 0)))
150     return conn;
151
152   setsockopt(conn, SOL_SOCKET, SO_REUSEADDR, &optval, sizeof(optval));
153
154   if (0 > (rc = bind (conn, (struct sockaddr *) saddr, sizeof(struct sockaddr_in))))
155     return rc;
156
157   if (0 > (rc = listen(conn, 512)))
158     return rc;
159
160   tr_debug("trps_listen: TRP Server listening on port %d", port);
161   return conn;
162 }
163
164 #if 0 /* remove this if I forget to do so */
165 /* returns EACCES if authorization is denied */
166 int trps_auth_cb(gss_name_t clientName, gss_buffer_t displayName, void *data)
167 {
168   TRPS_INSTANCE *trps = (TRPS_INSTANCE *)data;
169   int result=0;
170
171   if (0!=trps->auth_handler(clientName, displayName, trps->cookie)) {
172     tr_debug("trps_auth_cb: client '%.*s' denied authorization.", displayName->length, displayName->value);
173     result=EACCES; /* denied */
174   }
175
176   return result;
177 }
178 #endif 
179
180 /* get the currently selected route if available */
181 TRP_RENTRY *trps_get_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer)
182 {
183   return trp_rtable_get_entry(trps->rtable, comm, realm, peer);
184 }
185
186 TRP_RENTRY *trps_get_selected_route(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm)
187 {
188   return trp_rtable_get_selected_entry(trps->rtable, comm, realm);
189 }
190
191 /* copy the result if you want to keep it */
192 TR_NAME *trps_get_next_hop(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm)
193 {
194   TRP_RENTRY *route=trps_get_selected_route(trps, comm, realm);
195   if (route==NULL)
196     return NULL;
197
198   return trp_rentry_get_next_hop(route);
199 }
200
201
202 /* mark a route as retracted */
203 static void trps_retract_route(TRPS_INSTANCE *trps, TRP_RENTRY *entry)
204 {
205   trp_rentry_set_metric(entry, TRP_METRIC_INFINITY);
206 }
207
208 /* is this route retracted? */
209 static int trps_route_retracted(TRPS_INSTANCE *trps, TRP_RENTRY *entry)
210 {
211   return (trp_rentry_get_metric(entry)==TRP_METRIC_INFINITY);
212 }
213
214 static TRP_RC trps_read_message(TRPS_INSTANCE *trps, TRP_CONNECTION *conn, TR_MSG **msg)
215 {
216   int err=0;
217   char *buf=NULL;
218   size_t buflen = 0;
219   TR_NAME *peer=NULL;
220
221   tr_debug("trps_read_message: started");
222   if (err = gsscon_read_encrypted_token(trp_connection_get_fd(conn),
223                                        *(trp_connection_get_gssctx(conn)), 
224                                        &buf,
225                                        &buflen)) {
226     tr_debug("trps_read_message: error");
227     if (buf)
228       free(buf);
229     return TRP_ERROR;
230   }
231
232   tr_debug("trps_read_message(): Request Received, %u bytes.", (unsigned) buflen);
233   tr_debug("trps_read_message(): %.*s", buflen, buf);
234
235   *msg=tr_msg_decode(buf, buflen);
236   free(buf);
237   if (*msg==NULL)
238     return TRP_NOPARSE;
239
240   peer=trp_connection_get_peer(conn);
241   /* verify we received a message we support, otherwise drop it now */
242   switch (tr_msg_get_msg_type(*msg)) {
243   case TRP_UPDATE:
244     trp_upd_set_peer(tr_msg_get_trp_upd(*msg), tr_dup_name(peer));
245     break;
246
247   case TRP_REQUEST:
248     trp_req_set_peer(tr_msg_get_trp_req(*msg), tr_dup_name(peer));
249     break;
250
251   default:
252     tr_debug("trps_read_message: received unsupported message from %.*s", peer->len, peer->buf);
253     tr_msg_free_decoded(*msg);
254     *msg=NULL;
255     return TRP_UNSUPPORTED;
256   }
257   
258   return TRP_SUCCESS;
259 }
260
261 int trps_get_listener(TRPS_INSTANCE *trps,
262                       TRPS_MSG_FUNC msg_handler,
263                       TRP_AUTH_FUNC auth_handler,
264                       const char *hostname,
265                       unsigned int port,
266                       void *cookie)
267 {
268   int listen = -1;
269
270   if (0 > (listen = trps_listen(trps, port))) {
271     char errbuf[256];
272     if (0 == strerror_r(errno, errbuf, 256)) {
273       tr_debug("trps_get_listener: Error opening port %d: %s.", port, errbuf);
274     } else {
275       tr_debug("trps_get_listener: Unknown error openining port %d.", port);
276     }
277   } 
278
279   if (listen > 0) {
280     /* opening port succeeded */
281     tr_debug("trps_get_listener: Opened port %d.", port);
282     
283     /* make this socket non-blocking */
284     if (0 != fcntl(listen, F_SETFL, O_NONBLOCK)) {
285       tr_debug("trps_get_listener: Error setting O_NONBLOCK.");
286       close(listen);
287       listen=-1;
288     }
289   }
290
291   if (listen > 0) {
292     /* store the caller's request handler & cookie */
293     trps->msg_handler = msg_handler;
294     trps->auth_handler = auth_handler;
295     trps->hostname = talloc_strdup(trps, hostname);
296     trps->port = port;
297     trps->cookie = cookie;
298   }
299
300   return listen;
301 }
302
303 void trps_handle_connection(TRPS_INSTANCE *trps, TRP_CONNECTION *conn)
304 {
305   TALLOC_CTX *tmp_ctx=talloc_new(NULL);
306   TR_MSG *msg=NULL;
307   TRP_RC rc=TRP_ERROR;
308
309   /* try to establish a GSS context */
310   if (0!=trp_connection_auth(conn, trps->auth_handler, trps->cookie)) {
311     tr_notice("tr_trps_conn_thread: failed to authorize connection");
312     pthread_exit(NULL);
313   }
314   tr_notice("trps_handle_connection: authorized connection");
315   
316   /* loop as long as the connection exists */
317   while (trp_connection_get_status(conn)==TRP_CONNECTION_UP) {
318     rc=trps_read_message(trps, conn, &msg);
319     switch(rc) {
320     case TRP_SUCCESS:
321       trps->msg_handler(trps, conn, msg); /* send the TR_MSG off to the callback */
322       break;
323
324     case TRP_ERROR:
325       trp_connection_close(conn);
326       break;
327
328     default:
329       tr_debug("trps_handle_connection: trps_read_message failed (%d)", rc);
330     }
331   }
332
333   tr_debug("trps_handle_connection: connection closed.");
334   talloc_free(tmp_ctx);
335 }
336
337 static TRP_RC trps_validate_update(TRPS_INSTANCE *trps, TRP_UPD *upd)
338 {
339   if (trp_upd_get_inforec(upd)==NULL) {
340     tr_notice("trps_validate_update: received TRP update with no info records.");
341     return TRP_ERROR;
342   }
343
344   if (trp_upd_get_peer(upd)==NULL) {
345     tr_notice("trps_validate_update: received TRP update without origin peer information.");
346     return TRP_ERROR;
347   }
348   
349   return TRP_SUCCESS;
350 }
351
352 /* ensure that the update could be accepted if feasible */
353 static TRP_RC trps_validate_inforec(TRPS_INSTANCE *trps, TRP_INFOREC *rec)
354 {
355   switch(trp_inforec_get_type(rec)) {
356   case TRP_INFOREC_TYPE_ROUTE:
357     if ((trp_inforec_get_comm(rec)==NULL)
358        || (trp_inforec_get_realm(rec)==NULL)
359        || (trp_inforec_get_trust_router(rec)==NULL)
360        || (trp_inforec_get_next_hop(rec)==NULL)) {
361       tr_debug("trps_validate_inforec: missing record info.");
362       return TRP_ERROR;
363     }
364
365     /* check for valid metric */
366     if ((trp_inforec_get_metric(rec)==TRP_METRIC_INVALID)
367        || (trp_inforec_get_metric(rec)>TRP_METRIC_INFINITY)) {
368       tr_debug("trps_validate_inforec: invalid metric.");
369       return TRP_ERROR;
370     }
371
372     /* check for valid interval */
373     if (trp_inforec_get_interval(rec)==TRP_INTERVAL_INVALID) {
374       tr_debug("trps_validate_inforec: invalid interval.");
375       return TRP_ERROR;
376     }
377     break;
378
379   default:
380     tr_notice("trps_validate_inforec: unsupported record type.");
381     return TRP_UNSUPPORTED;
382   }
383
384   return TRP_SUCCESS;
385 }
386
387 /* link cost to a peer */
388 static unsigned int trps_cost(TRPS_INSTANCE *trps, TR_NAME *peer)
389 {
390   return 1;
391 }
392
393 static unsigned int trps_advertised_metric(TRPS_INSTANCE *trps, TR_NAME *comm, TR_NAME *realm, TR_NAME *peer)
394 {
395   TRP_RENTRY *entry=trp_rtable_get_entry(trps->rtable, comm, realm, peer);
396   if (entry==NULL)
397     return TRP_METRIC_INFINITY;
398   return trp_rentry_get_metric(entry) + trps_cost(trps, peer);
399 }
400
401 static int trps_check_feasibility(TRPS_INSTANCE *trps, TRP_INFOREC *rec)
402 {
403   unsigned int rec_metric=trp_inforec_get_metric(rec);
404   unsigned int new_metric=0;
405   unsigned int current_metric=0;
406   TR_NAME *next_hop=NULL;
407
408   /* we check these in the validation stage, but just in case... */
409   if ((rec_metric==TRP_METRIC_INVALID) || (rec_metric>TRP_METRIC_INFINITY))
410     return 0;
411
412   /* retractions (aka infinite metrics) are always feasible */
413   if (rec_metric==TRP_METRIC_INFINITY)
414     return 1;
415
416   /* updates from our current next hop are always feasible*/
417   next_hop=trps_get_next_hop(trps,
418                              trp_inforec_get_comm(rec),
419                              trp_inforec_get_realm(rec));;
420   if ((next_hop!=NULL)
421      && (0==tr_name_cmp(next_hop,trp_inforec_get_next_hop(rec)))) {
422     return 1;
423   }
424     
425
426   /* compare the existing metric we advertise to what we would advertise
427    * if we accept this update */
428   current_metric=trps_advertised_metric(trps,
429                                         trp_inforec_get_comm(rec),
430                                         trp_inforec_get_realm(rec),
431                                         trp_inforec_get_next_hop(rec));
432   new_metric=rec_metric + trps_cost(trps, trp_inforec_get_next_hop(rec));
433   if (new_metric <= current_metric)
434     return 1;
435   else
436     return 0;
437 }
438
439 /* uses memory pointed to by *ts, also returns that value. On error, its contents are {0,0} */
440 static struct timespec *trps_compute_expiry(TRPS_INSTANCE *trps, unsigned int interval, struct timespec *ts)
441 {
442   const unsigned int small_factor=3; /* how many intervals we wait before expiring */
443   if (0!=clock_gettime(CLOCK_REALTIME, ts)) {
444     tr_err("trps_compute_expiry: could not read realtime clock.");
445     ts->tv_sec=0;
446     ts->tv_nsec=0;
447   }
448   ts->tv_sec += small_factor*interval;
449   return ts;
450 }
451
452 static TRP_RC trps_accept_update(TRPS_INSTANCE *trps, TRP_INFOREC *rec)
453 {
454   TRP_RENTRY *entry=NULL;
455
456   entry=trp_rtable_get_entry(trps->rtable,
457                              trp_inforec_get_comm(rec),
458                              trp_inforec_get_realm(rec),
459                              trp_inforec_get_next_hop(rec));
460   if (entry==NULL) {
461     entry=trp_rentry_new(NULL);
462     if (entry==NULL) {
463       tr_err("trps_accept_update: unable to allocate new entry.");
464       return TRP_NOMEM;
465     }
466
467     trp_rentry_set_apc(entry, tr_dup_name(trp_inforec_get_comm(rec)));
468     trp_rentry_set_realm(entry, tr_dup_name(trp_inforec_get_realm(rec)));
469     trp_rentry_set_peer(entry, tr_dup_name(trp_inforec_get_next_hop(rec)));
470     trp_rentry_set_trust_router(entry, tr_dup_name(trp_inforec_get_trust_router(rec)));
471     trp_rentry_set_next_hop(entry, tr_dup_name(trp_inforec_get_next_hop(rec)));
472     if ((trp_rentry_get_apc(entry)==NULL)
473        ||(trp_rentry_get_realm(entry)==NULL)
474        ||(trp_rentry_get_peer(entry)==NULL)
475        ||(trp_rentry_get_trust_router(entry)==NULL)
476        ||(trp_rentry_get_next_hop(entry)==NULL)) {
477       /* at least one field could not be allocated */
478       tr_err("trps_accept_update: unable to allocate all fields for entry.");
479       trp_rentry_free(entry);
480       return TRP_NOMEM;
481     }
482     trp_rtable_add(trps->rtable, entry);
483   }
484
485   /* We now have an entry in the table, whether it's new or not. Update metric and expiry, unless
486    * the metric is infinity. An infinite metric can only occur here if we just retracted an existing
487    * route (we never accept retractions as new routes), so there is no risk of leaving the expiry
488    * time unset on a new route entry. */
489   tr_debug("trps_accept_update: accepting route update.");
490   trp_rentry_set_metric(entry, trp_inforec_get_metric(rec));
491   if (!trps_route_retracted(trps, entry)) {
492     tr_debug("trps_accept_update: route not retracted, setting expiry timer.");
493     trp_rentry_set_expiry(entry, trps_compute_expiry(trps,
494                                                      trp_inforec_get_interval(rec),
495                                                      trp_rentry_get_expiry(entry)));
496   }
497   return TRP_SUCCESS;
498 }
499
500 /* TODO: handle community updates */
501 static TRP_RC trps_handle_update(TRPS_INSTANCE *trps, TRP_UPD *upd)
502 {
503   unsigned int feas=0;
504   TRP_INFOREC *rec=NULL;
505   TRP_RENTRY *route=NULL;
506
507   if (trps_validate_update(trps, upd) != TRP_SUCCESS) {
508     tr_notice("trps_handle_update: received invalid TRP update.");
509     return TRP_ERROR;
510   }
511
512   rec=trp_upd_get_inforec(upd);
513   for (;rec!=NULL; rec=trp_inforec_get_next(rec)) {
514     /* validate/sanity check the record update */
515     if (trps_validate_inforec(trps, rec) != TRP_SUCCESS) {
516       tr_notice("trps_handle_update: invalid record in TRP update.");
517       continue;
518     }
519
520     /* determine feasibility */
521     feas=trps_check_feasibility(trps, rec);
522     tr_debug("trps_handle_update: record feasibility=%d", feas);
523
524     /* do we have an existing route? */
525     route=trps_get_route(trps, trp_inforec_get_comm(rec), trp_inforec_get_realm(rec), trp_inforec_get_next_hop(rec));
526     if (route!=NULL) {
527       /* there was a route table entry already */
528       tr_debug("trps_handle_updates: route entry already exists.");
529       if (feas) {
530         /* Update is feasible. Accept it. */
531         trps_accept_update(trps, rec);
532       } else {
533         /* Update is infeasible. Ignore it unless the trust router has changed. */
534         if (0!=tr_name_cmp(trp_rentry_get_trust_router(route),
535                            trp_inforec_get_trust_router(rec))) {
536           /* the trust router associated with the route has changed, treat update as a retraction */
537           trps_retract_route(trps, route);
538         }
539       }
540     } else {
541       /* No existing route table entry. Ignore it unless it is feasible and not a retraction. */
542       tr_debug("trps_handle_update: no route entry exists yet.");
543       if (feas && (trp_inforec_get_metric(rec) != TRP_METRIC_INFINITY))
544         trps_accept_update(trps, rec);
545     }
546   }
547   return TRP_SUCCESS;
548 }
549
550 TRP_RC trps_handle_tr_msg(TRPS_INSTANCE *trps, TR_MSG *tr_msg)
551 {
552   switch (tr_msg_get_msg_type(tr_msg)) {
553   case TRP_UPDATE:
554     return trps_handle_update(trps, tr_msg_get_trp_upd(tr_msg));
555
556   case TRP_REQUEST:
557     return TRP_UNSUPPORTED;
558
559   default:
560     /* unknown error or one we don't care about (e.g., TID messages) */
561     return TRP_ERROR;
562   }
563 }