add: bass beat analysis (bass amplitude signal)
This commit is contained in:
264
rhythm.py
Normal file
264
rhythm.py
Normal file
@@ -0,0 +1,264 @@
|
|||||||
|
#
|
||||||
|
# Rhythm analysis from songs.
|
||||||
|
#
|
||||||
|
# Provides the beat timing from the audio signal.
|
||||||
|
#
|
||||||
|
# Cheng, Hart and Walker 2009: Time-frequency Analysis of Musical Rhythm
|
||||||
|
# Smith 2000: A Multiresolution Time-Frequency Analysis And Interpretation of Musical Rhythm
|
||||||
|
|
||||||
|
# PERF:
|
||||||
|
# "9x" = 16 sec runtime for 2:27 min. song
|
||||||
|
# - _scalogram() is slowest
|
||||||
|
# - we could try downsampling the song, and reducing FFT window size W
|
||||||
|
|
||||||
|
import numpy as np
|
||||||
|
from numpy.fft import fft
|
||||||
|
from scipy.signal import fftconvolve
|
||||||
|
from scipy.io import wavfile
|
||||||
|
|
||||||
|
def viterbi_highest_frequency_path_vectorized(Scp2, jump_penalty=2.0, use_log_amplitude=True):
|
||||||
|
Scp2 = np.asarray(Scp2, dtype=float)
|
||||||
|
n_freqs, n_frames = Scp2.shape
|
||||||
|
|
||||||
|
if use_log_amplitude:
|
||||||
|
emission = np.log(Scp2 + 1e-12)
|
||||||
|
else:
|
||||||
|
emission = Scp2.copy()
|
||||||
|
|
||||||
|
dp = np.full((n_freqs, n_frames), -np.inf)
|
||||||
|
backptr = np.full((n_freqs, n_frames), -1, dtype=int)
|
||||||
|
|
||||||
|
dp[:, 0] = emission[:, 0]
|
||||||
|
|
||||||
|
idx = np.arange(n_freqs)
|
||||||
|
penalty = jump_penalty * np.abs(idx[:, None] - idx[None, :]) # [curr, prev]
|
||||||
|
|
||||||
|
for t in range(1, n_frames):
|
||||||
|
scores = dp[:, t - 1][None, :] - penalty # shape (curr, prev)
|
||||||
|
backptr[:, t] = np.argmax(scores, axis=1)
|
||||||
|
dp[:, t] = scores[np.arange(n_freqs), backptr[:, t]] + emission[:, t]
|
||||||
|
|
||||||
|
path = np.zeros(n_frames, dtype=int)
|
||||||
|
path[-1] = np.argmax(dp[:, -1])
|
||||||
|
for t in range(n_frames - 1, 0, -1):
|
||||||
|
path[t - 1] = backptr[path[t], t]
|
||||||
|
|
||||||
|
return path, dp, backptr
|
||||||
|
|
||||||
|
def rle(inarray):
|
||||||
|
""" run length encoding. Partial credit to R rle function.
|
||||||
|
Multi datatype arrays catered for including non Numpy
|
||||||
|
returns: tuple (runlengths, startpositions, values) """
|
||||||
|
ia = np.asarray(inarray) # force numpy
|
||||||
|
n = len(ia)
|
||||||
|
if n == 0:
|
||||||
|
return (None, None, None)
|
||||||
|
else:
|
||||||
|
y = ia[1:] != ia[:-1] # pairwise unequal (string safe)
|
||||||
|
i = np.append(np.where(y), n - 1) # must include last element posi
|
||||||
|
z = np.diff(np.append(-1, i)) # run lengths
|
||||||
|
p = np.cumsum(np.append(0, z))[:-1] # positions
|
||||||
|
return(z, p, ia[i])
|
||||||
|
|
||||||
|
def zero_rl(a):
|
||||||
|
""":returns: lengths of 0-intervals"""
|
||||||
|
z, p, aa = rle(a)
|
||||||
|
idxs = np.where(aa == 0)[0]
|
||||||
|
return z[idxs]
|
||||||
|
|
||||||
|
def gabor_wavelet(omega, nu, fs, T, tt=None):
|
||||||
|
"""
|
||||||
|
:param omega: width parameter
|
||||||
|
:param nu: frequency parameter
|
||||||
|
:param fs: sampling frequency
|
||||||
|
:param T: output length
|
||||||
|
:param tt: time-transforming function
|
||||||
|
:returns: complex-valued Gabor wavelet
|
||||||
|
"""
|
||||||
|
t = (np.linspace(0, T-1, T) - T//2) / fs # -T/2: center in window of length T
|
||||||
|
if tt is not None: t = tt(t)
|
||||||
|
psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega)
|
||||||
|
return psi
|
||||||
|
|
||||||
|
class BassAnalyzer:
|
||||||
|
"""
|
||||||
|
Rhythm analysis from songs.
|
||||||
|
Provides a beat amplitude signal from the audio signal.
|
||||||
|
|
||||||
|
References:
|
||||||
|
* Cheng, Hart and Walker 2009: Time-frequency Analysis of Musical Rhythm
|
||||||
|
* Smith 2000: A Multiresolution Time-Frequency Analysis And Interpretation of Musical Rhythm
|
||||||
|
"""
|
||||||
|
W = 1024 #: window size (must be even, so that right padding W/2-1 works)
|
||||||
|
shift_sec = 0.008 #: window shift in sec ('delta_tau') between subsequent windows
|
||||||
|
wavelet_win_sec = 0.175
|
||||||
|
k_omega, k_nu = 0.12, 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B')
|
||||||
|
viterbi_jump_penalty = 5000.0
|
||||||
|
|
||||||
|
def __init__(self, fs, sig):
|
||||||
|
"""
|
||||||
|
:param fs: sampling rate
|
||||||
|
:param sig: audio signal normalized to [-1,1]
|
||||||
|
"""
|
||||||
|
self.D = int(self.shift_sec * fs) #: spectrogram step
|
||||||
|
self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window
|
||||||
|
self.U = self.Wp // self.W # ratio
|
||||||
|
|
||||||
|
self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters)
|
||||||
|
self.f2 = np.pad(sig, (self.Wp//2, self.Wp//2-1)) #: signal padded (Wp-FFT to apply wavelet transform)
|
||||||
|
|
||||||
|
self.L = self.f.shape[0]
|
||||||
|
self.M = (self.L-self.W) // self.D + 1 #: number of time steps
|
||||||
|
self.fs = fs
|
||||||
|
|
||||||
|
def viterbi_wavelet_scalogram_amplitudes(self):
|
||||||
|
"""
|
||||||
|
Compute scalogram amplitudes from Viterbi path of highest-power frequencies.
|
||||||
|
NOTE: downsampled from the original 'fs'.
|
||||||
|
:returns: (fsd, sig): sampling rate, amplitude signal
|
||||||
|
"""
|
||||||
|
Spf = self._spectrogram()
|
||||||
|
pto = self._pulse_train(Spf)
|
||||||
|
Spf2 = self._spectrogram_2()
|
||||||
|
pms = self._scalogram_params(pto)
|
||||||
|
Spsi_ss = self._scalogram_wavelets(pms)
|
||||||
|
Scp2 = self._scalogram(Spf2, Spsi_ss)
|
||||||
|
path = self._viterbi_path(Scp2)
|
||||||
|
ampl = self._viterbi_ampl(Scp2, path)
|
||||||
|
return ampl
|
||||||
|
|
||||||
|
def _spectrogram(self):
|
||||||
|
"""W-FFT (STFTs) to determine scalogram parameters"""
|
||||||
|
# *f
|
||||||
|
# M, W, D
|
||||||
|
f = self.f
|
||||||
|
M, W, D = self.M, self.W, self.D
|
||||||
|
|
||||||
|
#
|
||||||
|
# compute spectrogram: 'Spf' (M x W) <- from 'f'
|
||||||
|
#
|
||||||
|
iwss = np.linspace(W//2, W//2 + (M-1)*D, M, dtype=int) # 'D'-spaced start time indices of windows on 's'
|
||||||
|
Spf = np.zeros((M, W), dtype=np.complex128)
|
||||||
|
for i, iw in zip(range(M), iwss):
|
||||||
|
iws, iwe = iw-W//2, iw+W//2
|
||||||
|
Spf[i,:] = fft(f[iws:iwe])
|
||||||
|
return Spf
|
||||||
|
|
||||||
|
def _pulse_train(self, Spf):
|
||||||
|
# *Spf
|
||||||
|
# fs, M, L, W, Wp, shift_sec
|
||||||
|
fs, M, L, W, Wp, shift_sec = self.fs, self.M, self.L, self.W, self.Wp, self.shift_sec
|
||||||
|
|
||||||
|
# compute parameters for scalogram
|
||||||
|
g = np.abs(Spf) # (M x W)
|
||||||
|
g_bar = np.mean(g, axis=1) # (M)
|
||||||
|
# TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const.
|
||||||
|
A = np.mean(g_bar) # amplitude cutoff for pulse train
|
||||||
|
|
||||||
|
# compute transitions (pulse train)
|
||||||
|
pt = (g_bar > A).astype(int) # pulse train
|
||||||
|
pt_re = (np.diff(pt) == 1).astype(int) # rising edge
|
||||||
|
self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal
|
||||||
|
|
||||||
|
# resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded)
|
||||||
|
squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int)
|
||||||
|
ptr = pt[squashed_idxs]
|
||||||
|
fsp = fs
|
||||||
|
|
||||||
|
# ptr
|
||||||
|
Dp = int(shift_sec * fsp) #: pt-spectrogram step
|
||||||
|
pto = ptr[W//2:-(W//2-1)] #: 'ptr' signal without padding
|
||||||
|
ptrp = np.pad(pto, (Wp//2, Wp//2-1)) #: 'ptr' signal re-padded
|
||||||
|
Lp = ptrp.shape[0]
|
||||||
|
Mp = (Lp-Wp) // Dp + 1
|
||||||
|
|
||||||
|
self.Dp, self.Lp, self.Mp = Dp, Lp, Mp
|
||||||
|
|
||||||
|
return pto
|
||||||
|
|
||||||
|
def _spectrogram_2(self):
|
||||||
|
"""Wp-FFT (STFTs) to apply wavelet transform"""
|
||||||
|
# *f2
|
||||||
|
# Mp
|
||||||
|
f2 = self.f2
|
||||||
|
Wp, Mp, Dp = self.Wp, self.Mp, self.Dp
|
||||||
|
|
||||||
|
#
|
||||||
|
# compute spectrogram: 'Spf2' (M x Wp) <- from 'f'
|
||||||
|
#
|
||||||
|
iwss2 = np.linspace(Wp//2, Wp//2 + (Mp-1)*Dp, Mp, dtype=int) # 'D'-spaced start time indices of windows on 's'
|
||||||
|
Spf2 = np.zeros((Mp, Wp), dtype=np.complex128)
|
||||||
|
for i, iw in zip(range(Mp), iwss2):
|
||||||
|
iws, iwe = iw-Wp//2, iw+Wp//2
|
||||||
|
Spf2[i,:] = fft(f2[iws:iwe])
|
||||||
|
return Spf2
|
||||||
|
|
||||||
|
def _scalogram_params(self, pto):
|
||||||
|
Lp, Wp, B = self.Lp, self.Wp, self.B
|
||||||
|
fsp = self.fs
|
||||||
|
k_omega, k_nu = self.k_omega, self.k_nu
|
||||||
|
|
||||||
|
# more parameters for scalogram
|
||||||
|
|
||||||
|
# TODO: check if 'delta' needs smoothing before we compute it (check: 'delta' should not be too small wrt. beat spacing)
|
||||||
|
# TODO: check if 'zl' with resampling needs rescaling
|
||||||
|
#zl = min((zero_rl(pto), pto.shape[0] / 2)) # min: limit min-space at least to 1 pulse
|
||||||
|
zl = zero_rl(pto)
|
||||||
|
delta = np.min(zl) / fsp # minimum length of 0-interval (minimum space between two successive pulses)
|
||||||
|
|
||||||
|
# magic, see equation (12) from paper
|
||||||
|
p = 4 * np.sqrt(np.pi)
|
||||||
|
T = (Lp - Wp) / fsp # un-padded sample count -> time length
|
||||||
|
#omega = p * T / B # width parameter of Gabor wavelet
|
||||||
|
#nu = B / (p * T) # frequency parameter of Gabor wavelet
|
||||||
|
I = int(np.log2(p**2 * T**2 / (delta * B**2)) - 3/2) # number of octaves
|
||||||
|
J = int(256 / I) # number of voices per octave
|
||||||
|
|
||||||
|
r = np.linspace(0, I*J-1, I*J)
|
||||||
|
ss = np.array([2]) ** (-r/J) # scales of wavelet
|
||||||
|
|
||||||
|
# we need to adjust the frequency range. (not sure why?)
|
||||||
|
# - maybe due to resampling:
|
||||||
|
# - we use orig fs and analyze orig signal, instead of 'pt' and its sampling rate
|
||||||
|
omega, nu = k_omega * (p*T/B), k_nu * B/(p*T)
|
||||||
|
freqs = np.array([nu/(omega*ss[y]) for y in np.arange(I*J)])
|
||||||
|
|
||||||
|
self.T = T
|
||||||
|
|
||||||
|
return ss, omega, nu, fsp, Wp, I, J, freqs
|
||||||
|
|
||||||
|
def _scalogram_wavelets(self, pms):
|
||||||
|
# ss, omega, nu, fsp, Wp, I, J
|
||||||
|
ss, omega, nu, fsp, Wp, I, J, freqs = pms
|
||||||
|
|
||||||
|
# compute frequency-domain wavelets
|
||||||
|
#Scp = np.zeros((Mp, I*J), dtype=np.complex128)
|
||||||
|
psi_ss = np.zeros((I*J, Wp), dtype=np.complex128)
|
||||||
|
for i, s in zip(range(I*J), ss):
|
||||||
|
psi_ss[i,:] = 1.0 / np.sqrt(s) * np.conj(gabor_wavelet(omega, nu, fsp, Wp, tt = lambda t: t / s))
|
||||||
|
Spsi_ss = fft(psi_ss, axis=1)
|
||||||
|
return Spsi_ss
|
||||||
|
|
||||||
|
def _scalogram(self, Spf2, Spsi_ss):
|
||||||
|
# *Spf2, Spsi_ss
|
||||||
|
# T, Lp, Wp
|
||||||
|
T, Lp, Wp = self.T, self.Lp, self.Wp
|
||||||
|
|
||||||
|
# compute convolution with wavelets, by multiplication in freq-domain
|
||||||
|
# 'Scp2' (M x I*J)
|
||||||
|
Scp2 = np.matmul(Spf2, Spsi_ss.T) * (T/(Lp-Wp))
|
||||||
|
return Scp2
|
||||||
|
|
||||||
|
def _viterbi_path(self, Scp2):
|
||||||
|
# TODO: check if we should re-weight freq-jumps, because of log-scale frequencies
|
||||||
|
path, dp, backptr = viterbi_highest_frequency_path_vectorized(
|
||||||
|
(np.abs(Scp2)**2).T,
|
||||||
|
jump_penalty=self.viterbi_jump_penalty,
|
||||||
|
#max_jump=20,
|
||||||
|
use_log_amplitude=False,
|
||||||
|
)
|
||||||
|
return path
|
||||||
|
|
||||||
|
def _viterbi_ampl(self, Scp2, path):
|
||||||
|
max_amplitudes = np.array([np.abs(Scp2[i, path[i]]) for i in range(Scp2.shape[0])])
|
||||||
|
return max_amplitudes
|
||||||
Reference in New Issue
Block a user