Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
Copyright 2025 Channable

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
87 changes: 87 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,93 @@ The specific advantages for opsqueue are:
* One standardized queuing system that can be reused again and again
* A single way to implement monitoring, alerting, and debugging workflows

## Getting Started:

### 1. Grab the `opsqueue` binary and the Python client library

The binary can be installed from this repo, or build it from source by cloning the repo using `just build` or `cargo build`.
This is a self-contained program, you can run it on a server on its own, include it in a tiny container, etc.

The Python library used by the `Consumer` and `Producer` can be built from source, or (soon) simply be installed from pypi. (`pip install opsqueue`,`uv install opsqueue` etc.)

### 2. Create a `Producer`

```python
import logging
from opsqueue.producer import ProducerClient
from collections.abc import Iterable

logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.DEBUG)

def file_to_words(filename: str) -> Iterable[str]:
"""
Iterates over each word and inter-word whitespace strings in a file
while keeping at most one line in memory at a time.
"""
with open(filename) as input_file:
for line in input_file:
for word in line.split():
yield word

def print_words(words: Iterable[str]) -> None:
"""
Prints all words and inter-word whitespace tokens
without first loading the full string into memory
"""
for word in words:
print(word, end="")

def main() -> None:
client = ProducerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/")
stream_of_words = file_to_words("lipsum.txt")
stream_of_capitalized_words = client.run_submission(stream_of_words, chunk_size=4000)
print_words(stream_of_capitalized_words)

if __name__ == "__main__":
main()
```

### 3. Create a `Consumer`

```python
import logging
from opsqueue.consumer import ConsumerClient, Strategy

logging.basicConfig(format="%(levelname)s: %(message)s", level=logging.INFO)

def capitalize_word(word: str) -> str:
output = word.capitalize()
# print(f"Capitalized word: {word} -> {output}")
return output

def main() -> None:
client = ConsumerClient("localhost:3999", "file:///tmp/opsqueue/capitalize_text/")
client.run_each_op(capitalize_word, strategy=Strategy.Random())

if __name__ == "__main__":
main()
```


4. Run the Producer, queue and Consumer

- Run `opsqueue`.
- Run `python3 capitalize_text_consumer.py` to run a consumer. Feel free to start multiple instances of this program to try out consumer concurrency.
- Run `python3 capitalize_text_producer.py` to run a producer.

The order you start these in does not matter; systems will reconnect and continue after any kind of failure or disconnect.

By default the queue will listen on `http://localhost:3999`. The exact port can of course be changed.
Producer and Consumer need to share the same object store location to store the content of their submission chunks.
In development, this can be a local folder as shown in the code above.
In production, you probably want to use Google's GCS, Amazon's S3 or Microsoft's Azure buckets.

Please tinker with above code!
If you want more logging to look under the hood, run `RUST_LOG=debug opsqueue` to enable extra logging for the queue.
The Producer/Consumer will use whatever log level is configured in Python.

More examples can be found in `./libs/opsqueue_python/examples/`

## Project structure

The majority of Opsqueue's code is written in Rust.
Expand Down
23 changes: 2 additions & 21 deletions libs/opsqueue_python/README.md
Original file line number Diff line number Diff line change
@@ -1,22 +1,3 @@
# How to run
The Python client library for the Opsqueue lightweight batch processing queue system.

1. Move to the `opsqueue_consumer` subdirectory. With `direnv`, the extra `.envrc` in those directories will load an (essentially empty) Python virtual environment. This is necessary to make the next step work.

2. Any time you change any Rust code, run [maturin](https://github.com/PyO3/maturin), specifically `maturin develop` to update the Rust<->Python library bindings:
```bash
maturin develop
```

3. Now, just run a Python shell which now (courtesy of the virtual env) has access to the `opsqueue_consumer` module using:
```bash
python
```

# Structure

All logic happens inside the main `opsqueue` crate.
Only the Python-specific parts live inside this library.

You will notice that some structs/enums are defined which seem to be 1:1 copies of definitions inside the main crate.
This is because we cannot add PyO3-specific code, macro calls, conversions, etc. inside the main crate.
And note that this duplication is _fake_ duplication: In cases where we want the Python interface to diverge slightly (or significantly) from the Rust crate's to make it more Python-idiomatic, the types will stop being identical.
Find the full README with examples at https://github.com/channable/opsqueue
10 changes: 10 additions & 0 deletions libs/opsqueue_python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ build-backend="maturin"

[project]
name = "opsqueue"
readme="README.md"
description = "Python client library for Opsqueue, the lightweight batch processing queue for heavy loads"
license="MIT"
keywords=["queue", "processing", "paralellism", "distributed", "batch", "producer", "consumer"]


requires-python = ">=3.8"
classifiers = [
"Programming Language :: Rust",
Expand All @@ -20,6 +26,10 @@ dependencies = [
"opentelemetry-exporter-otlp",
]

[project.urls]
Repository="https://github.com/channable/opsqueue"
Issues="https://github.com/channable/opsqueue/issues"


[tool.maturin]
features = ["pyo3/extension-module"]
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading