2026-04-19 01:52:37 +02:00
#
# Rhythm analysis from songs.
#
# Provides the beat timing from the audio signal.
#
# Cheng, Hart and Walker 2009: Time-frequency Analysis of Musical Rhythm
# Smith 2000: A Multiresolution Time-Frequency Analysis And Interpretation of Musical Rhythm
# PERF:
# "9x" = 16 sec runtime for 2:27 min. song
# - _scalogram() is slowest
# - we could try downsampling the song, and reducing FFT window size W
import numpy as np
from numpy . fft import fft
from scipy . signal import fftconvolve
from scipy . io import wavfile
def viterbi_highest_frequency_path_vectorized ( Scp2 , jump_penalty = 2.0 , use_log_amplitude = True ) :
Scp2 = np . asarray ( Scp2 , dtype = float )
n_freqs , n_frames = Scp2 . shape
if use_log_amplitude :
emission = np . log ( Scp2 + 1e-12 )
else :
emission = Scp2 . copy ( )
dp = np . full ( ( n_freqs , n_frames ) , - np . inf )
backptr = np . full ( ( n_freqs , n_frames ) , - 1 , dtype = int )
dp [ : , 0 ] = emission [ : , 0 ]
idx = np . arange ( n_freqs )
penalty = jump_penalty * np . abs ( idx [ : , None ] - idx [ None , : ] ) # [curr, prev]
for t in range ( 1 , n_frames ) :
scores = dp [ : , t - 1 ] [ None , : ] - penalty # shape (curr, prev)
backptr [ : , t ] = np . argmax ( scores , axis = 1 )
dp [ : , t ] = scores [ np . arange ( n_freqs ) , backptr [ : , t ] ] + emission [ : , t ]
path = np . zeros ( n_frames , dtype = int )
path [ - 1 ] = np . argmax ( dp [ : , - 1 ] )
for t in range ( n_frames - 1 , 0 , - 1 ) :
path [ t - 1 ] = backptr [ path [ t ] , t ]
return path , dp , backptr
def rle ( inarray ) :
""" run length encoding. Partial credit to R rle function.
Multi datatype arrays catered for including non Numpy
returns : tuple ( runlengths , startpositions , values ) """
ia = np . asarray ( inarray ) # force numpy
n = len ( ia )
if n == 0 :
return ( None , None , None )
else :
y = ia [ 1 : ] != ia [ : - 1 ] # pairwise unequal (string safe)
i = np . append ( np . where ( y ) , n - 1 ) # must include last element posi
z = np . diff ( np . append ( - 1 , i ) ) # run lengths
p = np . cumsum ( np . append ( 0 , z ) ) [ : - 1 ] # positions
return ( z , p , ia [ i ] )
def zero_rl ( a ) :
""" :returns: lengths of 0-intervals """
z , p , aa = rle ( a )
idxs = np . where ( aa == 0 ) [ 0 ]
return z [ idxs ]
def gabor_wavelet ( omega , nu , fs , T , tt = None ) :
"""
: param omega : width parameter
: param nu : frequency parameter
: param fs : sampling frequency
: param T : output length
: param tt : time - transforming function
: returns : complex - valued Gabor wavelet
"""
t = ( np . linspace ( 0 , T - 1 , T ) - T / / 2 ) / fs # -T/2: center in window of length T
if tt is not None : t = tt ( t )
psi = 1.0 / np . sqrt ( omega ) * np . exp ( - np . pi * ( t / omega ) * * 2 ) * np . exp ( 1 j * 2 * np . pi * nu * t / omega )
return psi
class BassAnalyzer :
"""
Rhythm analysis from songs .
Provides a beat amplitude signal from the audio signal .
2026-04-19 02:05:29 +02:00
Performs short - time Continuous Wavelet Transform on the signal ,
with log - spaced frequencies .
This allows a fine - grained resolution of very low frequencies ,
as opposed to an STFT where frequency resolution
would be limited by its window size .
Then , we perform Viterbi decoding to find the highest - power frequency at each window .
2026-04-19 01:52:37 +02:00
References :
* Cheng , Hart and Walker 2009 : Time - frequency Analysis of Musical Rhythm
* Smith 2000 : A Multiresolution Time - Frequency Analysis And Interpretation of Musical Rhythm
"""
W = 1024 #: window size (must be even, so that right padding W/2-1 works)
shift_sec = 0.008 #: window shift in sec ('delta_tau') between subsequent windows
wavelet_win_sec = 0.175
k_omega , k_nu = 0.12 , 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B')
viterbi_jump_penalty = 5000.0
def __init__ ( self , fs , sig ) :
"""
: param fs : sampling rate
: param sig : audio signal normalized to [ - 1 , 1 ]
"""
self . D = int ( self . shift_sec * fs ) #: spectrogram step
self . Wp = int ( np . round ( self . wavelet_win_sec * fs / self . W ) * self . W ) # wavelet window - make it an integer multiple of FFT window
self . U = self . Wp / / self . W # ratio
self . f = np . pad ( sig , ( self . W / / 2 , self . W / / 2 - 1 ) ) #: signal padded (W-FFT to determine scalogram parameters)
self . f2 = np . pad ( sig , ( self . Wp / / 2 , self . Wp / / 2 - 1 ) ) #: signal padded (Wp-FFT to apply wavelet transform)
self . L = self . f . shape [ 0 ]
self . M = ( self . L - self . W ) / / self . D + 1 #: number of time steps
self . fs = fs
def viterbi_wavelet_scalogram_amplitudes ( self ) :
"""
Compute scalogram amplitudes from Viterbi path of highest - power frequencies .
NOTE : downsampled from the original ' fs ' .
: returns : ( fsd , sig ) : sampling rate , amplitude signal
"""
Spf = self . _spectrogram ( )
pto = self . _pulse_train ( Spf )
Spf2 = self . _spectrogram_2 ( )
pms = self . _scalogram_params ( pto )
Spsi_ss = self . _scalogram_wavelets ( pms )
Scp2 = self . _scalogram ( Spf2 , Spsi_ss )
path = self . _viterbi_path ( Scp2 )
ampl = self . _viterbi_ampl ( Scp2 , path )
return ampl
def _spectrogram ( self ) :
""" W-FFT (STFTs) to determine scalogram parameters """
# *f
# M, W, D
f = self . f
M , W , D = self . M , self . W , self . D
#
# compute spectrogram: 'Spf' (M x W) <- from 'f'
#
iwss = np . linspace ( W / / 2 , W / / 2 + ( M - 1 ) * D , M , dtype = int ) # 'D'-spaced start time indices of windows on 's'
Spf = np . zeros ( ( M , W ) , dtype = np . complex128 )
for i , iw in zip ( range ( M ) , iwss ) :
iws , iwe = iw - W / / 2 , iw + W / / 2
Spf [ i , : ] = fft ( f [ iws : iwe ] )
return Spf
def _pulse_train ( self , Spf ) :
# *Spf
# fs, M, L, W, Wp, shift_sec
fs , M , L , W , Wp , shift_sec = self . fs , self . M , self . L , self . W , self . Wp , self . shift_sec
# compute parameters for scalogram
g = np . abs ( Spf ) # (M x W)
g_bar = np . mean ( g , axis = 1 ) # (M)
# TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const.
A = np . mean ( g_bar ) # amplitude cutoff for pulse train
# compute transitions (pulse train)
pt = ( g_bar > A ) . astype ( int ) # pulse train
pt_re = ( np . diff ( pt ) == 1 ) . astype ( int ) # rising edge
self . B = max ( np . sum ( pt_re ) , 1 ) # total number of pulses in the 'pt' pulse train signal
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
squashed_idxs = np . floor ( np . linspace ( 0 , L - 1 , L ) * ( M / L ) ) . astype ( int )
ptr = pt [ squashed_idxs ]
fsp = fs
# ptr
Dp = int ( shift_sec * fsp ) #: pt-spectrogram step
pto = ptr [ W / / 2 : - ( W / / 2 - 1 ) ] #: 'ptr' signal without padding
ptrp = np . pad ( pto , ( Wp / / 2 , Wp / / 2 - 1 ) ) #: 'ptr' signal re-padded
Lp = ptrp . shape [ 0 ]
Mp = ( Lp - Wp ) / / Dp + 1
self . Dp , self . Lp , self . Mp = Dp , Lp , Mp
return pto
def _spectrogram_2 ( self ) :
""" Wp-FFT (STFTs) to apply wavelet transform """
# *f2
# Mp
f2 = self . f2
Wp , Mp , Dp = self . Wp , self . Mp , self . Dp
#
# compute spectrogram: 'Spf2' (M x Wp) <- from 'f'
#
iwss2 = np . linspace ( Wp / / 2 , Wp / / 2 + ( Mp - 1 ) * Dp , Mp , dtype = int ) # 'D'-spaced start time indices of windows on 's'
Spf2 = np . zeros ( ( Mp , Wp ) , dtype = np . complex128 )
for i , iw in zip ( range ( Mp ) , iwss2 ) :
iws , iwe = iw - Wp / / 2 , iw + Wp / / 2
Spf2 [ i , : ] = fft ( f2 [ iws : iwe ] )
return Spf2
def _scalogram_params ( self , pto ) :
Lp , Wp , B = self . Lp , self . Wp , self . B
fsp = self . fs
k_omega , k_nu = self . k_omega , self . k_nu
# more parameters for scalogram
# TODO: check if 'delta' needs smoothing before we compute it (check: 'delta' should not be too small wrt. beat spacing)
# TODO: check if 'zl' with resampling needs rescaling
#zl = min((zero_rl(pto), pto.shape[0] / 2)) # min: limit min-space at least to 1 pulse
zl = zero_rl ( pto )
delta = np . min ( zl ) / fsp # minimum length of 0-interval (minimum space between two successive pulses)
# magic, see equation (12) from paper
p = 4 * np . sqrt ( np . pi )
T = ( Lp - Wp ) / fsp # un-padded sample count -> time length
#omega = p * T / B # width parameter of Gabor wavelet
#nu = B / (p * T) # frequency parameter of Gabor wavelet
I = int ( np . log2 ( p * * 2 * T * * 2 / ( delta * B * * 2 ) ) - 3 / 2 ) # number of octaves
J = int ( 256 / I ) # number of voices per octave
r = np . linspace ( 0 , I * J - 1 , I * J )
ss = np . array ( [ 2 ] ) * * ( - r / J ) # scales of wavelet
# we need to adjust the frequency range. (not sure why?)
# - maybe due to resampling:
# - we use orig fs and analyze orig signal, instead of 'pt' and its sampling rate
omega , nu = k_omega * ( p * T / B ) , k_nu * B / ( p * T )
freqs = np . array ( [ nu / ( omega * ss [ y ] ) for y in np . arange ( I * J ) ] )
self . T = T
return ss , omega , nu , fsp , Wp , I , J , freqs
def _scalogram_wavelets ( self , pms ) :
# ss, omega, nu, fsp, Wp, I, J
ss , omega , nu , fsp , Wp , I , J , freqs = pms
# compute frequency-domain wavelets
#Scp = np.zeros((Mp, I*J), dtype=np.complex128)
psi_ss = np . zeros ( ( I * J , Wp ) , dtype = np . complex128 )
for i , s in zip ( range ( I * J ) , ss ) :
psi_ss [ i , : ] = 1.0 / np . sqrt ( s ) * np . conj ( gabor_wavelet ( omega , nu , fsp , Wp , tt = lambda t : t / s ) )
Spsi_ss = fft ( psi_ss , axis = 1 )
return Spsi_ss
def _scalogram ( self , Spf2 , Spsi_ss ) :
# *Spf2, Spsi_ss
# T, Lp, Wp
T , Lp , Wp = self . T , self . Lp , self . Wp
# compute convolution with wavelets, by multiplication in freq-domain
# 'Scp2' (M x I*J)
Scp2 = np . matmul ( Spf2 , Spsi_ss . T ) * ( T / ( Lp - Wp ) )
return Scp2
def _viterbi_path ( self , Scp2 ) :
# TODO: check if we should re-weight freq-jumps, because of log-scale frequencies
path , dp , backptr = viterbi_highest_frequency_path_vectorized (
( np . abs ( Scp2 ) * * 2 ) . T ,
jump_penalty = self . viterbi_jump_penalty ,
#max_jump=20,
use_log_amplitude = False ,
)
return path
def _viterbi_ampl ( self , Scp2 , path ) :
max_amplitudes = np . array ( [ np . abs ( Scp2 [ i , path [ i ] ] ) for i in range ( Scp2 . shape [ 0 ] ) ] )
return max_amplitudes