Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: CI

on:
pull_request:
branches: [ "**" ]
push:
branches: [ main, master ]

jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10"]
steps:
- name: Checkout repository
uses: actions/checkout@v4

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -e .
pip install pytest

- name: Run tests
run: |
pytest -v


10 changes: 10 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[tool.pytest.ini_options]
minversion = "7.0"

testpaths = [
"tests"
]
python_files = ["test_*.py"]
python_functions = ["test_*", "it_*", "spec_*"]


15 changes: 15 additions & 0 deletions tests/test_cli_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from tbview.cli import is_event_file, local_event_name, local_event_dir


def test_is_event_file_and_local_event_name():
p = "/tmp/events.out.tfevents.1699999999.machine.tag"
assert is_event_file(p) is True
# local_event_name strips suffix after the first '.' following the prefix
assert local_event_name(p).startswith("events.out.tfevents.")


def test_local_event_dir_current_dir_when_empty():
assert local_event_dir("") == "."
assert local_event_dir("subdir") == "subdir"


37 changes: 37 additions & 0 deletions tests/test_crc32c.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import os
import struct

from tbview.crc32c import crc32c, masked_crc32c, u32


def test_crc32c_known_vectors():
# Test against known CRC32C values (Castagnoli polynomial)
# Source vectors appear in many CRC32C test suites
vectors = [
(b"", 0x00000000),
(b"123456789", 0xE3069283),
(b"The quick brown fox jumps over the lazy dog", 0x22620404),
(b"\x00" * 32, 0x8A9136AA),
]
for data, expected in vectors:
assert u32(crc32c(data)) == expected


def test_masked_crc32c_roundtrip_properties():
# Masking is a reversible transformation in TensorFlow TFRecord format:
# unmask(mask(crc(x))) == crc(x). We don't implement unmask; instead verify
# that masking preserves 32-bit range and is not identity on non-zero inputs.
samples = [b"a", b"abc", os.urandom(64), b"\x00" * 10]
for data in samples:
base = u32(crc32c(data))
masked = u32(masked_crc32c(data))
assert 0 <= masked <= 0xFFFFFFFF
if base != 0:
assert masked != base


def test_u32_masks_to_32_bits():
assert u32(0x1_0000_0000) == 0
assert u32(-1) == 0xFFFFFFFF


113 changes: 113 additions & 0 deletions tests/test_parser_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
import struct
import tempfile
from typing import List

import pytest

from tbview.parser import read_records, read_records_from_offset, test_crc32c as validate_crc32c
from tbview.tf_protobuf.event_pb2 import Event
from tbview.crc32c import masked_crc32c


def write_tfrecord_records(path: str, payloads: List[bytes]):
# Always append to support incremental writes across calls
with open(path, "ab") as f:
for payload in payloads:
length_bytes = struct.pack("Q", len(payload))
length_crc = struct.pack("I", masked_crc32c(length_bytes))
payload_crc = struct.pack("I", masked_crc32c(payload))
f.write(length_bytes)
f.write(length_crc)
f.write(payload)
f.write(payload_crc)


def make_event(step: int, tag: str, value: float) -> bytes:
e = Event()
e.step = step
e.wall_time = 1000.0 + step
v = e.summary.value.add()
v.tag = tag
v.simple_value = float(value)
return e.SerializeToString()


def test_read_records_streams_all_events_and_stops_on_truncation():
with tempfile.TemporaryDirectory() as d:
path = os.path.join(d, "events.out.tfevents.test")
payloads = [
make_event(1, "loss", 0.9),
make_event(2, "loss", 0.8),
make_event(3, "acc", 0.1),
]
write_tfrecord_records(path, payloads)
# Truncate the file mid-header to simulate corruption after valid records
with open(path, "ab") as f:
f.write(b"\x00\x01\x02")

steps = []
for ev in read_records(path, warn=lambda m: None):
steps.append(ev.step)

assert steps == [1, 2, 3]


