BUGFIX : Missed data causes timestamp slip

The Mutex on the RTL data reader thread does not "force" the data
processing thread to execute. Therefore, if the processor is busy, it is
possible for a second RTL callback to occur before the data from the
first has been processed. This will cause the loss of the first data,
but worse, it will cause a slip in the timestamp. This upsets Beamfinder
and MLAT operation in PlanePlotter.

To solve this, keep a Fifo buffer which is filled by the callback
thread, and emptied by the data processing thread. The fifo is the same
size as the number of buffers requested in the call to
rtlsdr_read_async().

Note - we only put the value of the pointer supplied in the callback
into the fifo. We do not attempt to cache the data in the buffer pointed
to by the pointer.  This would require us to memcopy() 2Mbytes per
second, which we don't want to do if we don't have to because it will
only make the processor loading worse. Instead, we assume that the data
in the buffer will remain valid after the callback returns, at least
until it is overwritten by new data.

It is still possible for us to lose data if we can't process it quickly
enough. However, we can now detect this loss of data when the fifo is
almost full, and correct the timestamp for the lost block/blocks.
This commit is contained in:
Malcolm Robb 2014-02-22 23:11:13 +00:00
parent 24080a22b1
commit 75a4c6ee21
4 changed files with 108 additions and 48 deletions

View file

