Restructuring of network output side.

Mostly refactoring the common code that was duplicated
between the different output types so that there aren't
many copies floating around.

This introduces "struct net_writer" to store the state of a
particular type of output service - buffers, time of last write,
connection count etc. prepareWrite() / completeWrite() give access
to the buffer and handle the actual writes and flushing when needed.

Heartbeat and time-based flushing move into a generic periodic-work
function.

Update the SBS output code to use the new infrastructure. This makes
a big different to CPU use when under load.
This commit is contained in:
Oliver Jowett 2014-10-03 22:55:21 +01:00
parent 8d4f1a396c
commit 1769ac9006
4 changed files with 216 additions and 160 deletions

262
net_io.c
View file

@ -49,6 +49,7 @@
struct service {
char *descr;
int *socket;
struct net_writer *writer;
int port;
int enabled;
};
@ -58,14 +59,14 @@ struct service services[MODES_NET_SERVICES_NUM];
void modesInitNet(void) {
int j;
struct service svc[MODES_NET_SERVICES_NUM] = {
{"Raw TCP output", &Modes.ros, Modes.net_output_raw_port, 1},
{"Raw TCP input", &Modes.ris, Modes.net_input_raw_port, 1},
{"Beast TCP output", &Modes.bos, Modes.net_output_beast_port, 1},
{"Beast TCP input", &Modes.bis, Modes.net_input_beast_port, 1},
{"HTTP server", &Modes.https, Modes.net_http_port, 1},
{"Basestation TCP output", &Modes.sbsos, Modes.net_output_sbs_port, 1}
};
struct service svc[MODES_NET_SERVICES_NUM] = {
{"Raw TCP output", &Modes.raw_out.socket, &Modes.raw_out, Modes.net_output_raw_port, 1},
{"Raw TCP input", &Modes.ris, NULL, Modes.net_input_raw_port, 1},
{"Beast TCP output", &Modes.beast_out.socket, &Modes.beast_out, Modes.net_output_beast_port, 1},
{"Beast TCP input", &Modes.bis, NULL, Modes.net_input_beast_port, 1},
{"HTTP server", &Modes.https, NULL, Modes.net_http_port, 1},
{"Basestation TCP output", &Modes.sbs_out.socket, &Modes.sbs_out, Modes.net_output_sbs_port, 1}
};
memcpy(&services, &svc, sizeof(svc));//services = svc;
@ -93,6 +94,18 @@ void modesInitNet(void) {
}
anetNonBlock(Modes.aneterr, s);
*services[j].socket = s;
if (services[j].writer) {
if (! (services[j].writer->data = malloc(MODES_OUT_BUF_SIZE)) ) {
fprintf(stderr, "Out of memory allocating output buffer for service %s\n", services[j].descr);
exit(1);
}
services[j].writer->socket = s;
services[j].writer->connections = 0;
services[j].writer->dataUsed = 0;
services[j].writer->lastWrite = time(NULL);
}
} else {
if (Modes.debug & MODES_DEBUG_NET) printf("%s port is disabled\n", services[j].descr);
}
@ -127,9 +140,11 @@ struct client * modesAcceptClients(void) {
Modes.clients = c;
anetSetSendBuffer(Modes.aneterr,fd, (MODES_NET_SNDBUF_SIZE << Modes.net_sndbuf_size));
if (*services[j].socket == Modes.sbsos) Modes.stat_sbs_connections++;
if (*services[j].socket == Modes.ros) Modes.stat_raw_connections++;
if (*services[j].socket == Modes.bos) Modes.stat_beast_connections++;
if (services[j].writer) {
if (++ services[j].writer->connections == 1) {
services[j].writer->lastWrite = time(NULL); // suppress heartbeat initially
}
}
j--; // Try again with the same listening port
@ -145,18 +160,21 @@ struct client * modesAcceptClients(void) {
// On error free the client, collect the structure, adjust maxfd if needed.
//
void modesFreeClient(struct client *c) {
int j;
// Clean up, but defer removing from the list until modesNetCleanup().
// This is because there may be stackframes still pointing at this
// client (unpredictably: reading from client A may cause client B to
// be freed)
close(c->fd);
if (c->service == Modes.sbsos) {
if (Modes.stat_sbs_connections) Modes.stat_sbs_connections--;
} else if (c->service == Modes.ros) {
if (Modes.stat_raw_connections) Modes.stat_raw_connections--;
} else if (c->service == Modes.bos) {
if (Modes.stat_beast_connections) Modes.stat_beast_connections--;
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
if (c->service == *services[j].socket) {
if (services[j].writer)
services[j].writer->connections--;
break;
}
}
if (Modes.debug & MODES_DEBUG_NET)
@ -169,36 +187,72 @@ void modesFreeClient(struct client *c) {
//
//=========================================================================
//
// Send the specified message to all clients listening for a given service
// Send the write buffer for the specified writer to all connected clients
//
void modesSendAllClients(int service, void *msg, int len) {
static void flushWrites(struct net_writer *writer) {
struct client *c;
for (c = Modes.clients; c; c = c->next) {
if (c->service == service) {
if (c->service == writer->socket) {
#ifndef _WIN32
int nwritten = write(c->fd, msg, len);
int nwritten = write(c->fd, writer->data, writer->dataUsed);
#else
int nwritten = send(c->fd, msg, len, 0 );
int nwritten = send(c->fd, writer->data, writer->dataUsed, 0 );
#endif
if (nwritten != len) {
if (nwritten != writer->dataUsed) {
modesFreeClient(c);
}
}
}
writer->dataUsed = 0;
writer->lastWrite = time(NULL);
}
// Prepare to write up to 'len' bytes to the given net_writer.
// Returns a pointer to write to, or NULL to skip this write.
static void *prepareWrite(struct net_writer *writer, int len) {
if (!writer ||
!writer->connections ||
!writer->data)
return NULL;
if (len >= MODES_OUT_BUF_SIZE)
return NULL;
if (writer->dataUsed + len >= MODES_OUT_BUF_SIZE) {
// Flush now to free some space
flushWrites(writer);
}
return writer->data + writer->dataUsed;
}
// Complete a write previously begun by prepareWrite.
// endptr should point one byte past the last byte written
// to the buffer returned from prepareWrite.
static void completeWrite(struct net_writer *writer, void *endptr) {
writer->dataUsed = endptr - writer->data;
if (writer->dataUsed >= Modes.net_output_flush_size) {
flushWrites(writer);
}
}
//
//=========================================================================
//
// Write raw output in Beast Binary format with Timestamp to TCP clients
//
void modesSendBeastOutput(struct modesMessage *mm) {
char *p = &Modes.beastOut[Modes.beastOutUsed];
int msgLen = mm->msgbits / 8;
char *p = prepareWrite(&Modes.beast_out, 2 + 2 * (7 + msgLen));
char * pTimeStamp;
char ch;
int j;
int iOutLen = msgLen + 9; // Escape, msgtype, timestamp, sigLevel, msg
if (!p)
return;
*p++ = 0x1a;
if (msgLen == MODES_SHORT_MSG_BYTES)
@ -213,36 +267,34 @@ void modesSendBeastOutput(struct modesMessage *mm) {
pTimeStamp = (char *) &mm->timestampMsg;
for (j = 5; j >= 0; j--) {
*p++ = (ch = pTimeStamp[j]);
if (0x1A == ch) {*p++ = ch; iOutLen++;}
if (0x1A == ch) {*p++ = ch; }
}
*p++ = (ch = mm->signalLevel);
if (0x1A == ch) {*p++ = ch; iOutLen++;}
if (0x1A == ch) {*p++ = ch; }
for (j = 0; j < msgLen; j++) {
*p++ = (ch = mm->msg[j]);
if (0x1A == ch) {*p++ = ch; iOutLen++;}
if (0x1A == ch) {*p++ = ch; }
}
Modes.beastOutUsed += iOutLen;
if (Modes.beastOutUsed >= Modes.net_output_raw_size)
{
modesSendAllClients(Modes.bos, Modes.beastOut, Modes.beastOutUsed);
Modes.beastOutUsed = 0;
Modes.net_output_raw_rate_count = 0;
}
completeWrite(&Modes.beast_out, p);
}
//
//=========================================================================
//
// Write raw output to TCP clients
//
void modesSendRawOutput(struct modesMessage *mm) {
char *p = &Modes.rawOut[Modes.rawOutUsed];
int msgLen = mm->msgbits / 8;
char *p = prepareWrite(&Modes.raw_out, msgLen*2 + 15);
int j;
unsigned char * pTimeStamp;
if (!p)
return;
if (Modes.mlat && mm->timestampMsg) {
*p++ = '@';
pTimeStamp = (unsigned char *) &mm->timestampMsg;
@ -250,7 +302,6 @@ void modesSendRawOutput(struct modesMessage *mm) {
sprintf(p, "%02X", pTimeStamp[j]);
p += 2;
}
Modes.rawOutUsed += 12; // additional 12 characters for timestamp
} else
*p++ = '*';
@ -262,13 +313,7 @@ void modesSendRawOutput(struct modesMessage *mm) {
*p++ = ';';
*p++ = '\n';
Modes.rawOutUsed += ((msgLen*2) + 3);
if (Modes.rawOutUsed >= Modes.net_output_raw_size)
{
modesSendAllClients(Modes.ros, Modes.rawOut, Modes.rawOutUsed);
Modes.rawOutUsed = 0;
Modes.net_output_raw_rate_count = 0;
}
completeWrite(&Modes.raw_out, p);
}
//
//=========================================================================
@ -277,17 +322,27 @@ void modesSendRawOutput(struct modesMessage *mm) {
// The message structure mm->bFlags tells us what has been updated by this message
//
void modesSendSBSOutput(struct modesMessage *mm) {
char msg[256], *p = msg;
char *p = prepareWrite(&Modes.sbs_out, 200);
uint32_t offset;
struct timeb epocTime_receive, epocTime_now;
struct tm stTime_receive, stTime_now;
int msgType;
if (!p)
return;
//
// SBS BS style output checked against the following reference
// http://www.homepages.mcb.net/bones/SBS/Article/Barebones42_Socket_Data.htm - seems comprehensive
//
if (mm->msgtype == -1) {
// heartbeat
p += sprintf(p, "\r\n");
completeWrite(&Modes.sbs_out, p);
return;
}
// Decide on the basic SBS Message Type
if ((mm->msgtype == 4) || (mm->msgtype == 20)) {
msgType = 5;
@ -434,15 +489,16 @@ void modesSendSBSOutput(struct modesMessage *mm) {
}
p += sprintf(p, "\r\n");
modesSendAllClients(Modes.sbsos, msg, p-msg);
completeWrite(&Modes.sbs_out, p);
}
//
//=========================================================================
//
void modesQueueOutput(struct modesMessage *mm) {
if (Modes.stat_sbs_connections) {modesSendSBSOutput(mm);}
if (Modes.stat_beast_connections) {modesSendBeastOutput(mm);}
if (Modes.stat_raw_connections) {modesSendRawOutput(mm);}
modesSendSBSOutput(mm);
modesSendBeastOutput(mm);
modesSendRawOutput(mm);
}
//
//=========================================================================
@ -928,44 +984,86 @@ void modesReadFromClient(struct client *c, char *sep,
}
}
}
//
//=========================================================================
//
// Read data from clients. This function actually delegates a lower-level
// function that depends on the kind of service (raw, http, ...).
//
void modesReadFromClients(void) {
struct client *c = modesAcceptClients();
//
// Perform periodic network work
//
void modesNetPeriodicWork(void) {
struct client *c, **prev;
time_t now = time(NULL);
int j;
int need_heartbeat = 0, need_flush = 0;
for (; c; c = c->next) {
if (c->service == Modes.ris) {
modesReadFromClient(c,"\n",decodeHexMessage);
} else if (c->service == Modes.bis) {
modesReadFromClient(c,"",decodeBinMessage);
} else if (c->service == Modes.https) {
modesReadFromClient(c,"\r\n\r\n",handleHTTPRequest);
// Accept new connetions
modesAcceptClients();
// Read from clients
for (c = Modes.clients; c; c = c->next) {
if (c->service == Modes.ris) {
modesReadFromClient(c,"\n",decodeHexMessage);
} else if (c->service == Modes.bis) {
modesReadFromClient(c,"",decodeBinMessage);
} else if (c->service == Modes.https) {
modesReadFromClient(c,"\r\n\r\n",handleHTTPRequest);
}
}
// If we have generated no messages for a while, generate
// a dummy heartbeat message.
if (Modes.net_heartbeat_interval) {
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
if (services[j].writer &&
services[j].writer->connections &&
(services[j].writer->lastWrite + Modes.net_heartbeat_interval) <= now) {
need_flush = 1;
if (services[j].writer->dataUsed == 0) {
need_heartbeat = 1;
break;
}
}
}
}
}
if (need_heartbeat) {
//
// We haven't sent any traffic for some time. To try and keep any TCP
// links alive, send a null frame. This will help stop any routers discarding our TCP
// link which will cause an un-recoverable link error if/when a real frame arrives.
//
// Fudge up a null message
struct modesMessage mm;
memset(&mm, 0, sizeof(mm));
mm.msgbits = MODES_SHORT_MSG_BITS;
mm.timestampMsg = 0;
mm.msgtype = -1;
// Feed output clients
modesQueueOutput(&mm);
}
// If we have data that has been waiting to be written for a while,
// write it now.
for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
if (services[j].writer &&
services[j].writer->dataUsed &&
(need_flush || (services[j].writer->lastWrite + Modes.net_output_flush_interval) <= now)) {
flushWrites(services[j].writer);
}
}
// Unlink and free closed clients
for (prev = &Modes.clients, c = *prev; c; c = *prev) {
if (c->fd == -1) {
// Recently closed, prune from list
*prev = c->next;
free(c);
} else {
prev = &c->next;
}
}
}
//
// Perform cleanup of any recently-closed clients.
//
void modesNetCleanup(void) {
struct client *c, **prev;
for (prev = &Modes.clients, c = *prev; c; c = *prev) {
if (c->fd == -1) {
// Recently closed, prune from list
*prev = c->next;
free(c);
} else {
prev = &c->next;
}
}
}
//
// =============================== Network IO ===========================
//