Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,13 @@ toml_dep = declare_dependency(
executable('mctp',
sources: ['src/mctp.c'] + netlink_sources + util_sources + ops_sources,
install: true,
c_args: ['-DHAVE_LIBSYSTEMD=0'],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mainly feel hesitant here. It feels like I am doing something that configure_file supposed to solve. Maybe a separate config.h for each target?

)

mctp_test = executable('test-mctp',
sources: ['src/mctp.c'] + netlink_sources + util_sources + test_ops_sources,
include_directories: include_directories('src'),
c_args: ['-DHAVE_LIBSYSTEMD=0'],
)

executable('mctp-req',
Expand Down Expand Up @@ -92,6 +94,7 @@ if libsystemd.found()
dependencies: [libsystemd, toml_dep],
install: true,
install_dir: get_option('sbindir'),
c_args: ['-DHAVE_LIBSYSTEMD=1'],
)

mctpd_test = executable('test-mctpd',
Expand All @@ -100,6 +103,7 @@ if libsystemd.found()
] + test_ops_sources + netlink_sources + util_sources,
include_directories: include_directories('src'),
dependencies: [libsystemd, toml_dep],
c_args: ['-DHAVE_LIBSYSTEMD=1'],
)
endif

Expand Down
9 changes: 9 additions & 0 deletions src/mctp-ops.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

#include <unistd.h>
#include <linux/netlink.h>
#if HAVE_LIBSYSTEMD
#include <systemd/sd-event.h>
#endif
#include <err.h>

#include "mctp.h"
Expand Down Expand Up @@ -74,6 +77,12 @@ const struct mctp_ops mctp_ops = {
.recvfrom = mctp_op_recvfrom,
.close = mctp_op_close,
},
#if HAVE_LIBSYSTEMD
.sd_event = {
.add_time_relative = sd_event_add_time_relative,
.source_set_time_relative = sd_event_source_set_time_relative,
},
#endif
.bug_warn = mctp_bug_warn,
};

Expand Down
13 changes: 13 additions & 0 deletions src/mctp-ops.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@

#include <sys/socket.h>
#include <stdarg.h>
#if HAVE_LIBSYSTEMD
#include <systemd/sd-event.h>
#endif

#define _GNU_SOURCE

Expand All @@ -24,9 +27,19 @@ struct socket_ops {
int (*close)(int sd);
};

#if HAVE_LIBSYSTEMD
struct sd_event_ops {
typeof(sd_event_add_time_relative) *add_time_relative;
typeof(sd_event_source_set_time_relative) *source_set_time_relative;
};
#endif

struct mctp_ops {
struct socket_ops mctp;
struct socket_ops nl;
#if HAVE_LIBSYSTEMD
struct sd_event_ops sd_event;
#endif
void (*bug_warn)(const char *fmt, va_list args);
};

Expand Down
11 changes: 6 additions & 5 deletions src/mctpd.c
Original file line number Diff line number Diff line change
Expand Up @@ -497,8 +497,9 @@ static int wait_fd_timeout(int fd, short events, uint64_t timeout_usec)
if (rc < 0)
goto out;

rc = sd_event_add_time_relative(ev, NULL, CLOCK_MONOTONIC, timeout_usec,
0, cb_exit_loop_timeout, NULL);
rc = mctp_ops.sd_event.add_time_relative(ev, NULL, CLOCK_MONOTONIC,
timeout_usec, 0,
cb_exit_loop_timeout, NULL);
if (rc < 0)
goto out;