@ -92,7 +92,7 @@ void modesInit(void) {
// Allocate the various buffers used by Modes // Allocate the various buffers used by Modes
if ( ((Modes.icao_cache = (uint32_t *) malloc(sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2) ) == NULL) || if ( ((Modes.icao_cache = (uint32_t *) malloc(sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2) ) == NULL) ||
((Modes.data = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE) ) == NULL) || ((Modes.pFileData = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE) ) == NULL) ||
((Modes.magnitude = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE) ) == NULL) || ((Modes.magnitude = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE) ) == NULL) ||
((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) || ((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) ||
((Modes.beastOut = (char *) malloc(MODES_RAWOUT_BUF_SIZE) ) == NULL) || ((Modes.beastOut = (char *) malloc(MODES_RAWOUT_BUF_SIZE) ) == NULL) ||
@ -104,7 +104,7 @@ void modesInit(void) {
// Clear the buffers that have just been allocated, just in-case // Clear the buffers that have just been allocated, just in-case
memset(Modes.icao_cache, 0, sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2); memset(Modes.icao_cache, 0, sizeof(uint32_t) * MODES_ICAO_CACHE_LEN * 2);
memset(Modes.data, 127, MODES_ASYNC_BUF_SIZE); memset(Modes.pFileData,127, MODES_ASYNC_BUF_SIZE);
memset(Modes.magnitude, 0, MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE); memset(Modes.magnitude, 0, MODES_ASYNC_BUF_SIZE+MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE);
// Validate the users Lat/Lon home location inputs // Validate the users Lat/Lon home location inputs
@ -133,8 +133,9 @@ void modesInit(void) {
{Modes.net_output_raw_rate = MODES_RAWOUT_BUF_RATE;} {Modes.net_output_raw_rate = MODES_RAWOUT_BUF_RATE;}
// Initialise the Block Timers to something half sensible // Initialise the Block Timers to something half sensible
ftime(&Modes.stSystemTimeRTL); ftime(&Modes.stSystemTimeBlk);
Modes.stSystemTimeBlk = Modes.stSystemTimeRTL; for (i = 0; i < MODES_ASYNC_BUF_NUMBER; i++)
{Modes.stSystemTimeRTL[i] = Modes.stSystemTimeBlk;}
// Each I and Q value varies from 0 to 255, which represents a range from -1 to +1. To get from the // Each I and Q value varies from 0 to 255, which represents a range from -1 to +1. To get from the
// unsigned (0-255) range you therefore subtract 127 (or 128 or 127.5) from each I and Q, giving you // unsigned (0-255) range you therefore subtract 127 (or 128 or 127.5) from each I and Q, giving you
@ -250,13 +251,35 @@ void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
MODES_NOTUSED(ctx); MODES_NOTUSED(ctx);
// Lock the data buffer variables before accessing them
pthread_mutex_lock(&Modes.data_mutex); pthread_mutex_lock(&Modes.data_mutex);
ftime(&Modes.stSystemTimeRTL);
if (len > MODES_ASYNC_BUF_SIZE) len = MODES_ASYNC_BUF_SIZE; rtlsdrStats(buf);
// Read the new data
memcpy(Modes.data, buf, len); Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!!
Modes.data_ready = 1;
// Signal to the other thread that new data is ready // Get the system time for this block
ftime(&Modes.stSystemTimeRTL[Modes.iDataIn]);
if (len > MODES_ASYNC_BUF_SIZE) {len = MODES_ASYNC_BUF_SIZE;}
// Queue the new data
Modes.pData[Modes.iDataIn] = (uint16_t *) buf;
Modes.iDataIn = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1);
Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);
if (Modes.iDataReady == 0) {
// Ooooops. We've just received the MODES_ASYNC_BUF_NUMBER'th outstanding buffer
// This means that RTLSDR is currently overwriting the MODES_ASYNC_BUF_NUMBER+1
// buffer, but we havent yet processed it, so we're going to lose it. There
// isn't much we can do to recover the lost data, but we can correct things to
// avoid any additional problems.
Modes.iDataOut = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut+1);
Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1);
Modes.iDataLost++;
}
// Signal to the other thread that new data is ready, and unlock
pthread_cond_signal(&Modes.data_cond); pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex); pthread_mutex_unlock(&Modes.data_mutex);
} }
@ -268,12 +291,11 @@ void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
// //
void readDataFromFile(void) { void readDataFromFile(void) {
pthread_mutex_lock(&Modes.data_mutex); pthread_mutex_lock(&Modes.data_mutex);
while(1) { while(Modes.exit == 0) {
ssize_t nread, toread; ssize_t nread, toread;
unsigned char *p; unsigned char *p;
if (Modes.exit == 1) break; if (Modes.iDataReady) {
if (Modes.data_ready) {
pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex); pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex);
continue; continue;
} }
@ -287,7 +309,7 @@ void readDataFromFile(void) {
} }
toread = MODES_ASYNC_BUF_SIZE; toread = MODES_ASYNC_BUF_SIZE;
p = (unsigned char *) Modes.data; p = (unsigned char *) Modes.pFileData;
while(toread) { while(toread) {
nread = read(Modes.fd, p, toread); nread = read(Modes.fd, p, toread);
if (nread <= 0) { if (nread <= 0) {
@ -301,7 +323,17 @@ void readDataFromFile(void) {
// Not enough data on file to fill the buffer? Pad with no signal. // Not enough data on file to fill the buffer? Pad with no signal.
memset(p,127,toread); memset(p,127,toread);
} }
Modes.data_ready = 1;
Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!!
// Get the system time for this block
ftime(&Modes.stSystemTimeRTL[Modes.iDataIn]);
// Queue the new data
Modes.pData[Modes.iDataIn] = Modes.pFileData;
Modes.iDataIn = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1);
Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);
// Signal to the other thread that new data is ready // Signal to the other thread that new data is ready
pthread_cond_signal(&Modes.data_cond); pthread_cond_signal(&Modes.data_cond);
} }
@ -323,7 +355,6 @@ void *readerThreadEntryPoint(void *arg) {
readDataFromFile(); readDataFromFile();
} }
// Signal to the other thread that new data is ready - dummy really so threads don't mutually lock // Signal to the other thread that new data is ready - dummy really so threads don't mutually lock
Modes.data_ready = 1;
pthread_cond_signal(&Modes.data_cond); pthread_cond_signal(&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex); pthread_mutex_unlock(&Modes.data_mutex);
#ifndef _WIN32 #ifndef _WIN32
@ -561,10 +592,6 @@ int main(int argc, char **argv) {
// Initialization // Initialization
modesInit(); modesInit();
//if (Modes.debug & MODES_DEBUG_BADCRC) {
// testAndTimeBitCorrection();
//}
if (Modes.net_only) { if (Modes.net_only) {
fprintf(stderr,"Net-only mode, no RTL device or file open.\n"); fprintf(stderr,"Net-only mode, no RTL device or file open.\n");
} else if (Modes.filename == NULL) { } else if (Modes.filename == NULL) {
@ -589,31 +616,55 @@ int main(int argc, char **argv) {
// Create the thread that will read the data from the device. // Create the thread that will read the data from the device.
pthread_create(&Modes.reader_thread, NULL, readerThreadEntryPoint, NULL); pthread_create(&Modes.reader_thread, NULL, readerThreadEntryPoint, NULL);
pthread_mutex_lock(&Modes.data_mutex); pthread_mutex_lock(&Modes.data_mutex);
while(1) {
if (!Modes.data_ready) {
pthread_cond_wait(&Modes.data_cond,&Modes.data_mutex);
continue;
}
computeMagnitudeVector();
Modes.stSystemTimeBlk = Modes.stSystemTimeRTL;
// Signal to the other thread that we processed the available data while (Modes.exit == 0) {
// and we want more (useful for --ifile)
Modes.data_ready = 0; if (Modes.iDataReady == 0) {
pthread_cond_wait(&Modes.data_cond,&Modes.data_mutex); // This unlocks Modes.data_mutex, and waits for Modes.data_cond
continue; // Once (Modes.data_cond) occurs, it locks Modes.data_mutex
}
// Modes.data_mutex is Locked, and (Modes.iDataReady != 0)
if (Modes.iDataReady) { // Check we have new data, just in case!!
Modes.iDataOut &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase
// Translate the next lot of I/Q samples into Modes.magnitude
computeMagnitudeVector(Modes.pData[Modes.iDataOut]);
Modes.stSystemTimeBlk = Modes.stSystemTimeRTL[Modes.iDataOut];
// Update the input buffer pointer queue
Modes.iDataOut = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut + 1);
Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);
// If we lost some blocks, correct the timestamp
if (Modes.iDataLost) {
Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES * 6 * Modes.iDataLost);
uRtlLost+= Modes.iDataLost;
Modes.iDataLost = 0;
}
// It's safe to release the lock now
pthread_cond_signal (&Modes.data_cond); pthread_cond_signal (&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
// Process data after releasing the lock, so that the capturing // Process data after releasing the lock, so that the capturing
// thread can read data while we perform computationally expensive // thread can read data while we perform computationally expensive
// stuff * at the same time. (This should only be useful with very // stuff at the same time.
// slow processors).
pthread_mutex_unlock(&Modes.data_mutex);
detectModeS(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES); detectModeS(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES);
// Update the timestamp ready for the next block
Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*6); Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*6);
} else {
pthread_cond_signal (&Modes.data_cond);
pthread_mutex_unlock(&Modes.data_mutex);
}
backgroundTasks(); backgroundTasks();
pthread_mutex_lock(&Modes.data_mutex); pthread_mutex_lock(&Modes.data_mutex);
if (Modes.exit) break;
} }
// If --stats were given, print statistics // If --stats were given, print statistics
@ -658,7 +709,11 @@ int main(int argc, char **argv) {
pthread_cond_destroy(&Modes.data_cond); // Thread cleanup pthread_cond_destroy(&Modes.data_cond); // Thread cleanup
pthread_mutex_destroy(&Modes.data_mutex); pthread_mutex_destroy(&Modes.data_mutex);
pthread_join(Modes.reader_thread,NULL); // Wait on reader thread exit pthread_join(Modes.reader_thread,NULL); // Wait on reader thread exit
#ifndef _WIN32
pthread_exit(0); pthread_exit(0);
#else
return (0);
#endif
} }
// //
//========================================================================= //=========================================================================

