Rearrangements to the receive thread.
Magnitude conversion now happens immediately when sample data is received, so there is no risk of newly received data clobbering old data under CPU overload.
This commit is contained in:
parent
e6c81251bf
commit
15ea5ba3da
11 changed files with 234 additions and 160 deletions
283
dump1090.c
283
dump1090.c
|
|
@ -167,17 +167,23 @@ void modesInit(void) {
|
|||
|
||||
// Allocate the various buffers used by Modes
|
||||
Modes.trailing_samples = (Modes.oversample ? (MODES_OS_PREAMBLE_SAMPLES + MODES_OS_LONG_MSG_SAMPLES) : (MODES_PREAMBLE_SAMPLES + MODES_LONG_MSG_SAMPLES)) + 16;
|
||||
if ( ((Modes.pFileData = (uint16_t *) malloc(MODES_ASYNC_BUF_SIZE) ) == NULL) ||
|
||||
((Modes.magnitude = (uint16_t *) calloc(MODES_ASYNC_BUF_SAMPLES+Modes.trailing_samples, 2) ) == NULL) ||
|
||||
((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) ||
|
||||
if ( ((Modes.maglut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) ||
|
||||
((Modes.log10lut = (uint16_t *) malloc(sizeof(uint16_t) * 256 * 256) ) == NULL) )
|
||||
{
|
||||
fprintf(stderr, "Out of memory allocating data buffer.\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
// Clear the buffers that have just been allocated, just in-case
|
||||
memset(Modes.pFileData,127, MODES_ASYNC_BUF_SIZE);
|
||||
for (i = 0; i < MODES_MAG_BUFFERS; ++i) {
|
||||
if ( (Modes.mag_buffers[i].data = calloc(MODES_MAG_BUF_SAMPLES+Modes.trailing_samples, sizeof(uint16_t))) == NULL ) {
|
||||
fprintf(stderr, "Out of memory allocating magnitude buffer.\n");
|
||||
exit(1);
|
||||
}
|
||||
|
||||
Modes.mag_buffers[i].length = 0;
|
||||
Modes.mag_buffers[i].dropped = 0;
|
||||
Modes.mag_buffers[i].sampleTimestamp = 0;
|
||||
}
|
||||
|
||||
// Validate the users Lat/Lon home location inputs
|
||||
if ( (Modes.fUserLat > 90.0) // Latitude must be -90 to +90
|
||||
|
|
@ -206,11 +212,6 @@ void modesInit(void) {
|
|||
if (Modes.net_sndbuf_size > (MODES_NET_SNDBUF_MAX))
|
||||
{Modes.net_sndbuf_size = MODES_NET_SNDBUF_MAX;}
|
||||
|
||||
// Initialise the Block Timers to something half sensible
|
||||
clock_gettime(CLOCK_REALTIME, &Modes.stSystemTimeBlk);
|
||||
for (i = 0; i < MODES_ASYNC_BUF_NUMBER; i++)
|
||||
{Modes.stSystemTimeRTL[i] = Modes.stSystemTimeBlk;}
|
||||
|
||||
// Each I and Q value varies from 0 to 255, which represents a range from -1 to +1. To get from the
|
||||
// unsigned (0-255) range you therefore subtract 127 (or 128 or 127.5) from each I and Q, giving you
|
||||
// a range from -127 to +128 (or -128 to +127, or -127.5 to +127.5)..
|
||||
|
|
@ -382,43 +383,105 @@ int modesInitRTLSDR(void) {
|
|||
static struct timespec reader_thread_start;
|
||||
|
||||
void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
|
||||
struct mag_buf *outbuf;
|
||||
struct mag_buf *lastbuf;
|
||||
uint16_t *p, *q;
|
||||
uint32_t slen;
|
||||
unsigned next_free_buffer;
|
||||
unsigned free_bufs;
|
||||
unsigned block_duration;
|
||||
|
||||
static int was_odd = 0; // paranoia!!
|
||||
static int dropping = 0;
|
||||
|
||||
MODES_NOTUSED(ctx);
|
||||
|
||||
// Lock the data buffer variables before accessing them
|
||||
pthread_mutex_lock(&Modes.data_mutex);
|
||||
|
||||
if (Modes.exit) {
|
||||
rtlsdr_cancel_async(Modes.dev); // ask our caller to exit
|
||||
}
|
||||
|
||||
Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!!
|
||||
next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS;
|
||||
outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
|
||||
lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
|
||||
free_bufs = (Modes.first_filled_buffer - next_free_buffer + MODES_MAG_BUFFERS) % MODES_MAG_BUFFERS;
|
||||
|
||||
// Get the system time for this block
|
||||
clock_gettime(CLOCK_REALTIME, &Modes.stSystemTimeRTL[Modes.iDataIn]);
|
||||
// Paranoia! Unlikely, but let's go for belt and suspenders here
|
||||
|
||||
if (len > MODES_ASYNC_BUF_SIZE) {len = MODES_ASYNC_BUF_SIZE;}
|
||||
if (len != MODES_RTL_BUF_SIZE) {
|
||||
fprintf(stderr, "weirdness: rtlsdr gave us a block with an unusual size (got %u bytes, expected %u bytes)\n",
|
||||
(unsigned)len, (unsigned)MODES_RTL_BUF_SIZE);
|
||||
|
||||
// Queue the new data
|
||||
Modes.pData[Modes.iDataIn] = (uint16_t *) buf;
|
||||
Modes.iDataIn = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1);
|
||||
Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);
|
||||
|
||||
if (Modes.iDataReady == 0) {
|
||||
// Ooooops. We've just received the MODES_ASYNC_BUF_NUMBER'th outstanding buffer
|
||||
// This means that RTLSDR is currently overwriting the MODES_ASYNC_BUF_NUMBER+1
|
||||
// buffer, but we havent yet processed it, so we're going to lose it. There
|
||||
// isn't much we can do to recover the lost data, but we can correct things to
|
||||
// avoid any additional problems.
|
||||
Modes.iDataOut = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut+1);
|
||||
Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1);
|
||||
Modes.iDataLost++;
|
||||
if (len > MODES_RTL_BUF_SIZE) {
|
||||
// wat?! Discard the start.
|
||||
unsigned discard = (len - MODES_RTL_BUF_SIZE + 1) / 2;
|
||||
outbuf->dropped += discard;
|
||||
buf += discard*2;
|
||||
len -= discard*2;
|
||||
}
|
||||
}
|
||||
|
||||
if (was_odd) {
|
||||
// Drop a sample so we are in sync with I/Q samples again (hopefully)
|
||||
++buf;
|
||||
--len;
|
||||
++outbuf->dropped;
|
||||
}
|
||||
|
||||
was_odd = (len & 1);
|
||||
slen = len/2;
|
||||
|
||||
if (free_bufs == 0 || (dropping && free_bufs < MODES_MAG_BUFFERS/2)) {
|
||||
// FIFO is full. Drop this block.
|
||||
dropping = 1;
|
||||
outbuf->dropped += slen;
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
return;
|
||||
}
|
||||
|
||||
dropping = 0;
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
|
||||
// Compute the sample timestamp and system timestamp for the start of the block
|
||||
if (Modes.oversample) {
|
||||
outbuf->sampleTimestamp = lastbuf->sampleTimestamp + (lastbuf->length + outbuf->dropped) * 5;
|
||||
block_duration = slen * 5000U / 12;
|
||||
} else {
|
||||
outbuf->sampleTimestamp = lastbuf->sampleTimestamp + (lastbuf->length + outbuf->dropped) * 6;
|
||||
block_duration = slen * 6000U / 12;
|
||||
}
|
||||
|
||||
// Get the approx system time for the start of this block
|
||||
clock_gettime(CLOCK_REALTIME, &outbuf->sysTimestamp);
|
||||
outbuf->sysTimestamp.tv_nsec -= block_duration;
|
||||
normalize_timespec(&outbuf->sysTimestamp);
|
||||
|
||||
// Copy trailing data from last block (or reset if not valid)
|
||||
if (outbuf->dropped == 0 && lastbuf->length >= Modes.trailing_samples) {
|
||||
memcpy(outbuf->data, lastbuf->data + lastbuf->length - Modes.trailing_samples, Modes.trailing_samples * sizeof(uint16_t));
|
||||
} else {
|
||||
memset(outbuf->data, 127, Modes.trailing_samples * sizeof(uint16_t));
|
||||
}
|
||||
|
||||
// Convert the new data
|
||||
outbuf->length = slen;
|
||||
p = (uint16_t*)buf;
|
||||
q = &outbuf->data[Modes.trailing_samples];
|
||||
while (slen-- > 0)
|
||||
*q++ = Modes.maglut[*p++];
|
||||
|
||||
// Push the new data to the demodulation thread
|
||||
pthread_mutex_lock(&Modes.data_mutex);
|
||||
|
||||
Modes.mag_buffers[next_free_buffer].dropped = 0;
|
||||
Modes.mag_buffers[next_free_buffer].length = 0; // just in case
|
||||
Modes.first_free_buffer = next_free_buffer;
|
||||
|
||||
// accumulate CPU while holding the mutex, and restart measurement
|
||||
end_cpu_timing(&reader_thread_start, &Modes.reader_cpu_accumulator);
|
||||
start_cpu_timing(&reader_thread_start);
|
||||
|
||||
// Signal to the other thread that new data is ready, and unlock
|
||||
|
||||
pthread_cond_signal(&Modes.data_cond);
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
}
|
||||
|
|
@ -429,60 +492,93 @@ void rtlsdrCallback(unsigned char *buf, uint32_t len, void *ctx) {
|
|||
// instead of using an RTLSDR device
|
||||
//
|
||||
void readDataFromFile(void) {
|
||||
pthread_mutex_lock(&Modes.data_mutex);
|
||||
while(Modes.exit == 0) {
|
||||
ssize_t nread, toread;
|
||||
unsigned char *p;
|
||||
int eof = 0;
|
||||
struct timespec next_buffer_delivery;
|
||||
|
||||
if (Modes.iDataReady) {
|
||||
clock_gettime(CLOCK_MONOTONIC, &next_buffer_delivery);
|
||||
|
||||
pthread_mutex_lock(&Modes.data_mutex);
|
||||
while (!Modes.exit && !eof) {
|
||||
ssize_t nread, toread;
|
||||
void *r;
|
||||
uint16_t *p;
|
||||
struct mag_buf *outbuf, *lastbuf;
|
||||
unsigned next_free_buffer;
|
||||
unsigned slen;
|
||||
|
||||
next_free_buffer = (Modes.first_free_buffer + 1) % MODES_MAG_BUFFERS;
|
||||
if (next_free_buffer == Modes.first_filled_buffer) {
|
||||
// no space for output yet
|
||||
pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (Modes.interactive) {
|
||||
// When --ifile and --interactive are used together, slow down
|
||||
// playing at the natural rate of the RTLSDR received.
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
usleep(64000);
|
||||
pthread_mutex_lock(&Modes.data_mutex);
|
||||
outbuf = &Modes.mag_buffers[Modes.first_free_buffer];
|
||||
lastbuf = &Modes.mag_buffers[(Modes.first_free_buffer + MODES_MAG_BUFFERS - 1) % MODES_MAG_BUFFERS];
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
|
||||
// Compute the sample timestamp and system timestamp for the start of the block
|
||||
if (Modes.oversample) {
|
||||
outbuf->sampleTimestamp = lastbuf->sampleTimestamp + lastbuf->length * 5;
|
||||
} else {
|
||||
outbuf->sampleTimestamp = lastbuf->sampleTimestamp + lastbuf->length * 6;
|
||||
}
|
||||
|
||||
toread = MODES_ASYNC_BUF_SIZE;
|
||||
p = (unsigned char *) Modes.pFileData;
|
||||
// Copy trailing data from last block (or reset if not valid)
|
||||
if (lastbuf->length >= Modes.trailing_samples) {
|
||||
memcpy(outbuf->data, lastbuf->data + lastbuf->length - Modes.trailing_samples, Modes.trailing_samples * sizeof(uint16_t));
|
||||
} else {
|
||||
memset(outbuf->data, 127, Modes.trailing_samples * sizeof(uint16_t));
|
||||
}
|
||||
|
||||
// Get the system time for the start of this block
|
||||
clock_gettime(CLOCK_REALTIME, &outbuf->sysTimestamp);
|
||||
|
||||
toread = MODES_RTL_BUF_SIZE;
|
||||
r = (void *) (outbuf->data + Modes.trailing_samples);
|
||||
while(toread) {
|
||||
nread = read(Modes.fd, p, toread);
|
||||
nread = read(Modes.fd, r, toread);
|
||||
if (nread <= 0) {
|
||||
// Done.
|
||||
Modes.exit = 1; // Signal the other threads to exit.
|
||||
goto OUT;
|
||||
eof = 1;
|
||||
break;
|
||||
}
|
||||
p += nread;
|
||||
r += nread;
|
||||
toread -= nread;
|
||||
}
|
||||
if (toread) {
|
||||
// Not enough data on file to fill the buffer? Pad with no signal.
|
||||
memset(p,127,toread);
|
||||
|
||||
slen = outbuf->length = (MODES_RTL_BUF_SIZE - toread) / 2;
|
||||
|
||||
// Convert the new data
|
||||
p = (uint16_t*) (outbuf->data + Modes.trailing_samples);
|
||||
while (slen-- > 0) {
|
||||
*p = Modes.maglut[*p];
|
||||
++p;
|
||||
}
|
||||
|
||||
Modes.iDataIn &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase!!!
|
||||
if (Modes.interactive) {
|
||||
// Wait until we are allowed to release this buffer to the main thread
|
||||
while (clock_nanosleep(CLOCK_MONOTONIC, TIMER_ABSTIME, &next_buffer_delivery, NULL) == EINTR)
|
||||
;
|
||||
|
||||
// Get the system time for this block
|
||||
clock_gettime(CLOCK_REALTIME, &Modes.stSystemTimeRTL[Modes.iDataIn]);
|
||||
|
||||
// Queue the new data
|
||||
Modes.pData[Modes.iDataIn] = Modes.pFileData;
|
||||
Modes.iDataIn = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn + 1);
|
||||
Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);
|
||||
// compute the time we can deliver the next buffer.
|
||||
next_buffer_delivery.tv_nsec += (slen * (Modes.oversample ? 5000 : 6000) / 12);
|
||||
normalize_timespec(&next_buffer_delivery);
|
||||
}
|
||||
|
||||
// Push the new data to the main thread
|
||||
pthread_mutex_lock(&Modes.data_mutex);
|
||||
Modes.first_free_buffer = next_free_buffer;
|
||||
// accumulate CPU while holding the mutex, and restart measurement
|
||||
end_cpu_timing(&reader_thread_start, &Modes.reader_cpu_accumulator);
|
||||
start_cpu_timing(&reader_thread_start);
|
||||
|
||||
// Signal to the other thread that new data is ready
|
||||
pthread_cond_signal(&Modes.data_cond);
|
||||
}
|
||||
|
||||
OUT:
|
||||
// Wait for the main thread to consume all data
|
||||
while (!Modes.exit && Modes.first_filled_buffer != Modes.first_free_buffer)
|
||||
pthread_cond_wait(&Modes.data_cond, &Modes.data_mutex);
|
||||
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
}
|
||||
//
|
||||
|
|
@ -500,8 +596,8 @@ void *readerThreadEntryPoint(void *arg) {
|
|||
if (Modes.filename == NULL) {
|
||||
while (!Modes.exit) {
|
||||
rtlsdr_read_async(Modes.dev, rtlsdrCallback, NULL,
|
||||
MODES_ASYNC_BUF_NUMBER,
|
||||
MODES_ASYNC_BUF_SIZE);
|
||||
MODES_RTL_BUFFERS,
|
||||
MODES_RTL_BUF_SIZE);
|
||||
|
||||
if (!Modes.exit) {
|
||||
log_with_timestamp("Warning: lost the connection to the RTLSDR device.");
|
||||
|
|
@ -1027,7 +1123,7 @@ int main(int argc, char **argv) {
|
|||
while (Modes.exit == 0) {
|
||||
struct timespec start_time;
|
||||
|
||||
if (Modes.iDataReady == 0) {
|
||||
if (Modes.first_free_buffer == Modes.first_filled_buffer) {
|
||||
/* wait for more data.
|
||||
* we should be getting data every 50-60ms. wait for max 100ms before we give up and do some background work.
|
||||
* this is fairly aggressive as all our network I/O runs out of the background work!
|
||||
|
|
@ -1039,63 +1135,44 @@ int main(int argc, char **argv) {
|
|||
normalize_timespec(&ts);
|
||||
|
||||
pthread_cond_timedwait(&Modes.data_cond, &Modes.data_mutex, &ts); // This unlocks Modes.data_mutex, and waits for Modes.data_cond
|
||||
// Once (Modes.data_cond) occurs, it locks Modes.data_mutex
|
||||
}
|
||||
|
||||
// Modes.data_mutex is Locked, and possibly (Modes.iDataReady != 0)
|
||||
// Modes.data_mutex is locked, and possibly we have data.
|
||||
|
||||
// copy out reader CPU time and reset it
|
||||
add_timespecs(&Modes.reader_cpu_accumulator, &Modes.stats_current.reader_cpu, &Modes.stats_current.reader_cpu);
|
||||
Modes.reader_cpu_accumulator.tv_sec = 0;
|
||||
Modes.reader_cpu_accumulator.tv_nsec = 0;
|
||||
|
||||
if (Modes.iDataReady) { // Check we have new data, just in case!!
|
||||
if (Modes.first_free_buffer != Modes.first_filled_buffer) {
|
||||
// FIFO is not empty, process one buffer.
|
||||
|
||||
struct mag_buf *buf;
|
||||
|
||||
start_cpu_timing(&start_time);
|
||||
|
||||
Modes.iDataOut &= (MODES_ASYNC_BUF_NUMBER-1); // Just incase
|
||||
|
||||
// Translate the next lot of I/Q samples into Modes.magnitude
|
||||
computeMagnitudeVector(Modes.pData[Modes.iDataOut]);
|
||||
|
||||
Modes.stSystemTimeBlk = Modes.stSystemTimeRTL[Modes.iDataOut];
|
||||
|
||||
// Update the input buffer pointer queue
|
||||
Modes.iDataOut = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataOut + 1);
|
||||
Modes.iDataReady = (MODES_ASYNC_BUF_NUMBER-1) & (Modes.iDataIn - Modes.iDataOut);
|
||||
|
||||
// If we lost some blocks, correct the timestamp
|
||||
if (Modes.iDataLost) {
|
||||
if (Modes.oversample)
|
||||
Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES * 5 * Modes.iDataLost);
|
||||
else
|
||||
Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES * 6 * Modes.iDataLost);
|
||||
Modes.stats_current.blocks_dropped += Modes.iDataLost;
|
||||
Modes.iDataLost = 0;
|
||||
}
|
||||
|
||||
// It's safe to release the lock now
|
||||
pthread_cond_signal (&Modes.data_cond);
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
buf = &Modes.mag_buffers[Modes.first_filled_buffer];
|
||||
|
||||
// Process data after releasing the lock, so that the capturing
|
||||
// thread can read data while we perform computationally expensive
|
||||
// stuff at the same time.
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
|
||||
if (Modes.oversample)
|
||||
demodulate2400(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES);
|
||||
demodulate2400(buf);
|
||||
else
|
||||
demodulate2000(Modes.magnitude, MODES_ASYNC_BUF_SAMPLES);
|
||||
|
||||
// Update the timestamp ready for the next block
|
||||
if (Modes.oversample)
|
||||
Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*5);
|
||||
else
|
||||
Modes.timestampBlk += (MODES_ASYNC_BUF_SAMPLES*6);
|
||||
Modes.stats_current.blocks_processed++;
|
||||
demodulate2000(buf);
|
||||
|
||||
Modes.stats_current.samples_processed += buf->length;
|
||||
Modes.stats_current.samples_dropped += buf->dropped;
|
||||
end_cpu_timing(&start_time, &Modes.stats_current.demod_cpu);
|
||||
|
||||
// Mark the buffer we just processed as completed.
|
||||
pthread_mutex_lock(&Modes.data_mutex);
|
||||
Modes.first_filled_buffer = (Modes.first_filled_buffer + 1) % MODES_MAG_BUFFERS;
|
||||
pthread_cond_signal(&Modes.data_cond);
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
} else {
|
||||
pthread_cond_signal (&Modes.data_cond);
|
||||
// Nothing to process this time around.
|
||||
pthread_mutex_unlock(&Modes.data_mutex);
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue