From 15ea5ba3da201431e80ab7f7481901cab1da6f11 Mon Sep 17 00:00:00 2001 From: Oliver Jowett Date: Thu, 9 Apr 2015 18:51:31 +0100 Subject: [PATCH] Rearrangements to the receive thread. Magnitude conversion now happens immediately when sample data is received, so there is no risk of newly received data clobbering old data under CPU overload. --- debian/changelog | 3 + demod_2000.c | 16 +-- demod_2000.h | 4 +- demod_2400.c | 11 +- demod_2400.h | 4 +- dump1090.c | 283 ++++++++++++++++++++++++++++++----------------- dump1090.h | 35 +++--- mode_s.c | 18 --- net_io.c | 8 +- stats.c | 8 +- stats.h | 4 +- 11 files changed, 234 insertions(+), 160 deletions(-) diff --git a/debian/changelog b/debian/changelog index 1239e43..e151b31 100644 --- a/debian/changelog +++ b/debian/changelog @@ -7,6 +7,9 @@ dump1090-mutability (1.15~dev) UNRELEASED; urgency=medium the usual system user UID range. (github issue #24) * Fix timestamp correction when sample blocks are dropped. (github issue #43) + * Rearrangements to the receive thread. Magnitude conversion now happens + immediately when sample data is received, so there is no risk of newly + received data clobbering old data under CPU overload. -- Oliver Jowett Thu, 19 Feb 2015 22:39:19 +0000 diff --git a/demod_2000.c b/demod_2000.c index 4fef969..5ffb092 100644 --- a/demod_2000.c +++ b/demod_2000.c @@ -278,12 +278,14 @@ static void applyPhaseCorrection(uint16_t *pPayload) { // size 'mlen' bytes. Every detected Mode S message is convert it into a // stream of bits and passed to the function to display it. // -void demodulate2000(uint16_t *m, uint32_t mlen) { +void demodulate2000(struct mag_buf *mag) { struct modesMessage mm; unsigned char msg[MODES_LONG_MSG_BYTES], *pMsg; uint16_t aux[MODES_PREAMBLE_SAMPLES+MODES_LONG_MSG_SAMPLES+1]; uint32_t j; int use_correction = 0; + unsigned mlen = mag->length; + uint16_t *m = mag->data; memset(&mm, 0, sizeof(mm)); @@ -337,11 +339,11 @@ void demodulate2000(uint16_t *m, uint32_t mlen) { if (ModeA) // We have found a valid ModeA/C in the data { - mm.timestampMsg = Modes.timestampBlk + ((j+1) * 6); + mm.timestampMsg = mag->sampleTimestamp + ((j+1) * 6); // compute message receive time as block-start-time + difference in the 12MHz clock - mm.sysTimestampMsg = Modes.stSystemTimeBlk; // end of block time - mm.sysTimestampMsg.tv_nsec -= receiveclock_ns_elapsed(mm.timestampMsg, Modes.timestampBlk + MODES_ASYNC_BUF_SAMPLES * 6); // time until end of block + mm.sysTimestampMsg = mag->sysTimestamp; // start of block time + mm.sysTimestampMsg.tv_nsec += receiveclock_ns_elapsed(mag->sampleTimestamp, mm.timestampMsg); normalize_timespec(&mm.sysTimestampMsg); // Decode the received message @@ -543,11 +545,11 @@ void demodulate2000(uint16_t *m, uint32_t mlen) { int result; // Set initial mm structure details - mm.timestampMsg = Modes.timestampBlk + (j*6); + mm.timestampMsg = mag->sampleTimestamp + (j*6); // compute message receive time as block-start-time + difference in the 12MHz clock - mm.sysTimestampMsg = Modes.stSystemTimeBlk; // end of block time - mm.sysTimestampMsg.tv_nsec -= receiveclock_ns_elapsed(mm.timestampMsg, Modes.timestampBlk + MODES_ASYNC_BUF_SAMPLES * 6); // time until end of block + mm.sysTimestampMsg = mag->sysTimestamp; // start of block time + mm.sysTimestampMsg.tv_nsec += receiveclock_ns_elapsed(mag->sampleTimestamp, mm.timestampMsg); normalize_timespec(&mm.sysTimestampMsg); mm.signalLevel = (365.0*60 + sigLevel + noiseLevel) * (365.0*60 + sigLevel + noiseLevel) / MAX_POWER / 60 / 60; diff --git a/demod_2000.h b/demod_2000.h index b2029a5..f97e86f 100644 --- a/demod_2000.h +++ b/demod_2000.h @@ -22,6 +22,8 @@ #include -void demodulate2000(uint16_t *m, uint32_t mlen); +struct mag_buf; + +void demodulate2000(struct mag_buf *mag); #endif diff --git a/demod_2400.c b/demod_2400.c index 0cd844a..487418a 100644 --- a/demod_2400.c +++ b/demod_2400.c @@ -155,7 +155,7 @@ static int best_phase(uint16_t *m) { // Given 'mlen' magnitude samples in 'm', sampled at 2.4MHz, // try to demodulate some Mode S messages. // -void demodulate2400(uint16_t *m, uint32_t mlen) { +void demodulate2400(struct mag_buf *mag) { struct modesMessage mm; unsigned char msg1[MODES_LONG_MSG_BYTES], msg2[MODES_LONG_MSG_BYTES], *msg; uint32_t j; @@ -172,6 +172,9 @@ void demodulate2400(uint16_t *m, uint32_t mlen) { uint64_t noise_power_sum = 0; #endif + uint16_t *m = mag->data; + uint32_t mlen = mag->length; + memset(&mm, 0, sizeof(mm)); msg = msg1; @@ -424,11 +427,11 @@ void demodulate2400(uint16_t *m, uint32_t mlen) { msglen = modesMessageLenByType(bestmsg[0] >> 3); // Set initial mm structure details - mm.timestampMsg = Modes.timestampBlk + (j*5) + bestphase; + mm.timestampMsg = mag->sampleTimestamp + (j*5) + bestphase; // compute message receive time as block-start-time + difference in the 12MHz clock - mm.sysTimestampMsg = Modes.stSystemTimeBlk; // end of block time - mm.sysTimestampMsg.tv_nsec -= receiveclock_ns_elapsed(mm.timestampMsg, Modes.timestampBlk + MODES_ASYNC_BUF_SAMPLES * 5); // time until end of block + mm.sysTimestampMsg = mag->sysTimestamp; // start of block time + mm.sysTimestampMsg.tv_nsec += receiveclock_ns_elapsed(mag->sampleTimestamp, mm.timestampMsg); normalize_timespec(&mm.sysTimestampMsg); mm.score = bestscore; diff --git a/demod_2400.h b/demod_2400.h index 26cc8aa..762b250 100644 --- a/demod_2400.h +++ b/demod_2400.h @@ -22,6 +22,8 @@ #include -void demodulate2400(uint16_t *m, uint32_t mlen); +struct mag_buf; + +void demodulate2400(struct mag_buf *mag); #endif diff --git a/dump1090.c b/dump1090.c index 8d516ea..58883dc 100644 --- a/dump1090.c +++ b/dump1090.c @@ -167,17 +167,23 @@ void modesInit(void) { // Allocate the various buffers used by Modes Modes.trailing_samples = (Modes.oversample ? (MODES_OS_PREAMBLE_SAMPLES + MODES_OS_LONG_MSG_SAMPLES) : (MODES_PREAMBLE_SAMPLES + MODES_LONG_MSG_SAMPLES)) + 16; - if ( ((Modes.pFileData = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE) ) == NULL) || - ((Modes.magnitude = (uint16_t *) calloc(MODES_ASYNC_BUF_SAMPLES+Modes.trailing_samples, 2) ) == NULL) || - ((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) || + if ( ((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) || ((Modes.log10lut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) ) { fprintf(stderr, "Out of memory allocating data buffer.\n"); exit(1); } - // Clear the buffers that have just been allocated, just in-case - memset(Modes.pFileData,127, MODES_ASYNC_BUF_SIZE); + for (i = 0; i < MODES_MAG_BUFFERS; ++i) { + if ( (Modes.mag_buffers[i].data = calloc(MODES_MAG_BUF_SAMPLES+Modes.trailing_samples, sizeof(uint16_t))) == NULL ) { + fprintf(stderr, "Out of memory allocating magnitude buffer.\n"); + exit(1); + } + + Modes.mag_buffers[i].length = 0; + Modes.mag_buffers[i].dropped = 0; + Modes.mag_buffers[i].sampleTimestamp = 0; + } // Validate the users Lat/Lon home location inputs if ( (Modes.fUserLat > 90.0) // Latitude must be -90 to +90 @@ -206,11 +212,6 @@ void modesInit(void) { if (Modes.net_sndbuf_size > (MODES_NET_SNDBUF_MAX)) {Modes.net_sndbuf_size = MODES_NET_SNDBUF_MAX;} - // Initialise the Block Timers to something half sensible - clock_gettime(CLOCK_REALTIME, &Modes.stSystemTimeBlk); - for (i = 0; i < MODES_ASYNC_BUF_NUMBER; i++) - {Modes.stSystemTimeRTL[i] = Modes.stSystemTimeBlk;} - // Each I and Q value varies from 0 to 255, which represents a range from -1 to +1. To get from the // unsigned (0-255) range you therefore subtract 127 (or 128 or 127.5) from each I and Q, giving you // a range from -127 to +128 (or -128 to +127, or -127.5 to +127.5).. @@ -382,43 +383,105 @@ int modesInitRTLSDR(void) { static struct timespec reader_thread_start; void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) { + struct mag_buf *outbuf; + struct mag_buf *lastbuf; + uint16_t *p, *q; + uint32_t slen; + unsigned next_free_buffer; + unsigned free_bufs; + unsigned block_duration; + + static int was_odd = 0; // paranoia!! + static int dropping = 0; + MODES_NOTUSED(ctx); // Lock the data buffer variables before accessing them pthread_mutex_lock(&Modes.data_mutex); - if (Modes.exit) { rtlsdr_cancel_async(Modes.dev); // ask our caller to exit } - Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!! + next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS; + outbuf = &Modes.mag_buffers[Modes.first_free_buffer]; + lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS]; + free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS; - // Get the system time for this block - clock_gettime(CLOCK_REALTIME, &Modes.stSystemTimeRTL[Modes.iDataIn]); + // Paranoia! Unlikely, but let's go for belt and suspenders here - if (len > MODES_ASYNC_BUF_SIZE) {len = MODES_ASYNC_BUF_SIZE;} + if (len != MODES_RTL_BUF_SIZE) { + fprintf(stderr, "weirdness: rtlsdr gave us a block with an unusual size (got %u bytes, expected %u bytes)\n", + (unsigned)len, (unsigned)MODES_RTL_BUF_SIZE); - // Queue the new data - Modes.pData[Modes.iDataIn] = (uint16_t *) buf; - Modes.iDataIn = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1); - Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut); - - if (Modes.iDataReady == 0) { - // Ooooops. We've just received the MODES_ASYNC_BUF_NUMBER'th outstanding buffer - // This means that RTLSDR is currently overwriting the MODES_ASYNC_BUF_NUMBER+1 - // buffer, but we havent yet processed it, so we're going to lose it. There - // isn't much we can do to recover the lost data, but we can correct things to - // avoid any additional problems. - Modes.iDataOut = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut+1); - Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1); - Modes.iDataLost++; + if (len > MODES_RTL_BUF_SIZE) { + // wat?! Discard the start. + unsigned discard = (len - MODES_RTL_BUF_SIZE + 1) / 2; + outbuf->dropped += discard; + buf += discard*2; + len -= discard*2; + } } + if (was_odd) { + // Drop a sample so we are in sync with I/Q samples again (hopefully) + ++buf; + --len; + ++outbuf->dropped; + } + + was_odd = (len & 1); + slen = len/2; + + if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) { + // FIFO is full. Drop this block. + dropping = 1; + outbuf->dropped += slen; + pthread_mutex_unlock(&Modes.data_mutex); + return; + } + + dropping = 0; + pthread_mutex_unlock(&Modes.data_mutex); + + // Compute the sample timestamp and system timestamp for the start of the block + if (Modes.oversample) { + outbuf->sampleTimestamp = lastbuf->sampleTimestamp + (lastbuf->length + outbuf->dropped) * 5; + block_duration = slen * 5000U / 12; + } else { + outbuf->sampleTimestamp = lastbuf->sampleTimestamp + (lastbuf->length + outbuf->dropped) * 6; + block_duration = slen * 6000U / 12; + } + + // Get the approx system time for the start of this block + clock_gettime(CLOCK_REALTIME, &outbuf->sysTimestamp); + outbuf->sysTimestamp.tv_nsec -= block_duration; + normalize_timespec(&outbuf->sysTimestamp); + + // Copy trailing data from last block (or reset if not valid) + if (outbuf->dropped == 0 && lastbuf->length >= Modes.trailing_samples) { + memcpy(outbuf->data, lastbuf->data + lastbuf->length - Modes.trailing_samples, Modes.trailing_samples * sizeof(uint16_t)); + } else { + memset(outbuf->data, 127, Modes.trailing_samples * sizeof(uint16_t)); + } + + // Convert the new data + outbuf->length = slen; + p = (uint16_t*)buf; + q = &outbuf->data[Modes.trailing_samples]; + while (slen-- > 0) + *q++ = Modes.maglut[*p++]; + + // Push the new data to the demodulation thread + pthread_mutex_lock(&Modes.data_mutex); + + Modes.mag_buffers[next_free_buffer].dropped = 0; + Modes.mag_buffers[next_free_buffer].length = 0; // just in case + Modes.first_free_buffer = next_free_buffer; + // accumulate CPU while holding the mutex, and restart measurement end_cpu_timing(&reader_thread_start, &Modes.reader_cpu_accumulator); start_cpu_timing(&reader_thread_start); - - // Signal to the other thread that new data is ready, and unlock + pthread_cond_signal(&Modes.data_cond); pthread_mutex_unlock(&Modes.data_mutex); } @@ -429,60 +492,93 @@ void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) { // instead of using an RTLSDR device // void readDataFromFile(void) { - pthread_mutex_lock(&Modes.data_mutex); - while(Modes.exit == 0) { - ssize_t nread, toread; - unsigned char *p; + int eof = 0; + struct timespec next_buffer_delivery; - if (Modes.iDataReady) { + clock_gettime(CLOCK_MONOTONIC, &next_buffer_delivery); + + pthread_mutex_lock(&Modes.data_mutex); + while (!Modes.exit && !eof) { + ssize_t nread, toread; + void *r; + uint16_t *p; + struct mag_buf *outbuf, *lastbuf; + unsigned next_free_buffer; + unsigned slen; + + next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS; + if (next_free_buffer == Modes.first_filled_buffer) { + // no space for output yet pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex); continue; } - if (Modes.interactive) { - // When --ifile and --interactive are used together, slow down - // playing at the natural rate of the RTLSDR received. - pthread_mutex_unlock(&Modes.data_mutex); - usleep(64000); - pthread_mutex_lock(&Modes.data_mutex); + outbuf = &Modes.mag_buffers[Modes.first_free_buffer]; + lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS]; + pthread_mutex_unlock(&Modes.data_mutex); + + // Compute the sample timestamp and system timestamp for the start of the block + if (Modes.oversample) { + outbuf->sampleTimestamp = lastbuf->sampleTimestamp + lastbuf->length * 5; + } else { + outbuf->sampleTimestamp = lastbuf->sampleTimestamp + lastbuf->length * 6; } - toread = MODES_ASYNC_BUF_SIZE; - p = (unsigned char *) Modes.pFileData; + // Copy trailing data from last block (or reset if not valid) + if (lastbuf->length >= Modes.trailing_samples) { + memcpy(outbuf->data, lastbuf->data + lastbuf->length - Modes.trailing_samples, Modes.trailing_samples * sizeof(uint16_t)); + } else { + memset(outbuf->data, 127, Modes.trailing_samples * sizeof(uint16_t)); + } + + // Get the system time for the start of this block + clock_gettime(CLOCK_REALTIME, &outbuf->sysTimestamp); + + toread = MODES_RTL_BUF_SIZE; + r = (void *) (outbuf->data + Modes.trailing_samples); while(toread) { - nread = read(Modes.fd, p, toread); + nread = read(Modes.fd, r, toread); if (nread <= 0) { // Done. - Modes.exit = 1; // Signal the other threads to exit. - goto OUT; + eof = 1; + break; } - p += nread; + r += nread; toread -= nread; } - if (toread) { - // Not enough data on file to fill the buffer? Pad with no signal. - memset(p,127,toread); + + slen = outbuf->length = (MODES_RTL_BUF_SIZE - toread) / 2; + + // Convert the new data + p = (uint16_t*) (outbuf->data + Modes.trailing_samples); + while (slen-- > 0) { + *p = Modes.maglut[*p]; + ++p; } - Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!! + if (Modes.interactive) { + // Wait until we are allowed to release this buffer to the main thread + while (clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_buffer_delivery, NULL) == EINTR) + ; - // Get the system time for this block - clock_gettime(CLOCK_REALTIME, &Modes.stSystemTimeRTL[Modes.iDataIn]); - - // Queue the new data - Modes.pData[Modes.iDataIn] = Modes.pFileData; - Modes.iDataIn = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1); - Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut); + // compute the time we can deliver the next buffer. + next_buffer_delivery.tv_nsec += (slen * (Modes.oversample ? 5000 : 6000) / 12); + normalize_timespec(&next_buffer_delivery); + } + // Push the new data to the main thread + pthread_mutex_lock(&Modes.data_mutex); + Modes.first_free_buffer = next_free_buffer; // accumulate CPU while holding the mutex, and restart measurement end_cpu_timing(&reader_thread_start, &Modes.reader_cpu_accumulator); start_cpu_timing(&reader_thread_start); - - // Signal to the other thread that new data is ready pthread_cond_signal(&Modes.data_cond); } - OUT: + // Wait for the main thread to consume all data + while (!Modes.exit && Modes.first_filled_buffer != Modes.first_free_buffer) + pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex); + pthread_mutex_unlock(&Modes.data_mutex); } // @@ -500,8 +596,8 @@ void *readerThreadEntryPoint(void *arg) { if (Modes.filename == NULL) { while (!Modes.exit) { rtlsdr_read_async(Modes.dev, rtlsdrCallback, NULL, - MODES_ASYNC_BUF_NUMBER, - MODES_ASYNC_BUF_SIZE); + MODES_RTL_BUFFERS, + MODES_RTL_BUF_SIZE); if (!Modes.exit) { log_with_timestamp("Warning: lost the connection to the RTLSDR device."); @@ -1027,7 +1123,7 @@ int main(int argc, char **argv) { while (Modes.exit == 0) { struct timespec start_time; - if (Modes.iDataReady == 0) { + if (Modes.first_free_buffer == Modes.first_filled_buffer) { /* wait for more data. * we should be getting data every 50-60ms. wait for max 100ms before we give up and do some background work. * this is fairly aggressive as all our network I/O runs out of the background work! @@ -1039,63 +1135,44 @@ int main(int argc, char **argv) { normalize_timespec(&ts); pthread_cond_timedwait(&Modes.data_cond, &Modes.data_mutex, &ts); // This unlocks Modes.data_mutex, and waits for Modes.data_cond - // Once (Modes.data_cond) occurs, it locks Modes.data_mutex } - // Modes.data_mutex is Locked, and possibly (Modes.iDataReady != 0) + // Modes.data_mutex is locked, and possibly we have data. // copy out reader CPU time and reset it add_timespecs(&Modes.reader_cpu_accumulator, &Modes.stats_current.reader_cpu, &Modes.stats_current.reader_cpu); Modes.reader_cpu_accumulator.tv_sec = 0; Modes.reader_cpu_accumulator.tv_nsec = 0; - if (Modes.iDataReady) { // Check we have new data, just in case!! + if (Modes.first_free_buffer != Modes.first_filled_buffer) { + // FIFO is not empty, process one buffer. + + struct mag_buf *buf; + start_cpu_timing(&start_time); - - Modes.iDataOut &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase - - // Translate the next lot of I/Q samples into Modes.magnitude - computeMagnitudeVector(Modes.pData[Modes.iDataOut]); - - Modes.stSystemTimeBlk = Modes.stSystemTimeRTL[Modes.iDataOut]; - - // Update the input buffer pointer queue - Modes.iDataOut = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut + 1); - Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut); - - // If we lost some blocks, correct the timestamp - if (Modes.iDataLost) { - if (Modes.oversample) - Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES * 5 * Modes.iDataLost); - else - Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES * 6 * Modes.iDataLost); - Modes.stats_current.blocks_dropped += Modes.iDataLost; - Modes.iDataLost = 0; - } - - // It's safe to release the lock now - pthread_cond_signal (&Modes.data_cond); - pthread_mutex_unlock(&Modes.data_mutex); + buf = &Modes.mag_buffers[Modes.first_filled_buffer]; // Process data after releasing the lock, so that the capturing // thread can read data while we perform computationally expensive // stuff at the same time. + pthread_mutex_unlock(&Modes.data_mutex); if (Modes.oversample) - demodulate2400(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES); + demodulate2400(buf); else - demodulate2000(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES); - - // Update the timestamp ready for the next block - if (Modes.oversample) - Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*5); - else - Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*6); - Modes.stats_current.blocks_processed++; + demodulate2000(buf); + Modes.stats_current.samples_processed += buf->length; + Modes.stats_current.samples_dropped += buf->dropped; end_cpu_timing(&start_time, &Modes.stats_current.demod_cpu); + + // Mark the buffer we just processed as completed. + pthread_mutex_lock(&Modes.data_mutex); + Modes.first_filled_buffer = (Modes.first_filled_buffer + 1) % MODES_MAG_BUFFERS; + pthread_cond_signal(&Modes.data_cond); + pthread_mutex_unlock(&Modes.data_mutex); } else { - pthread_cond_signal (&Modes.data_cond); + // Nothing to process this time around. pthread_mutex_unlock(&Modes.data_mutex); } diff --git a/dump1090.h b/dump1090.h index 482d2d5..16b7125 100644 --- a/dump1090.h +++ b/dump1090.h @@ -92,9 +92,10 @@ #define MODES_DEFAULT_FREQ 1090000000 #define MODES_DEFAULT_WIDTH 1000 #define MODES_DEFAULT_HEIGHT 700 -#define MODES_ASYNC_BUF_NUMBER 16 -#define MODES_ASYNC_BUF_SIZE (16*16384) // 256k -#define MODES_ASYNC_BUF_SAMPLES (MODES_ASYNC_BUF_SIZE / 2) // Each sample is 2 bytes +#define MODES_RTL_BUFFERS 15 // Number of RTL buffers +#define MODES_RTL_BUF_SIZE (16*16384) // 256k +#define MODES_MAG_BUF_SAMPLES (MODES_RTL_BUF_SIZE / 2) // Each sample is 2 bytes +#define MODES_MAG_BUFFERS 12 // Number of magnitude buffers (should be smaller than RTL_BUFFERS for flowcontrol to work) #define MODES_AUTO_GAIN -100 // Use automatic gain #define MODES_MAX_GAIN 999999 // Use max available gain #define MODES_MSG_SQUELCH_DB 4.0 // Minimum SNR, in dB @@ -240,26 +241,29 @@ struct net_writer { uint64_t lastWrite; // time of last write to clients }; +// Structure representing one magnitude buffer +struct mag_buf { + uint16_t *data; // Magnitude data. Starts with Modes.trailing_samples worth of overlap from the previous block + unsigned length; // Number of valid samples _after_ overlap. Total buffer length is buf->length + Modes.trailing_samples. + uint64_t sampleTimestamp; // Clock timestamp of the start of this block, 12MHz clock + struct timespec sysTimestamp; // Estimated system time at start of block + uint32_t dropped; // Number of dropped samples preceding this buffer +}; + // Program global state struct { // Internal state pthread_t reader_thread; pthread_mutex_t data_mutex; // Mutex to synchronize buffer access pthread_cond_t data_cond; // Conditional variable associated - uint16_t *pData [MODES_ASYNC_BUF_NUMBER]; // Raw IQ sample buffers from RTL - struct timespec stSystemTimeRTL[MODES_ASYNC_BUF_NUMBER]; // System time when RTL passed us this block - int iDataIn; // Fifo input pointer - int iDataOut; // Fifo output pointer - int iDataReady; // Fifo content count - int iDataLost; // Count of missed buffers - struct timespec reader_cpu_accumulator; // CPU time used by the reader thread, copied out and reset by the main thread under the mutex - int trailing_samples;// extra trailing samples in magnitude buffer + struct mag_buf mag_buffers[MODES_MAG_BUFFERS]; // Converted magnitude buffers from RTL or file input + unsigned first_free_buffer; // Entry in mag_buffers that will next be filled with input. + unsigned first_filled_buffer; // Entry in mag_buffers that has valid data and will be demodulated next. If equal to next_free_buffer, there is no unprocessed data. + struct timespec reader_cpu_accumulator; // CPU time used by the reader thread, copied out and reset by the main thread under the mutex + + unsigned trailing_samples; // extra trailing samples in magnitude buffers - uint16_t *pFileData; // Raw IQ samples buffer (from a File) - uint16_t *magnitude; // Magnitude vector - uint64_t timestampBlk; // Timestamp of the start of the current block - struct timespec stSystemTimeBlk; // System time when RTL passed us currently processing this block int fd; // --ifile option file descriptor uint16_t *maglut; // I/Q -> Magnitude lookup table uint16_t *log10lut; // Magnitude -> log10 lookup table @@ -429,7 +433,6 @@ int scoreModesMessage(unsigned char *msg, int validbits); int decodeModesMessage (struct modesMessage *mm, unsigned char *msg); void displayModesMessage(struct modesMessage *mm); void useModesMessage (struct modesMessage *mm); -void computeMagnitudeVector(uint16_t *pData); // // Functions exported from interactive.c // diff --git a/mode_s.c b/mode_s.c index d6f4e13..617c410 100644 --- a/mode_s.c +++ b/mode_s.c @@ -1193,24 +1193,6 @@ void displayModesMessage(struct modesMessage *mm) { printf("\n"); } -// -//========================================================================= -// -// Turn I/Q samples pointed by Modes.data into the magnitude vector -// pointed by Modes.magnitude. -// -void computeMagnitudeVector(uint16_t *p) { - uint16_t *m = &Modes.magnitude[Modes.trailing_samples]; - uint32_t j; - - memcpy(Modes.magnitude,&Modes.magnitude[MODES_ASYNC_BUF_SAMPLES], Modes.trailing_samples * 2); - - // Compute the magnitudo vector. It's just SQRT(I^2 + Q^2), but - // we rescale to the 0-255 range to exploit the full resolution. - for (j = 0; j < MODES_ASYNC_BUF_SAMPLES; j ++) { - *m++ = Modes.maglut[*p++]; - } -} // //========================================================================= diff --git a/net_io.c b/net_io.c index e179ec0..0174261 100644 --- a/net_io.c +++ b/net_io.c @@ -825,14 +825,14 @@ static char * appendStatsJson(char *p, if (!Modes.net_only) { p += snprintf(p, end-p, - ",\"local\":{\"blocks_processed\":%u" - ",\"blocks_dropped\":%u" + ",\"local\":{\"samples_processed\":%llu" + ",\"samples_dropped\":%llu" ",\"modeac\":%u" ",\"modes\":%u" ",\"bad\":%u" ",\"unknown_icao\":%u", - st->blocks_processed, - st->blocks_dropped, + (unsigned long long)st->samples_processed, + (unsigned long long)st->samples_dropped, st->demod_modeac, st->demod_preambles, st->demod_rejected_bad, diff --git a/stats.c b/stats.c index 13e1b0c..26467f3 100644 --- a/stats.c +++ b/stats.c @@ -78,8 +78,8 @@ void display_stats(struct stats *st) { if (!Modes.net_only) { printf("Local receiver:\n"); - printf(" %u sample blocks processed\n", st->blocks_processed); - printf(" %u sample blocks dropped\n", st->blocks_dropped); + printf(" %llu samples processed\n", (unsigned long long)st->samples_processed); + printf(" %llu samples dropped\n", (unsigned long long)st->samples_dropped); printf(" %u Mode A/C messages received\n", st->demod_modeac); printf(" %u Mode-S message preambles received\n", st->demod_preambles); @@ -202,8 +202,8 @@ void add_stats(const struct stats *st1, const struct stats *st2, struct stats *t target->demod_accepted[i] = st1->demod_accepted[i] + st2->demod_accepted[i]; target->demod_modeac = st1->demod_modeac + st2->demod_modeac; - target->blocks_processed = st1->blocks_processed + st2->blocks_processed; - target->blocks_dropped = st1->blocks_dropped + st2->blocks_dropped; + target->samples_processed = st1->samples_processed + st2->samples_processed; + target->samples_dropped = st1->samples_dropped + st2->samples_dropped; add_timespecs(&st1->demod_cpu, &st2->demod_cpu, &target->demod_cpu); add_timespecs(&st1->reader_cpu, &st2->reader_cpu, &target->reader_cpu); diff --git a/stats.h b/stats.h index 7c9cd7d..0c30c8f 100644 --- a/stats.h +++ b/stats.h @@ -63,8 +63,8 @@ struct stats { // Mode A/C demodulator counts: uint32_t demod_modeac; - uint32_t blocks_processed; - uint32_t blocks_dropped; + uint64_t samples_processed; + uint64_t samples_dropped; // timing: struct timespec demod_cpu;