View file

@ -81,7 +81,7 @@
#define MODES_DEFAULT_FREQ 1090000000 #define MODES_DEFAULT_FREQ 1090000000
#define MODES_DEFAULT_WIDTH 1000 #define MODES_DEFAULT_WIDTH 1000
#define MODES_DEFAULT_HEIGHT 700 #define MODES_DEFAULT_HEIGHT 700
#define MODES_ASYNC_BUF_NUMBER 12 #define MODES_ASYNC_BUF_NUMBER 16
#define MODES_ASYNC_BUF_SIZE (16*16384) // 256k #define MODES_ASYNC_BUF_SIZE (16*16384) // 256k
#define MODES_ASYNC_BUF_SAMPLES (MODES_ASYNC_BUF_SIZE / 2) // Each sample is 2 bytes #define MODES_ASYNC_BUF_SAMPLES (MODES_ASYNC_BUF_SIZE / 2) // Each sample is 2 bytes
#define MODES_AUTO_GAIN -100 // Use automatic gain #define MODES_AUTO_GAIN -100 // Use automatic gain
@ -225,15 +225,21 @@ struct aircraft {
// Program global state // Program global state
struct { // Internal state struct { // Internal state
pthread_t reader_thread; pthread_t reader_thread;
pthread_mutex_t data_mutex; // Mutex to synchronize buffer access pthread_mutex_t data_mutex; // Mutex to synchronize buffer access
pthread_cond_t data_cond; // Conditional variable associated pthread_cond_t data_cond; // Conditional variable associated
uint16_t *data; // Raw IQ samples buffer uint16_t *pData [MODES_ASYNC_BUF_NUMBER]; // Raw IQ sample buffers from RTL
struct timeb stSystemTimeRTL[MODES_ASYNC_BUF_NUMBER]; // System time when RTL passed us this block
int iDataIn; // Fifo input pointer
int iDataOut; // Fifo output pointer
int iDataReady; // Fifo content count
int iDataLost; // Count of missed buffers
uint16_t *pFileData; // Raw IQ samples buffer (from a File)
uint16_t *magnitude; // Magnitude vector uint16_t *magnitude; // Magnitude vector
struct timeb stSystemTimeRTL; // System time when RTL passed us the Latest block
uint64_t timestampBlk; // Timestamp of the start of the current block uint64_t timestampBlk; // Timestamp of the start of the current block
struct timeb stSystemTimeBlk; // System time when RTL passed us currently processing this block struct timeb stSystemTimeBlk; // System time when RTL passed us currently processing this block
int fd; // --ifile option file descriptor int fd; // --ifile option file descriptor
int data_ready; // Data ready to be processed
uint32_t *icao_cache; // Recently seen ICAO addresses cache uint32_t *icao_cache; // Recently seen ICAO addresses cache
uint16_t *maglut; // I/Q -> Magnitude lookup table uint16_t *maglut; // I/Q -> Magnitude lookup table
int exit; // Exit from the main loop when true int exit; // Exit from the main loop when true
@ -403,7 +409,7 @@ void detectModeS (uint16_t *m, uint32_t mlen);
void decodeModesMessage (struct modesMessage *mm, unsigned char *msg); void decodeModesMessage (struct modesMessage *mm, unsigned char *msg);
void displayModesMessage(struct modesMessage *mm); void displayModesMessage(struct modesMessage *mm);
void useModesMessage (struct modesMessage *mm); void useModesMessage (struct modesMessage *mm);
void computeMagnitudeVector(); void computeMagnitudeVector(uint16_t *pData);
void decodeCPR (struct aircraft *a, int fflag, int surface); void decodeCPR (struct aircraft *a, int fflag, int surface);
int decodeCPRrelative (struct aircraft *a, int fflag, int surface); int decodeCPRrelative (struct aircraft *a, int fflag, int surface);
void modesInitErrorInfo (); void modesInitErrorInfo ();

View file

@ -1372,9 +1372,8 @@ void displayModesMessage(struct modesMessage *mm) {
// Turn I/Q samples pointed by Modes.data into the magnitude vector // Turn I/Q samples pointed by Modes.data into the magnitude vector
// pointed by Modes.magnitude. // pointed by Modes.magnitude.
// //
void computeMagnitudeVector(void) { void computeMagnitudeVector(uint16_t *p) {
uint16_t *m = &Modes.magnitude[MODES_PREAMBLE_SAMPLES+MODES_LONG_MSG_SAMPLES]; uint16_t *m = &Modes.magnitude[MODES_PREAMBLE_SAMPLES+MODES_LONG_MSG_SAMPLES];
uint16_t *p = Modes.data;
uint32_t j; uint32_t j;
memcpy(Modes.magnitude,&Modes.magnitude[MODES_ASYNC_BUF_SAMPLES], MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE); memcpy(Modes.magnitude,&Modes.magnitude[MODES_ASYNC_BUF_SAMPLES], MODES_PREAMBLE_SIZE+MODES_LONG_MSG_SIZE);

View file

@ -96,7 +96,7 @@ void modesAcceptClients(void) {
services[4] = Modes.https; services[4] = Modes.https;
services[5] = Modes.sbsos; services[5] = Modes.sbsos;
for (j = 0; j < sizeof(services)/sizeof(int); j++) { for (j = 0; j < MODES_NET_SERVICES_NUM; j++) {
fd = anetTcpAccept(Modes.aneterr, services[j], NULL, &port); fd = anetTcpAccept(Modes.aneterr, services[j], NULL, &port);
if (fd == -1) continue; if (fd == -1) continue;