Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
95 commits
Select commit Hold shift + click to select a range
761543f
much more comprehensive perf test with analysis: ezmsg-perf
griffinmilsap Sep 19, 2025
0722bc9
less aggressive test strategy
griffinmilsap Sep 20, 2025
d1fc6e3
slight refactor
griffinmilsap Sep 20, 2025
b884118
minor
griffinmilsap Sep 20, 2025
6beb4ba
html analysis output; ai was a bust
griffinmilsap Sep 20, 2025
a661ce5
report tweaks
griffinmilsap Sep 20, 2025
5464887
added test iters
griffinmilsap Sep 21, 2025
460ca9d
more cmdline
griffinmilsap Sep 22, 2025
173c096
changing sample_rate calculation
griffinmilsap Sep 22, 2025
d17a4fe
fix: n-clients = 0 is useful and works
griffinmilsap Sep 22, 2025
01212d3
better support for msg_size = 0 and n_clients = 0
griffinmilsap Sep 22, 2025
30234ae
also force_tcp on tcp_spread oops
griffinmilsap Sep 22, 2025
e8d1a3d
viztracer is useful for profiling
griffinmilsap Sep 23, 2025
a791365
fix for rare exception on system shutdown
griffinmilsap Sep 23, 2025
17ff66d
Merge branch 'fix/incomplete-read' into feature/more-perf-tests
griffinmilsap Sep 23, 2025
77e38e9
added median latency
griffinmilsap Sep 23, 2025
4693d5c
added median latency to performance report
griffinmilsap Sep 23, 2025
e8f4994
try using time.time
griffinmilsap Sep 23, 2025
59d2c59
fix: graphserver port no-longer lingers
griffinmilsap Aug 13, 2025
79b4339
comments to self
griffinmilsap Aug 13, 2025
77eca8b
write optimization for tcp and SHM
griffinmilsap Jul 18, 2025
48ab00d
formatting
griffinmilsap Jul 18, 2025
7761370
tcp write optimization
griffinmilsap Aug 8, 2025
2e09d62
backend rework: channels
griffinmilsap Aug 20, 2025
bd53ad7
bugfixes
griffinmilsap Aug 20, 2025
e8bd0aa
test scripts
griffinmilsap Aug 20, 2025
1d3948e
backpressure fixed
griffinmilsap Aug 20, 2025
2fed32a
working on bugfixes
griffinmilsap Aug 21, 2025
35f8a10
lots of bugfixes
griffinmilsap Aug 21, 2025
f4bb029
painful bugfixes
griffinmilsap Aug 22, 2025
5ac957c
fixed intermittent failure of test_filter_key
griffinmilsap Aug 26, 2025
ef073b8
Fixed issue with message channels attempting to connect to canonical …
griffinmilsap Sep 16, 2025
afd64d8
fixed backpressure from pub
griffinmilsap Sep 16, 2025
7347424
attach example was broken
griffinmilsap Sep 17, 2025
3cb550b
working around niche race condition
griffinmilsap Sep 17, 2025
6b9a6ee
addressed socket bind race condition on linux
griffinmilsap Sep 17, 2025
6643d7c
comments to document REUSEADDR behavior
griffinmilsap Sep 17, 2025
37506b7
explicit close of socket resource
griffinmilsap Sep 17, 2025
7161403
disable nagles algorithm
griffinmilsap Sep 17, 2025
cb15e57
fixed race condition on channel registration
griffinmilsap Sep 17, 2025
821c7e3
reverting buffer stride calculation change
griffinmilsap Sep 17, 2025
6b12778
much more comprehensive perf test with analysis: ezmsg-perf
griffinmilsap Sep 19, 2025
bdd4052
less aggressive test strategy
griffinmilsap Sep 20, 2025
ae0fb5d
slight refactor
griffinmilsap Sep 20, 2025
e252849
minor
griffinmilsap Sep 20, 2025
386461a
html analysis output; ai was a bust
griffinmilsap Sep 20, 2025
d01deb7
report tweaks
griffinmilsap Sep 20, 2025
2792fa6
local comms with no tcp ack
griffinmilsap Sep 18, 2025
07ad449
remove debug statement
griffinmilsap Sep 18, 2025
380e3b0
bugfix: local backpressure
griffinmilsap Sep 18, 2025
b05096a
added test iters
griffinmilsap Sep 21, 2025
50756f6
more cmdline
griffinmilsap Sep 22, 2025
8823c61
changing sample_rate calculation
griffinmilsap Sep 22, 2025
de37916
bugfix: force_tcp on local channel causes backpressure
griffinmilsap Sep 22, 2025
d714d81
fix: n-clients = 0 is useful and works
griffinmilsap Sep 22, 2025
4bfd958
better support for msg_size = 0 and n_clients = 0
griffinmilsap Sep 22, 2025
4dac9f9
also force_tcp on tcp_spread oops
griffinmilsap Sep 22, 2025
1e54cfc
tcp benchmarks un-necessarily hamstrung by extra TCP transmission to …
griffinmilsap Sep 23, 2025
e780e8a
viztracer is useful for profiling
griffinmilsap Sep 23, 2025
eab4cea
force_tcp bugfix
griffinmilsap Sep 23, 2025
1d59244
bugfix for rare error on shutdown
griffinmilsap Sep 23, 2025
fdb89dc
hotpath optim
griffinmilsap Sep 23, 2025
856802f
added median latency
griffinmilsap Sep 23, 2025
a77eaa8
added median latency to performance report
griffinmilsap Sep 23, 2025
a0490d9
try using time.time
griffinmilsap Sep 23, 2025
dbd48b1
more diagnostic test results in less time
griffinmilsap Oct 28, 2025
7fcfaa8
early quit and test randomization
griffinmilsap Oct 28, 2025
fd876fe
Merge branch 'feature/more-perf-tests' into griff/working
griffinmilsap Oct 28, 2025
2489c78
attempting to stabilize perf test results
griffinmilsap Nov 5, 2025
f3e2560
tests stabilized with median of means and num-buffers = 1
griffinmilsap Nov 6, 2025
f97186f
Merge branch 'feature/more-perf-tests' into griff/working
griffinmilsap Nov 6, 2025
c6b985f
further simplifying tests
griffinmilsap Nov 10, 2025
35c3811
Merge branch 'feature/more-perf-tests' into griff/working
griffinmilsap Nov 10, 2025
b676e77
bugfix: shm race
griffinmilsap Nov 11, 2025
7b7cdce
tweak to remove nonstandard handling of state vars
griffinmilsap Nov 11, 2025
de4080d
Merge branch 'feature/more-perf-tests' into griff/working
griffinmilsap Nov 11, 2025
746b49e
implemented batch writes for stable perf testing with num_buffers > 1
griffinmilsap Nov 12, 2025
f000d94
Merge with dev
griffinmilsap Nov 13, 2025
af8ff47
fix shm closure
griffinmilsap Nov 14, 2025
39d9cc5
revert batch write; eventually should implement proper flow control
griffinmilsap Nov 14, 2025
603f8a4
addressed deadlocks on shutdown and proper shm deallocation
griffinmilsap Nov 17, 2025
c097b77
revert except clause
griffinmilsap Nov 17, 2025
256231c
fixes #204
griffinmilsap Nov 19, 2025
1e45811
refactor classes out of messagechannel
griffinmilsap Nov 19, 2025
592233f
Added docstrings for Channel and ChannelManager
griffinmilsap Nov 19, 2025
5ca5711
chore: ruff formatting
griffinmilsap Nov 20, 2025
b13f7de
added unit tests for new functionality
griffinmilsap Nov 20, 2025
8ff0657
chore: ruff formatting
griffinmilsap Nov 20, 2025
4d3f3d8
fix for python 3.10
griffinmilsap Nov 20, 2025
b0997f4
fix: windows perf tests
griffinmilsap Nov 20, 2025
9f6f94a
update git-blame-ignore-revs with more ruff formatting
griffinmilsap Nov 20, 2025
438b557
addressed review
griffinmilsap Nov 25, 2025
a21073e
updated docstring
griffinmilsap Nov 25, 2025
6132479
Update tests/ez_test_utils.py
griffinmilsap Nov 25, 2025
66bdd6d
Merge branch 'griff/working' of https://github.com/ezmsg-org/ezmsg in…
griffinmilsap Nov 25, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .git-blame-ignore-revs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@
5b4c220154d33cac15a75d3d7d978a75e67f2b8a
# Ran `black` formatter on entire codebase and fixed non-standard line-endings with `dos2unix`.
4e4f20be40a73f2162a565732bae76ea0c812739
# chore: ruff formatting
5ca5711e7714042f23d1286abf946217d75318c2
7 changes: 4 additions & 3 deletions docs/source/conf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Configuration file for the Sphinx documentation builder.

