google-deepmind/bagz

C++

Open original ↗

Captured source

source ↗
published Jul 3, 2025seen 6dcaptured 8hhttp 200method plain

google-deepmind/bagz

Description: Bagz is a format for storing a sequence of string records. It supports per-record compression and fast index-based lookup.

Language: C++

License: Apache-2.0

Stars: 64

Forks: 9

Open issues: 5

Created: 2025-07-03T09:15:10Z

Pushed: 2026-04-17T09:57:33Z

Default branch: main

Fork: no

Archived: no

README:

Bagz

Overview

*Bagz* is a format for storing a sequence of byte-array records, typically serialised protocol buffers. It supports per-record compression and fast index-based lookup. All indexing is zero based.

Installation

To install Bagz from source we recommend using uv. The install will download required dependencies apart from curl (libcurl-devel) and OpenSSL (openssl-devel). These need to be installed in a location that cmake's find_package searches.

uv pip install .

On Linux you can install the latest version from PyPI.

uv pip install bagz

Python API

Python Reader

Reader for reading a single or sharded Bagz file-set.

from collections.abc import Sequence, Iterable

import bagz
import numpy as np

# Bagz Readers support random access. The order of elements within a Bagz
# file is the order in which they are written. Records are returned as `bytes`
# objects.
data = bagz.Reader('/path/to/data.bagz')

# Bagz Readers can be configured like this - here we require that the file was
# written with separate limits.
data_separate_limits = bagz.Reader('/path/to/data.bagz', bagz.Reader.Options(
limits_placement=bagz.LimitsPlacement.SEPARATE,
))

# Bagz Readers are Sequences and support slicing, iterating, etc.
assert isinstance(data, Sequence)

# Bagz Readers have a length.
assert len(data) > 10

# Can access record by row-index.
fifth_value: bytes = data[5]

# Can slice.
data_from_5: bagz.Reader = data[5:]

# Slices are still Readers.
assert isinstance(data_from_5, bagz.Reader)

assert data_from_5[0] == fifth_value

# Can access records by multiple row-indices.
fourth, second, tenth = data.read_indices([4, 2, 10])
assert fourth == data[4]
assert second == data[2]
assert tenth == data[10]

# Can iterate records.
for record in data:
do_something_else(record)

# Can read all records. This eager version can be faster than iteration.
all_records = data.read()

# Can iterate sub-range of records.
for record in data[4:9]:
do_something_else(record)

# Can read a sub-range of records. This eager form can be faster than
# iteration.
sub_range = data[4:9].read()

# Can use an infinite iterator as source of indices. (Reads ahead in parallel.)
def my_generator(size: int) -> Iterable[int]:
rng = np.random.default_rng(42)
while True:
yield rng.integers(size).item()

data_iter: Iterable[bytes] = data.read_indices_iter(my_generator(len(data)))
for i in range(10):
random_item: bytes = next(data_iter)

Python Reader - Index and MultiIndex

You can use Index to find the first index of a record and MultiIndex to find all instances of an item.

keys = bagz.Reader('/path/to/keys.bag')
# Get the index of the first occurrence of key.
index = bagz.Index(keys)
key_index: int = index[b'example_key']

# Get all occurrences of key.
multi_index = bagz.MultiIndex(keys)
all_indices: list[int] = multi_index[b'example_key']

Python Writer

For writing a single Bagz file.

Example:

import bagz

# Compression is selected based on the file extension:
# `.bagz` will use Zstandard compression with default settings.
# `.bag` will use no compression.
with bagz.Writer('/path/to/data.bagz') as writer:
for d in generate_records():
writer.write(d)

# Adjust compression level explicitly.
# Note this will no longer use the extension to detemine whether to compress.
with bagz.Writer(
'/path/to/data.bagz',
bagz.Writer.Options(
compression=bagz.CompressionZstd(level=3)
),
) as writer:
for d in generate_records():
writer.write(d)

Options

Reader Options

bagz.Reader.Options has these optional arguments.

  • compression: Can be one of:
  • bagz.CompressionAutoDetect(): Default - Uses extension whether to

compress. (.bagz - Compressed (ZStandard), .bag - Uncompressed)

  • bagz.CompressionNone(): Records are not decompressed.
  • bagz.CompressionZstd(): Records are decompressed using Zstandard.
  • limits_placement: Can be one of:
  • bagz.LimitsPlacement.TAIL: Default- Reads limits from a tail of file.
  • bagz.LimitsPlacement.SEPARATE: Reads limits from a separate file.
  • limits_storage: Can be one of:
  • bagz.LimitsStorage.ON_DISK: Default - Reads limits from disk for each

read.

  • bagz.LimitsStorage.IN_MEMORY: Reads all limits from disk in one go.
  • max_parallelism: Default number of threads when reading many records.
  • sharding_layout: Can be one of:
  • bagz.ShardingLayout.CONCATENATED: Default - See [Sharding](#sharding)
  • bagz.ShardingLayout.INTERLEAVED: See [Sharding](#sharding)

Writer Options

bagz.Writer.Options has these optional arguments.

  • compression: Can be one of:
  • bagz.CompressionAutoDetect(): Default - Uses extension whether to

compress. (.bagz - Compressed (Zstandard), .bag - Uncompressed)

  • bagz.CompressionNone(): Records are not compressed.
  • bagz.CompressionZstd(level = 3): Records are compressed using

Zstandard the level of the compression can be specified.

  • limits_placement: Can be one of:
  • bagz.LimitsPlacement.TAIL: Default - Writes limits to a tail of file.
  • bagz.LimitsPlacement.SEPARATE: Writes limits to a separate file.

Apache Beam Support

Bagz also provides Apache Beam connectors for reading and writing Bagz files in Beam pipelines.

Ensure you have Apache Beam installed.

uv pip install apache_beam

Bagz Source

import apache_beam as beam
from bagz.beam import bagzio
import tensorflow as tf

with beam.Pipeline() as pipeline:
examples = (
pipeline
| 'ReadData' >> bagzio.ReadFromBagz('/path/to/your/data@*.bagz')
| 'Decode' >> beam.Map(tf.train.Example.FromString)
)
# Continue your pipeline.

Bagz Sink

from bagz.beam import bagzio
import tensorflow as tf

def create_tf_example(data):
# Replace with your actual feature creation logic.
feature = {
'data': tf.train.Feature(bytes_list=tf.train.BytesList(value=[data])),
}
return tf.train.Example(features=tf.train.Features(feature=feature))

with beam.Pipeline() as pipeline:
data = [b'record1', b'record2', b'record3']

examples = (
pipeline
| 'CreateData' >> beam.Create(data)
|…

Excerpt shown — open the source for the full document.

Notability

notability 3.0/10

Low star count, routine new repo from DeepMind