diff --git a/dump1090.c b/dump1090.c index 3cef7be..6a76fdf 100644 --- a/dump1090.c +++ b/dump1090.c @@ -92,7 +92,7 @@ void modesInit(void) { // Allocate the various buffers used by Modes if ( ((Modes.icao_cache = (uint32_t *) malloc(sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2) ) == NULL) || - ((Modes.data = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE) ) == NULL) || + ((Modes.pFileData = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE) ) == NULL) || ((Modes.magnitude = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE) ) == NULL) || ((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) || ((Modes.beastOut = (char *) malloc(MODES_RAWOUT_BUF_SIZE) ) == NULL) || @@ -104,7 +104,7 @@ void modesInit(void) { // Clear the buffers that have just been allocated, just in-case memset(Modes.icao_cache, 0, sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2); - memset(Modes.data, 127, MODES_ASYNC_BUF_SIZE); + memset(Modes.pFileData,127, MODES_ASYNC_BUF_SIZE); memset(Modes.magnitude, 0, MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE); // Validate the users Lat/Lon home location inputs @@ -133,8 +133,9 @@ void modesInit(void) { {Modes.net_output_raw_rate = MODES_RAWOUT_BUF_RATE;} // Initialise the Block Timers to something half sensible - ftime(&Modes.stSystemTimeRTL); - Modes.stSystemTimeBlk = Modes.stSystemTimeRTL; + ftime(&Modes.stSystemTimeBlk); + for (i = 0; i < MODES_ASYNC_BUF_NUMBER; i++) + {Modes.stSystemTimeRTL[i] = Modes.stSystemTimeBlk;} // Each I and Q value varies from 0 to 255, which represents a range from -1 to +1. To get from the // unsigned (0-255) range you therefore subtract 127 (or 128 or 127.5) from each I and Q, giving you @@ -250,13 +251,35 @@ void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) { MODES_NOTUSED(ctx); + // Lock the data buffer variables before accessing them pthread_mutex_lock(&Modes.data_mutex); - ftime(&Modes.stSystemTimeRTL); - if (len > MODES_ASYNC_BUF_SIZE) len = MODES_ASYNC_BUF_SIZE; - // Read the new data - memcpy(Modes.data, buf, len); - Modes.data_ready = 1; - // Signal to the other thread that new data is ready + + rtlsdrStats(buf); + + Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!! + + // Get the system time for this block + ftime(&Modes.stSystemTimeRTL[Modes.iDataIn]); + + if (len > MODES_ASYNC_BUF_SIZE) {len = MODES_ASYNC_BUF_SIZE;} + + // Queue the new data + Modes.pData[Modes.iDataIn] = (uint16_t *) buf; + Modes.iDataIn = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1); + Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut); + + if (Modes.iDataReady == 0) { + // Ooooops. We've just received the MODES_ASYNC_BUF_NUMBER'th outstanding buffer + // This means that RTLSDR is currently overwriting the MODES_ASYNC_BUF_NUMBER+1 + // buffer, but we havent yet processed it, so we're going to lose it. There + // isn't much we can do to recover the lost data, but we can correct things to + // avoid any additional problems. + Modes.iDataOut = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut+1); + Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1); + Modes.iDataLost++; + } + + // Signal to the other thread that new data is ready, and unlock pthread_cond_signal(&Modes.data_cond); pthread_mutex_unlock(&Modes.data_mutex); } @@ -268,13 +291,12 @@ void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) { // void readDataFromFile(void) { pthread_mutex_lock(&Modes.data_mutex); - while(1) { + while(Modes.exit == 0) { ssize_t nread, toread; unsigned char *p; - if (Modes.exit == 1) break; - if (Modes.data_ready) { - pthread_cond_wait(&Modes.data_cond,&Modes.data_mutex); + if (Modes.iDataReady) { + pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex); continue; } @@ -287,7 +309,7 @@ void readDataFromFile(void) { } toread = MODES_ASYNC_BUF_SIZE; - p = (unsigned char *) Modes.data; + p = (unsigned char *) Modes.pFileData; while(toread) { nread = read(Modes.fd, p, toread); if (nread <= 0) { @@ -301,7 +323,17 @@ void readDataFromFile(void) { // Not enough data on file to fill the buffer? Pad with no signal. memset(p,127,toread); } - Modes.data_ready = 1; + + Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!! + + // Get the system time for this block + ftime(&Modes.stSystemTimeRTL[Modes.iDataIn]); + + // Queue the new data + Modes.pData[Modes.iDataIn] = Modes.pFileData; + Modes.iDataIn = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1); + Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut); + // Signal to the other thread that new data is ready pthread_cond_signal(&Modes.data_cond); } @@ -323,7 +355,6 @@ void *readerThreadEntryPoint(void *arg) { readDataFromFile(); } // Signal to the other thread that new data is ready - dummy really so threads don't mutually lock - Modes.data_ready = 1; pthread_cond_signal(&Modes.data_cond); pthread_mutex_unlock(&Modes.data_mutex); #ifndef _WIN32 @@ -561,10 +592,6 @@ int main(int argc, char **argv) { // Initialization modesInit(); - //if (Modes.debug & MODES_DEBUG_BADCRC) { - // testAndTimeBitCorrection(); - //} - if (Modes.net_only) { fprintf(stderr,"Net-only mode, no RTL device or file open.\n"); } else if (Modes.filename == NULL) { @@ -589,31 +616,55 @@ int main(int argc, char **argv) { // Create the thread that will read the data from the device. pthread_create(&Modes.reader_thread, NULL, readerThreadEntryPoint, NULL); - pthread_mutex_lock(&Modes.data_mutex); - while(1) { - if (!Modes.data_ready) { - pthread_cond_wait(&Modes.data_cond,&Modes.data_mutex); - continue; + + while (Modes.exit == 0) { + + if (Modes.iDataReady == 0) { + pthread_cond_wait(&Modes.data_cond,&Modes.data_mutex); // This unlocks Modes.data_mutex, and waits for Modes.data_cond + continue; // Once (Modes.data_cond) occurs, it locks Modes.data_mutex } - computeMagnitudeVector(); - Modes.stSystemTimeBlk = Modes.stSystemTimeRTL; - // Signal to the other thread that we processed the available data - // and we want more (useful for --ifile) - Modes.data_ready = 0; - pthread_cond_signal(&Modes.data_cond); + // Modes.data_mutex is Locked, and (Modes.iDataReady != 0) + if (Modes.iDataReady) { // Check we have new data, just in case!! + + Modes.iDataOut &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase + + // Translate the next lot of I/Q samples into Modes.magnitude + computeMagnitudeVector(Modes.pData[Modes.iDataOut]); + + Modes.stSystemTimeBlk = Modes.stSystemTimeRTL[Modes.iDataOut]; + + // Update the input buffer pointer queue + Modes.iDataOut = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut + 1); + Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut); + + // If we lost some blocks, correct the timestamp + if (Modes.iDataLost) { + Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES * 6 * Modes.iDataLost); + uRtlLost+= Modes.iDataLost; + Modes.iDataLost = 0; + } + + // It's safe to release the lock now + pthread_cond_signal (&Modes.data_cond); + pthread_mutex_unlock(&Modes.data_mutex); + + // Process data after releasing the lock, so that the capturing + // thread can read data while we perform computationally expensive + // stuff at the same time. + detectModeS(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES); + + // Update the timestamp ready for the next block + Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*6); + + } else { + pthread_cond_signal (&Modes.data_cond); + pthread_mutex_unlock(&Modes.data_mutex); + } - // Process data after releasing the lock, so that the capturing - // thread can read data while we perform computationally expensive - // stuff * at the same time. (This should only be useful with very - // slow processors). - pthread_mutex_unlock(&Modes.data_mutex); - detectModeS(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES); - Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*6); backgroundTasks(); pthread_mutex_lock(&Modes.data_mutex); - if (Modes.exit) break; } // If --stats were given, print statistics @@ -658,7 +709,11 @@ int main(int argc, char **argv) { pthread_cond_destroy(&Modes.data_cond); // Thread cleanup pthread_mutex_destroy(&Modes.data_mutex); pthread_join(Modes.reader_thread,NULL); // Wait on reader thread exit +#ifndef _WIN32 pthread_exit(0); +#else + return (0); +#endif } // //========================================================================= diff --git a/dump1090.h b/dump1090.h index 24d971b..3c6d41c 100644 --- a/dump1090.h +++ b/dump1090.h @@ -81,7 +81,7 @@ #define MODES_DEFAULT_FREQ 1090000000 #define MODES_DEFAULT_WIDTH 1000 #define MODES_DEFAULT_HEIGHT 700 -#define MODES_ASYNC_BUF_NUMBER 12 +#define MODES_ASYNC_BUF_NUMBER 16 #define MODES_ASYNC_BUF_SIZE (16*16384) // 256k #define MODES_ASYNC_BUF_SAMPLES (MODES_ASYNC_BUF_SIZE / 2) // Each sample is 2 bytes #define MODES_AUTO_GAIN -100 // Use automatic gain @@ -225,15 +225,21 @@ struct aircraft { // Program global state struct { // Internal state pthread_t reader_thread; + pthread_mutex_t data_mutex; // Mutex to synchronize buffer access pthread_cond_t data_cond; // Conditional variable associated - uint16_t *data; // Raw IQ samples buffer + uint16_t *pData [MODES_ASYNC_BUF_NUMBER]; // Raw IQ sample buffers from RTL + struct timeb stSystemTimeRTL[MODES_ASYNC_BUF_NUMBER]; // System time when RTL passed us this block + int iDataIn; // Fifo input pointer + int iDataOut; // Fifo output pointer + int iDataReady; // Fifo content count + int iDataLost; // Count of missed buffers + + uint16_t *pFileData; // Raw IQ samples buffer (from a File) uint16_t *magnitude; // Magnitude vector - struct timeb stSystemTimeRTL; // System time when RTL passed us the Latest block uint64_t timestampBlk; // Timestamp of the start of the current block struct timeb stSystemTimeBlk; // System time when RTL passed us currently processing this block int fd; // --ifile option file descriptor - int data_ready; // Data ready to be processed uint32_t *icao_cache; // Recently seen ICAO addresses cache uint16_t *maglut; // I/Q -> Magnitude lookup table int exit; // Exit from the main loop when true @@ -403,7 +409,7 @@ void detectModeS (uint16_t *m, uint32_t mlen); void decodeModesMessage (struct modesMessage *mm, unsigned char *msg); void displayModesMessage(struct modesMessage *mm); void useModesMessage (struct modesMessage *mm); -void computeMagnitudeVector(); +void computeMagnitudeVector(uint16_t *pData); void decodeCPR (struct aircraft *a, int fflag, int surface); int decodeCPRrelative (struct aircraft *a, int fflag, int surface); void modesInitErrorInfo (); diff --git a/mode_s.c b/mode_s.c index 8cd9987..84bef2d 100644 --- a/mode_s.c +++ b/mode_s.c @@ -1372,9 +1372,8 @@ void displayModesMessage(struct modesMessage *mm) { // Turn I/Q samples pointed by Modes.data into the magnitude vector // pointed by Modes.magnitude. // -void computeMagnitudeVector(void) { +void computeMagnitudeVector(uint16_t *p) { uint16_t *m = &Modes.magnitude[MODES_PREAMBLE_SAMPLES+MODES_LONG_MSG_SAMPLES]; - uint16_t *p = Modes.data; uint32_t j; memcpy(Modes.magnitude,&Modes.magnitude[MODES_ASYNC_BUF_SAMPLES], MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE); diff --git a/net_io.c b/net_io.c index 2ff6c94..5d6b834 100644 --- a/net_io.c +++ b/net_io.c @@ -96,7 +96,7 @@ void modesAcceptClients(void) { services[4] = Modes.https; services[5] = Modes.sbsos; - for (j = 0; j < sizeof(services)/sizeof(int); j++) { + for (j = 0; j < MODES_NET_SERVICES_NUM; j++) { fd = anetTcpAccept(Modes.aneterr, services[j], NULL, &port); if (fd == -1) continue;