A minimal, composable pipeline for multimodal data prep with single-responsibility steps and predictable chaining.
Transform raw, unstructured data into training-ready datasets.
- Domain-first steps: MeshTransform, MeshClean, MeshQC, ImageTransform, ImageClean, ImageQC, ImageLabel
- Clear I/O contracts via StepSignature: input modality, output modality, arity
- Pluggable providers (e.g., Blender) behind small interfaces
- Built-in AI labeling with DSPy for caption/prompt generation
This omniflow/ folder is the intended public OSS surface. Scripts under scripts/ are convenience runners for local development.
- Modality: current content flowing through the pipeline
3d_mesh,2d_image,text,metrics(extensible)
- Arity: how many inputs/outputs a step produces
one_to_one,one_to_many,many_to_one
- Step types
- Transform: mutates content (e.g., render, convert mode)
- Clean: drops/fixes content based on rules
- QC: computes metrics/flags, does not mutate content
- Label: enriches metadata with captions/tags/labels (content unchanged)
Each step declares a StepSignature(input_modality, output_modality, arity) and is validated during Pipeline.compile().
- Streaming by default: steps consume and yield
Sampleobjects lazily. - Optional batching (default batch_size=1):
Pipeline.run(batch_size=N)processes up to N items end-to-end, then releases them before loading the next batch. This bounds memory. - Optional parallelism:
Pipeline.run(num_workers=W)runs up to W batches concurrently end-to-end (threads; portable, good for I/O-bound). In-flight items ≈N * W. - Fanout: one-to-many steps expand within the batch. Downstream steps see the expanded items. Keep
views(or fanout factors) modest or use smallerbatch_sizeto bound memory. - Representations by modality (canonical, in-memory between steps):
3d_mesh: in-memorytrimesh.Trimesh(default from sources). Providers that require files (e.g., Blender) transparently export a temporary.glband auto-clean.2d_image: in-memoryPIL.Image. Early downscale keeps memory low.
- Persistence: only writer steps (e.g.,
DestinationLocal) save artifacts. This provides natural progress saving. Stage checkpoints and resumability may be added later.
Omniflow uses a unified provider system for extensibility. Providers handle external services (rendering, AI models) with consistent interfaces.
- ProviderBase: Base class with
alias,kind,capabilities, andpreflight()validation - Capability Protocols:
ProviderCaption(image captioning),ProviderMeshRender(3D rendering) - Registry: Central
provider_registryfor discovery and validation
Mesh Rendering (kind=mesh_render)
blender: Headless Blender renderer for GLB/GLTF files- Requires: Blender 3.0+ (set
BLENDER_PATHor install to default location) - Capabilities:
render
- Requires: Blender 3.0+ (set
AI Models (kind=model)
gemini: Google Gemini vision models (gemini-2.5-flash-lite, etc.)- Requires:
dspy-aipackage,GEMINI_API_KEYenvironment variable - Capabilities:
caption
- Requires:
openai: OpenAI GPT-4 vision models (gpt-4o-mini, gpt-4.1-nano, etc.)- Requires:
dspy-aipackage,OPENAI_API_KEYenvironment variable - Capabilities:
caption
- Requires:
| Kind | Alias | Capabilities | Requirements |
|---|---|---|---|
| mesh_render | blender | render | Blender 3.0+ (BLENDER_PATH opt) |
| model | gemini | caption | dspy-ai, GEMINI_API_KEY |
| model | openai | caption | dspy-ai, OPENAI_API_KEY |
Steps accept a provider parameter to select the backend:
# Mesh rendering
MeshScreenshots(provider="blender", ...)
# Caption generation
LabelVisionToText(provider="gemini", model="gemini-2.5-flash-lite", ...)
LabelVisionToText(provider="openai", model="gpt-4o-mini", ...)from omniflow.provider_registry import provider_registry
# List caption-capable model providers
print(provider_registry.providers_for(kind="model", capability="caption"))
# List mesh render providers
print(provider_registry.providers_for(kind="mesh_render", capability="render"))For local folders, you can choose what the source yields via representation:
# Default: in-memory objects (recommended)
source = SourceFolder(modality="3d_mesh", path="./MODELS", pattern="*.glb", representation="object")
# Power users: stream file paths and let the first transform load lazily
source = SourceFolder(modality="3d_mesh", path="./MODELS", pattern="*.glb", representation="path")In both cases, streaming keeps memory bounded by batch_size; only the current batch is in memory.
The Pipeline.compile() method calls preflight() on each step, which validates:
- Provider is available and registered
- Required environment variables are set (API keys)
- Dependencies are installed (Blender, dspy-ai, etc.)
- I/O checks (where applicable):
- Writers create/delete a tiny sentinel in their target directories
- Render providers verify temporary directory create/write/delete
This ensures fail-fast behavior before pipeline execution.
- Missing API key:
GEMINI_API_KEY/OPENAI_API_KEYnot set - Missing dependency:
dspy-ainot installed (for model providers) - Blender not found:
BLENDER_PATHnot set and auto-detect failed
Example error during compile():
PipelineConfigError: Preflight check failed for step 'label.vision_to_text': Caption provider 'gemini' failed preflight. Available providers: ['gemini', 'openai']. Error: GEMINI_API_KEY environment variable required for ProviderGemini.
Mesh
NormalizeMesh(transform): centers/scales atrimesh.Trimesh(accepts path or object; loads if path)MeshScreenshots(transform, one-to-many): renders views via provider- Args:
provider,elevation,azimuth,fov,size,bg,verbose - Input:
3d_mesh(object or path). If given atrimesh.Trimesh, the provider transparently exports a temporary.glbfor rendering. - Output:
2d_imagesamples with metadata:parent.id,view.{elevation, azimuth, fov, size, bg, provider}
- Args:
Image
SetBackground(transform): composites RGBA over a solid color to RGBSetMode(transform): converts image mode (e.g.,RGB/RGBA)ResizeTo(transform): resize images to target dimensions (with optional letterboxing)CenterCrop(transform): center crop to target sizeFlipHorizontal/FlipVertical(transform): flip images for augmentationRotate(transform): rotate by 90/180/270 degrees
Image QC
ImageQCCroppedAtEdges(qc): flags if foreground touches any image border- RGBA: uses alpha; RGB: compares to provided BG color with tolerance
- Writes QC record shape:
metadata["qc"][name] = { ok: bool, cropped: bool, touching_edges: {...}, width, height }
Image Labeling
LabelVisionToText(label): generates captions using AI vision models- Args:
provider,model,profile,field,max_chars,max_tokens,overwrite,custom_prompt - Providers:
gemini(default),openai - Models:
gemini-2.5-flash-lite,gpt-4o-mini,gpt-4.1-nano - Profiles:
sdxl(220 char, concise for SD/SDXL training),generic(256 char, flexible) - Stores caption in
metadata[field](default:labels.caption) - Presets:
LabelVisionToTextStableDiffusion,LabelVisionToTextSDXL - Requires:
dspy-aipackage and provider-specific API key
- Args:
Destinations (Training Data Writers)
Omniflow provides flexible output writers for common training formats:
Core Writers
DestinationLocal: Simple image writer to a folder (basic checkpointing)DestinationJSONL: Schema-driven JSONL writer with external media filesDestinationFolderLayout: Strict file/folder layouts (e.g.,image.png+prompt.txt)DestinationHF: Export to HuggingFace Datasets format
Preset Writers (Pre-configured for common tasks)
DestinationJSONLImageText: SDXL/Diffusers text→image (JSONL + images)DestinationFolderPairsSD: SD/SDXL folder pairs (separate image + prompt folders)DestinationJSONLImage2Image: I2I/IP-Adapter (input + target images)DestinationJSONLChat: Chat/conversation datasets for LLM tuningDestinationHFImageText: HuggingFace image-text export
Schema Types
FileRef,ImageFileRef: Define how media files are saved/referencedImageTextRow,Image2ImageRow,ChatRow: Pydantic schemas for common formats
See "Output Formats" section below for detailed usage examples.
Use the development script in scripts/run_pipeline.py:
python -m scripts.run_pipeline \
--models ./path/to/glbs \
--out ./out \
--el 15 \
--az 0 45 90 \
--size 512 \
--fov 35 \
--bg "#d0d0d0" \
--verboseOutputs:
- PNGs in
--out - QC report:
--out/qc_report.json(override with--report)
Notes:
- Set
BLENDER_PATHif Blender isn't auto-detected - Requires Pillow (
pip install pillow)
from omniflow.pipeline import Pipeline
from omniflow.steps import (
MeshTransformNormalizeMesh,
MeshScreenshots,
ImageTransformSetBackground,
LabelVisionToTextStableDiffusion,
)
from omniflow.utils.sources import SourceFolder
from omniflow import DestinationJSONLImageText
# Load 3D models
source = SourceFolder(modality="3d_mesh", path="./models", pattern="*.glb")
# Build pipeline
pipe = (
Pipeline(source)
.add(MeshTransformNormalizeMesh())
.add(MeshScreenshots(
provider="blender", # Mesh render provider
elevation=15,
azimuth=[0, 45, 90, 135, 180, 225, 270, 315],
fov=35,
size=512
))
.add(ImageTransformSetBackground(color="#ffffff"))
.add(LabelVisionToTextStableDiffusion(
provider="gemini", # Caption provider
model="gemini-2.5-flash-lite",
context_hint="3D block model render"
))
.add(DestinationJSONLImageText(
out_path="./training/data.jsonl",
media_dir="./training"
))
)
# Preflight checks providers, API keys, dependencies
pipe.compile()
# Run with batching and parallelism
pipe.run(batch_size=4, num_workers=2)Environment setup:
export BLENDER_PATH="/path/to/blender" # Optional if Blender is in PATH
export GEMINI_API_KEY="your-api-key-here"
# or
export OPENAI_API_KEY="your-api-key-here"Output: JSONL file with image paths and captions, ready for SDXL/Diffusers training.
Create a minimal provider by subclassing ProviderBase. For example, a mock caption provider:
from omniflow.provider_registry import ProviderBase, provider_registry
class ProviderMyCaption(ProviderBase):
def __init__(self):
super().__init__(alias="mycaption", kind="model", capabilities={"caption"})
def preflight(self):
# Validate dependencies/env as needed
pass
def caption(self, image, model: str, guidance: str, max_tokens: int | None = None) -> str:
# Implement your captioning logic
return "custom caption"
# Register it at startup
provider_registry.register(ProviderMyCaption())
# Use it in a pipeline
# LabelVisionToText(provider="mycaption", model="ignored")Omniflow provides flexible output writers to generate training data in the exact format your framework expects.
For text-to-image training with SDXL or Diffusers:
from omniflow import DestinationJSONLImageText
writer = DestinationJSONLImageText(
out_path="./training/data.jsonl",
media_dir="./training"
)
pipe.add(writer)Output structure:
training/
data.jsonl # {"id": "001", "image_path": "images/001.png", "text": "caption", "meta": {...}}
images/
001.png
002.png
...
For trainers that expect image.png + prompt.txt pairs:
from omniflow import DestinationFolderPairsSD
writer = DestinationFolderPairsSD(
root="./sd_training",
image_format="png"
)
pipe.add(writer)Output structure:
sd_training/
images/
001.png
002.png
prompts/
001.txt # "caption text"
002.txt
Define your own schema with Pydantic:
from pydantic import BaseModel
from omniflow import DestinationJSONL, ImageFileRef
class MyTrainingRow(BaseModel):
sample_id: str
img: str
prompt: str
quality_score: float
writer = DestinationJSONL(
out_path="custom.jsonl",
media_dir="./data",
schema_model=MyTrainingRow,
mapping={
"sample_id": lambda s: s.id,
"img": lambda s: ImageFileRef(path_template="img/{id}.jpg", format="jpg"),
"prompt": lambda s: s.metadata["labels"]["caption"],
"quality_score": lambda s: s.metadata.get("qc", {}).get("score", 1.0),
}
)Export directly to HF Datasets format:
from omniflow import DestinationHFImageText
writer = DestinationHFImageText(
dataset_name="username/my-dataset",
split="train",
push_to_hub=True # Requires HF_TOKEN
)
pipe.add(writer)For IP-Adapter, ControlNet, or I2I models:
from omniflow import DestinationJSONLImage2Image
writer = DestinationJSONLImage2Image(
out_path="i2i.jsonl",
media_dir="./i2i_data"
)
pipe.add(writer)Output: JSONL with input_image_path, target_image_path, and optional text fields.
For LLM or multimodal chat tuning:
from omniflow import DestinationJSONLChat
writer = DestinationJSONLChat(
out_path="conversations.jsonl",
system_message="You are a helpful assistant."
)
pipe.add(writer)Output: JSONL with messages array containing system/user/assistant turns.
Default: QC steps are read-only and annotate metadata only.
- Recommended: QC measure → Clean → QC assert
- Measure: e.g.,
ImageQCCroppedAtEdges - Clean (future): e.g.,
ImageCleanDropIfCropped - Assert (optional): QC step with
strict=Trueto gate curated runs/CI
- Measure: e.g.,
- Flexible order: you can do
measure → clean → assertormeasure → assert → clean. - Writers: only writer steps persist artifacts; place them where you want outputs.
- Common: write after the final assert.
- For checkpoints: write after clean, then continue.
- Assert policies (planned):
fail_on_any(default),max_fail_rate,on_fail={raise|warn|annotate}.
- Prefix by domain and intent:
MeshTransform*,MeshClean*,MeshQC*ImageTransform*,ImageClean*,ImageQC*
- Keep steps small, verb-focused, with clear args and docstrings
- Small provider interface and registry for pluggability
- Minimal Blender helper (engine autodetect, basic lighting)
- Canonical in-memory handoff between steps: sources yield in-memory objects by default (e.g.,
trimesh.Trimesh,PIL.Image), and steps operate on them. Providers that require files perform temporary export internally and auto-clean. - Declarative UX: users describe what they want done; Omniflow internalizes representation adaptation and temp storage. Writers are the only persistent I/O.
- Preflight is fail-fast: dependencies, API keys, provider availability, and basic I/O permissions (sentinel writes) are checked in
compile(). - Broadcasting: single
elevation/azimuthvalues broadcast across views
- ✅ Vision-to-text labeling via DSPy (SDXL, generic profiles)
- ✅ Batching and parallelism for throughput
- ✅ Provenance tracking in metadata
- ✅ Multiple image transforms (resize, crop, flip, rotate)
- ✅ Flexible output writers with schema mapping (JSONL, HuggingFace datasets, folder layouts)
- Dataset-level operations (shuffle, split, deduplicate, sample)
- Camera fit: compute distance from bounds and FOV to avoid cropping
- Clean steps: drop/correct based on QC flags
- Text modality pipeline (cleaning, templating, QC)
- Additional labeling backends (BLIP-2, LLaVA, local models)
- Multi-view grouping/aggregation (select best, combine captions)
- Additional providers (e.g., nvdiffrast)
- Resumability and checkpointing
Core dependencies:
pip install pillow trimesh numpyFor labeling (caption generation):
pip install dspy-aiFor 3D rendering:
- Blender 3.0+ (set
BLENDER_PATHor install to default location)
-
Install in development mode:
cd omniflow python -m venv .venv source .venv/bin/activate pip install -r requirements.txt
-
Run tests:
pytest tests/ -v --cov=omniflow
-
Provider test:
python -m scripts.test_blender_provider --model ./some.glb --out ./tmp --transparent --verbose
-
Full pipeline (see Quickstart examples)
- Keep steps single-purpose with explicit signatures
- Favor pure transforms and metadata over hidden side effects
- Add concise docstrings (summary, args, I/O, arity, raises)