Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bin/mk
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ def parse_command_line():

parser.add_argument('requests',
nargs='*',
help='Whitespace-separated sequence of requests. For a get, watch, or describe request, this is a sequence of keys; for a set request, this is a sequence of key=value statements; for a list request this is a sequence of stores; for a discover request this is a sequence of broker addresses.')
help='Whitespace-separated sequence of requests. For a get, watch, or describe request, this is a sequence of keys; for a set request, this is a sequence of key=value statements; for a list request this is a sequence of stores; for a discover request this is a sequence of registry addresses.')


parsed = parser.parse_args()
Expand Down
8 changes: 4 additions & 4 deletions doc/client.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ at run time.
.. autofunction:: mktl.home

You may need to pre-populate your local cache with information from the
brokers handling the configuration for the store(s) you wish to access.
registries handling the configuration for the store(s) you wish to access.
The :ref:`mk` command line tool will handle this process; for a given
IP address or hostname, you would invoke::

mk discover 192.168.5.34

This discovery process will query that address for any available configurations,
but more importantly, will cache the address of that broker and use it again in
the future if/when discovering new services, without an explicit request to do
so. In other words, it should only be necessary to run this discovery process
but more importantly, will cache the address of that registry and use it again
in the future if/when discovering new services, without an explicit request to
do so. In other words, it should only be necessary to run this discovery process
a single time in order to gain access to a new set of mKTL stores.

The :func:`mktl.get` method is the universal entry point to retrieve a
Expand Down
6 changes: 3 additions & 3 deletions doc/example/daemon.rst
Original file line number Diff line number Diff line change
Expand Up @@ -221,15 +221,15 @@ example:
Starting the daemon
-------------------

The :ref:`mkbrokerd` executable is a persistent application that enables
The :ref:`mkregistryd` executable is a persistent application that enables
clients to easily find authoritative mKTL daemons. Having one instance of
:ref:`mkbrokerd` running on the local network is recommended.
:ref:`mkregistryd` running on the local network is recommended.

The :ref:`mkd` executable provides a common entry point for a persistent
daemon. Assuming the default search path is set up correctly, for the example
outlined here the invocation would resemble::

mkd metal precious --module metal.precious -c precious_metals.json

Both :ref:`mkbrokerd` and :ref:`mkd` should be running before mKTL client
Both :ref:`mkregistryd` and :ref:`mkd` should be running before mKTL client
interactions are attempted.
10 changes: 5 additions & 5 deletions doc/executables.rst
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ The command line arguments describe its usage:
:language: none


.. _mkbrokerd:
.. _mkregistryd:

mkbrokerd
---------
mkregistryd
-----------

The `mkbrokerd` persistent daemon is a discovery aid, listening for UDP
The `mkregistryd` persistent daemon is a discovery aid, listening for UDP
broadcasts on a well-known port number so that clients can be directed to
a specific mKTL daemon handling requests for a specific store, or fraction
of a store. While having a `mkbrokerd` daemon running is not a strict
of a store. While having a `mkregistryd` daemon running is not a strict
requirement it is a key component of automated discovery of mKTL stores on
a local network.
2 changes: 1 addition & 1 deletion doc/mk.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ positional arguments:
keys; for a set request, this is a sequence of
key=value statements; for a list request this is a
sequence of stores; for a discover request this is a
sequence of broker addresses.
sequence of registry addresses.

options:
-h, --help show this help message and exit
Expand Down
39 changes: 20 additions & 19 deletions doc/protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@ listener, operating on a unique port on that host.
A given host providing connectivity for mKTL could have thousands of daemons
running locally, each listening on a unique port number. Each daemon deploys
a UDP listener on a predetermined port number (10111) to enable discovery of
daemons on that host; a :ref:`dedicated "broker" process <mkbrokerd>` also
daemons on that host; a :ref:`dedicated registry process <mkregistryd>` also
listens on a predetermined port number (10103) to streamline discovery from
the client side. This usage pattern expects there to be a single
:ref:`mkbrokerd` process running on every host running one or more mKTL
:ref:`mkregistryd` process running on every host running one or more mKTL
daemons. The discovery exchange is :ref:`described below <discovery>`
in more detail.

Expand Down Expand Up @@ -424,11 +424,11 @@ port, which greatly simplfies periodic discovery.

The discovery of daemons is a two-part process; rather than ask every daemon
to cache the configuration for every other daemon on its local network, the
caching of configuration data is handled by :ref:`mkbrokerd`; when a client
caching of configuration data is handled by :ref:`mkregistryd`; when a client
issues a discovery broadcast, it is not looking for responses from individual
daemons, it is looking for responses from a :ref:`mkbrokerd` process.
daemons, it is looking for responses from a :ref:`mkregistryd` process.

