A comprehensive Python framework for fleet telemetry data. Provides both a type-safe API client for direct provider interaction and an automated ETL pipeline for building unified datasets from multiple telematics platforms (Motive, Samsara).
Fleet Telemetry Hub is a dual-purpose system:
- API Abstraction Framework - Type-safe, provider-agnostic interface for direct API interaction
- Data Pipeline System - Automated ETL for collecting, normalizing, and persisting telemetry data
Whether you need one-off API queries or continuous data collection, Fleet Telemetry Hub provides the right abstraction.
- Multi-Provider Support: Unified interface for Motive and Samsara APIs
- Type-Safe: Full type hints and Pydantic models for request/response validation
- Automatic Pagination: Seamlessly iterate through paginated results (offset & cursor-based)
- Smart Retry Logic: Exponential backoff with configurable retry strategies
- Rate Limit Handling: Automatic throttling and retry-after support
- Network Resilience: Designed to work behind corporate proxies (Zscaler, etc.)
- SSL Flexibility: Configurable SSL verification with custom CA bundle support
- Connection Pooling: Efficient HTTP connection management
- Unified Schema: Normalizes data from all providers to common format
- Incremental Updates: Intelligent lookback for late-arriving data
- Batch Processing: Configurable time-based batches for memory efficiency
- Atomic Writes: No data corruption even if process crashes mid-write
- Automatic Deduplication: Handles overlapping data from multiple runs
- Date-Partitioned Parquet Storage: Daily partitions for BigQuery compatibility and scalable storage
- Independent Provider Fetching: One provider's failure doesn't block others
- Comprehensive Logging: Track progress and debug issues
pip install fleet-telemetry-hubgit clone https://github.com/andrewjordan3/fleet_telemetry_hub.git
cd fleet_telemetry_hub
pip install -e .# For development tools (ruff, mypy, pytest)
pip install -e ".[dev]"
# For TLS/truststore support (Windows + Zscaler)
pip install -e ".[tls]"
# Install everything
pip install -e ".[all]"The pipeline automatically collects, normalizes, and stores data from all configured providers in date-partitioned Parquet files.
1. Create a configuration file at config/telemetry_config.yaml:
providers:
motive:
enabled: true
base_url: "https://api.gomotive.com"
api_key: "your-motive-api-key"
request_timeout: [10, 30]
max_retries: 5
retry_backoff_factor: 2.0
verify_ssl: true
samsara:
enabled: true
base_url: "https://api.samsara.com"
api_key: "your-samsara-api-key"
request_timeout: [10, 30]
max_retries: 5
retry_backoff_factor: 2.0
verify_ssl: true
pipeline:
default_start_date: "2024-01-01"
lookback_days: 7
batch_increment_days: 1.0
storage:
parquet_path: "data/telemetry"
parquet_compression: "snappy"
logging:
file_path: "logs/fleet_telemetry.log"
console_level: "INFO"
file_level: "DEBUG"2. Run the pipeline:
from fleet_telemetry_hub.pipeline_partitioned import PartitionedTelemetryPipeline
# One-liner for scheduled jobs (cron, etc.)
PartitionedTelemetryPipeline('config/telemetry_config.yaml').run()
# Or work with specific date ranges
pipeline = PartitionedTelemetryPipeline('config/telemetry_config.yaml')
pipeline.run()
# Load data for specific date range (for analysis)
from datetime import date
df = pipeline.load_date_range(
start_date=date(2024, 1, 1),
end_date=date(2024, 1, 31),
)
print(f"Loaded {len(df)} records from January 2024")
# Implement data retention (delete old partitions)
deleted = pipeline.delete_old_partitions(retention_days=90)
print(f"Deleted {deleted} partitions older than 90 days")Storage Structure:
The pipeline creates a Hive-style directory structure compatible with BigQuery:
data/telemetry/
├── date=2024-01-15/
│ └── data.parquet
├── date=2024-01-16/
│ └── data.parquet
├── date=2024-01-17/
│ └── data.parquet
└── _metadata.json
3. Schedule it (optional):
# Run daily at 2 AM
0 2 * * * cd /path/to/project && python -c "from fleet_telemetry_hub.pipeline_partitioned import PartitionedTelemetryPipeline; PartitionedTelemetryPipeline('config.yaml').run()"The pipeline will:
- Fetch data from all enabled providers
- Normalize to unified schema (14 columns: VIN, timestamp, GPS, speed, driver, etc.)
- Deduplicate on (provider, provider_vehicle_id, timestamp)
- Save incrementally to date-partitioned Parquet files with atomic writes
- Resume from last run with configurable lookback
- Organize data by date for efficient BigQuery loading and retention management
For one-off queries or custom integrations, use the Provider interface:
from fleet_telemetry_hub import Provider
from fleet_telemetry_hub.config.loader import load_config
# Load config
config = load_config('config/telemetry_config.yaml')
# Create provider
motive = Provider.from_config('motive', config.providers['motive'])
# Fetch data
for vehicle in motive.fetch_all('vehicles'):
print(f"Vehicle: {vehicle.number} - VIN: {vehicle.vin}")
# Or convert directly to DataFrame
df = motive.to_dataframe('vehicles')
print(df.head())Each provider requires the following settings:
enabled: Whether to use this provider (boolean)base_url: API base URL (no trailing slash)api_key: Your API token/keyrequest_timeout: Tuple of [connect_timeout, read_timeout] in secondsmax_retries: Maximum retry attempts for failed requestsretry_backoff_factor: Exponential backoff multiplierverify_ssl: SSL verification (true/false or path to CA bundle)rate_limit_requests_per_second: Client-side rate limiting
For corporate environments with MITM proxies (Zscaler, etc.):
# Disable SSL verification (not recommended for production)
verify_ssl: false
# Or provide custom CA bundle
verify_ssl: "/path/to/ca-bundle.pem"
# Or use system trust store (Windows)
pipeline:
use_truststore: trueThe request_timeout tuple has two values:
- Connect Timeout: Time to establish TCP handshake (default: 10s)
- Read Timeout: Time to wait for server response (default: 30s)
request_timeout: [10, 30] # [connect, read]The pipeline creates Hive-style date partitions that are natively compatible with BigQuery:
from fleet_telemetry_hub.pipeline_partitioned import PartitionedTelemetryPipeline
# Run pipeline to populate partitioned storage
pipeline = PartitionedTelemetryPipeline('config.yaml')
pipeline.run()Option 1: BigQuery External Table (recommended for GCS)
- Upload partitioned data to Google Cloud Storage:
gsutil -m cp -r data/telemetry/* gs://your-bucket/telemetry/- Create external table in BigQuery:
CREATE EXTERNAL TABLE `project.dataset.fleet_telemetry`
WITH PARTITION COLUMNS (
date DATE
)
OPTIONS (
format = 'PARQUET',
uris = ['gs://your-bucket/telemetry/date=*/*.parquet'],
hive_partition_uri_prefix = 'gs://your-bucket/telemetry/'
);Option 2: BigQuery LOAD DATA (for one-time imports)
LOAD DATA INTO `project.dataset.fleet_telemetry`
FROM FILES (
format = 'PARQUET',
uris = ['gs://your-bucket/telemetry/date=*/*.parquet']
)
WITH PARTITION COLUMNS (date DATE);Benefits of Partitioned Storage:
- Query only specific date ranges (reduces cost and latency)
- Automatic partition pruning in BigQuery
- Efficient data retention (delete old partitions easily)
- Scales to billions of records without memory constraints
Fetch historical data by overriding the default start date:
from fleet_telemetry_hub.pipeline_partitioned import PartitionedTelemetryPipeline
from fleet_telemetry_hub.config.loader import load_config
config = load_config('config.yaml')
config.pipeline.default_start_date = '2023-01-01'
config.pipeline.batch_increment_days = 7.0 # Larger batches for backfill
pipeline = PartitionedTelemetryPipeline.from_config(config)
pipeline.run()from fleet_telemetry_hub.pipeline_partitioned import PartitionedTelemetryPipeline, PartitionedPipelineError
import logging
logger = logging.getLogger(__name__)
try:
pipeline = PartitionedTelemetryPipeline('config.yaml')
pipeline.run()
stats = pipeline.file_handler.get_statistics()
logger.info(f"Pipeline success: {stats['partition_count']} partitions, {stats['total_size_mb']} MB")
except PartitionedPipelineError as e:
logger.error(f"Pipeline failed: {e}")
if e.partial_data_saved:
logger.warning(f"Partial data saved up to batch {e.batch_index}")
# Send alert, retry with different config, etc.The pipeline produces a DataFrame with standardized columns across all providers:
from fleet_telemetry_hub.pipeline_partitioned import PartitionedTelemetryPipeline
from datetime import date
import pandas as pd
pipeline = PartitionedTelemetryPipeline('config.yaml')
pipeline.run()
# Load specific date range
df = pipeline.load_date_range(
start_date=date(2025, 1, 1),
end_date=date(2025, 1, 31),
)
# Columns available (14 total):
# provider, provider_vehicle_id, vin, fleet_number, timestamp,
# latitude, longitude, speed_mph, heading_degrees, engine_state,
# driver_id, driver_name, location_description, odometer
# Filter by VIN
vehicle_data = df[df['vin'] == 'ABC123XYZ']
# Analyze by provider
print(df.groupby('provider')['vin'].nunique())
# Calculate average speed by vehicle
avg_speeds = df.groupby('fleet_number')['speed_mph'].mean()
# Export subsets
df.to_csv('january_data.csv', index=False)The client handles pagination automatically:
from fleet_telemetry_hub import Provider
provider = Provider.from_config('motive', config.providers['motive'])
# Iterate through all items (automatic pagination)
with provider.client() as client:
for item in client.fetch_all(provider.endpoint('vehicles')):
process(item)Convert API data to pandas DataFrame for analysis:
from fleet_telemetry_hub import Provider
from datetime import date
config = load_config("config.yaml")
motive = Provider.from_config("motive", config.providers["motive"])
# Get all vehicles as DataFrame
df = motive.to_dataframe("vehicles")
print(df.head())
# With parameters
locations_df = motive.to_dataframe(
"vehicle_locations",
vehicle_id=12345,
start_date=date(2025, 1, 1),
)
# Save to various formats
df.to_parquet("vehicles.parquet", compression="snappy")
df.to_csv("vehicles.csv", index=False)
df.to_excel("vehicles.xlsx", index=False)with motive.client() as client:
# Single SSL handshake, connection pooling
vehicles = list(client.fetch_all(motive.endpoint('vehicles')))
groups = list(client.fetch_all(motive.endpoint('groups')))
users = list(client.fetch_all(motive.endpoint('users')))# Clone repository
git clone https://github.com/andrewjordan3/fleet_telemetry_hub.git
cd fleet_telemetry_hub
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
# Install with dev dependencies
pip install -e ".[dev]"# Format code
ruff format .
# Lint code
ruff check .
# Type checking
mypy src/
# Run tests
pytest
# Run tests with coverage
pytest --cov=fleet_telemetry_hub --cov-report=htmlfleet-telemetry-hub/
├── src/
│ └── fleet_telemetry_hub/
│ ├── pipeline_partitioned.py # Pipeline orchestrator
│ ├── schema.py # Unified telemetry schema
│ ├── client.py # HTTP client (API abstraction)
│ ├── provider.py # Provider facade (API abstraction)
│ ├── registry.py # Endpoint discovery
│ │
│ ├── common/ # Common utilities
│ │ ├── partitioned_file_io.py # Date-partitioned Parquet handler
│ │ ├── logger.py # Centralized logging setup
│ │ └── truststore_context.py # SSL/TLS utilities
│ │
│ ├── config/ # Configuration models and loader
│ │ ├── config_models.py # Pydantic config models
│ │ └── loader.py # YAML config loader
│ │
│ ├── models/ # Request/response models
│ │ ├── shared_request_models.py # RequestSpec, HTTPMethod
│ │ ├── shared_response_models.py # EndpointDefinition, ParsedResponse
│ │ ├── motive_requests.py # Motive endpoint definitions
│ │ ├── motive_responses.py # Motive Pydantic models
│ │ ├── samsara_requests.py # Samsara endpoint definitions
│ │ └── samsara_responses.py # Samsara Pydantic models
│ │
│ └── operations/ # Data transformation functions
│ ├── fetch_data.py # Provider fetch orchestration
│ ├── motive_funcs.py # Motive flatten functions
│ └── samsara_funcs.py # Samsara flatten functions
│
├── config/
│ └── telemetry_config.yaml # Example configuration
├── examples/ # Example scripts
├── tests/ # Test suite
├── pyproject.toml # Project metadata and dependencies
├── ARCHITECTURE.md # Detailed architecture documentation
└── README.md # This file
- Python 3.12 or higher
- Dependencies:
pydantic>=2.0.0- Data validation and type safetyhttpx>=0.28.0- Modern HTTP clienttenacity>=9.1.0- Retry logic with exponential backoffpyyaml>=6.0.0- YAML configuration parsingpandas>=2.0.0- DataFrame manipulationpyarrow>=14.0.0- Parquet I/O and columnar storage
For detailed architecture documentation, see ARCHITECTURE.md.
Key architectural highlights:
- Two-tier system: Data Pipeline (high-level ETL) + API Abstraction Framework (low-level access)
- Self-describing endpoints: All API knowledge encapsulated in endpoint definitions
- Unified schema: 14-column normalized format for cross-provider analytics
- Provider independence: Add/remove providers without changing core logic
- Type safety: Pydantic models from API → DataFrame
- Atomic writes: Temp file + rename guarantees no data corruption
- API Documentation: https://developer.gomotive.com/
- Features: Vehicle tracking, driver logs, fuel data, ELD compliance
- Pagination: Offset-based (page_no, per_page)
- Authentication: X-API-Key header
- API Documentation: https://developers.samsara.com/
- Features: Fleet management, vehicle health, driver safety, route optimization
- Pagination: Cursor-based (after parameter)
- Authentication: Bearer token
Contributions are welcome! Please follow these guidelines:
- Fork the repository
- Create a feature branch (
git checkout -b feature/amazing-feature) - Make your changes
- Run tests and linting (
pytest,ruff check,mypy) - Commit your changes (
git commit -m 'Add amazing feature') - Push to the branch (
git push origin feature/amazing-feature) - Open a Pull Request
Please ensure:
- All tests pass
- Code is formatted with Ruff
- Type hints are complete and mypy passes
- Documentation is updated
This project is licensed under the MIT License - see the LICENSE file for details.
Andrew Jordan - andrewjordan3@gmail.com
Project Link: https://github.com/andrewjordan3/fleet-telemetry-hub
- Built with Pydantic for data validation
- HTTP client powered by HTTPX
- Retry logic using Tenacity
- Type checking with mypy
- Code quality with Ruff
- Unified schema for multi-provider data normalization
- Automated ETL pipeline with incremental updates
- Parquet storage with atomic writes
- Date-partitioned storage for BigQuery compatibility
- Python 3.12+ with modern type syntax
- Comprehensive logging system
- Batch processing with configurable time windows
- Automatic deduplication
- Add support for additional providers (Geotab, Verizon Connect)
- Async client support with
asyncio - CLI tool for common operations
- Real-time data streaming mode
- GraphQL API support
- Webhook integration for push notifications
- Data quality metrics and validation reports
If you encounter SSL errors behind a corporate proxy:
# Option 1: Use system trust store (Windows)
pipeline:
use_truststore: true
# Option 2: Provide CA bundle
providers:
motive:
verify_ssl: "/path/to/ca-bundle.pem"
# Option 3: Disable verification (development only)
providers:
motive:
verify_ssl: falseIf you're hitting rate limits frequently:
- Increase
request_delay_secondsin pipeline config - Reduce
rate_limit_requests_per_secondfor the provider - Check provider documentation for current rate limits
For slow API responses, increase the read timeout:
providers:
motive:
request_timeout: [10, 60] # Increase read timeout to 60s