Factor out net services so they're not tied to a static array.

This lets different things dynamically create the services they need,
and sorts out the horrible hacks that view1090 used to make outgoing
connections. Now you can explicitly create a service and tell it to make
an outgoing connection.

This means that view1090 can now just set all the ports to zero (to disable
the listeners), do a normal net init, then explicitly construct the beast
input service without a listener and tell it to make a connection as needed.
This commit is contained in:
Oliver Jowett 2015-06-26 17:50:51 +01:00
parent 5fa039a2d4
commit 278448179d
5 changed files with 360 additions and 379 deletions

460
net_io.c
View file

@ -66,80 +66,160 @@
// function gets called and we accept new connections. All the rest is
// handled via non-blocking I/O and manually polling clients to see if
// they have something new to share with us when reading is needed.
static int decodeBinMessage(struct client *c, char *p);
static int decodeHexMessage(struct client *c, char *hex);
static int handleHTTPRequest(struct client *c, char *p);
//
//=========================================================================
//
// Networking "stack" initialization
//
struct service {
char *descr;
int *socket;
struct net_writer *writer;
int port;
int enabled;
};
struct service services[MODES_NET_SERVICES_NUM];
// Init a service with the given read/write characteristics, return the new service.
// Doesn't arrange for the service to listen or connect
struct net_service *serviceInit(const char *descr, struct net_writer *writer, const char *sep, read_handler handler)
{
struct net_service *service;
void modesInitNet(void) {
int j;
struct service svc[MODES_NET_SERVICES_NUM] = {
{"Raw TCP output", &Modes.raw_out.socket, &Modes.raw_out, Modes.net_output_raw_port, 1},
{"Raw TCP input", &Modes.ris, NULL, Modes.net_input_raw_port, 1},
{"Beast TCP output", &Modes.beast_out.socket, &Modes.beast_out, Modes.net_output_beast_port, 1},
{"Beast TCP input", &Modes.bis, NULL, Modes.net_input_beast_port, 1},
{"HTTP server", &Modes.https, NULL, Modes.net_http_port, 1},
{"Basestation TCP output", &Modes.sbs_out.socket, &Modes.sbs_out, Modes.net_output_sbs_port, 1},
{"FlightAware TSV output", &Modes.fatsv_out.socket, &Modes.fatsv_out, Modes.net_fatsv_port, 1}
};
memcpy(&services, &svc, sizeof(svc));//services = svc;
Modes.clients = NULL;
#ifdef _WIN32
if ( (!Modes.wsaData.wVersion)
&& (!Modes.wsaData.wHighVersion) ) {
// Try to start the windows socket support
if (WSAStartup(MAKEWORD(2,1),&Modes.wsaData) != 0)
{
fprintf(stderr, "WSAStartup returned Error\n");
}
}
#endif
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
services[j].enabled = (services[j].port != 0);
if (services[j].enabled) {
int s = anetTcpServer(Modes.aneterr, services[j].port, Modes.net_bind_address);
if (s == -1) {
fprintf(stderr, "Error opening the listening port %d (%s): %s\n",
services[j].port, services[j].descr, Modes.aneterr);
exit(1);
}
anetNonBlock(Modes.aneterr, s);
*services[j].socket = s;
if (services[j].writer) {
if (! (services[j].writer->data = malloc(MODES_OUT_BUF_SIZE)) ) {
fprintf(stderr, "Out of memory allocating output buffer for service %s\n", services[j].descr);
exit(1);
}
services[j].writer->socket = s;
services[j].writer->connections = 0;
services[j].writer->dataUsed = 0;
services[j].writer->lastWrite = mstime();
}
} else {
if (Modes.debug & MODES_DEBUG_NET) printf("%s port is disabled\n", services[j].descr);
}
if (!(service = calloc(sizeof(*service), 1))) {
fprintf(stderr, "Out of memory allocating service %s\n", descr);
exit(1);
}
#ifndef _WIN32
service->next = Modes.services;
Modes.services = service;
service->descr = descr;
service->listen_fd = -1;
service->connections = 0;
service->writer = writer;
service->read_sep = sep;
service->read_handler = handler;
if (service->writer) {
if (! (service->writer->data = malloc(MODES_OUT_BUF_SIZE)) ) {
fprintf(stderr, "Out of memory allocating output buffer for service %s\n", descr);
exit(1);
}
service->writer->service = service;
service->writer->dataUsed = 0;
service->writer->lastWrite = mstime();
}
return service;
}
// Create a client attached to the given service using the provided socket FD
static struct client *createClient(struct net_service *service, int fd)
{
struct client *c;
anetNonBlock(Modes.aneterr, fd);
if (!(c = (struct client *) malloc(sizeof(*c)))) {
fprintf(stderr, "Out of memory allocating a new %s network client\n", service->descr);
exit(1);
}
c->service = service;
c->next = Modes.clients;
c->fd = fd;
c->buflen = 0;
Modes.clients = c;
anetSetSendBuffer(Modes.aneterr,fd, (MODES_NET_SNDBUF_SIZE << Modes.net_sndbuf_size));
++service->connections;
if (service->writer && service->connections == 1) {
service->writer->lastWrite = mstime(); // suppress heartbeat initially
}
return c;
}
// Initiate an outgoing connection which will use the given service.
// Return the new client or NULL if the connection failed
struct client *serviceConnect(struct net_service *service, char *addr, int port)
{
int s = anetTcpConnect(Modes.aneterr, addr, port);
if (s == ANET_ERR)
return NULL;
return createClient(service, s);
}
// Set up the given service to listen on an address/port.
// _exits_ on failure!
void serviceListen(struct net_service *service, char *bind_addr, int bind_port)
{
int s;
if (service->listen_fd >= 0) {
fprintf(stderr, "Tried to set up the service %s twice!\n", service->descr);
exit(1);
}
s = anetTcpServer(Modes.aneterr, bind_port, bind_addr);
if (s == ANET_ERR) {
fprintf(stderr, "Error opening the listening port %d (%s): %s\n",
bind_port, service->descr, Modes.aneterr);
exit(1);
}
anetNonBlock(Modes.aneterr, s);
service->listen_fd = s;
}
struct net_service *makeBeastInputService(void)
{
return serviceInit("Beast TCP input", NULL, NULL, decodeBinMessage);
}
void modesInitNet(void) {
struct net_service *s;
signal(SIGPIPE, SIG_IGN);
#endif
Modes.clients = NULL;
Modes.services = NULL;
// set up listeners
if (Modes.net_output_raw_port) {
s = serviceInit("Raw TCP output", &Modes.raw_out, NULL, NULL);
serviceListen(s, Modes.net_bind_address, Modes.net_output_raw_port);
}
if (Modes.net_output_beast_port) {
s = serviceInit("Beast TCP output", &Modes.beast_out, NULL, NULL);
serviceListen(s, Modes.net_bind_address, Modes.net_output_beast_port);
}
if (Modes.net_output_sbs_port) {
s = serviceInit("Basestation TCP output", &Modes.sbs_out, NULL, NULL);
serviceListen(s, Modes.net_bind_address, Modes.net_output_sbs_port);
}
if (Modes.net_fatsv_port) {
s = serviceInit("FATSV TCP output", &Modes.fatsv_out, NULL, NULL);
serviceListen(s, Modes.net_bind_address, Modes.net_fatsv_port);
}
if (Modes.net_input_raw_port) {
s = serviceInit("Raw TCP input", NULL, "\n", decodeHexMessage);
serviceListen(s, Modes.net_bind_address, Modes.net_input_raw_port);
}
if (Modes.net_input_beast_port) {
s = makeBeastInputService();
serviceListen(s, Modes.net_bind_address, Modes.net_input_beast_port);
}
if (Modes.net_http_port) {
s = serviceInit("HTTP server", NULL, "\r\n\r\n", handleHTTPRequest);
serviceListen(s, Modes.net_bind_address, Modes.net_http_port);
}
}
//
//=========================================================================
@ -147,37 +227,18 @@ void modesInitNet(void) {
// This function gets called from time to time when the decoding thread is
// awakened by new data arriving. This usually happens a few times every second
//
struct client * modesAcceptClients(void) {
static struct client * modesAcceptClients(void) {
int fd, port;
unsigned int j;
struct client *c;
struct net_service *s;
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
if (services[j].enabled) {
fd = anetTcpAccept(Modes.aneterr, *services[j].socket, NULL, &port);
if (fd == -1) continue;
anetNonBlock(Modes.aneterr, fd);
c = (struct client *) malloc(sizeof(*c));
c->service = *services[j].socket;
c->next = Modes.clients;
c->fd = fd;
c->buflen = 0;
Modes.clients = c;
anetSetSendBuffer(Modes.aneterr,fd, (MODES_NET_SNDBUF_SIZE << Modes.net_sndbuf_size));
if (services[j].writer) {
if (++ services[j].writer->connections == 1) {
services[j].writer->lastWrite = mstime(); // suppress heartbeat initially
}
}
j--; // Try again with the same listening port
if (Modes.debug & MODES_DEBUG_NET)
printf("Created new client %d\n", fd);
}
for (s = Modes.services; s; s = s->next) {
if (s->listen_fd >= 0) {
while ((fd = anetTcpAccept(Modes.aneterr, s->listen_fd, NULL, &port)) >= 0) {
createClient(s, fd);
}
}
}
return Modes.clients;
}
//
@ -185,8 +246,11 @@ struct client * modesAcceptClients(void) {
//
// On error free the client, collect the structure, adjust maxfd if needed.
//
void modesCloseClient(struct client *c) {
int j;
static void modesCloseClient(struct client *c) {
if (!c->service) {
fprintf(stderr, "warning: double close of net client\n");
return;
}
// Clean up, but defer removing from the list until modesNetCleanup().
// This is because there may be stackframes still pointing at this
@ -194,21 +258,11 @@ void modesCloseClient(struct client *c) {
// be freed)
close(c->fd);
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
if (c->service == *services[j].socket) {
if (services[j].writer)
services[j].writer->connections--;
break;
}
}
if (Modes.debug & MODES_DEBUG_NET)
printf("Closing client %d\n", c->fd);
c->service->connections--;
// mark it as inactive and ready to be freed
c->fd = -1;
c->service = -1;
c->service = NULL;
}
//
//=========================================================================
@ -219,7 +273,7 @@ static void flushWrites(struct net_writer *writer) {
struct client *c;
for (c = Modes.clients; c; c = c->next) {
if (c->service == writer->socket) {
if (c->service == writer->service) {
#ifndef _WIN32
int nwritten = write(c->fd, writer->data, writer->dataUsed);
#else
@ -238,31 +292,32 @@ static void flushWrites(struct net_writer *writer) {
// Prepare to write up to 'len' bytes to the given net_writer.
// Returns a pointer to write to, or NULL to skip this write.
static void *prepareWrite(struct net_writer *writer, int len) {
if (!writer ||
!writer->connections ||
!writer->data)
return NULL;
if (!writer ||
!writer->service ||
!writer->service->connections ||
!writer->data)
return NULL;
if (len > MODES_OUT_BUF_SIZE)
return NULL;
if (len > MODES_OUT_BUF_SIZE)
return NULL;
if (writer->dataUsed + len >= MODES_OUT_BUF_SIZE) {
// Flush now to free some space
flushWrites(writer);
}
if (writer->dataUsed + len >= MODES_OUT_BUF_SIZE) {
// Flush now to free some space
flushWrites(writer);
}
return writer->data + writer->dataUsed;
return writer->data + writer->dataUsed;
}
// Complete a write previously begun by prepareWrite.
// endptr should point one byte past the last byte written
// to the buffer returned from prepareWrite.
static void completeWrite(struct net_writer *writer, void *endptr) {
writer->dataUsed = endptr - writer->data;
writer->dataUsed = endptr - writer->data;
if (writer->dataUsed >= Modes.net_output_flush_size) {
flushWrites(writer);
}
if (writer->dataUsed >= Modes.net_output_flush_size) {
flushWrites(writer);
}
}
//
@ -270,7 +325,7 @@ static void completeWrite(struct net_writer *writer, void *endptr) {
//
// Write raw output in Beast Binary format with Timestamp to TCP clients
//
void modesSendBeastOutput(struct modesMessage *mm) {
static void modesSendBeastOutput(struct modesMessage *mm) {
int msgLen = mm->msgbits / 8;
char *p = prepareWrite(&Modes.beast_out, 2 + 2 * (7 + msgLen));
char ch;
@ -320,7 +375,7 @@ void modesSendBeastOutput(struct modesMessage *mm) {
//
// Write raw output to TCP clients
//
void modesSendRawOutput(struct modesMessage *mm) {
static void modesSendRawOutput(struct modesMessage *mm) {
int msgLen = mm->msgbits / 8;
char *p = prepareWrite(&Modes.raw_out, msgLen*2 + 15);
int j;
@ -353,7 +408,7 @@ void modesSendRawOutput(struct modesMessage *mm) {
// Write SBS output to TCP clients
// The message structure mm->bFlags tells us what has been updated by this message
//
void modesSendSBSOutput(struct modesMessage *mm) {
static void modesSendSBSOutput(struct modesMessage *mm) {
char *p;
struct timespec now;
struct tm stTime_receive, stTime_now;
@ -536,7 +591,7 @@ void modesQueueOutput(struct modesMessage *mm) {
// The function always returns 0 (success) to the caller as there is no
// case where we want broken messages here to close the client connection.
//
int decodeBinMessage(struct client *c, char *p) {
static int decodeBinMessage(struct client *c, char *p) {
int msgLen = 0;
int j;
char ch;
@ -611,7 +666,7 @@ int decodeBinMessage(struct client *c, char *p) {
// Turn an hex digit into its 4 bit decimal value.
// Returns -1 if the digit is not in the 0-F range.
//
int hexDigitVal(int c) {
static int hexDigitVal(int c) {
c = tolower(c);
if (c >= '0' && c <= '9') return c-'0';
else if (c >= 'a' && c <= 'f') return c-'a'+10;
@ -631,7 +686,7 @@ int hexDigitVal(int c) {
// The function always returns 0 (success) to the caller as there is no
// case where we want broken messages here to close the client connection.
//
int decodeHexMessage(struct client *c, char *hex) {
static int decodeHexMessage(struct client *c, char *hex) {
int l = strlen(hex), j;
unsigned char msg[MODES_LONG_MSG_BYTES];
struct modesMessage mm;
@ -1102,7 +1157,7 @@ static struct {
// Returns 1 on error to signal the caller the client connection should
// be closed.
//
int handleHTTPRequest(struct client *c, char *p) {
static int handleHTTPRequest(struct client *c, char *p) {
char hdr[512];
int clen, hdrlen;
int httpver, keepalive;
@ -1277,8 +1332,7 @@ int handleHTTPRequest(struct client *c, char *p) {
// The handler returns 0 on success, or 1 to signal this function we should
// close the connection with the client in case of non-recoverable errors.
//
void modesReadFromClient(struct client *c, char *sep,
int(*handler)(struct client *, char *)) {
static void modesReadFromClient(struct client *c) {
int left;
int nread;
int fullmsg;
@ -1332,7 +1386,7 @@ void modesReadFromClient(struct client *c, char *sep,
e = s = c->buf; // Start with the start of buffer, first message
if (c->service == Modes.bis) {
if (c->service->read_sep == NULL) {
// This is the Beast Binary scanning case.
// If there is a complete message still in the buffer, there must be the separator 'sep'
// in the buffer, note that we full-scan the buffer at every read for simplicity.
@ -1366,7 +1420,7 @@ void modesReadFromClient(struct client *c, char *sep,
break;
}
// Have a 0x1a followed by 1, 2 or 3 - pass message less 0x1a to handler.
if (handler(c, s)) {
if (c->service->read_handler(c, s)) {
modesCloseClient(c);
return;
}
@ -1380,13 +1434,13 @@ void modesReadFromClient(struct client *c, char *sep,
// If there is a complete message still in the buffer, there must be the separator 'sep'
// in the buffer, note that we full-scan the buffer at every read for simplicity.
//
while ((e = strstr(s, sep)) != NULL) { // end of first message if found
while ((e = strstr(s, c->service->read_sep)) != NULL) { // end of first message if found
*e = '\0'; // The handler expects null terminated strings
if (handler(c, s)) { // Pass message to handler.
if (c->service->read_handler(c, s)) { // Pass message to handler.
modesCloseClient(c); // Handler returns 1 on error to signal we .
return; // should close the client connection
}
s = e + strlen(sep); // Move to start of next message
s = e + strlen(c->service->read_sep); // Move to start of next message
fullmsg = 1;
}
}
@ -1402,13 +1456,14 @@ void modesReadFromClient(struct client *c, char *sep,
#define TSV_MAX_PACKET_SIZE 160
static void writeFATSV() {
static void writeFATSV()
{
struct aircraft *a;
uint64_t now;
static uint64_t next_update;
if (!Modes.fatsv_out.connections) {
return; // no active connections
if (!Modes.fatsv_out.service || !Modes.fatsv_out.service->connections) {
return; // not enabled or no active connections
}
now = mstime();
@ -1595,82 +1650,77 @@ static void writeFATSV() {
// Perform periodic network work
//
void modesNetPeriodicWork(void) {
struct client *c, **prev;
uint64_t now = mstime();
int j;
int need_heartbeat = 0, need_flush = 0;
struct client *c, **prev;
struct net_service *s;
uint64_t now = mstime();
int need_heartbeat = 0, need_flush = 0;
// Accept new connetions
modesAcceptClients();
// Accept new connetions
modesAcceptClients();
// Read from clients
for (c = Modes.clients; c; c = c->next) {
if (c->service == Modes.ris) {
modesReadFromClient(c,"\n",decodeHexMessage);
} else if (c->service == Modes.bis) {
modesReadFromClient(c,"",decodeBinMessage);
} else if (c->service == Modes.https) {
modesReadFromClient(c,"\r\n\r\n",handleHTTPRequest);
}
}
// Read from clients
for (c = Modes.clients; c; c = c->next) {
if (c->service->read_handler)
modesReadFromClient(c);
}
// Generate FATSV output
writeFATSV();
// Generate FATSV output
writeFATSV();
// If we have generated no messages for a while, generate
// a dummy heartbeat message.
if (Modes.net_heartbeat_interval) {
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
if (services[j].writer &&
services[j].writer->connections &&
(services[j].writer->lastWrite + Modes.net_heartbeat_interval) <= now) {
need_flush = 1;
if (services[j].writer->dataUsed == 0) {
need_heartbeat = 1;
break;
}
}
}
// If we have generated no messages for a while, generate
// a dummy heartbeat message.
if (Modes.net_heartbeat_interval) {
for (s = Modes.services; s; s = s->next) {
if (s->writer &&
s->connections &&
(s->writer->lastWrite + Modes.net_heartbeat_interval) <= now) {
need_flush = 1;
if (s->writer->dataUsed == 0) {
need_heartbeat = 1;
break;
}
}
}
}
if (need_heartbeat) {
//
// We haven't sent any traffic for some time. To try and keep any TCP
// links alive, send a null frame. This will help stop any routers discarding our TCP
// link which will cause an un-recoverable link error if/when a real frame arrives.
//
// Fudge up a null message
struct modesMessage mm;
if (need_heartbeat) {
//
// We haven't sent any traffic for some time. To try and keep any TCP
// links alive, send a null frame. This will help stop any routers discarding our TCP
// link which will cause an un-recoverable link error if/when a real frame arrives.
//
// Fudge up a null message
struct modesMessage mm;
memset(&mm, 0, sizeof(mm));
mm.msgbits = MODES_SHORT_MSG_BITS;
mm.timestampMsg = 0;
mm.msgtype = -1;
memset(&mm, 0, sizeof(mm));
mm.msgbits = MODES_SHORT_MSG_BITS;
mm.timestampMsg = 0;
mm.msgtype = -1;
// Feed output clients
modesQueueOutput(&mm);
// Feed output clients
modesQueueOutput(&mm);
}
// If we have data that has been waiting to be written for a while,
// write it now.
for (s = Modes.services; s; s = s->next) {
if (s->writer &&
s->writer->dataUsed &&
(need_flush || (s->writer->lastWrite + Modes.net_output_flush_interval) <= now)) {
flushWrites(s->writer);
}
}
// If we have data that has been waiting to be written for a while,
// write it now.
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
if (services[j].writer &&
services[j].writer->dataUsed &&
(need_flush || (services[j].writer->lastWrite + Modes.net_output_flush_interval) <= now)) {
flushWrites(services[j].writer);
}
}
// Unlink and free closed clients
for (prev = &Modes.clients, c = *prev; c; c = *prev) {
if (c->fd == -1) {
// Recently closed, prune from list
*prev = c->next;
free(c);
} else {
prev = &c->next;
}
}
// Unlink and free closed clients
for (prev = &Modes.clients, c = *prev; c; c = *prev) {
if (c->fd == -1) {
// Recently closed, prune from list
*prev = c->next;
free(c);
} else {
prev = &c->next;
}
}
}
//