From 48d640f69618b5b95462cf085748ffdef9682bcc Mon Sep 17 00:00:00 2001 From: Chris Cannam Date: Mon, 26 Nov 2007 11:00:47 +0000 Subject: [PATCH] * fix a threading issue --- TODO | 2 +- src/StretcherImpl.cpp | 3 +- src/StretcherImpl.h | 2 +- src/StretcherProcess.cpp | 33 +++++++++----- src/Thread.cpp | 99 +++++++++++++++++++++++++++++++++++++++- src/Thread.h | 2 + src/sysutils.cpp | 1 - 7 files changed, 125 insertions(+), 17 deletions(-) diff --git a/TODO b/TODO index fcb1814..4562425 100644 --- a/TODO +++ b/TODO @@ -1,5 +1,4 @@ -* Threading lock structure is very slow if it becomes starved of CPUs * Fix "!!!" points * LADSPA plugin has too much "artificial latency" * LADSPA plugin probably doesn't want to go any higher than about +2 octaves @@ -7,6 +6,7 @@ * sweeps & tones * ensure default options don't produce garbage at any extreme +DONE * Threading lock structure is very slow if it becomes starved of CPUs DONE * Rationalise naming further (e.g. use of "lock" for both peak phases and resynchronisation at transients; use of block vs chunk vs frame vs window) diff --git a/src/StretcherImpl.cpp b/src/StretcherImpl.cpp index f3c5d3e..6309ab1 100644 --- a/src/StretcherImpl.cpp +++ b/src/StretcherImpl.cpp @@ -945,7 +945,8 @@ RubberBandStretcher::Impl::process(const float *const *input, size_t samples, bo // cerr << "process: happy with channel " << c << endl; } if (!m_threaded && !m_realtime) { - processChunks(c); + bool any = false, last = false; + processChunks(c, any, last); } } diff --git a/src/StretcherImpl.h b/src/StretcherImpl.h index 19d996a..762d419 100644 --- a/src/StretcherImpl.h +++ b/src/StretcherImpl.h @@ -87,7 +87,7 @@ protected: size_t m_channels; size_t consumeChannel(size_t channel, const float *input, size_t samples); - bool processChunks(size_t channel); // returns "last" + void processChunks(size_t channel, bool &any, bool &last); bool processOneChunk(); // across all channels, for real time use bool processChunkForChannel(size_t channel, size_t phaseIncrement, size_t shiftIncrement, bool phaseReset); diff --git a/src/StretcherProcess.cpp b/src/StretcherProcess.cpp index 85b1421..ea38557 100644 --- a/src/StretcherProcess.cpp +++ b/src/StretcherProcess.cpp @@ -47,19 +47,23 @@ RubberBandStretcher::Impl::ProcessThread::run() << ", readSpace == " << cd.inbuf->getReadSpace() << endl; } - bool last = m_s->processChunks(m_channel); - - m_s->m_spaceAvailable.signal(); + bool any = false, last = false; + m_s->processChunks(m_channel, any, last); if (last) break; + if (any) m_s->m_spaceAvailable.signal(); + m_s->m_dataAvailable.lock(); - if (cd.inbuf->getReadSpace() == 0) { + if (!m_s->testInbufReadSpace(m_channel)) { m_s->m_dataAvailable.wait(); + } else { + m_s->m_dataAvailable.unlock(); } } - m_s->processChunks(m_channel); + bool any = false, last = false; + m_s->processChunks(m_channel, any, last); m_s->m_spaceAvailable.signal(); if (m_s->m_debugLevel > 1) { @@ -67,8 +71,8 @@ RubberBandStretcher::Impl::ProcessThread::run() } } -bool -RubberBandStretcher::Impl::processChunks(size_t c) +void +RubberBandStretcher::Impl::processChunks(size_t c, bool &any, bool &last) { // Process as many chunks as there are available on the input // buffer for channel c. This requires that the increments have @@ -76,11 +80,17 @@ RubberBandStretcher::Impl::processChunks(size_t c) ChannelData &cd = *m_channelData[c]; - bool last = false; + last = false; + any = false; while (!last) { - if (!testInbufReadSpace(c)) break; + if (!testInbufReadSpace(c)) { +// cerr << "not enough input" << endl; + break; + } + + any = true; if (!cd.draining) { size_t got = cd.inbuf->peek(cd.fltbuf, m_windowSize); @@ -99,8 +109,6 @@ RubberBandStretcher::Impl::processChunks(size_t c) cerr << "channel " << c << ": last = " << last << ", chunkCount = " << cd.chunkCount << endl; } } - - return last; } bool @@ -840,7 +848,8 @@ RubberBandStretcher::Impl::available() const // cerr << "calling processChunks(" << c << ") from available" << endl; //!!! do we ever actually do this? if so, this method should not be const // ^^^ yes, we do sometimes -- e.g. when fed a very short file - ((RubberBandStretcher::Impl *)this)->processChunks(c); + bool any = false, last = false; + ((RubberBandStretcher::Impl *)this)->processChunks(c, any, last); } } } diff --git a/src/Thread.cpp b/src/Thread.cpp index d096cb6..b985597 100644 --- a/src/Thread.cpp +++ b/src/Thread.cpp @@ -19,6 +19,10 @@ #include #include +//#define DEBUG_THREAD 1 +//#define DEBUG_MUTEX 1 +//#define DEBUG_CONDITION 1 + using std::cerr; using std::endl; @@ -30,13 +34,22 @@ Thread::Thread() : m_id(0), m_extant(false) { +#ifdef DEBUG_THREAD + cerr << "THREAD DEBUG: Created thread object " << this << endl; +#endif } Thread::~Thread() { +#ifdef DEBUG_THREAD + cerr << "THREAD DEBUG: Destroying thread object " << this << ", id " << m_id << endl; +#endif if (m_extant) { pthread_join(m_id, 0); } +#ifdef DEBUG_THREAD + cerr << "THREAD DEBUG: Destroyed thread object " << this << endl; +#endif } void @@ -47,6 +60,9 @@ Thread::start() cerr << "ERROR: thread creation failed" << endl; exit(1); } else { +#ifdef DEBUG_THREAD + cerr << "THREAD DEBUG: Created thread " << m_id << " for thread object " << this << endl; +#endif m_extant = true; } } @@ -55,7 +71,13 @@ void Thread::wait() { if (m_extant) { +#ifdef DEBUG_THREAD + cerr << "THREAD DEBUG: Waiting on thread " << m_id << " for thread object " << this << endl; +#endif pthread_join(m_id, 0); +#ifdef DEBUG_THREAD + cerr << "THREAD DEBUG: Waited on thread " << m_id << " for thread object " << this << endl; +#endif m_extant = false; } } @@ -76,6 +98,9 @@ void * Thread::staticRun(void *arg) { Thread *thread = (Thread *)arg; +#ifdef DEBUG_THREAD + cerr << "THREAD DEBUG: " << (void *)pthread_self() << ": Running thread " << thread->m_id << " for thread object " << thread << endl; +#endif thread->run(); return 0; } @@ -83,22 +108,42 @@ Thread::staticRun(void *arg) Mutex::Mutex() { pthread_mutex_init(&m_mutex, 0); +#ifdef DEBUG_MUTEX + cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Initialised mutex " << &m_mutex << endl; +#endif } Mutex::~Mutex() { +#ifdef DEBUG_MUTEX + cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Destroying mutex " << &m_mutex << endl; +#endif pthread_mutex_destroy(&m_mutex); } void Mutex::lock() { + if (m_locked) { + cerr << "ERROR: Deadlock on mutex " << &m_mutex << endl; + } +#ifdef DEBUG_MUTEX + cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Want to lock mutex " << &m_mutex << endl; +#endif pthread_mutex_lock(&m_mutex); + m_locked = true; +#ifdef DEBUG_MUTEX + cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Locked mutex " << &m_mutex << endl; +#endif } void Mutex::unlock() { +#ifdef DEBUG_MUTEX + cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Unlocking mutex " << &m_mutex << endl; +#endif + m_locked = false; pthread_mutex_unlock(&m_mutex); } @@ -106,8 +151,15 @@ bool Mutex::trylock() { if (pthread_mutex_trylock(&m_mutex)) { +#ifdef DEBUG_MUTEX + cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Mutex " << &m_mutex << " unavailable" << endl; +#endif return false; } else { + m_locked = true; +#ifdef DEBUG_MUTEX + cerr << "MUTEX DEBUG: " << (void *)pthread_self() << ": Locked mutex " << &m_mutex << " (from trylock)" << endl; +#endif return true; } } @@ -117,10 +169,16 @@ Condition::Condition() pthread_mutex_init(&m_mutex, 0); m_locked = false; pthread_cond_init(&m_condition, 0); +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Initialised condition " << &m_condition << endl; +#endif } Condition::~Condition() { +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Destroying condition " << &m_condition << endl; +#endif if (m_locked) pthread_mutex_unlock(&m_mutex); pthread_cond_destroy(&m_condition); pthread_mutex_destroy(&m_mutex); @@ -129,9 +187,36 @@ Condition::~Condition() void Condition::lock() { - if (m_locked) return; + if (m_locked) { +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Already locked " << &m_condition << endl; +#endif + return; + } +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Want to lock " << &m_condition << endl; +#endif pthread_mutex_lock(&m_mutex); m_locked = true; +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Locked " << &m_condition << endl; +#endif +} + +void +Condition::unlock() +{ + if (!m_locked) { +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Not locked " << &m_condition << endl; +#endif + return; + } +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Unlocking " << &m_condition << endl; +#endif + m_locked = false; + pthread_mutex_unlock(&m_mutex); } void @@ -140,6 +225,9 @@ Condition::wait(int us) lock(); if (us == 0) { +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Waiting on " << &m_condition << endl; +#endif pthread_cond_wait(&m_condition, &m_mutex); } else { @@ -157,9 +245,15 @@ Condition::wait(int us) timeout.tv_sec = now.tv_sec; timeout.tv_nsec = now.tv_usec * 1000; +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Timed waiting on " << &m_condition << endl; +#endif pthread_cond_timedwait(&m_condition, &m_mutex, &timeout); } +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Wait done on " << &m_condition << endl; +#endif pthread_mutex_unlock(&m_mutex); m_locked = false; } @@ -167,6 +261,9 @@ Condition::wait(int us) void Condition::signal() { +#ifdef DEBUG_CONDITION + cerr << "CONDITION DEBUG: " << (void *)pthread_self() << ": Signalling " << &m_condition << endl; +#endif pthread_cond_signal(&m_condition); } diff --git a/src/Thread.h b/src/Thread.h index a58c9f4..282992d 100644 --- a/src/Thread.h +++ b/src/Thread.h @@ -56,6 +56,7 @@ public: private: pthread_mutex_t m_mutex; + bool m_locked; }; class MutexLocker @@ -75,6 +76,7 @@ public: ~Condition(); void lock(); + void unlock(); void wait(int us = 0); void signal(); diff --git a/src/sysutils.cpp b/src/sysutils.cpp index 784e455..6ac481a 100644 --- a/src/sysutils.cpp +++ b/src/sysutils.cpp @@ -25,7 +25,6 @@ system_is_multiprocessor() static bool tested = false, mp = false; if (tested) return mp; - //...