import os
import sys

# -- Project information --------------------------

Expand Down Expand Up @@ -75,7 +74,7 @@
html_static_path = ["_static"]

# Timestamp is inserted at every page bottom in this strftime format.
html_last_updated_fmt = '%Y-%m-%d'
html_last_updated_fmt = "%Y-%m-%d"

# -- Options for EPUB output --------------------------
epub_show_urls = "footnote"
Expand All @@ -95,8 +94,10 @@ def linkcode_resolve(domain, info):
else:
return f"{code_url}src/{filename}.py"


# -- Options for graphviz -----------------------------
graphviz_output_format = "svg"


def setup(app):
app.add_css_file("custom.css")
app.add_css_file("custom.css")
2 changes: 1 addition & 1 deletion examples/ezmsg_attach.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
if __name__ == "__main__":
print("This example attaches to the system created/run by ezmsg_toy.py.")
log = DebugLog()
ez.run(log, connections=(("TestSystem/PING/OUTPUT", log.INPUT),))
ez.run(LOG=log, connections=(("GLOBAL_PING_TOPIC", log.INPUT),))
2 changes: 1 addition & 1 deletion examples/ezmsg_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -239,5 +239,5 @@ def network(self) -> ez.NetworkDefinition:
]

for system in test_systems:
ez.logger.info(f"Testing { system.__name__ }")
ez.logger.info(f"Testing {system.__name__}")
ez.run(system())
8 changes: 4 additions & 4 deletions examples/ezmsg_toy.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ def process_components(self):

ez.run(
SYSTEM=system,
# connections = [
# ( system.PING.OUTPUT, 'PING_OUTPUT' ),
# ( 'FOO_SUB', system.FOOSUB.INPUT )
# ]
connections=[
# Make PING.OUTPUT available on a topic ezmsg_attach.py
(system.PING.OUTPUT, "GLOBAL_PING_TOPIC"),
],
)
119 changes: 119 additions & 0 deletions examples/lowlevel_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import asyncio

import ezmsg.core as ez

PORT = 12345
MAX_COUNT = 100
TOPIC = "/TEST"


async def handle_pub(pub: ez.Publisher) -> None:
print("Publisher Task Launched")

count = 0

while True:
await pub.broadcast(f"{count=}")
await asyncio.sleep(0.1)
count += 1
if count >= MAX_COUNT:
break

print("Publisher Task Concluded")


async def handle_sub(sub: ez.Subscriber) -> None:
print("Subscriber Task Launched")

rx_count = 0
while True:
async with sub.recv_zero_copy() as msg:
# Uncomment if you want to witness backpressure!
# await asyncio.sleep(0.15)
print(msg)

rx_count += 1
if rx_count >= MAX_COUNT:
break

print("Subscriber Task Concluded")


async def host(host: str = "127.0.0.1"):
# Manually create a GraphServer
server = ez.GraphServer()
server.start((host, PORT))

print(f"Created GraphServer @ {server.address}")

try:
test_pub = await ez.Publisher.create(TOPIC, (host, PORT), host=host)
test_sub1 = await ez.Subscriber.create(TOPIC, (host, PORT))
test_sub2 = await ez.Subscriber.create(TOPIC, (host, PORT))

await asyncio.sleep(1.0)

