# # Rhythm analysis from songs. # # Provides the beat timing from the audio signal. # # Cheng, Hart and Walker 2009: Time-frequency Analysis of Musical Rhythm # Smith 2000: A Multiresolution Time-Frequency Analysis And Interpretation of Musical Rhythm # PERF: # "9x" = 16 sec runtime for 2:27 min. song # - _scalogram() is slowest # - we could try downsampling the song, and reducing FFT window size W import numpy as np from numpy.fft import fft from hsh_signal.signal import lowpass_fft import time def viterbi_highest_frequency_path_vectorized(Scp2, jump_penalty=2.0, use_log_amplitude=True): Scp2 = np.asarray(Scp2, dtype=float) n_freqs, n_frames = Scp2.shape if use_log_amplitude: emission = np.log(Scp2 + 1e-12) else: emission = Scp2.copy() dp = np.full((n_freqs, n_frames), -np.inf) backptr = np.full((n_freqs, n_frames), -1, dtype=int) dp[:, 0] = emission[:, 0] idx = np.arange(n_freqs) penalty = jump_penalty * np.abs(idx[:, None] - idx[None, :]) # [curr, prev] for t in range(1, n_frames): scores = dp[:, t - 1][None, :] - penalty # shape (curr, prev) backptr[:, t] = np.argmax(scores, axis=1) dp[:, t] = scores[np.arange(n_freqs), backptr[:, t]] + emission[:, t] path = np.zeros(n_frames, dtype=int) path[-1] = np.argmax(dp[:, -1]) for t in range(n_frames - 1, 0, -1): path[t - 1] = backptr[path[t], t] return path, dp, backptr def rle(inarray): """ run length encoding. Partial credit to R rle function. Multi datatype arrays catered for including non Numpy returns: tuple (runlengths, startpositions, values) """ ia = np.asarray(inarray) # force numpy n = len(ia) if n == 0: return (None, None, None) else: y = ia[1:] != ia[:-1] # pairwise unequal (string safe) i = np.append(np.where(y), n - 1) # must include last element posi z = np.diff(np.append(-1, i)) # run lengths p = np.cumsum(np.append(0, z))[:-1] # positions return(z, p, ia[i]) def zero_rl(a): """:returns: lengths of 0-intervals""" z, p, aa = rle(a) idxs = np.where(aa == 0)[0] return z[idxs] def gabor_wavelet(omega, nu, fs, T, tt=None): """ :param omega: width parameter :param nu: frequency parameter :param fs: sampling frequency :param T: output length :param tt: time-transforming function :returns: complex-valued Gabor wavelet """ t = (np.linspace(0, T-1, T) - T//2) / fs # -T/2: center in window of length T if tt is not None: t = tt(t) psi = 1.0 / np.sqrt(omega) * np.exp(-np.pi * (t / omega)**2) * np.exp(1j*2*np.pi * nu * t / omega) return psi class BassAnalyzer: """ Rhythm analysis from songs. Provides a beat amplitude signal from the audio signal. Performs short-time Continuous Wavelet Transform on the signal, with log-spaced frequencies. This allows a fine-grained resolution of very low frequencies, as opposed to an STFT where frequency resolution would be limited by its window size. Then, we perform Viterbi decoding to find the highest-power frequency at each window. References: * Cheng, Hart and Walker 2009: Time-frequency Analysis of Musical Rhythm * Smith 2000: A Multiresolution Time-Frequency Analysis And Interpretation of Musical Rhythm """ W = 1024 #: window size (must be even, so that right padding W/2-1 works) shift_sec = 0.008 #: window shift in sec ('delta_tau') between subsequent windows wavelet_win_sec = 0.175 k_omega, k_nu = 0.12, 5.0 #: adapt scaling to get 'reasonable' frequency range (for pop bass, e.g. 18..1145 Hz, but that range strongly depends on the actual song's 'pt' shortest interval 'B') viterbi_jump_penalty = 5000.0 Wp_force = None I_force = None def __init__(self, fs, sig, Wp_force=None): """ :param fs: sampling rate :param sig: audio signal normalized to [-1,1] """ self.D = int(self.shift_sec * fs) #: spectrogram step if self.Wp_force: self.Wp = self.Wp_force elif Wp_force: self.Wp = Wp_force else: self.Wp = int(np.round(self.wavelet_win_sec * fs / self.W) * self.W) # wavelet window - make it an integer multiple of FFT window self.U = self.Wp // self.W # ratio self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters) self.f2 = np.pad(sig, (self.Wp//2, self.Wp//2-1)) #: signal padded (Wp-FFT to apply wavelet transform) self.L = self.f.shape[0] self.M = (self.L-self.W) // self.D + 1 #: number of time steps self.fs = fs def viterbi_wavelet_scalogram_amplitudes(self, dbg_time=False): """ Compute scalogram amplitudes from Viterbi path of highest-power frequencies. NOTE: downsampled from the original 'fs'. :returns: (fsd, sig): sampling rate, amplitude signal """ t1 = time.time() Spf = self._spectrogram() t2 = time.time() pto = self._pulse_train(Spf) t3 = time.time() Spf2 = self._spectrogram_2() t4 = time.time() pms = self._scalogram_params(pto) t5 = time.time() Spsi_ss = self._scalogram_wavelets(pms) t6 = time.time() Scp2 = self._scalogram(Spf2, Spsi_ss) t7 = time.time() path = self._viterbi_path(Scp2) t8 = time.time() ampl = self._viterbi_ampl(Scp2, path) t9 = time.time() if not dbg_time: return ampl else: return ampl, np.diff([t1, t2, t3, t4, t5, t6, t7, t8, t9]) def _spectrogram(self): """W-FFT (STFTs) to determine scalogram parameters""" # *f # M, W, D f = self.f M, W, D = self.M, self.W, self.D # # compute spectrogram: 'Spf' (M x W) <- from 'f' # iwss = np.linspace(W//2, W//2 + (M-1)*D, M, dtype=int) # 'D'-spaced start time indices of windows on 's' Spf = np.zeros((M, W), dtype=np.complex128) for i, iw in zip(range(M), iwss): iws, iwe = iw-W//2, iw+W//2 Spf[i,:] = fft(f[iws:iwe]) return Spf def _pulse_train(self, Spf): # *Spf # fs, M, L, W, Wp, shift_sec fs, M, L, W, Wp, shift_sec = self.fs, self.M, self.L, self.W, self.Wp, self.shift_sec # compute parameters for scalogram g = np.abs(Spf) # (M x W) g_bar = np.mean(g, axis=1) # (M) # TODO: check if 'A' needs to be a smooth signal slowly varying over time, not a const. #A = np.mean(g_bar) # amplitude cutoff for pulse train ip = int(fs) g_bar_l = lowpass_fft(np.pad(g_bar, (ip, ip), mode='edge'), fps=fs/self.D, cf=0.5, tw=0.05)[ip:-ip] A = g_bar_l # compute transitions (pulse train) pt = (g_bar > A).astype(int) # pulse train pt_re = (np.diff(pt) == 1).astype(int) # rising edge self.B = max(np.sum(pt_re), 1) # total number of pulses in the 'pt' pulse train signal # resample 'pt' (M) at these indices -> 'ptr' (L), like original 'f' (signal padded) squashed_idxs = np.floor(np.linspace(0, L-1, L) * (M/L)).astype(int) ptr = pt[squashed_idxs] fsp = fs # ptr Dp = int(shift_sec * fsp) #: pt-spectrogram step pto = ptr[W//2:-(W//2-1)] #: 'ptr' signal without padding ptrp = np.pad(pto, (Wp//2, Wp//2-1)) #: 'ptr' signal re-padded Lp = ptrp.shape[0] Mp = (Lp-Wp) // Dp + 1 self.Dp, self.Lp, self.Mp = Dp, Lp, Mp return pto def _spectrogram_2(self): """Wp-FFT (STFTs) to apply wavelet transform""" # *f2 # Mp f2 = self.f2 Wp, Mp, Dp = self.Wp, self.Mp, self.Dp # TODO(perf): 5.0 sec runtime # # compute spectrogram: 'Spf2' (M x Wp) <- from 'f' # iwss2 = np.linspace(Wp//2, Wp//2 + (Mp-1)*Dp, Mp, dtype=int) # 'D'-spaced start time indices of windows on 's' Spf2 = np.zeros((Mp, Wp), dtype=np.complex128) for i, iw in zip(range(Mp), iwss2): iws, iwe = iw-Wp//2, iw+Wp//2 Spf2[i,:] = fft(f2[iws:iwe]) return Spf2 def _scalogram_params(self, pto): Lp, Wp, B = self.Lp, self.Wp, self.B fsp = self.fs k_omega, k_nu = self.k_omega, self.k_nu # more parameters for scalogram # TODO: check if 'delta' needs smoothing before we compute it (check: 'delta' should not be too small wrt. beat spacing) # TODO: check if 'zl' with resampling needs rescaling #zl = min((zero_rl(pto), pto.shape[0] / 2)) # min: limit min-space at least to 1 pulse zl = zero_rl(pto) delta = np.min(zl) / fsp # minimum length of 0-interval (minimum space between two successive pulses) # magic, see equation (12) from paper p = 4 * np.sqrt(np.pi) T = (Lp - Wp) / fsp # un-padded sample count -> time length #omega = p * T / B # width parameter of Gabor wavelet #nu = B / (p * T) # frequency parameter of Gabor wavelet if self.I_force: I = self.I_force else: I = int(np.log2(p**2 * T**2 / (delta * B**2)) - 3/2) # number of octaves J = int(256 / I) # number of voices per octave r = np.linspace(0, I*J-1, I*J) ss = np.array([2]) ** (-r/J) # scales of wavelet # we need to adjust the frequency range. (not sure why?) # - maybe due to resampling: # - we use orig fs and analyze orig signal, instead of 'pt' and its sampling rate omega, nu = k_omega * (p*T/B), k_nu * B/(p*T) freqs = np.array([nu/(omega*ss[y]) for y in np.arange(I*J)]) self.T = T return ss, omega, nu, fsp, Wp, I, J, freqs def _scalogram_wavelets(self, pms): # ss, omega, nu, fsp, Wp, I, J ss, omega, nu, fsp, Wp, I, J, freqs = pms # compute frequency-domain wavelets #Scp = np.zeros((Mp, I*J), dtype=np.complex128) psi_ss = np.zeros((I*J, Wp), dtype=np.complex128) for i, s in zip(range(I*J), ss): psi_ss[i,:] = 1.0 / np.sqrt(s) * np.conj(gabor_wavelet(omega, nu, fsp, Wp, tt = lambda t: t / s)) Spsi_ss = fft(psi_ss, axis=1) return Spsi_ss def _scalogram(self, Spf2, Spsi_ss): # *Spf2, Spsi_ss # T, Lp, Wp T, Lp, Wp = self.T, self.Lp, self.Wp # TODO(perf): reduce num of wavelets, and/or parallelize into freq slices # TODO(perf): 3.5 sec runtime # compute convolution with wavelets, by multiplication in freq-domain # 'Scp2' (M x I*J) Scp2 = np.matmul(Spf2, Spsi_ss.T) * (T/(Lp-Wp)) return Scp2 def _viterbi_path(self, Scp2): # TODO(perf): parallelize into time slices # TODO(perf): 4.5 sec runtime # TODO: check if we should re-weight freq-jumps, because of log-scale frequencies path, dp, backptr = viterbi_highest_frequency_path_vectorized( (np.abs(Scp2)**2).T, jump_penalty=self.viterbi_jump_penalty, #max_jump=20, use_log_amplitude=False, ) return path def _viterbi_ampl(self, Scp2, path): max_amplitudes = np.array([np.abs(Scp2[i, path[i]]) for i in range(Scp2.shape[0])]) return max_amplitudes class GuitarAnalyzer: """ Rhythm analysis from songs. Provides a beat amplitude signal from the audio signal. Performs short-time Fourier Transform on the signal, then returns the f1..f2 band power for each window. For low-frequency instruments like percussion, use BassAnalyzer instead. """ W = 1024 #: window size (must be even, so that right padding W/2-1 works) shift_sec = 0.008 #: window shift in sec ('delta_tau') between subsequent windows target_band_f1 = 800.0 #: lower bound of target freq band in Hz target_band_f2 = 4300.0 #: upper bound of target freq band in Hz def __init__(self, fs, sig): """ :param fs: sampling rate :param sig: audio signal normalized to [-1,1] """ self.f = np.pad(sig, (self.W//2, self.W//2-1)) #: signal padded (W-FFT to determine scalogram parameters) self.fs = fs self.D = int(self.shift_sec * fs) #: spectrogram step self.L = self.f.shape[0] self.M = (self.L-self.W) // self.D + 1 #: number of time steps self.fs = fs def spectrogram_power_amplitudes(self): """ Compute spectrogram power from a target frequency range. NOTE: downsampled from the original 'fs'. :returns: (fsd, sig): sampling rate, amplitude signal """ Spf = self._spectrogram() ampl = self._spectrogram_power(Spf) return ampl def _spectrogram_power(self, Spf): # Spf # fs, W fs, W = self.fs, self.W k1, k2 = int(self.target_band_f1/fs*W), int(self.target_band_f2/fs*W) # freq band range in W-FFT # # spectrum power in f1..f2 bands # #hp_slice = highpass(np.sum(np.abs(Spf_slice[:, k1:k2]), axis=1), fps=fs/Dp, cf=2.0, tw=0.2) hp_slice = np.sum(np.abs(Spf[:, k1:k2]), axis=1)-np.mean(np.sum(np.abs(Spf[:, k1:k2]), axis=1)) return hp_slice def _spectrogram(self): """W-FFT (STFTs) to determine scalogram parameters""" # *f # M, W, D f = self.f M, W, D = self.M, self.W, self.D # # compute spectrogram: 'Spf' (M x W) <- from 'f' # iwss = np.linspace(W//2, W//2 + (M-1)*D, M, dtype=int) # 'D'-spaced start time indices of windows on 's' Spf = np.zeros((M, W), dtype=np.complex128) for i, iw in zip(range(M), iwss): iws, iwe = iw-W//2, iw+W//2 Spf[i,:] = fft(f[iws:iwe]) return Spf