Expand Down Expand Up @@ -3239,8 +3240,8 @@ static int peer_endpoint_recover(sd_event_source *s, uint64_t usec,

reschedule:
if (peer->recovery.npolls > 0) {
rc = sd_event_source_set_time_relative(peer->recovery.source,
peer->recovery.delay);
rc = mctp_ops.sd_event.source_set_time_relative(
peer->recovery.source, peer->recovery.delay);
if (rc >= 0) {
rc = sd_event_source_set_enabled(peer->recovery.source,
SD_EVENT_ONESHOT);
Expand Down Expand Up @@ -3275,7 +3276,7 @@ static int method_endpoint_recover(sd_bus_message *call, void *data,
peer->recovery.npolls = MCTP_I2C_TSYM_MN1_MIN + 1;
peer->recovery.delay =
(MCTP_I2C_TSYM_TRECLAIM_MIN_US / 2) - ctx->mctp_timeout;
rc = sd_event_add_time_relative(
rc = mctp_ops.sd_event.add_time_relative(
ctx->event, &peer->recovery.source, CLOCK_MONOTONIC, 0,
ctx->mctp_timeout, peer_endpoint_recover, peer);
if (rc < 0) {
Expand Down
8 changes: 8 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

import pytest
import asyncdbus
import trio.testing

import mctpenv

Expand Down Expand Up @@ -35,3 +36,10 @@ async def mctpd(nursery, dbus, sysnet, config):
@pytest.fixture
async def mctp(nursery, sysnet):
return mctpenv.MctpWrapper(nursery, sysnet)

@pytest.fixture
def autojump_clock():
"""
Custom autojump clock with a reasonable threshold for non-time I/O waits
"""
return trio.testing.MockClock(autojump_threshold=0.01)
129 changes: 125 additions & 4 deletions tests/mctp-ops-test.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

#define _GNU_SOURCE

#include <assert.h>
#include <err.h>
#include <errno.h>
#include <stdlib.h>
Expand All @@ -18,6 +19,9 @@
#include <sys/socket.h>
#include <sys/un.h>

#if HAVE_LIBSYSTEMD
#include <systemd/sd-event.h>
#endif
#include <linux/netlink.h>

#include "mctp-ops.h"
Expand All @@ -38,10 +42,12 @@ static int mctp_op_socket(int type)
struct iovec iov;
int rc, var, sd;

if (type == AF_MCTP)
if (type == CONTROL_OP_SOCKET_MCTP)
req.type = CONTROL_OP_SOCKET_MCTP;
else if (type == AF_NETLINK)
else if (type == CONTROL_OP_SOCKET_NL)
req.type = CONTROL_OP_SOCKET_NL;
else if (type == CONTROL_OP_TIMER)
req.type = CONTROL_OP_TIMER;
else
errx(EXIT_FAILURE, "invalid socket type?");

Expand Down Expand Up @@ -72,12 +78,12 @@ static int mctp_op_socket(int type)

static int mctp_op_mctp_socket(void)
{
return mctp_op_socket(AF_MCTP);
return mctp_op_socket(CONTROL_OP_SOCKET_MCTP);
}

static int mctp_op_netlink_socket(void)
{
return mctp_op_socket(AF_NETLINK);
return mctp_op_socket(CONTROL_OP_SOCKET_NL);
}

static int mctp_op_bind(int sd, struct sockaddr *addr, socklen_t addrlen)
Expand Down Expand Up @@ -221,6 +227,115 @@ static void mctp_bug_warn(const char *fmt, va_list args)
abort();
}

#if HAVE_LIBSYSTEMD
struct wrapped_time_userdata {
sd_event_time_handler_t callback;
void *userdata;
};

int wrapped_time_callback(sd_event_source *source, int fd, uint revents,
void *userdata)
{
struct wrapped_time_userdata *wrapud = userdata;
uint64_t usec;
ssize_t rc;

rc = read(fd, &usec, sizeof(usec));
if (rc != 8)
errx(EXIT_FAILURE, "ops protocol error");

rc = wrapud->callback(source, usec, wrapud->userdata);
warnx("%ld", rc);

return 0;
}

void wrapped_time_destroy(void *wrapud)
{
free(wrapud);
}

static int mctp_op_sd_event_add_time_relative(
sd_event *e, sd_event_source **ret, clockid_t clock, uint64_t usec,
uint64_t accuracy, sd_event_time_handler_t callback, void *userdata)
{
struct wrapped_time_userdata *wrapud = NULL;
sd_event_source *source = NULL;
int sd = -1;
int rc = 0;

sd = mctp_op_socket(CONTROL_OP_TIMER);
if (sd < 0)
return -errno;

rc = write(sd, &usec, sizeof(usec));
if (rc != 8)
errx(EXIT_FAILURE, "ops protocol error");

wrapud = malloc(sizeof(*wrapud));
if (!wrapud) {
rc = -ENOMEM;
goto fail;
}

wrapud->callback = callback;
wrapud->userdata = userdata;

rc = sd_event_add_io(e, &source, sd, EPOLLIN, wrapped_time_callback,
wrapud);
if (rc < 0)
goto fail;

rc = sd_event_source_set_destroy_callback(source, wrapped_time_destroy);
if (rc < 0)
goto fail;

wrapud = NULL;

rc = sd_event_source_set_io_fd_own(source, 1);
if (rc < 0)
goto fail;

sd = -1;

rc = sd_event_source_set_enabled(source, SD_EVENT_ONESHOT);
if (rc < 0)
goto fail;

if (!ret) {
rc = sd_event_source_set_floating(source, 1);
if (rc < 0)
goto fail;

sd_event_source_unref(source);
} else {
*ret = source;
}

return 0;

fail:
if (sd > 0)
close(sd);
free(wrapud);
sd_event_source_disable_unref(*ret);
return rc;
}

static int mctp_op_sd_event_source_set_time_relative(sd_event_source *s,
uint64_t usec)
{
int sd = sd_event_source_get_io_fd(s);
ssize_t rc;

rc = write(sd, &usec, sizeof(usec));
if (rc != 8)
errx(EXIT_FAILURE, "ops protocol error");

return 0;
}
#endif

const struct mctp_ops mctp_ops = {
.mctp = {
.socket = mctp_op_mctp_socket,
Expand All @@ -238,6 +353,12 @@ const struct mctp_ops mctp_ops = {
.recvfrom = mctp_op_recvfrom,
.close = mctp_op_close,
},
#if HAVE_LIBSYSTEMD
.sd_event = {
.add_time_relative = mctp_op_sd_event_add_time_relative,
.source_set_time_relative = mctp_op_sd_event_source_set_time_relative,
},
#endif
.bug_warn = mctp_bug_warn,
};

Expand Down
34 changes: 34 additions & 0 deletions tests/mctpenv/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import array
import enum
import errno
import math
import os
import signal
import socket
Expand Down Expand Up @@ -1113,6 +1114,31 @@ async def notify_delroute(self, route):
await self._notify_route(route, rtnl.RTM_DELROUTE);


class TimerSocket(BaseSocket):
def __init__(self, sock):
super().__init__(sock)
self.delay = sys.maxsize

async def run(self):
while True:
try:
with trio.move_on_after(self.delay / 1000000) as scope:
# mctpd requests a new uint64_t delay
data = await self.sock.recv(8)
if len(data) == 0:
break

(next_delay,) = struct.unpack('@Q', data)
self.delay = next_delay

# timed out
if scope.cancelled_caught:
await self.sock.send(struct.pack('@Q', math.floor(trio.current_time() * 1000000)))
self.delay = sys.maxsize
except (ConnectionResetError, BrokenPipeError) as ex:
break


async def send_fd(sock, fd):
fdarray = array.array("i", [fd])
await sock.sendmsg([b'x'], [
Expand Down Expand Up @@ -1158,6 +1184,14 @@ async def handle_control(self, nursery):
remote.close()
nursery.start_soon(nl.run)

elif op == 0x03:
# Timer socket
(local, remote) = self.socketpair()
sd = TimerSocket(local)
await send_fd(self.sock_local, remote.fileno())
remote.close()
nursery.start_soon(sd.run)

else:
print(f"unknown op {op}")

Expand Down
1 change: 1 addition & 0 deletions tests/test-proto.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ enum {
CONTROL_OP_INIT,
CONTROL_OP_SOCKET_MCTP,
CONTROL_OP_SOCKET_NL,
CONTROL_OP_TIMER,
};

struct control_msg_req {
Expand Down
Loading