pub_task = asyncio.Task(handle_pub(test_pub))
sub_task_1 = asyncio.Task(handle_sub(test_sub1))
sub_task_2 = asyncio.Task(handle_sub(test_sub2))

await asyncio.wait([pub_task, sub_task_1, sub_task_2])

test_pub.close()
test_sub1.close()
test_sub2.close()

for future in asyncio.as_completed(
[
test_pub.wait_closed(),
test_sub1.wait_closed(),
test_sub2.wait_closed(),
]
):
await future

finally:
server.stop()

print("Done")


async def attach_client(host: str = "127.0.0.1"):

sub = await ez.Subscriber.create(TOPIC, (host, PORT))

try:
while True:
async with sub.recv_zero_copy() as msg:
# Uncomment if you want to see EXTREME backpressure!
# await asyncio.sleep(1.0)
print(msg)

except asyncio.CancelledError:
pass

finally:
sub.close()
await sub.wait_closed()
print("Detached")


if __name__ == "__main__":
from dataclasses import dataclass
from argparse import ArgumentParser

parser = ArgumentParser()
parser.add_argument("--attach", action="store_true", help="attach to running graph")
parser.add_argument("--host", default="0.0.0.0", help="hostname for graphserver")

@dataclass
class Args:
attach: bool
host: str

args = Args(**vars(parser.parse_args()))

if args.attach:
asyncio.run(attach_client(host=args.host))
else:
asyncio.run(host(host=args.host))
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ dev = [
{include-group = "lint"},
{include-group = "test"},
"pre-commit>=4.3.0",
"viztracer>=1.0.4",
]
lint = [
"flake8>=7.3.0",
Expand All @@ -35,6 +36,7 @@ test = [
"pytest-asyncio>=1.1.0",
"pytest-cov>=6.2.1",
"xarray>=2025.6.1",
"psutil>=7.1.0",
]
docs = [
{include-group = "axisarray"},
Expand All @@ -51,6 +53,7 @@ axisarray = [

[project.scripts]
ezmsg = "ezmsg.core.command:cmdline"
ezmsg-perf = "ezmsg.util.perf.command:command"

[project.optional-dependencies]
axisarray = [
Expand Down
4 changes: 4 additions & 0 deletions src/ezmsg/core/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
"GraphServer",
"GraphContext",
"run_command",
"Publisher",
"Subscriber",
# All following are deprecated
"System",
"run_system",
Expand All @@ -42,6 +44,8 @@
from .graphserver import GraphServer
from .graphcontext import GraphContext
from .command import run_command
from .pubclient import Publisher
from .subclient import Subscriber

# Following imports are deprecated
from .backend import run_system
Expand Down
15 changes: 8 additions & 7 deletions src/ezmsg/core/addressable.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@
class Addressable:
"""
Base class for objects that can be addressed within the ezmsg system.
Addressable objects have a hierarchical address structure consisting of

Addressable objects have a hierarchical address structure consisting of
a location path and a name, similar to a filesystem path.
"""

_name: str | None
_location: list[str] | None

def __init__(self) -> None:
"""
Initialize an Addressable object.

The name and location are initially None and must be set before
the object can be properly addressed. This is achieved through
the ``_set_name()`` and ``_set_location()`` methods.
Expand All @@ -38,7 +39,7 @@ def _set_location(self, location: list[str] | None = None):
def name(self) -> str:
"""
Get the name of this addressable object.

:return: The object's name
:rtype: str
:raises AssertionError: If name has not been set
Expand All @@ -51,7 +52,7 @@ def name(self) -> str:
def location(self) -> list[str]:
"""
Get the location path of this addressable object.

:return: List of path components representing the object's location
:rtype: list[str]
:raises AssertionError: If location has not been set
Expand All @@ -64,10 +65,10 @@ def location(self) -> list[str]:
def address(self) -> str:
"""
Get the full address of this object.

The address is constructed by joining the location path and name
with forward slashes, similar to a filesystem path.

:return: The full address string
:rtype: str
"""
Expand Down
Loading