def test_read_records_from_offset_resumes_incrementally():
with tempfile.TemporaryDirectory() as d:
path = os.path.join(d, "events.out.tfevents.test")
payloads = [
make_event(10, "loss", 1.0),
make_event(11, "loss", 0.9),
make_event(12, "acc", 0.2),
]
write_tfrecord_records(path, payloads)

out = list(read_records_from_offset(path, 0, warn=lambda m: None))
events, offsets = zip(*out)
assert [e.step for e in events] == [10, 11, 12]
# Resume from last offset should yield nothing new
last = offsets[-1]
out2 = list(read_records_from_offset(path, last, warn=lambda m: None))
assert out2 == []

# Append another record and ensure resume yields only the new one
extra = make_event(13, "loss", 0.8)
write_tfrecord_records(path, [extra])
out3 = list(read_records_from_offset(path, last, warn=lambda m: None))
events3, offsets3 = zip(*out3)
assert [e.step for e in events3] == [13]
assert offsets3[-1] > last


def test_test_crc32c_validation():
data = b"hello world"
good = struct.pack("I", masked_crc32c(data))
bad = struct.pack("I", (masked_crc32c(data) + 1) & 0xFFFFFFFF)
assert validate_crc32c(data, good) is True
assert validate_crc32c(data, bad) is False


def test_read_records_stops_on_invalid_crc():
with tempfile.TemporaryDirectory() as d:
path = os.path.join(d, "events.out.tfevents.test")
p1 = make_event(1, "loss", 0.5)
p2 = make_event(2, "loss", 0.4)
write_tfrecord_records(path, [p1, p2])
# Corrupt the payload CRC of the second record
with open(path, "r+b") as f:
# skip first record: len(8)+crc(4)+payload+crc(4)
off = 8 + 4 + len(p1) + 4
f.seek(off + 8 + 4 + len(p2))
good_crc = f.read(4)
f.seek(off + 8 + 4 + len(p2))
bad_crc = struct.pack("I", (struct.unpack("I", good_crc)[0] ^ 0xFFFFFFFF))
f.write(bad_crc)

steps = []
for ev in read_records(path, warn=lambda m: None):
steps.append(ev.step)
# Should yield only the first record and stop on CRC error
assert steps == [1]


46 changes: 46 additions & 0 deletions tests/test_viewer_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
from tbview.viewer import TensorboardViewer


class Dummy:
pass


def test_moving_average_simple_cases():
dummy = Dummy()
vals = [1, 2, 3, 4]
assert TensorboardViewer._moving_average(dummy, vals, 0) == vals
assert TensorboardViewer._moving_average(dummy, vals, 1) == vals
# window 2 -> prefix mean over last 2 values
assert TensorboardViewer._moving_average(dummy, vals, 2) == [1.0, 1.5, 2.5, 3.5]


def test_format_duration_formats_compactly():
dummy = Dummy()
assert TensorboardViewer._format_duration(dummy, 5) == "00:05"
assert TensorboardViewer._format_duration(dummy, 60) == "01:00"
assert TensorboardViewer._format_duration(dummy, 3661) == "1:01:01"


def test_compute_run_epoch_eta_and_speed_from_epoch_series():
# Build a minimal self-like object with required attributes
self_like = Dummy()
self_like.records_by_run = {
"runA": {
"train/epoch": {0: 0.0, 10: 0.5, 20: 1.0},
}
}
self_like.wall_times_by_run = {
"runA": {
"train/epoch": {0: 100.0, 10: 110.0, 20: 120.0},
}
}

eta_speed = TensorboardViewer._compute_run_epoch_eta(self_like, "runA")
assert eta_speed is not None
eta, speed = eta_speed
# When first epoch>=1 at t=120 and t0=100 -> eta 20s
assert abs(eta - 20.0) < 1e-6
# steps elapsed between first and idx_ge1: 20 - 0 over 20s => 1.0 steps/s
assert speed is not None and abs(speed - 1.0) < 1e-6