Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Delay Augmentation, tested on per_batch and per_example #171

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions tests/test_delay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import unittest

import numpy as np
import pytest
import torch
from numpy.testing import assert_almost_equal
from torch_audiomentations import Delay


def get_example():
return (
torch.rand(
size=(8, 2, 32000),
dtype=torch.float32,
device="cuda" if torch.cuda.is_available() else "cpu",
)
- 0.5
)


class TestDelay(unittest.TestCase):
def test_per_example_delay(self):
samples = get_example()
aug = Delay(sample_rate=16000, p=1, mode="per_example", output_type="dict")
aug.randomize_parameters(samples)
results = aug.apply_transform(samples).samples
self.assertEqual(results.shape, samples.shape)
self.assertEqual(results.dtype, torch.float32)

def test_per_batch_shift(self):
samples = get_example()
aug = Delay(sample_rate=16000, p=1, mode="per_batch", output_type="dict")
aug.randomize_parameters(samples)
results = aug.apply_transform(samples).samples
self.assertEqual(results.shape, samples.shape)
self.assertEqual(results.dtype, torch.float32)

2 changes: 2 additions & 0 deletions torch_audiomentations/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
from .augmentations.shuffle_channels import ShuffleChannels
from .augmentations.splice_out import SpliceOut
from .augmentations.time_inversion import TimeInversion
from .augmentations.delay import Delay
from .core.composition import Compose, SomeOf, OneOf
from .utils.config import from_dict, from_yaml
from .utils.convolution import convolve


__version__ = "0.11.0"
174 changes: 174 additions & 0 deletions torch_audiomentations/augmentations/delay.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
from ..core.transforms_interface import BaseWaveformTransform
from ..utils.object_dict import ObjectDict

from torch import Tensor
from typing import Optional
from random import choices
import torch
import numpy as np


class Delay(BaseWaveformTransform):
"""
A rudimentary delay effect which delays the original signal multiple times and applies an attenuation factor to each delayed signal.
The delayed signals are then added to the original signal and the whole delayed signal is reduced by a volume factor.
"""

supported_modes = {"per_batch", "per_example"}

supports_multichannel = True
requires_sample_rate = False

supports_target = True
requires_target = False

def __init__(
self,
min_delay_ms: float = 20.0,
max_delay_ms: float = 100.0,
mode: str = "per_example",
p: float = 0.5,
p_mode: str = None,
sample_rate: int = None,
target_rate: int = None,
output_type: Optional[str] = None,
volume_factor: float = 0.5,
volume_factor_range: float = 0.2,
repeats: int = 2,
repeats_range: int = 1,
attenuation: float = 0.5,
attenuation_range:int = 0.2
):
"""
:param sample_rate:
:param min_delay_ms: Minimum delay in milliseconds (default 20.0)
:param max_delay_ms: Maximum delay in milliseconds (default 100.0)
:param mode: ``per_example``, ``per_channel``, or ``per_batch``. Default ``per_example``.
:param p:
:param p_mode:
:param target_rate:
:param output_type:
:param volume_factor: The factor by which the delayed signal is reduced compared to the original signal. Default 0.5
:param volume_factor_range: The range of the volume factor. Default 0.2
:param repeats: The number of times the delayed signal is added to the original signal. Default 2
:param repeats_range: The range of the number of repeats. Default 1
:param attenuation: The factor by which the delayed signal is attenuated for each repeat. Default 0.5
:param attenuation_range: The range of the attenuation factor. Default 0.2
"""
super().__init__(
mode=mode,
p=p,
p_mode=p_mode,
sample_rate=sample_rate,
target_rate=target_rate,
output_type=output_type,
)

if min_delay_ms > max_delay_ms:
raise ValueError("max_delay_ms must be > min_delay_ms")
if not sample_rate:
raise ValueError("sample_rate is invalid.")
self._sample_rate = sample_rate
self._max_delay_ms = max_delay_ms
self._min_delay_ms = min_delay_ms
self._max_delay_samples = int(max_delay_ms * sample_rate / 1000)
self._min_delay_samples = int(min_delay_ms * sample_rate / 1000)
self._mode = mode
self._volume_factor = volume_factor
self._volume_factor_range = volume_factor_range
self._repeats = repeats
self._repeats_range = repeats_range
self._attenuation = attenuation
self._attenuation_range = attenuation_range

def randomize_parameters(
self,
samples: Tensor = None,
sample_rate: Optional[int] = None,
targets: Optional[Tensor] = None,
target_rate: Optional[int] = None,
):
"""
:param samples: (batch_size, num_channels, num_samples)
:param sample_rate:
"""
batch_size, num_channels, num_samples = samples.shape

if self._mode == "per_example":
self.transform_parameters["delays"] = choices(
range(self._min_delay_samples, self._max_delay_samples + 1), k=batch_size
)
self.transform_parameters['volume_factors'] = np.random.uniform(self._volume_factor-self._volume_factor_range, self._volume_factor+self._volume_factor_range, batch_size)
self.transform_parameters['repeats'] = choices([self._repeats-self._repeats_range, self._repeats+self._repeats_range],k=batch_size)
self.transform_parameters['attenuation'] = choices([self._attenuation-self._attenuation_range, self._attenuation+self._attenuation_range],k=batch_size)

elif self._mode == "per_batch":
self.transform_parameters["delays"] = choices(
range(self._min_delay_samples, self._max_delay_samples + 1), k=1
)
self.transform_parameters['volume_factors'] = np.random.uniform(self._volume_factor-self._volume_factor_range, self._volume_factor+self._volume_factor_range, 1)
self.transform_parameters['repeats'] = choices([self._repeats-self._repeats_range, self._repeats+self._repeats_range],k=1)
self.transform_parameters['attenuation'] = choices([self._attenuation-self._attenuation_range, self._attenuation+self._attenuation_range],k=1)

def apply_transform(

self,
samples: Tensor = None,
sample_rate: Optional[int] = None,
targets: Optional[Tensor] = None,
target_rate: Optional[int] = None,
) -> ObjectDict:
"""
:param samples: (batch_size, num_channels, num_samples)
:param sample_rate:
"""
batch_size, num_channels, num_samples = samples.shape


if self._mode == "per_example":
for i in range(batch_size):
samples[i, ...] = self.delay(
samples[i][None],
self.transform_parameters["delays"][i],
self.transform_parameters['volume_factors'][i],
self.transform_parameters['repeats'][i],
self.transform_parameters['attenuation'][i],
)[0]


elif self._mode == "per_batch":
samples = self.delay(
samples, self.transform_parameters["delays"][0], self.transform_parameters['volume_factors'][0],
self.transform_parameters['repeats'][0],
self.transform_parameters['attenuation'][0],
)

return ObjectDict(
samples=samples,
sample_rate=sample_rate,
targets=targets,
target_rate=target_rate,
)

def delay(self,samples: Tensor, delay_samples: int, volume_factor: float , repeats : int, attenuation : float) -> Tensor:
"""
:param samples: (batch_size, num_channels, num_samples)
:param delay_samples: int
:param volume_factor: float
:param repeats: int
:param attenuation: float

"""

batch_size, num_channels, num_samples = samples.shape

delayed_signal = torch.zeros(batch_size, num_channels, num_samples + repeats * delay_samples, device = samples.device)

for i in range(repeats):
delayed_signal[:,:,i*delay_samples:i*delay_samples+num_samples] += samples * attenuation ** (i)

delayed_signal = delayed_signal[:,:,:num_samples]

samples = (samples + volume_factor * delayed_signal)/2

return samples