* fix a threading issue

This commit is contained in:
Chris Cannam
2007-11-26 11:00:47 +00:00
parent 89ce4223ee
commit 48d640f696
7 changed files with 125 additions and 17 deletions

2
TODO
View File

@@ -1,5 +1,4 @@
* Threading lock structure is very slow if it becomes starved of CPUs
* Fix "!!!" points * Fix "!!!" points
* LADSPA plugin has too much "artificial latency" * LADSPA plugin has too much "artificial latency"
* LADSPA plugin probably doesn't want to go any higher than about +2 octaves * LADSPA plugin probably doesn't want to go any higher than about +2 octaves
@@ -7,6 +6,7 @@
* sweeps & tones * sweeps & tones
* ensure default options don't produce garbage at any extreme * ensure default options don't produce garbage at any extreme
DONE * Threading lock structure is very slow if it becomes starved of CPUs
DONE * Rationalise naming further (e.g. use of "lock" for both peak phases DONE * Rationalise naming further (e.g. use of "lock" for both peak phases
and resynchronisation at transients; use of block vs chunk vs frame and resynchronisation at transients; use of block vs chunk vs frame
vs window) vs window)

View File

@@ -945,7 +945,8 @@ RubberBandStretcher::Impl::process(const float *const *input, size_t samples, bo
// cerr << "process: happy with channel " << c << endl; // cerr << "process: happy with channel " << c << endl;
} }
if (!m_threaded && !m_realtime) { if (!m_threaded && !m_realtime) {
processChunks(c); bool any = false, last = false;
processChunks(c, any, last);
} }
} }

View File

@@ -87,7 +87,7 @@ protected:
size_t m_channels; size_t m_channels;
size_t consumeChannel(size_t channel, const float *input, size_t samples); size_t consumeChannel(size_t channel, const float *input, size_t samples);
bool processChunks(size_t channel); // returns "last" void processChunks(size_t channel, bool &any, bool &last);
bool processOneChunk(); // across all channels, for real time use bool processOneChunk(); // across all channels, for real time use
bool processChunkForChannel(size_t channel, size_t phaseIncrement, bool processChunkForChannel(size_t channel, size_t phaseIncrement,
size_t shiftIncrement, bool phaseReset); size_t shiftIncrement, bool phaseReset);

View File

@@ -47,19 +47,23 @@ RubberBandStretcher::Impl::ProcessThread::run()
<< ", readSpace == " << cd.inbuf->getReadSpace() << endl; << ", readSpace == " << cd.inbuf->getReadSpace() << endl;
} }
bool last = m_s->processChunks(m_channel); bool any = false, last = false;
m_s->processChunks(m_channel, any, last);
m_s->m_spaceAvailable.signal();
if (last) break; if (last) break;
if (any) m_s->m_spaceAvailable.signal();
m_s->m_dataAvailable.lock(); m_s->m_dataAvailable.lock();
if (cd.inbuf->getReadSpace() == 0) { if (!m_s->testInbufReadSpace(m_channel)) {
m_s->m_dataAvailable.wait(); m_s->m_dataAvailable.wait();
} else {
m_s->m_dataAvailable.unlock();
} }
} }
m_s->processChunks(m_channel); bool any = false, last = false;
m_s->processChunks(m_channel, any, last);
m_s->m_spaceAvailable.signal(); m_s->m_spaceAvailable.signal();
if (m_s->m_debugLevel > 1) { if (m_s->m_debugLevel > 1) {
@@ -67,8 +71,8 @@ RubberBandStretcher::Impl::ProcessThread::run()
} }
} }
bool void
RubberBandStretcher::Impl::processChunks(size_t c) RubberBandStretcher::Impl::processChunks(size_t c, bool &any, bool &last)
{ {
// Process as many chunks as there are available on the input // Process as many chunks as there are available on the input
// buffer for channel c. This requires that the increments have // buffer for channel c. This requires that the increments have
@@ -76,11 +80,17 @@ RubberBandStretcher::Impl::processChunks(size_t c)
ChannelData &cd = *m_channelData[c]; ChannelData &cd = *m_channelData[c];
bool last = false; last = false;
any = false;
while (!last) { while (!last) {
if (!testInbufReadSpace(c)) break; if (!testInbufReadSpace(c)) {
// cerr << "not enough input" << endl;
break;
}
any = true;
if (!cd.draining) { if (!cd.draining) {
size_t got = cd.inbuf->peek(cd.fltbuf, m_windowSize); size_t got = cd.inbuf->peek(cd.fltbuf, m_windowSize);
@@ -99,8 +109,6 @@ RubberBandStretcher::Impl::processChunks(size_t c)
cerr << "channel " << c << ": last = " << last << ", chunkCount = " << cd.chunkCount << endl; cerr << "channel " << c << ": last = " << last << ", chunkCount = " << cd.chunkCount << endl;
} }
} }
return last;
} }
bool bool
@@ -840,7 +848,8 @@ RubberBandStretcher::Impl::available() const
// cerr << "calling processChunks(" << c << ") from available" << endl; // cerr << "calling processChunks(" << c << ") from available" << endl;
//!!! do we ever actually do this? if so, this method should not be const //!!! do we ever actually do this? if so, this method should not be const
// ^^^ yes, we do sometimes -- e.g. when fed a very short file // ^^^ yes, we do sometimes -- e.g. when fed a very short file
((RubberBandStretcher::Impl *)this)->processChunks(c); bool any = false, last = false;
((RubberBandStretcher::Impl *)this)->processChunks(c, any, last);
} }
} }
} }

