Rewrite and fix heartbeat code.

The old logic had a number of problems, including:

 * sending heartbeats on all service types if any type needed
   a heartbeat
 * sending a heartbeat multiple times a second if there was a
   service type that was idle but didn't generate traffic when
   an empty message was sent (e.g. FATSV)

Rewrite it all so that heartbeats are explicitly tracked and handled
per service type, rather than by sending a dummy message.

Also switch to mode A/C messages for the beast/raw heartbeat, as
it's a bit more compact and less likely to mess with Mode S state
(an all-zeros Mode S message actually looks valid)
This commit is contained in:
Oliver Jowett 2015-06-29 10:06:13 +01:00
parent 803760ca80
commit b793f83a29
2 changed files with 76 additions and 44 deletions

111
net_io.c
View file

@ -71,6 +71,10 @@ static int decodeBinMessage(struct client *c, char *p);
static int decodeHexMessage(struct client *c, char *hex); static int decodeHexMessage(struct client *c, char *hex);
static int handleHTTPRequest(struct client *c, char *p); static int handleHTTPRequest(struct client *c, char *p);
static void send_raw_heartbeat(struct net_service *service);
static void send_beast_heartbeat(struct net_service *service);
static void send_sbs_heartbeat(struct net_service *service);
// //
//========================================================================= //=========================================================================
// //
@ -79,7 +83,7 @@ static int handleHTTPRequest(struct client *c, char *p);
// Init a service with the given read/write characteristics, return the new service. // Init a service with the given read/write characteristics, return the new service.
// Doesn't arrange for the service to listen or connect // Doesn't arrange for the service to listen or connect
struct net_service *serviceInit(const char *descr, struct net_writer *writer, const char *sep, read_handler handler) struct net_service *serviceInit(const char *descr, struct net_writer *writer, heartbeat_fn hb, const char *sep, read_fn handler)
{ {
struct net_service *service; struct net_service *service;
@ -107,6 +111,7 @@ struct net_service *serviceInit(const char *descr, struct net_writer *writer, co
service->writer->service = service; service->writer->service = service;
service->writer->dataUsed = 0; service->writer->dataUsed = 0;
service->writer->lastWrite = mstime(); service->writer->lastWrite = mstime();
service->writer->send_heartbeat = hb;
} }
return service; return service;
@ -180,12 +185,12 @@ void serviceListen(struct net_service *service, char *bind_addr, int bind_port)
struct net_service *makeBeastInputService(void) struct net_service *makeBeastInputService(void)
{ {
return serviceInit("Beast TCP input", NULL, NULL, decodeBinMessage); return serviceInit("Beast TCP input", NULL, NULL, NULL, decodeBinMessage);
} }
struct net_service *makeFatsvOutputService(void) struct net_service *makeFatsvOutputService(void)
{ {
return serviceInit("FATSV TCP output", &Modes.fatsv_out, NULL, NULL); return serviceInit("FATSV TCP output", &Modes.fatsv_out, NULL, NULL, NULL);
} }
void modesInitNet(void) { void modesInitNet(void) {
@ -198,17 +203,17 @@ void modesInitNet(void) {
// set up listeners // set up listeners
if (Modes.net_output_raw_port) { if (Modes.net_output_raw_port) {
s = serviceInit("Raw TCP output", &Modes.raw_out, NULL, NULL); s = serviceInit("Raw TCP output", &Modes.raw_out, send_raw_heartbeat, NULL, NULL);
serviceListen(s, Modes.net_bind_address, Modes.net_output_raw_port); serviceListen(s, Modes.net_bind_address, Modes.net_output_raw_port);
} }
if (Modes.net_output_beast_port) { if (Modes.net_output_beast_port) {
s = serviceInit("Beast TCP output", &Modes.beast_out, NULL, NULL); s = serviceInit("Beast TCP output", &Modes.beast_out, send_beast_heartbeat, NULL, NULL);
serviceListen(s, Modes.net_bind_address, Modes.net_output_beast_port); serviceListen(s, Modes.net_bind_address, Modes.net_output_beast_port);
} }
if (Modes.net_output_sbs_port) { if (Modes.net_output_sbs_port) {
s = serviceInit("Basestation TCP output", &Modes.sbs_out, NULL, NULL); s = serviceInit("Basestation TCP output", &Modes.sbs_out, send_sbs_heartbeat, NULL, NULL);
serviceListen(s, Modes.net_bind_address, Modes.net_output_sbs_port); serviceListen(s, Modes.net_bind_address, Modes.net_output_sbs_port);
} }
@ -218,7 +223,7 @@ void modesInitNet(void) {
} }
if (Modes.net_input_raw_port) { if (Modes.net_input_raw_port) {
s = serviceInit("Raw TCP input", NULL, "\n", decodeHexMessage); s = serviceInit("Raw TCP input", NULL, NULL, "\n", decodeHexMessage);
serviceListen(s, Modes.net_bind_address, Modes.net_input_raw_port); serviceListen(s, Modes.net_bind_address, Modes.net_input_raw_port);
} }
@ -228,7 +233,7 @@ void modesInitNet(void) {
} }
if (Modes.net_http_port) { if (Modes.net_http_port) {
s = serviceInit("HTTP server", NULL, "\r\n\r\n", handleHTTPRequest); s = serviceInit("HTTP server", NULL, NULL, "\r\n\r\n", handleHTTPRequest);
serviceListen(s, Modes.net_bind_address, Modes.net_http_port); serviceListen(s, Modes.net_bind_address, Modes.net_http_port);
} }
} }
@ -383,6 +388,22 @@ static void modesSendBeastOutput(struct modesMessage *mm) {
completeWrite(&Modes.beast_out, p); completeWrite(&Modes.beast_out, p);
} }
static void send_beast_heartbeat(struct net_service *service)
{
static char heartbeat_message[] = { 0x1a, '1', 0, 0, 0, 0, 0, 0, 0, 0, 0 };
char *data;
if (!service->writer)
return;
data = prepareWrite(service->writer, sizeof(heartbeat_message));
if (!data)
return;
memcpy(data, heartbeat_message, sizeof(heartbeat_message));
completeWrite(service->writer, data + sizeof(heartbeat_message));
}
// //
//========================================================================= //=========================================================================
// //
@ -415,6 +436,24 @@ static void modesSendRawOutput(struct modesMessage *mm) {
completeWrite(&Modes.raw_out, p); completeWrite(&Modes.raw_out, p);
} }
static void send_raw_heartbeat(struct net_service *service)
{
static char *heartbeat_message = "*0000;\n";
char *data;
int len = strlen(heartbeat_message);
if (!service->writer)
return;
data = prepareWrite(service->writer, len);
if (!data)
return;
memcpy(data, heartbeat_message, len);
completeWrite(service->writer, data + len);
}
// //
//========================================================================= //=========================================================================
// //
@ -440,13 +479,6 @@ static void modesSendSBSOutput(struct modesMessage *mm) {
// http://www.homepages.mcb.net/bones/SBS/Article/Barebones42_Socket_Data.htm - seems comprehensive // http://www.homepages.mcb.net/bones/SBS/Article/Barebones42_Socket_Data.htm - seems comprehensive
// //
if (mm->msgtype == -1) {
// heartbeat
p += sprintf(p, "\r\n");
completeWrite(&Modes.sbs_out, p);
return;
}
// Decide on the basic SBS Message Type // Decide on the basic SBS Message Type
if ((mm->msgtype == 4) || (mm->msgtype == 20)) { if ((mm->msgtype == 4) || (mm->msgtype == 20)) {
msgType = 5; msgType = 5;
@ -583,6 +615,24 @@ static void modesSendSBSOutput(struct modesMessage *mm) {
completeWrite(&Modes.sbs_out, p); completeWrite(&Modes.sbs_out, p);
} }
static void send_sbs_heartbeat(struct net_service *service)
{
static char *heartbeat_message = "\r\n"; // is there a better one?
char *data;
int len = strlen(heartbeat_message);
if (!service->writer)
return;
data = prepareWrite(service->writer, len);
if (!data)
return;
memcpy(data, heartbeat_message, len);
completeWrite(service->writer, data + len);
}
// //
//========================================================================= //=========================================================================
// //
@ -1673,7 +1723,7 @@ void modesNetPeriodicWork(void) {
struct client *c, **prev; struct client *c, **prev;
struct net_service *s; struct net_service *s;
uint64_t now = mstime(); uint64_t now = mstime();
int need_heartbeat = 0, need_flush = 0; int need_flush = 0;
// Accept new connetions // Accept new connetions
modesAcceptClients(); modesAcceptClients();
@ -1689,40 +1739,19 @@ void modesNetPeriodicWork(void) {
// Generate FATSV output // Generate FATSV output
writeFATSV(); writeFATSV();
// If we have generated no messages for a while, generate // If we have generated no messages for a while, send
// a dummy heartbeat message. // a heartbeat
if (Modes.net_heartbeat_interval) { if (Modes.net_heartbeat_interval) {
for (s = Modes.services; s; s = s->next) { for (s = Modes.services; s; s = s->next) {
if (s->writer && if (s->writer &&
s->connections && s->connections &&
s->writer->send_heartbeat &&
(s->writer->lastWrite + Modes.net_heartbeat_interval) <= now) { (s->writer->lastWrite + Modes.net_heartbeat_interval) <= now) {
need_flush = 1; s->writer->send_heartbeat(s);
if (s->writer->dataUsed == 0) {
need_heartbeat = 1;
break;
}
} }
} }
} }
if (need_heartbeat) {
//
// We haven't sent any traffic for some time. To try and keep any TCP
// links alive, send a null frame. This will help stop any routers discarding our TCP
// link which will cause an un-recoverable link error if/when a real frame arrives.
//
// Fudge up a null message
struct modesMessage mm;
memset(&mm, 0, sizeof(mm));
mm.msgbits = MODES_SHORT_MSG_BITS;
mm.timestampMsg = 0;
mm.msgtype = -1;
// Feed output clients
modesQueueOutput(&mm);
}
// If we have data that has been waiting to be written for a while, // If we have data that has been waiting to be written for a while,
// write it now. // write it now.
for (s = Modes.services; s; s = s->next) { for (s = Modes.services; s; s = s->next) {

View file

@ -24,7 +24,9 @@
struct modesMessage; struct modesMessage;
struct client; struct client;
typedef int (*read_handler)(struct client *, char *); struct net_service;
typedef int (*read_fn)(struct client *, char *);
typedef void (*heartbeat_fn)(struct net_service *);
// Describes one network service (a group of clients with common behaviour) // Describes one network service (a group of clients with common behaviour)
struct net_service { struct net_service {
@ -37,7 +39,7 @@ struct net_service {
struct net_writer *writer; // shared writer state struct net_writer *writer; // shared writer state
const char *read_sep; // hander details for input data const char *read_sep; // hander details for input data
read_handler read_handler; read_fn read_handler;
}; };
// Structure used to describe a networking client // Structure used to describe a networking client
@ -55,9 +57,10 @@ struct net_writer {
void *data; // shared write buffer, sized MODES_OUT_BUF_SIZE void *data; // shared write buffer, sized MODES_OUT_BUF_SIZE
int dataUsed; // number of bytes of write buffer currently used int dataUsed; // number of bytes of write buffer currently used
uint64_t lastWrite; // time of last write to clients uint64_t lastWrite; // time of last write to clients
heartbeat_fn send_heartbeat; // function that queues a heartbeat if needed
}; };
struct net_service *serviceInit(const char *descr, struct net_writer *writer, const char *sep, read_handler handler); struct net_service *serviceInit(const char *descr, struct net_writer *writer, heartbeat_fn hb_handler, const char *sep, read_fn read_handler);
struct client *serviceConnect(struct net_service *service, char *addr, int port); struct client *serviceConnect(struct net_service *service, char *addr, int port);
void serviceListen(struct net_service *service, char *bind_addr, int bind_port); void serviceListen(struct net_service *service, char *bind_addr, int bind_port);
struct client *createSocketClient(struct net_service *service, int fd); struct client *createSocketClient(struct net_service *service, int fd);