From 4f449a8d44b41823a463550db90d10e897ce3257 Mon Sep 17 00:00:00 2001 From: Malcolm Robb Date: Thu, 2 Oct 2014 21:08:57 +0100 Subject: [PATCH 1/2] Make Pull # compatible with Pull #42 --- mode_s.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/mode_s.c b/mode_s.c index 4d470bd..03aa58f 100644 --- a/mode_s.c +++ b/mode_s.c @@ -2052,7 +2052,8 @@ int decodeCPR(struct aircraft *a, int fflag, int surface) { } // Check to see that the latitude is in range: -90 .. +90 - if (rlat0 < -90 || rlat0 > 90 || rlat1 < -90 || rlat1 > 90) return; + if (rlat0 < -90 || rlat0 > 90 || rlat1 < -90 || rlat1 > 90) + return (-1); // Check that both are in the same latitude zone, or abort. if (cprNLFunction(rlat0) != cprNLFunction(rlat1)) From 1769ac9006df910811d8cf2ce556169035ff6610 Mon Sep 17 00:00:00 2001 From: Oliver Jowett Date: Fri, 3 Oct 2014 22:55:21 +0100 Subject: [PATCH 2/2] Restructuring of network output side. Mostly refactoring the common code that was duplicated between the different output types so that there aren't many copies floating around. This introduces "struct net_writer" to store the state of a particular type of output service - buffers, time of last write, connection count etc. prepareWrite() / completeWrite() give access to the buffer and handle the actual writes and flushing when needed. Heartbeat and time-based flushing move into a generic periodic-work function. Update the SBS output code to use the new infrastructure. This makes a big different to CPU use when under load. --- dump1090.c | 30 +++--- dump1090.h | 44 ++++----- mode_s.c | 40 -------- net_io.c | 262 ++++++++++++++++++++++++++++++++++++----------------- 4 files changed, 216 insertions(+), 160 deletions(-) diff --git a/dump1090.c b/dump1090.c index 3686352..cc46f3d 100644 --- a/dump1090.c +++ b/dump1090.c @@ -70,7 +70,7 @@ void modesInitConfig(void) { Modes.freq = MODES_DEFAULT_FREQ; Modes.ppm_error = MODES_DEFAULT_PPM; Modes.check_crc = 1; - Modes.net_heartbeat_rate = MODES_NET_HEARTBEAT_RATE; + Modes.net_heartbeat_interval = MODES_NET_HEARTBEAT_INTERVAL; Modes.net_output_sbs_port = MODES_NET_OUTPUT_SBS_PORT; Modes.net_output_raw_port = MODES_NET_OUTPUT_RAW_PORT; Modes.net_input_raw_port = MODES_NET_INPUT_RAW_PORT; @@ -97,9 +97,7 @@ void modesInit(void) { if ( ((Modes.icao_cache = (uint32_t *) malloc(sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2) ) == NULL) || ((Modes.pFileData = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE) ) == NULL) || ((Modes.magnitude = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE) ) == NULL) || - ((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) || - ((Modes.beastOut = (char *) malloc(MODES_RAWOUT_BUF_SIZE) ) == NULL) || - ((Modes.rawOut = (char *) malloc(MODES_RAWOUT_BUF_SIZE) ) == NULL) ) + ((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) ) { fprintf(stderr, "Out of memory allocating data buffer.\n"); exit(1); @@ -130,10 +128,10 @@ void modesInit(void) { } // Limit the maximum requested raw output size to less than one Ethernet Block - if (Modes.net_output_raw_size > (MODES_RAWOUT_BUF_FLUSH)) - {Modes.net_output_raw_size = MODES_RAWOUT_BUF_FLUSH;} - if (Modes.net_output_raw_rate > (MODES_RAWOUT_BUF_RATE)) - {Modes.net_output_raw_rate = MODES_RAWOUT_BUF_RATE;} + if (Modes.net_output_flush_size > (MODES_OUT_FLUSH_SIZE)) + {Modes.net_output_flush_size = MODES_OUT_FLUSH_SIZE;} + if (Modes.net_output_flush_interval > (MODES_OUT_FLUSH_INTERVAL)) + {Modes.net_output_flush_interval = MODES_OUT_FLUSH_INTERVAL;} if (Modes.net_sndbuf_size > (MODES_NET_SNDBUF_MAX)) {Modes.net_sndbuf_size = MODES_NET_SNDBUF_MAX;} @@ -407,7 +405,6 @@ void showHelp(void) { "--raw Show only messages hex values\n" "--net Enable networking\n" "--modeac Enable decoding of SSR Modes 3/A & 3/C\n" -"--net-beast TCP raw output in Beast binary format\n" "--net-only Enable just networking, no RTL device or file used\n" "--net-http-port HTTP server port (default: 8080)\n" "--net-ri-port TCP raw input listen port (default: 30001)\n" @@ -415,8 +412,8 @@ void showHelp(void) { "--net-sbs-port TCP BaseStation output listen port (default: 30003)\n" "--net-bi-port TCP Beast input listen port (default: 30004)\n" "--net-bo-port TCP Beast output listen port (default: 30005)\n" -"--net-ro-size TCP raw output minimum size (default: 0)\n" -"--net-ro-rate TCP raw output memory flush rate (default: 0)\n" +"--net-ro-size TCP output minimum size (default: 0)\n" +"--net-ro-interval TCP output memory flush rate in seconds (default: 0)\n" "--net-heartbeat TCP heartbeat rate in seconds (default: 60 sec; 0 to disable)\n" "--net-buffer TCP buffer size 64Kb * (2^n) (default: n=0, 64Kb)\n" "--lat Reference/receiver latitude for surface posn (opt)\n" @@ -572,8 +569,7 @@ void backgroundTasks(void) { static time_t next_stats; if (Modes.net) { - modesReadFromClients(); - modesNetCleanup(); + modesNetPeriodicWork(); } // If Modes.aircrafts is not NULL, remove any stale aircraft @@ -701,11 +697,13 @@ int main(int argc, char **argv) { Modes.net = 1; Modes.net_only = 1; } else if (!strcmp(argv[j],"--net-heartbeat") && more) { - Modes.net_heartbeat_rate = atoi(argv[++j]) * 15; + Modes.net_heartbeat_interval = atoi(argv[++j]); } else if (!strcmp(argv[j],"--net-ro-size") && more) { - Modes.net_output_raw_size = atoi(argv[++j]); + Modes.net_output_flush_size = atoi(argv[++j]); } else if (!strcmp(argv[j],"--net-ro-rate") && more) { - Modes.net_output_raw_rate = atoi(argv[++j]); + Modes.net_output_flush_interval = atoi(argv[++j]) / 15; // backwards compatibility + } else if (!strcmp(argv[j],"--net-ro-interval") && more) { + Modes.net_output_flush_interval = atoi(argv[++j]); } else if (!strcmp(argv[j],"--net-ro-port") && more) { if (Modes.beast) // Required for legacy backward compatibility {Modes.net_output_beast_port = atoi(argv[++j]);;} diff --git a/dump1090.h b/dump1090.h index 5f45b08..fe1f8bf 100644 --- a/dump1090.h +++ b/dump1090.h @@ -116,9 +116,9 @@ #define MODES_LONG_MSG_SIZE (MODES_LONG_MSG_SAMPLES * sizeof(uint16_t)) #define MODES_SHORT_MSG_SIZE (MODES_SHORT_MSG_SAMPLES * sizeof(uint16_t)) -#define MODES_RAWOUT_BUF_SIZE (1500) -#define MODES_RAWOUT_BUF_FLUSH (MODES_RAWOUT_BUF_SIZE - 200) -#define MODES_RAWOUT_BUF_RATE (1000) // 1000 * 64mS = 1 Min approx +#define MODES_OUT_BUF_SIZE (1500) +#define MODES_OUT_FLUSH_SIZE (MODES_OUT_BUF_SIZE - 256) +#define MODES_OUT_FLUSH_INTERVAL (60) #define MODES_ICAO_CACHE_LEN 1024 // Power of two required #define MODES_ICAO_CACHE_TTL 60 // Time to live of cached addresses @@ -165,7 +165,7 @@ #define MODES_INTERACTIVE_DELETE_TTL 300 // Delete from the list after 300 seconds #define MODES_INTERACTIVE_DISPLAY_TTL 60 // Delete from display after 60 seconds -#define MODES_NET_HEARTBEAT_RATE 900 // Each block is approx 65mS - default is > 1 min +#define MODES_NET_HEARTBEAT_INTERVAL 60 // seconds #define MODES_NET_SERVICES_NUM 6 #define MODES_NET_INPUT_RAW_PORT 30001 @@ -237,6 +237,15 @@ struct stDF { unsigned char msg[MODES_LONG_MSG_BYTES]; // the binary } tDF; +// Common writer state for all output sockets of one type +struct net_writer { + int socket; // listening socket FD, used to identify the owning service + int connections; // number of active clients + void *data; // shared write buffer, sized MODES_OUT_BUF_SIZE + int dataUsed; // number of bytes of write buffer currently used + time_t lastWrite; // time of last write to clients +}; + // Program global state struct { // Internal state pthread_t reader_thread; @@ -270,16 +279,14 @@ struct { // Internal state // Networking char aneterr[ANET_ERR_LEN]; struct client *clients; // Our clients - int sbsos; // SBS output listening socket - int ros; // Raw output listening socket int ris; // Raw input listening socket - int bos; // Beast output listening socket int bis; // Beast input listening socket int https; // HTTP listening socket - char *rawOut; // Buffer for building raw output data - int rawOutUsed; // How much of the buffer is currently used - char *beastOut; // Buffer for building beast output data - int beastOutUsed; // How much if the buffer is currently used + + struct net_writer raw_out; // Raw output + struct net_writer beast_out; // Beast-format output + struct net_writer sbs_out; // SBS-format output + #ifdef _WIN32 WSADATA wsaData; // Windows socket initialisation #endif @@ -295,12 +302,10 @@ struct { // Internal state int debug; // Debugging mode int net; // Enable networking int net_only; // Enable just networking - int net_heartbeat_count; // TCP heartbeat counter - int net_heartbeat_rate; // TCP heartbeat rate + int net_heartbeat_interval; // TCP heartbeat interval (seconds) int net_output_sbs_port; // SBS output TCP port - int net_output_raw_size; // Minimum Size of the output raw data - int net_output_raw_rate; // Rate (in 64mS increments) of output raw data - int net_output_raw_rate_count; // Rate (in 64mS increments) of output raw data + int net_output_flush_size; // Minimum Size of output data + int net_output_flush_interval; // Maximum interval (in seconds) between outputwrites int net_output_raw_port; // Raw output TCP port int net_input_raw_port; // Raw input TCP port int net_output_beast_port; // Beast output TCP port @@ -348,9 +353,6 @@ struct { // Internal state unsigned int stat_bit_fix[MODES_MAX_BITERRORS]; unsigned int stat_http_requests; - unsigned int stat_sbs_connections; - unsigned int stat_raw_connections; - unsigned int stat_beast_connections; unsigned int stat_out_of_phase; unsigned int stat_ph_demodulated0; unsigned int stat_ph_demodulated1; @@ -453,11 +455,9 @@ struct stDF *interactiveFindDF (uint32_t addr); // Functions exported from net_io.c // void modesInitNet (void); -void modesReadFromClients (void); -void modesSendAllClients (int service, void *msg, int len); void modesQueueOutput (struct modesMessage *mm); void modesReadFromClient(struct client *c, char *sep, int(*handler)(struct client *, char *)); -void modesNetCleanup (void); +void modesNetPeriodicWork (void); #ifdef __cplusplus } diff --git a/mode_s.c b/mode_s.c index bb0da2c..a1014c6 100644 --- a/mode_s.c +++ b/mode_s.c @@ -1849,43 +1849,6 @@ void detectModeS(uint16_t *m, uint32_t mlen) { use_correction = 0; } } - - //Send any remaining partial raw buffers now - if (Modes.rawOutUsed || Modes.beastOutUsed) - { - Modes.net_output_raw_rate_count++; - if (Modes.net_output_raw_rate_count > Modes.net_output_raw_rate) - { - if (Modes.rawOutUsed) { - modesSendAllClients(Modes.ros, Modes.rawOut, Modes.rawOutUsed); - Modes.rawOutUsed = 0; - } - if (Modes.beastOutUsed) { - modesSendAllClients(Modes.bos, Modes.beastOut, Modes.beastOutUsed); - Modes.beastOutUsed = 0; - } - Modes.net_output_raw_rate_count = 0; - } - } - else if ( (Modes.net) - && (Modes.net_heartbeat_rate) - && ((++Modes.net_heartbeat_count) > Modes.net_heartbeat_rate) ) { - // - // We haven't received any Mode A/C/S messages for some time. To try and keep any TCP - // links alive, send a null frame. This will help stop any routers discarding our TCP - // link which will cause an un-recoverable link error if/when a real frame arrives. - // - // Fudge up a null message - memset(&mm, 0, sizeof(mm)); - mm.msgbits = MODES_SHORT_MSG_BITS; - mm.timestampMsg = Modes.timestampBlk; - - // Feed output clients - modesQueueOutput(&mm); - - // Reset the heartbeat counter - Modes.net_heartbeat_count = 0; - } } // //========================================================================= @@ -1910,9 +1873,6 @@ void useModesMessage(struct modesMessage *mm) { // Feed output clients if (Modes.net) {modesQueueOutput(mm);} - - // Heartbeat not required whilst we're seeing real messages - Modes.net_heartbeat_count = 0; } } // diff --git a/net_io.c b/net_io.c index b092c33..294a37d 100644 --- a/net_io.c +++ b/net_io.c @@ -49,6 +49,7 @@ struct service { char *descr; int *socket; + struct net_writer *writer; int port; int enabled; }; @@ -58,14 +59,14 @@ struct service services[MODES_NET_SERVICES_NUM]; void modesInitNet(void) { int j; - struct service svc[MODES_NET_SERVICES_NUM] = { - {"Raw TCP output", &Modes.ros, Modes.net_output_raw_port, 1}, - {"Raw TCP input", &Modes.ris, Modes.net_input_raw_port, 1}, - {"Beast TCP output", &Modes.bos, Modes.net_output_beast_port, 1}, - {"Beast TCP input", &Modes.bis, Modes.net_input_beast_port, 1}, - {"HTTP server", &Modes.https, Modes.net_http_port, 1}, - {"Basestation TCP output", &Modes.sbsos, Modes.net_output_sbs_port, 1} - }; + struct service svc[MODES_NET_SERVICES_NUM] = { + {"Raw TCP output", &Modes.raw_out.socket, &Modes.raw_out, Modes.net_output_raw_port, 1}, + {"Raw TCP input", &Modes.ris, NULL, Modes.net_input_raw_port, 1}, + {"Beast TCP output", &Modes.beast_out.socket, &Modes.beast_out, Modes.net_output_beast_port, 1}, + {"Beast TCP input", &Modes.bis, NULL, Modes.net_input_beast_port, 1}, + {"HTTP server", &Modes.https, NULL, Modes.net_http_port, 1}, + {"Basestation TCP output", &Modes.sbs_out.socket, &Modes.sbs_out, Modes.net_output_sbs_port, 1} + }; memcpy(&services, &svc, sizeof(svc));//services = svc; @@ -93,6 +94,18 @@ void modesInitNet(void) { } anetNonBlock(Modes.aneterr, s); *services[j].socket = s; + + if (services[j].writer) { + if (! (services[j].writer->data = malloc(MODES_OUT_BUF_SIZE)) ) { + fprintf(stderr, "Out of memory allocating output buffer for service %s\n", services[j].descr); + exit(1); + } + + services[j].writer->socket = s; + services[j].writer->connections = 0; + services[j].writer->dataUsed = 0; + services[j].writer->lastWrite = time(NULL); + } } else { if (Modes.debug & MODES_DEBUG_NET) printf("%s port is disabled\n", services[j].descr); } @@ -127,9 +140,11 @@ struct client * modesAcceptClients(void) { Modes.clients = c; anetSetSendBuffer(Modes.aneterr,fd, (MODES_NET_SNDBUF_SIZE << Modes.net_sndbuf_size)); - if (*services[j].socket == Modes.sbsos) Modes.stat_sbs_connections++; - if (*services[j].socket == Modes.ros) Modes.stat_raw_connections++; - if (*services[j].socket == Modes.bos) Modes.stat_beast_connections++; + if (services[j].writer) { + if (++ services[j].writer->connections == 1) { + services[j].writer->lastWrite = time(NULL); // suppress heartbeat initially + } + } j--; // Try again with the same listening port @@ -145,18 +160,21 @@ struct client * modesAcceptClients(void) { // On error free the client, collect the structure, adjust maxfd if needed. // void modesFreeClient(struct client *c) { + int j; + // Clean up, but defer removing from the list until modesNetCleanup(). // This is because there may be stackframes still pointing at this // client (unpredictably: reading from client A may cause client B to // be freed) close(c->fd); - if (c->service == Modes.sbsos) { - if (Modes.stat_sbs_connections) Modes.stat_sbs_connections--; - } else if (c->service == Modes.ros) { - if (Modes.stat_raw_connections) Modes.stat_raw_connections--; - } else if (c->service == Modes.bos) { - if (Modes.stat_beast_connections) Modes.stat_beast_connections--; + + for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if (c->service == *services[j].socket) { + if (services[j].writer) + services[j].writer->connections--; + break; + } } if (Modes.debug & MODES_DEBUG_NET) @@ -169,36 +187,72 @@ void modesFreeClient(struct client *c) { // //========================================================================= // -// Send the specified message to all clients listening for a given service +// Send the write buffer for the specified writer to all connected clients // -void modesSendAllClients(int service, void *msg, int len) { +static void flushWrites(struct net_writer *writer) { struct client *c; for (c = Modes.clients; c; c = c->next) { - if (c->service == service) { + if (c->service == writer->socket) { #ifndef _WIN32 - int nwritten = write(c->fd, msg, len); + int nwritten = write(c->fd, writer->data, writer->dataUsed); #else - int nwritten = send(c->fd, msg, len, 0 ); + int nwritten = send(c->fd, writer->data, writer->dataUsed, 0 ); #endif - if (nwritten != len) { + if (nwritten != writer->dataUsed) { modesFreeClient(c); } } } + + writer->dataUsed = 0; + writer->lastWrite = time(NULL); } + +// Prepare to write up to 'len' bytes to the given net_writer. +// Returns a pointer to write to, or NULL to skip this write. +static void *prepareWrite(struct net_writer *writer, int len) { + if (!writer || + !writer->connections || + !writer->data) + return NULL; + + if (len >= MODES_OUT_BUF_SIZE) + return NULL; + + if (writer->dataUsed + len >= MODES_OUT_BUF_SIZE) { + // Flush now to free some space + flushWrites(writer); + } + + return writer->data + writer->dataUsed; +} + +// Complete a write previously begun by prepareWrite. +// endptr should point one byte past the last byte written +// to the buffer returned from prepareWrite. +static void completeWrite(struct net_writer *writer, void *endptr) { + writer->dataUsed = endptr - writer->data; + + if (writer->dataUsed >= Modes.net_output_flush_size) { + flushWrites(writer); + } +} + // //========================================================================= // // Write raw output in Beast Binary format with Timestamp to TCP clients // void modesSendBeastOutput(struct modesMessage *mm) { - char *p = &Modes.beastOut[Modes.beastOutUsed]; int msgLen = mm->msgbits / 8; + char *p = prepareWrite(&Modes.beast_out, 2 + 2 * (7 + msgLen)); char * pTimeStamp; char ch; int j; - int iOutLen = msgLen + 9; // Escape, msgtype, timestamp, sigLevel, msg + + if (!p) + return; *p++ = 0x1a; if (msgLen == MODES_SHORT_MSG_BYTES) @@ -213,36 +267,34 @@ void modesSendBeastOutput(struct modesMessage *mm) { pTimeStamp = (char *) &mm->timestampMsg; for (j = 5; j >= 0; j--) { *p++ = (ch = pTimeStamp[j]); - if (0x1A == ch) {*p++ = ch; iOutLen++;} + if (0x1A == ch) {*p++ = ch; } } *p++ = (ch = mm->signalLevel); - if (0x1A == ch) {*p++ = ch; iOutLen++;} + if (0x1A == ch) {*p++ = ch; } for (j = 0; j < msgLen; j++) { *p++ = (ch = mm->msg[j]); - if (0x1A == ch) {*p++ = ch; iOutLen++;} + if (0x1A == ch) {*p++ = ch; } } - Modes.beastOutUsed += iOutLen; - if (Modes.beastOutUsed >= Modes.net_output_raw_size) - { - modesSendAllClients(Modes.bos, Modes.beastOut, Modes.beastOutUsed); - Modes.beastOutUsed = 0; - Modes.net_output_raw_rate_count = 0; - } + completeWrite(&Modes.beast_out, p); } + // //========================================================================= // // Write raw output to TCP clients // void modesSendRawOutput(struct modesMessage *mm) { - char *p = &Modes.rawOut[Modes.rawOutUsed]; int msgLen = mm->msgbits / 8; + char *p = prepareWrite(&Modes.raw_out, msgLen*2 + 15); int j; unsigned char * pTimeStamp; + if (!p) + return; + if (Modes.mlat && mm->timestampMsg) { *p++ = '@'; pTimeStamp = (unsigned char *) &mm->timestampMsg; @@ -250,7 +302,6 @@ void modesSendRawOutput(struct modesMessage *mm) { sprintf(p, "%02X", pTimeStamp[j]); p += 2; } - Modes.rawOutUsed += 12; // additional 12 characters for timestamp } else *p++ = '*'; @@ -262,13 +313,7 @@ void modesSendRawOutput(struct modesMessage *mm) { *p++ = ';'; *p++ = '\n'; - Modes.rawOutUsed += ((msgLen*2) + 3); - if (Modes.rawOutUsed >= Modes.net_output_raw_size) - { - modesSendAllClients(Modes.ros, Modes.rawOut, Modes.rawOutUsed); - Modes.rawOutUsed = 0; - Modes.net_output_raw_rate_count = 0; - } + completeWrite(&Modes.raw_out, p); } // //========================================================================= @@ -277,17 +322,27 @@ void modesSendRawOutput(struct modesMessage *mm) { // The message structure mm->bFlags tells us what has been updated by this message // void modesSendSBSOutput(struct modesMessage *mm) { - char msg[256], *p = msg; + char *p = prepareWrite(&Modes.sbs_out, 200); uint32_t offset; struct timeb epocTime_receive, epocTime_now; struct tm stTime_receive, stTime_now; int msgType; + if (!p) + return; + // // SBS BS style output checked against the following reference // http://www.homepages.mcb.net/bones/SBS/Article/Barebones42_Socket_Data.htm - seems comprehensive // + if (mm->msgtype == -1) { + // heartbeat + p += sprintf(p, "\r\n"); + completeWrite(&Modes.sbs_out, p); + return; + } + // Decide on the basic SBS Message Type if ((mm->msgtype == 4) || (mm->msgtype == 20)) { msgType = 5; @@ -434,15 +489,16 @@ void modesSendSBSOutput(struct modesMessage *mm) { } p += sprintf(p, "\r\n"); - modesSendAllClients(Modes.sbsos, msg, p-msg); + + completeWrite(&Modes.sbs_out, p); } // //========================================================================= // void modesQueueOutput(struct modesMessage *mm) { - if (Modes.stat_sbs_connections) {modesSendSBSOutput(mm);} - if (Modes.stat_beast_connections) {modesSendBeastOutput(mm);} - if (Modes.stat_raw_connections) {modesSendRawOutput(mm);} + modesSendSBSOutput(mm); + modesSendBeastOutput(mm); + modesSendRawOutput(mm); } // //========================================================================= @@ -928,44 +984,86 @@ void modesReadFromClient(struct client *c, char *sep, } } } -// -//========================================================================= -// -// Read data from clients. This function actually delegates a lower-level -// function that depends on the kind of service (raw, http, ...). -// -void modesReadFromClients(void) { - struct client *c = modesAcceptClients(); +// +// Perform periodic network work +// +void modesNetPeriodicWork(void) { + struct client *c, **prev; + time_t now = time(NULL); + int j; + int need_heartbeat = 0, need_flush = 0; - for (; c; c = c->next) { - if (c->service == Modes.ris) { - modesReadFromClient(c,"\n",decodeHexMessage); - } else if (c->service == Modes.bis) { - modesReadFromClient(c,"",decodeBinMessage); - } else if (c->service == Modes.https) { - modesReadFromClient(c,"\r\n\r\n",handleHTTPRequest); + // Accept new connetions + modesAcceptClients(); + + // Read from clients + for (c = Modes.clients; c; c = c->next) { + if (c->service == Modes.ris) { + modesReadFromClient(c,"\n",decodeHexMessage); + } else if (c->service == Modes.bis) { + modesReadFromClient(c,"",decodeBinMessage); + } else if (c->service == Modes.https) { + modesReadFromClient(c,"\r\n\r\n",handleHTTPRequest); + } + } + + // If we have generated no messages for a while, generate + // a dummy heartbeat message. + if (Modes.net_heartbeat_interval) { + for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if (services[j].writer && + services[j].writer->connections && + (services[j].writer->lastWrite + Modes.net_heartbeat_interval) <= now) { + need_flush = 1; + if (services[j].writer->dataUsed == 0) { + need_heartbeat = 1; + break; + } + } + } } - } + + if (need_heartbeat) { + // + // We haven't sent any traffic for some time. To try and keep any TCP + // links alive, send a null frame. This will help stop any routers discarding our TCP + // link which will cause an un-recoverable link error if/when a real frame arrives. + // + // Fudge up a null message + struct modesMessage mm; + + memset(&mm, 0, sizeof(mm)); + mm.msgbits = MODES_SHORT_MSG_BITS; + mm.timestampMsg = 0; + mm.msgtype = -1; + + // Feed output clients + modesQueueOutput(&mm); + } + + // If we have data that has been waiting to be written for a while, + // write it now. + for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { + if (services[j].writer && + services[j].writer->dataUsed && + (need_flush || (services[j].writer->lastWrite + Modes.net_output_flush_interval) <= now)) { + flushWrites(services[j].writer); + } + } + + // Unlink and free closed clients + for (prev = &Modes.clients, c = *prev; c; c = *prev) { + if (c->fd == -1) { + // Recently closed, prune from list + *prev = c->next; + free(c); + } else { + prev = &c->next; + } + } } -// -// Perform cleanup of any recently-closed clients. -// -void modesNetCleanup(void) { - struct client *c, **prev; - - for (prev = &Modes.clients, c = *prev; c; c = *prev) { - if (c->fd == -1) { - // Recently closed, prune from list - *prev = c->next; - free(c); - } else { - prev = &c->next; - } - } - } - // // =============================== Network IO =========================== //