Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions docs/explanations/control_system.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# The FastCS class

`FastCS` is the entrypoint for a fastcs application. It connects a `Controller` to one
or more `Transport`s, runs the controller's update loops as background tasks, and
manages the full application lifecycle from startup to shutdown.

## Construction

```python
from fastcs import FastCS
from fastcs.controllers import Controller
from fastcs.transports.epics import EpicsCATransport


class MyController(Controller):
pass


control_system = FastCS(controller=MyController(), transports=[EpicsCATransport()])

control_system.run()
```

## Startup and Runtime

Calling `control_system.run()` (or `await control_system.serve()`) executes the
following steps in order:

1. **Initialise the controller:** `controller.initialise()` is awaited, allowing the
controller to query the device and dynamically add attributes before the API is
built. After that, `controller.post_initialise()` is called to perform any final
setup, such as validating all type hints are satisfied.

2. **Build the API:** `controller.create_api_and_tasks()` returns the `ControllerAPI`
that transports will use, plus two lists of coroutines: *initial tasks* (run once at
startup) and *scan tasks* (run as continuous background loops).

3. **Connect transports:** each transport's `connect()` method is called with the
`ControllerAPI`. This lets the transport inspect the controller's attributes and
commands to prepare its protocol-specific representations before serving begins.

4. **Connect the controller:** `controller.connect()` is called to open the connection
to the device and perform any other setup logic.

5. **Run initial tasks:** each initial-task coroutine is awaited in sequence. These are
`@scan(period=ONCE)` methods and `AttributeIO` update callbacks with
`update_period=ONCE`.

6. **Start scan tasks:** each scan task coroutine is wrapped in an `asyncio.Task` and
run as a background task for the lifetime of the application.

7. **Gather transport coroutines:** `asyncio.gather` runs all transport `serve()`
coroutines concurrently. Each transport begins accepting and responding to protocol
requests.

8. **Scan pause and reconnect**: If any scan tasks raise exceptions, all scan tasks are
paused until `reconnect`.

## The Interactive Shell

