Skip to content

DataPipes

redcat.datapipes.iter

Contain the implementation of IterDataPipes.

redcat.datapipes.iter.BatchExtender

Bases: IterDataPipe[BaseBatch[T]]

Implement a DataPipe to combine several BaseBatch object into a single BaseBatch object.

Parameters:

Name Type Description Default
datapipe IterDataPipe[BaseBatch[T]]

Specifies the source DataPipe. The DataPipe has to return compatible BaseBatch objects.

required
buffer_size int

Specifies the buffer size i.e. the number of batches that are combined into a bigger batch.

10
drop_last bool

If True, the last samples are dropped if the buffer is not full, otherwise it is returned.

False

Example usage:

>>> import torch
>>> from torch.utils.data.datapipes.iter import IterableWrapper
>>> from redcat import BatchedTensor
>>> from redcat.datapipes.iter import BatchExtender
>>> datapipe = BatchExtender(
...     IterableWrapper([BatchedTensor(torch.ones(2) * i) for i in range(10)]),
...     buffer_size=4,
... )
>>> list(datapipe)
[tensor([0., 0., 1., 1., 2., 2., 3., 3.], batch_dim=0),
 tensor([4., 4., 5., 5., 6., 6., 7., 7.], batch_dim=0),
 tensor([8., 8., 9., 9.], batch_dim=0)]
>>> datapipe = BatchExtender(
...     IterableWrapper([BatchedTensor(torch.ones(2) * i) for i in range(10)]),
...     buffer_size=4,
...     drop_last=True,
... )
>>> list(datapipe)
[tensor([0., 0., 1., 1., 2., 2., 3., 3.], batch_dim=0),
 tensor([4., 4., 5., 5., 6., 6., 7., 7.], batch_dim=0)]

redcat.datapipes.iter.BatchShuffler

Bases: IterDataPipe[BaseBatch[T]]

Implement a DataPipe to shuffle data in BaseBatch objects.

Parameters:

Name Type Description Default
datapipe IterDataPipe[BaseBatch[T]]

Specifies the source DataPipe. The DataPipe has to return BaseBatch objects.

required
random_seed int

Specifies the random seed used to shuffle the data.

3770589329299158004

Example usage:

>>> import torch
>>> from torch.utils.data.datapipes.iter import IterableWrapper
>>> from redcat import BatchedTensor
>>> from redcat.datapipes.iter import BatchShuffler
>>> datapipe = BatchShuffler(
...     IterableWrapper([BatchedTensor(torch.arange(4).add(i)) for i in range(2)])
... )
>>> list(datapipe)
[tensor([3, 0, 1, 2], batch_dim=0), tensor([1, 3, 4, 2], batch_dim=0)]

redcat.datapipes.iter.BatchShuffler.random_seed property

random_seed: int

The random seed used to initialize the pseudo random generator.

redcat.datapipes.iter.MiniBatcher

Bases: IterDataPipe[BaseBatch[T]]

Implement a DataPipe to generate mini-batches from a batch (BaseBatch object).

Parameters:

Name Type Description Default
datapipe_or_batch IterDataPipe[BaseBatch[T]] | BaseBatch[T]

Specifies the datapipe of batches to split. The generated mini-batches have the same structure as the input batches.

required
batch_size int

Specifies the batch size.

required
drop_last bool

If True, it drops the last incomplete batch, if the number of examples is not divisible by the batch size. If False and the number of examples is not divisible by the batch size, then the last batch will be smaller.

False
shuffle bool

If True, the batches are shuffled before to create the mini-batches. The shuffling is done per batch.

False
random_seed int

Specifies the random seed used to shuffle the batch before to split it.

5513175564631803238

Example usage:

>>> import torch
>>> from torch.utils.data.datapipes.iter import IterableWrapper
>>> from redcat import BatchedTensor
>>> from redcat.datapipes.iter import MiniBatcher
>>> datapipe = MiniBatcher(
...     IterableWrapper([BatchedTensor(torch.arange(4).add(i * 4)) for i in range(2)]),
...     batch_size=2,
... )
>>> list(datapipe)
[tensor([0, 1], batch_dim=0),
 tensor([2, 3], batch_dim=0),
 tensor([4, 5], batch_dim=0),
 tensor([6, 7], batch_dim=0)]
>>> datapipe = MiniBatcher(BatchedTensor(torch.arange(9)), batch_size=2)
>>> list(datapipe)
[tensor([0, 1], batch_dim=0),
 tensor([2, 3], batch_dim=0),
 tensor([4, 5], batch_dim=0),
 tensor([6, 7], batch_dim=0),
 tensor([8], batch_dim=0)]

redcat.datapipes.iter.MiniBatcher.batch_size property

batch_size: int

The batch size.

redcat.datapipes.iter.MiniBatcher.random_seed property

random_seed: int

The random seed used to initialize the pseudo random generator.