This two-step approach, of contacting the broker process, and subsequently
This two-step approach, of contacting the registry process, and subsequently
contacting the authoritative daemon, could be avoided if every local daemon
caching the configuration of every other local daemon; however, a typical
client will cache the response, and discovery is only invoked if the cached
Expand All @@ -439,24 +439,25 @@ exponentially with the number of locally reachable daemons.

There are four shared secrets used in the discovery exchange:

=============== ===============================================================
*Secret* *Description*
=============== ===============================================================
**broker port** The UDP port used to discover locally accessible
:ref:`mkbrokerd` processes. Clients use this port to find
all such processes. The port number is 10103.
================= ==============================================================
*Secret* *Description*
================= ==============================================================
**registry port** The UDP port used to discover locally accessible
:ref:`mkregistryd` processes. Clients use this port to find
all such processes. The port number is 10103.

**daemon port** The UDP port used to discover locally accessible mKTL daemons.
:ref:`mkbrokerd` uses this port to find all such daemons.
The port number is 10111.
**daemon port** The UDP port used to discover locally accessible mKTL daemons.
:ref:`mkregistryd` uses this port to find all such daemons.
The port number is 10111.

**call** An arbitrary string used by the discoverer to trigger a
response from the listener. The string value is ``I heard it``.
**call** An arbitrary string used by the discoverer to trigger a
response from the listener. The string value is
``I heard it``.

**response** An arbitrary string used by the listener to respond to any
received calls. The string value is ``on the X:``.
**response** An arbitrary string used by the listener to respond to any
received calls. The string value is ``on the X:``.

=============== ===============================================================
================= ==============================================================

The purpose of discovery is to convey a single piece of information: what is
the port number of an actual mKTL request handler on this host? That port
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ repository = "https://github.com/KeckObservatory/mKTL"
[tool.poetry.scripts]
mk = { reference = "bin/mk", type = "file" }
mkd = { reference = "sbin/mkd", type = "file" }
mkbrokerd = { reference = "sbin/mkbrokerd", type = "file" }
mkpersistd = { reference = "sbin/mkpersistd", type = "file" }
mkregistryd = { reference = "sbin/mkregistryd", type = "file" }

[build-system]
requires = ["poetry-core"]
Expand Down
10 changes: 5 additions & 5 deletions sbin/mkbrokerd → sbin/mkregistryd
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
#! /usr/bin/env python3

description = """
The broker daemon acts as a clearing house for configuration requests;
when daemons first start running they will seek out any brokers on their
The registry daemon acts as a clearing house for configuration requests;
when daemons first start running they will seek out any registries on their
local network and announce configuration updates; any clients broadcasting
will discover a local broker, and ask for any available configuration
will discover a local registry, and ask for any available configuration
information.

Daemons and clients can both bypass broadcast discovery for a local broker
if they already know where to find a broker.
Daemons and clients can both bypass broadcast discovery for a local registry
if they already know where to find a registry.
"""

import argparse
Expand Down
20 changes: 10 additions & 10 deletions src/mktl/begin.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,26 @@ def _clear(store):