Alongside the transport coroutines, FastCS launches an embedded
[IPython](https://ipython.org/) shell (unless `interactive=False` is passed). The shell
namespace is pre-populated with:

| Name | Value |
|------|-------|
| `controller` | The root controller instance |
| `transports` | The class names of active transports |
| `run` | A helper that schedules a coroutine on the FastCS event loop from the IPython thread |
| *transport-specific keys* | Any entries exposed via each transport's `context` property |

The shell runs in a separate thread so it does not block the asyncio event loop. When
the user exits the shell, the application begins its shutdown sequence.

When `interactive=False` a simple coroutine that blocks forever keeps the application
alive until the task is cancelled externally (e.g. SIGINT).

## Shutdown Sequence

When then application is stopped, FastCS performs an orderly teardown:

1. **Cancel scan tasks:** each background scan task is cancelled and removed, stopping
all periodic polling.

2. **Disconnect the controller:** `controller.disconnect()` is awaited, allowing the
controller to release device resources cleanly.
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ addopts = """
--tb=native -vv --doctest-modules --doctest-glob="*.md" --ignore-glob docs/snippets/*py --benchmark-sort=mean --benchmark-columns="mean, min, max, outliers, ops, rounds"
"""
# https://iscinumpy.gitlab.io/post/bound-version-constraints/#watch-for-warnings
# https://github.com/DiamondLightSource/FastCS/issues/230
filterwarnings = "error"
# Doctest python code in docs, python code in src docstrings, test functions in tests
testpaths = "docs src tests"
Expand Down
77 changes: 45 additions & 32 deletions src/fastcs/control_system.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,9 @@ class FastCS:
loop: Optional event loop to run the control system in
"""

def __init__(
self,
controller: Controller,
transports: Sequence[Transport],
loop: asyncio.AbstractEventLoop | None = None,
):
def __init__(self, controller: Controller, transports: Sequence[Transport]):
self._controller = controller
self._transports = transports
self._loop = loop or asyncio.get_event_loop()

self._scan_coros: list[ScanCallback] = []
self._initial_coros: list[ScanCallback] = []
Expand All @@ -47,36 +41,25 @@ def run(self, interactive: bool = True):
"""Run the application

This is a convenience method to call `serve` in a synchronous context.
To use in an async context, call `serve` directly.

Args:
interactive: Whether to create an interactive IPython shell

"""
serve = asyncio.ensure_future(self.serve(interactive=interactive))

if os.name != "nt":
self._loop.add_signal_handler(signal.SIGINT, serve.cancel)
self._loop.add_signal_handler(signal.SIGTERM, serve.cancel)
self._loop.run_until_complete(serve)
async def _serve():
"""Wrapper to add signal handlers and call `serve`"""
loop = asyncio.get_running_loop()
task = asyncio.current_task()

async def _run_initial_coros(self):
for coro in self._initial_coros:
await coro()
if os.name != "nt" and task is not None:
loop.add_signal_handler(signal.SIGINT, task.cancel)
loop.add_signal_handler(signal.SIGTERM, task.cancel)

async def _start_scan_tasks(self):
self._scan_tasks = {self._loop.create_task(coro()) for coro in self._scan_coros}
await self.serve(interactive=interactive)

def _stop_scan_tasks(self):
for task in self._scan_tasks:
if not task.done():
try:
task.cancel()
except (asyncio.CancelledError, RuntimeError):
pass
except Exception as e:
raise RuntimeError("Unhandled exception in stop scan tasks") from e

self._scan_tasks.clear()
asyncio.run(_serve())

async def serve(self, interactive: bool = True) -> None:
"""Serve the control system over the given transports on the current event loop
Expand Down Expand Up @@ -110,7 +93,7 @@ async def serve(self, interactive: bool = True) -> None:

coros: list[Coroutine] = []
for transport in self._transports:
transport.connect(controller_api=self.controller_api, loop=self._loop)
transport.connect(controller_api=self.controller_api)
coros.append(transport.serve())
common_context = context.keys() & transport.context.keys()
if common_context:
Expand Down Expand Up @@ -153,16 +136,30 @@ async def block_forever():
self._stop_scan_tasks()
await self._controller.disconnect()

async def _run_initial_coros(self):
for coro in self._initial_coros:
await coro()

async def _start_scan_tasks(self):
self._scan_tasks = {asyncio.create_task(coro()) for coro in self._scan_coros}

async def _interactive_shell(self, context: dict[str, Any]):
"""Spawn interactive shell in another thread and wait for it to complete."""
loop = asyncio.get_running_loop()

def run(coro: Coroutine[None, None, None]):
"""Run coroutine on FastCS event loop from IPython thread."""

def wrapper():
asyncio.create_task(coro)
task = asyncio.create_task(coro)

def _log_exception(t: asyncio.Task):
if not t.cancelled() and (exc := t.exception()):
logger.exception("`run` task raised exception", exc_info=exc)

self._loop.call_soon_threadsafe(wrapper)
task.add_done_callback(_log_exception)

loop.call_soon_threadsafe(wrapper)

async def interactive_shell(
context: dict[str, object], stop_event: asyncio.Event
Expand All @@ -176,8 +173,24 @@ async def interactive_shell(
context["run"] = run

stop_event = asyncio.Event()
self._loop.create_task(interactive_shell(context, stop_event))
shell_task = asyncio.create_task(interactive_shell(context, stop_event))

await stop_event.wait()

if not shell_task.cancelled() and (exc := shell_task.exception()):
logger.exception("Interactive shell raised exception", exc_info=exc)

def _stop_scan_tasks(self):
for task in self._scan_tasks:
if not task.done():
try:
task.cancel()
except (asyncio.CancelledError, RuntimeError):
pass
except Exception as e:
raise RuntimeError("Unhandled exception in stop scan tasks") from e

self._scan_tasks.clear()

def __del__(self):
self._stop_scan_tasks()
5 changes: 1 addition & 4 deletions src/fastcs/launch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
import inspect
import json
from pathlib import Path
Expand Down Expand Up @@ -158,9 +157,7 @@ def run(
else:
controller = controller_class()

instance = FastCS(
controller, instance_options.transport, loop=asyncio.get_event_loop()
)
instance = FastCS(controller, instance_options.transport)

instance.run()

Expand Down
7 changes: 2 additions & 5 deletions src/fastcs/transports/epics/ca/ioc.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,8 @@ def __init__(
_create_and_link_attribute_pvs(pv_prefix, controller_api)
_create_and_link_command_pvs(pv_prefix, controller_api)

def run(
self,
loop: asyncio.AbstractEventLoop,
) -> None:
dispatcher = AsyncioDispatcher(loop) # Needs running loop
def run(self) -> None:
dispatcher = AsyncioDispatcher(asyncio.get_running_loop())
builder.LoadDatabase()
softioc.iocInit(dispatcher)

Expand Down
10 changes: 2 additions & 8 deletions src/fastcs/transports/epics/ca/transport.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from dataclasses import dataclass, field
from typing import Any

Expand Down Expand Up @@ -28,13 +27,8 @@ class EpicsCATransport(Transport):
gui: EpicsGUIOptions | None = None
"""Options for the GUI. If not set, no GUI will be created."""

def connect(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
) -> None:
def connect(self, controller_api: ControllerAPI) -> None:
self._controller_api = controller_api
self._loop = loop
self._pv_prefix = self.epicsca.pv_prefix
self._ioc = EpicsCAIOC(self.epicsca.pv_prefix, controller_api)

Expand All @@ -47,7 +41,7 @@ def connect(
async def serve(self) -> None:
"""Serve `ControllerAPI` over EPICS Channel Access"""
logger.info("Running IOC", pv_prefix=self._pv_prefix)
self._ioc.run(self._loop)
self._ioc.run()

@property
def context(self) -> dict[str, Any]:
Expand Down
13 changes: 2 additions & 11 deletions src/fastcs/transports/epics/pva/transport.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import asyncio
from dataclasses import dataclass, field

from fastcs.controllers import ControllerAPI
from fastcs.logging import logger
from fastcs.transports.epics import (
EpicsDocsOptions,
EpicsGUIOptions,
EpicsIOCOptions,
)
from fastcs.transports.epics import EpicsDocsOptions, EpicsGUIOptions, EpicsIOCOptions
from fastcs.transports.epics.docs import EpicsDocs
from fastcs.transports.epics.pva.gui import PvaEpicsGUI
from fastcs.transports.transport import Transport
Expand All @@ -23,11 +18,7 @@ class EpicsPVATransport(Transport):
docs: EpicsDocsOptions | None = None
gui: EpicsGUIOptions | None = None

def connect(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
) -> None:
def connect(self, controller_api: ControllerAPI) -> None:
self._controller_api = controller_api
self._pv_prefix = self.epicspva.pv_prefix
self._ioc = P4PIOC(self.epicspva.pv_prefix, controller_api)
Expand Down
7 changes: 1 addition & 6 deletions src/fastcs/transports/graphql/transport.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from dataclasses import dataclass, field

from fastcs.controllers import ControllerAPI
Expand All @@ -14,11 +13,7 @@ class GraphQLTransport(Transport):

graphql: GraphQLServerOptions = field(default_factory=GraphQLServerOptions)

def connect(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
):
def connect(self, controller_api: ControllerAPI):
self._server = GraphQLServer(controller_api)

async def serve(self) -> None:
Expand Down
7 changes: 1 addition & 6 deletions src/fastcs/transports/rest/transport.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from dataclasses import dataclass, field

from fastcs.controllers import ControllerAPI
Expand All @@ -14,11 +13,7 @@ class RestTransport(Transport):

rest: RestServerOptions = field(default_factory=RestServerOptions)

def connect(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
):
def connect(self, controller_api: ControllerAPI):
self._server = RestServer(controller_api)

async def serve(self) -> None:
Expand Down
12 changes: 4 additions & 8 deletions src/fastcs/transports/tango/transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,9 @@ class TangoTransport(Transport):

tango: TangoDSROptions = field(default_factory=TangoDSROptions)

def connect(
self,
controller_api: ControllerAPI,
loop: asyncio.AbstractEventLoop,
):
self._dsr = TangoDSR(controller_api, loop)
def connect(self, controller_api: ControllerAPI):
self._controller_api = controller_api

async def serve(self) -> None:
coro = asyncio.to_thread(self._dsr.run, self.tango)
await coro
self._dsr = TangoDSR(self._controller_api, asyncio.get_running_loop())
await asyncio.to_thread(self._dsr.run, self.tango)
Loading
Loading