View File

@@ -19,6 +19,10 @@
#include <sys/time.h> #include <sys/time.h>
#include <time.h> #include <time.h>
//#define DEBUG_THREAD 1
//#define DEBUG_MUTEX 1
//#define DEBUG_CONDITION 1
using std::cerr; using std::cerr;
using std::endl; using std::endl;
@@ -30,13 +34,22 @@ Thread::Thread() :
m_id(0), m_id(0),
m_extant(false) m_extant(false)
{ {
#ifdef DEBUG_THREAD
cerr << "THREAD DEBUG: Created thread object " << this << endl;
#endif
} }
Thread::~Thread() Thread::~Thread()
{ {
#ifdef DEBUG_THREAD
cerr << "THREAD DEBUG: Destroying thread object " << this << ", id " << m_id << endl;
#endif
if (m_extant) { if (m_extant) {
pthread_join(m_id, 0); pthread_join(m_id, 0);
} }
#ifdef DEBUG_THREAD
cerr << "THREAD DEBUG: Destroyed thread object " << this << endl;
#endif
} }
void void
@@ -47,6 +60,9 @@ Thread::start()
cerr << "ERROR: thread creation failed" << endl; cerr << "ERROR: thread creation failed" << endl;
exit(1); exit(1);
} else { } else {
#ifdef DEBUG_THREAD
cerr << "THREAD DEBUG: Created thread " << m_id << " for thread object " << this << endl;
#endif
m_extant = true; m_extant = true;
} }
} }
@@ -55,7 +71,13 @@ void
Thread::wait() Thread::wait()
{ {
if (m_extant) { if (m_extant) {
#ifdef DEBUG_THREAD
cerr << "THREAD DEBUG: Waiting on thread " << m_id << " for thread object " << this << endl;
#endif
pthread_join(m_id, 0); pthread_join(m_id, 0);
#ifdef DEBUG_THREAD
cerr << "THREAD DEBUG: Waited on thread " << m_id << " for thread object " << this << endl;
#endif
m_extant = false; m_extant = false;
} }
} }
@@ -76,6 +98,9 @@ void *
Thread::staticRun(void *arg) Thread::staticRun(void *arg)
{ {
Thread *thread = (Thread *)arg; Thread *thread = (Thread *)arg;
#ifdef DEBUG_THREAD
cerr << "THREAD DEBUG: " << (void *)pthread_self() << ": Running thread " << thread->m_id << " for thread object " << thread << endl;
#endif
thread->run(); thread->run();
return 0; return 0;
} }
@@ -83,22 +108,42 @@ Thread::staticRun(void *arg)
Mutex::Mutex() Mutex::Mutex()
{ {
pthread_mutex_init(&m_mutex, 0); pthread_mutex_init(&m_mutex, 0);
#ifdef DEBUG_MUTEX
cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Initialised mutex " << &m_mutex << endl;
#endif
} }
Mutex::~Mutex() Mutex::~Mutex()
{ {
#ifdef DEBUG_MUTEX
cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Destroying mutex " << &m_mutex << endl;
#endif
pthread_mutex_destroy(&m_mutex); pthread_mutex_destroy(&m_mutex);
} }
void void
Mutex::lock() Mutex::lock()
{ {
if (m_locked) {
cerr << "ERROR: Deadlock on mutex " << &m_mutex << endl;
}
#ifdef DEBUG_MUTEX
cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Want to lock mutex " << &m_mutex << endl;
#endif
pthread_mutex_lock(&m_mutex); pthread_mutex_lock(&m_mutex);
m_locked = true;
#ifdef DEBUG_MUTEX
cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Locked mutex " << &m_mutex << endl;
#endif
} }
void void
Mutex::unlock() Mutex::unlock()
{ {
#ifdef DEBUG_MUTEX
cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Unlocking mutex " << &m_mutex << endl;
#endif
m_locked = false;
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
} }
@@ -106,8 +151,15 @@ bool
Mutex::trylock() Mutex::trylock()
{ {
if (pthread_mutex_trylock(&m_mutex)) { if (pthread_mutex_trylock(&m_mutex)) {
#ifdef DEBUG_MUTEX
cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Mutex " << &m_mutex << " unavailable" << endl;
#endif
return false; return false;
} else { } else {
m_locked = true;
#ifdef DEBUG_MUTEX
cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Locked mutex " << &m_mutex << " (from trylock)" << endl;
#endif
return true; return true;
} }
} }
@@ -117,10 +169,16 @@ Condition::Condition()
pthread_mutex_init(&m_mutex, 0); pthread_mutex_init(&m_mutex, 0);
m_locked = false; m_locked = false;
pthread_cond_init(&m_condition, 0); pthread_cond_init(&m_condition, 0);
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Initialised condition " << &m_condition << endl;
#endif
} }
Condition::~Condition() Condition::~Condition()
{ {
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Destroying condition " << &m_condition << endl;
#endif
if (m_locked) pthread_mutex_unlock(&m_mutex); if (m_locked) pthread_mutex_unlock(&m_mutex);
pthread_cond_destroy(&m_condition); pthread_cond_destroy(&m_condition);
pthread_mutex_destroy(&m_mutex); pthread_mutex_destroy(&m_mutex);
@@ -129,9 +187,36 @@ Condition::~Condition()
void void
Condition::lock() Condition::lock()
{ {
if (m_locked) return; if (m_locked) {
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Already locked " << &m_condition << endl;
#endif
return;
}
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Want to lock " << &m_condition << endl;
#endif
pthread_mutex_lock(&m_mutex); pthread_mutex_lock(&m_mutex);
m_locked = true; m_locked = true;
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Locked " << &m_condition << endl;
#endif
}
void
Condition::unlock()
{
if (!m_locked) {
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Not locked " << &m_condition << endl;
#endif
return;
}
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Unlocking " << &m_condition << endl;
#endif
m_locked = false;
pthread_mutex_unlock(&m_mutex);
} }
void void
@@ -140,6 +225,9 @@ Condition::wait(int us)
lock(); lock();
if (us == 0) { if (us == 0) {
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Waiting on " << &m_condition << endl;
#endif
pthread_cond_wait(&m_condition, &m_mutex); pthread_cond_wait(&m_condition, &m_mutex);
} else { } else {
@@ -157,9 +245,15 @@ Condition::wait(int us)
timeout.tv_sec = now.tv_sec; timeout.tv_sec = now.tv_sec;
timeout.tv_nsec = now.tv_usec * 1000; timeout.tv_nsec = now.tv_usec * 1000;
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Timed waiting on " << &m_condition << endl;
#endif
pthread_cond_timedwait(&m_condition, &m_mutex, &timeout); pthread_cond_timedwait(&m_condition, &m_mutex, &timeout);
} }
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Wait done on " << &m_condition << endl;
#endif
pthread_mutex_unlock(&m_mutex); pthread_mutex_unlock(&m_mutex);
m_locked = false; m_locked = false;
} }
@@ -167,6 +261,9 @@ Condition::wait(int us)
void void
Condition::signal() Condition::signal()
{ {
#ifdef DEBUG_CONDITION
cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Signalling " << &m_condition << endl;
#endif
pthread_cond_signal(&m_condition); pthread_cond_signal(&m_condition);
} }

View File

@@ -56,6 +56,7 @@ public:
private: private:
pthread_mutex_t m_mutex; pthread_mutex_t m_mutex;
bool m_locked;
}; };
class MutexLocker class MutexLocker
@@ -75,6 +76,7 @@ public:
~Condition(); ~Condition();
void lock(); void lock();
void unlock();
void wait(int us = 0); void wait(int us = 0);
void signal(); void signal();

View File

@@ -25,7 +25,6 @@ system_is_multiprocessor()
static bool tested = false, mp = false; static bool tested = false, mp = false;
if (tested) return mp; if (tested) return mp;
//... //...