def discover(*targets):
""" Look for mKTL brokers, both by broadcasting on the local network
""" Look for mKTL registries, both by broadcasting on the local network
and by directly querying any/all supplied addresses. The goal of
this discovery is to populate a local cache including any/all stores
known to these brokers, for all future queries. This is especially
known to these registries, for all future queries. This is especially
helpful if the local client is not on the same network as the
brokers of interest.
registries of interest.
"""

brokers = protocol.discover.search(wait=True, targets=targets)
registries = protocol.discover.search(wait=True, targets=targets)

if len(brokers) == 0:
raise RuntimeError('no brokers available')
if len(registries) == 0:
raise RuntimeError('no registries available')

# Hacking the timeout for discovery, this is not expected to throw
# errors with minimal delay.

old_timeout = protocol.request.Client.timeout
protocol.request.Client.timeout = 0.5

for address,port in brokers:
for address,port in registries:
request = protocol.message.Request('HASH')
try:
payload = protocol.request.send(address, port, request)
Expand Down Expand Up @@ -133,11 +133,11 @@ def get(store, key=None):

if len(configuration) == 0:
# Nothing valid cached locally. Broadcast for responses.
brokers = protocol.discover.search()
if len(brokers) == 0:
registries = protocol.discover.search()
if len(registries) == 0:
raise RuntimeError("no configuration available for '%s' (local or remote)" % (store))

hostname,port = brokers[0]
hostname,port = registries[0]
message = protocol.message.Request('CONFIG', store)
payload = protocol.request.send(hostname, port, message)

Expand Down
6 changes: 3 additions & 3 deletions src/mktl/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -1217,9 +1217,9 @@ def announce(config, uuid, override=False):
payload.add_origin()
message = protocol.message.Request('CONFIG', store, payload)

brokers = protocol.discover.search(wait=True)
registries = protocol.discover.search(wait=True)

for address,port in brokers:
for address,port in registries:
try:
payload = protocol.request.send(address, port, message)
except TimeoutError:
Expand All @@ -1229,7 +1229,7 @@ def announce(config, uuid, override=False):
if error is None or error == '':
continue

# The broker daemon will return errors for a variety of circumstances,
# The registry daemon will return errors for a variety of circumstances,
# but in every case the immediate meaning is the same: do not proceed.

e_type = error['type']
Expand Down
45 changes: 23 additions & 22 deletions src/mktl/protocol/discover.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@

default_port = 10103

# The default port is used for discovery of intermediaries, brokers that are
# The default port is used for discovery of intermediaries, registries that are
# willing to cache and share second-hand information aggregated from one or
# more authoritative daemons, and be the first stop for any new clients on
# the network. The direct port is used by last-stop daemons, those that are
# authoritative for their respective stores; the brokers will use this direct
# authoritative for their respective stores; the registries will use this direct
# port to discover them.

direct_port = 10111
Expand Down Expand Up @@ -145,7 +145,7 @@ def search(port=default_port, wait=False, targets=tuple()):
targets = list(targets)

if port == default_port:
cached = preload_brokers()
cached = preload_registries()
targets.extend(cached)

for target in targets:
Expand Down Expand Up @@ -204,7 +204,7 @@ def search(port=default_port, wait=False, targets=tuple()):
sock.settimeout(expiration - elapsed)

if port == default_port:
remember_brokers(found)
remember_registries(found)

return found

Expand All @@ -217,14 +217,15 @@ def search_direct(port=direct_port, wait=True):
return search(port, wait)


def preload_brokers():
def preload_registries():
""" Helper method to parse environment variables and cached files on
disk to build a list of addresses to check for broker availability.
disk to build a list of addresses to check for registry availability.
"""

directory = config.directory()
manual = os.path.join(directory, 'client', 'brokers')
cached = manual + '.cache'
client = os.path.join(directory, 'client')
manual = os.path.join(client, 'registries')
cached = os.path.join(client, 'registries.cache')

lines = list()

Expand All @@ -243,33 +244,33 @@ def preload_brokers():
lines.extend(contents.split('\n'))


brokers = ''
registries = ''

for line in lines:
line = line.split('#')[0]
brokers = brokers + ' ' + line
registries = registries + ' ' + line

brokers = brokers.strip()
registries = registries.strip()

if brokers == '' or brokers is None:
brokers = tuple()
if registries == '' or registries is None:
registries = tuple()
else:
brokers = brokers.split()
registries = registries.split()

return brokers
return registries


def remember_brokers(found):
""" Cache any found brokers for future requests. There is no provision
for removing brokers that no longer respond, this may become necessary
in the future-- if the cached set grows unbounded there may become
a point where the occasional UDP broadcast becomes a burden rather
than a minor inefficiency.
def remember_registries(found):
""" Cache any found registries for future requests. There is no provision
for removing registries that no longer respond, this may become
necessary in the future-- if the cached set grows unbounded there may
become a point where the occasional UDP broadcast becomes a burden
rather than a minor inefficiency.
"""

directory = config.directory()
client = os.path.join(directory, 'client')
cached = os.path.join(client, 'brokers.cache')
cached = os.path.join(client, 'registries.cache')

if os.path.exists(client):
pass
Expand Down
8 changes: 4 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@


@pytest.fixture(scope="session")
def run_mkbrokerd():
def run_mkregistryd():

arguments = list()
arguments.append(sys.executable)
arguments.append('../sbin/mkbrokerd')
arguments.append('../sbin/mkregistryd')

pipe = subprocess.PIPE
mkbrokerd = subprocess.Popen(arguments, stdout=pipe, stderr=pipe)
mkregistryd = subprocess.Popen(arguments, stdout=pipe, stderr=pipe)

yield

mkbrokerd.terminate()
mkregistryd.terminate()


@pytest.fixture(scope="session")
Expand Down
Loading
Loading