ya-dataflow is a modern, data-centric AI framework designed to orchestrate complex, large-scale workflows for Large Language Models (LLMs) and Vision-Language Models (VLMs).
It provides a unified, modular, and highly scalable architecture to manage the entire lifecycle of AI data processingโfrom raw data ingestion and sophisticated multi-modal reasoning to high-throughput trajectory synthesis and large-scale evaluation.
In the era of LLMs, the bottleneck is no longer just model capacity, but the quality and scale of the data pipelines that feed them. ya-dataflow is built to address this by treating data as a first-class citizen, providing a robust engine to transform raw, unstructured data into high-quality, structured datasets for training, fine-tuning, and evaluation.
At the heart of ya-dataflow is a highly extensible operator-based design. Every taskโwhether it's a simple text cleaning step, a complex RAG retrieval, or a sophisticated VLM reasoning processโis implemented as a standardized Operator. This allows for seamless composition of complex pipelines.
Designed for massive workloads, ya-dataflow supports advanced execution strategies like PartitionPipelineParallelRun. It enables high-throughput, parallel processing across distributed environments, making it capable of handling billions of tokens or millions of multi-modal assets.
Leveraging a powerful binary and structured file generation system, ya-dataflow can synthesize diverse datasetsโincluding JSON, XML, Markdown, HTML, and complex binary formats like PDF, XLSX, and PPTXโto create robust testing and training environments.
Deeply integrated with the OpenClaw and Nanobot ecosystems. Through CLIOpenClawServing and NanobotServing, your data pipelines become accessible intelligence that can be dynamically invoked and orchestrated by autonomous agents.
While many frameworks handle simple tasks, ya-dataflow is engineered for production-grade, massive-scale AI data engineering.
- Cloud-Native Storage: Native, seamless integration with S3 and other cloud storage providers.
- Read-Write Separation: Decoupled
DataSourceandStoragelayers for maximum flexibility and control. - Intelligent Caching: Advanced-level caching mechanisms (
CacheStorage) to optimize I/O and accelerate repetitive workloads.
- Checkpointing & Resumption: Built-in support for checkpointing. If a massive job (running for days) is interrupted, it can resume exactly from where it left off.
- Fine-Grained Parallelism: Beyond simple task parallelism, our Partition-level parallelism allows you to scale throughput by splitting datasets into granular work units.
- OpenClaw Powered: Use
CLIOpenClawServingto bridge the gap between agentic reasoning and heavy-duty data pipelines. - Nanobot Ready: Integrated with
NanobotServingfor lightweight, high-performance serving within the Nanobot SDK ecosystem.
Install the core framework:
pip install ya-dataflowInstall with specialized capabilities via extras:
# For RAG workflows
pip install ya-dataflow[rag]
# For Multimodal (VLM) and PDF processing
pip install ya-dataflow[pdf2vqa]
# For LLM serving and high-performance evaluation
pip install ya-dataflow[vllm,eval]
# For Code and Math reasoning tasks
pip install ya-dataflow[code,reasoning]In production, you define a pipeline by inheriting from PartitionPipelineParallelRun and implementing the forward method to orchestrate your operators.
from dataflow.pipeline import PartitionPipelineParallelRun
from dataflow.operators.core_text import TextCleaningOperator
from dataflow.utils.storage import FileDataSource, FileStorage, FileCacheStorage
from dataflow.serving.api_llm_serving_request import APILLMServing_request
class MyProductionPipeline(PartitionPipelineParallelRun):
def __init__(self, source: FileDataSource, storage: FileStorage, llm_serving: APILLMServing_request):
# 1. Initialize CacheStorage (Crucial: cannot be None)
cache_storage = FileCacheStorage(cache_path="./cache")
# 2. Initialize base class with cache_storage and explicit partitions
super().__init__(cache_storage=cache_storage, partitions=10)
self.storage = storage
self.llm_serving = llm_serving
# Define Operators
self.clean_op = TextCleaningOperator(self.llm_serving)
self.refine_op = SomeRefineOperator(self.llm_serving)
def forward(self):
# Step 1: Clean the raw text
# .step() retrieves the current partition's data
self.clean_op.run(
self.storage.step(),
input_key="raw_text",
output_key="cleaned_text"
)
# Step 2: Refine based on cleaned text (Dependency: cleaned_text)
self.refine_op.run(
self.storage.step(),
input_key="raw_text",
output_key="final_result",
input_prev_1="cleaned_text"
)
# Usage
source = FileDataSource(paths=["./input.jsonl"])
storage = FileStorage(data_source=source, id_key="id", cache_path="./cache")
llm = APILLMServing_request(api_url="...", model_name="...")
pipeline = MyProductionPipeline(source, storage, llm)
pipeline.compile()
pipeline.run()Generate data on-the-fly without relying on pre-existing files.
Use when you have base data and want to enhance it with LLM-generated fields:
from dataflow.utils.storage import GeneratorDataSource, FileCacheStorage
from dataflow.serving.agent.cli_openclaw_serving import CLIOpenClawServing
# Define your base data generator
def task_generator():
"""Yield base task data"""
for i in range(1000):
yield {
"index": i,
"scene": "search" if i % 2 == 0 else "analysis",
"keywords": "็นๆฏๆ" if i % 2 == 0 else "่ดขๅกๆฐๆฎ"
}
# Create data source with LLM enhancement
source = GeneratorDataSource(
generator_fn=task_generator,
total_rows=1000,
name="enhanced_tasks",
serving=CLIOpenClawServing(agent_id="main"),
prompt_templates={
"question": "ๅบไบๅบๆฏ {scene} ๅๅ
ณ้ฎ่ฏ {keywords}๏ผ็ๆไธไธช็ๅฎ็ๆ่ฝไฝฟ็จ้ฎ้ขใ่ฟๅ JSON: {{\"question\": \"...\"}}",
"target_skills": "ๅบไบๅบๆฏ {scene}๏ผ้ๆฉ 2-3 ไธช้ๅ็ๆ่ฝใ่ฟๅ JSON: {{\"target_skills\": [...]}}",
},
fields_from_base=["index", "scene", "keywords"],
)
# Read data (LLM fields are generated on-the-fly)
for row in source.read(chunk_size=32):
print(row) # Contains: index, scene, keywords, question, target_skillsUse when you want LLM to generate all data from scratch:
from dataflow.utils.storage import LLMGeneratorDataSource
# Pure LLM generation - no base data needed
source = LLMGeneratorDataSource(
serving=CLIOpenClawServing(agent_id="main"),
prompts={
"question": "็ๆไธไธช็ๅฎ็ OpenClaw ๆ่ฝไฝฟ็จ้ฎ้ขใ่ฟๅ JSON: {{\"question\": \"...\"}}",
"target_skills": "ไธบ่ฟไธช้ฎ้ข้ๆฉ 2-3 ไธชๅ้็ๆ่ฝใ่ฟๅ JSON: {{\"target_skills\": [...]}}",
"difficulty": "่ฏไผฐ้ฎ้ข้พๅบฆ๏ผ1-5 ๅ๏ผใ่ฟๅ JSON: {{\"difficulty\": 3}}",
},
num_rows=10000,
batch_size=32,
name="llm_generated_tasks",
)
# Read generated data
for row in source.read(chunk_size=32):
print(row) # Contains: question, target_skills, difficultyfrom dataflow.utils.storage import create_data_source
# Generator data source
source = create_data_source(
["enhanced_tasks"],
source_type="generator",
generator_fn=task_generator,
total_rows=1000,
serving=CLIOpenClawServing(agent_id="main"),
prompt_templates={
"question": "ๅบไบๅบๆฏ {scene} ็ๆ้ฎ้ข",
},
fields_from_base=["index", "scene"],
)
# LLM generator data source
source = create_data_source(
["llm_tasks"],
source_type="llm_generator",
serving=CLIOpenClawServing(agent_id="main"),
prompts={
"question": "็ๆไธไธชๆ่ฝไฝฟ็จ้ฎ้ข",
},
num_rows=5000,
)from dataflow.pipeline import PartitionPipelineParallelRun
from dataflow.utils.storage import S3DataSource, S3Storage, S3CacheStorage
# Configure massive S3-based workflow with checkpointing
source = S3DataSource(
endpoint="https://s3.example.com",
ak="YOUR_AK", sk="YOUR_SK",
s3_paths=["s3://my-bucket/massive-dataset/"],
)
storage = S3Storage(
data_source=source,
id_key="task_id",
cache_path="./local_cache",
cache_type="jsonl"
)
# Enable checkpointing via CacheStorage
progress_storage = S3CacheStorage(
endpoint="https://s3.example.com",
ak="YOUR_AK", sk="YOUR_SK",
cache_file="s3://my-bucket/checkpoints/pipeline_v1.json"
)
pipeline = PartitionPipelineParallelRun(
steps=[...],
data_source=source,
storage=storage,
cache_storage=progress_storage,
partitions=1000, # Scale to thousands of partitions
max_parallelism=32
)
# Run with automatic resumption
pipeline.run(resume_from_last=True)dataflow/
โโโ core/ # Core engine, registry, and base abstractions
โโโ operators/ # Extensive library of built-in operators
โ โโโ core_text/ # Text processing, cleaning, and extraction
โ โโโ core_vision/ # VLM and image reasoning
โ โโโ code/ # Code synthesis and execution
โ โโโ reasoning/ # Math and logical reasoning
โ โโโ ... # Specialized domains (RAG, PDF2VQA, etc.)
โโโ pipeline/ # Pipeline orchestration and execution logic
โโโ serving/ # LLM/VLM serving integrations (vLLM, OpenAI, etc.)
โโโ utils/ # Storage, registry, and utility helpers
โโโ ...
ya-dataflow is an evolving ecosystem. We welcome contributions from the community to expand its operator library and performance capabilities. Please visit the main repository for contribution guidelines.
This project is licensed under the Apache-2.0 license.