Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions modules/infra/control/gr_port.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <rte_ethdev.h>
#include <rte_ether.h>
#include <rte_mempool.h>
#include <rte_spinlock.h>

#include <stdint.h>
#include <sys/queue.h>
Expand All @@ -32,6 +33,7 @@ GR_IFACE_INFO(GR_IFACE_TYPE_PORT, iface_info_port, {
struct rte_mempool *pool;
char *devargs;
uint32_t pool_size;
rte_spinlock_t txq_locks[RTE_MAX_QUEUES_PER_PORT];
struct {
mac_filter_flags_t flags;
unsigned hw_limit;
Expand Down
65 changes: 50 additions & 15 deletions modules/infra/control/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ void worker_graph_free(struct worker *worker) {
}
}

static struct iface_info_port *find_port(gr_vec struct iface_info_port **ports, uint16_t port_id) {
struct iface_info_port *port = NULL;
gr_vec_foreach (struct iface_info_port *p, ports) {
if (p->port_id == port_id) {
port = p;
break;
}
}
assert(port != NULL);
return port;
}

static int
worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_port **ports) {
gr_vec const char **graph_nodes = NULL;
Expand All @@ -88,7 +100,7 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_

n_rxqs = 0;
gr_vec_foreach_ref (qmap, worker->rxqs) {
if (qmap->enabled)
if (ports != NULL && qmap->enabled)
n_rxqs++;
}
if (n_rxqs == 0) {
Expand Down Expand Up @@ -139,13 +151,8 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_
snprintf(node_name, sizeof(node_name), RX_NODE_FMT, qmap->port_id, qmap->queue_id);
node = rte_graph_node_get_by_name(graph_name, node_name);
struct rx_node_ctx *ctx = rx_node_ctx(node);
gr_vec_foreach (struct iface_info_port *p, ports) {
if (p->port_id == qmap->port_id) {
ctx->iface = RTE_PTR_SUB(p, offsetof(struct iface, info));
break;
}
}
assert(ctx->iface != NULL);
struct iface_info_port *port = find_port(ports, qmap->port_id);
ctx->iface = RTE_PTR_SUB(port, offsetof(struct iface, info));
ctx->rxq.port_id = qmap->port_id;
ctx->rxq.queue_id = qmap->queue_id;
ctx->burst_size = RTE_GRAPH_BURST_SIZE / gr_vec_len(worker->rxqs);
Expand All @@ -154,9 +161,10 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_
// initialize all tx nodes context to invalid ports and queues
gr_vec_foreach (const char *name, tx_node_names) {
node = rte_graph_node_get_by_name(graph_name, name);
struct port_queue *ctx = port_queue(node);
ctx->port_id = UINT16_MAX;
ctx->queue_id = UINT16_MAX;
struct tx_node_ctx *ctx = tx_node_ctx(node);
ctx->txq.port_id = UINT16_MAX;
ctx->txq.queue_id = UINT16_MAX;
ctx->lock = NULL;
}

// initialize the port_output node context to point to invalid edges
Expand All @@ -180,15 +188,35 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_
snprintf(node_name, sizeof(node_name), TX_NODE_FMT, qmap->port_id, qmap->queue_id);
node = rte_graph_node_get_by_name(graph_name, node_name);
// and update its context data to correct values
struct port_queue *ctx = port_queue(node);
ctx->port_id = qmap->port_id;
ctx->queue_id = qmap->queue_id;
struct tx_node_ctx *ctx = tx_node_ctx(node);
ctx->txq.port_id = qmap->port_id;
ctx->txq.queue_id = qmap->queue_id;

// set spinlock only if multiple workers share this TX queue
unsigned txq_users = 0;
struct worker *w = NULL;
STAILQ_FOREACH (w, &workers, next) {
gr_vec_foreach_ref (struct queue_map *q, w->txqs) {
if (q->port_id == qmap->port_id && q->queue_id == qmap->queue_id)
txq_users++;
}
}
if (txq_users > 1) {
struct iface_info_port *port = find_port(ports, qmap->port_id);
ctx->lock = &port->txq_locks[qmap->queue_id];
LOG(WARNING,
"[CPU %d] port %s txq %u shared by %u workers",
worker->cpu_id,
port->devargs,
qmap->queue_id,
txq_users);
}

for (rte_edge_t edge = 0; edge < gr_vec_len(tx_node_names); edge++) {
if (strcmp(tx_node_names[edge], node_name) == 0) {
// update the port_output context data to map this port to the
// correct edge
out->edges[ctx->port_id] = edge;
out->edges[ctx->txq.port_id] = edge;
break;
}
}
Expand Down Expand Up @@ -307,6 +335,13 @@ int worker_graph_reload_all(gr_vec struct iface_info_port **ports) {

gr_vec rte_node_t *unused_nodes = worker_graph_nodes_add_missing(ports);

// stop all workers by switching them to NULL graphs
STAILQ_FOREACH (worker, &workers, next) {
if ((ret = worker_graph_reload(worker, NULL)) < 0)
return ret;
}

// create new graphs and start all workers
STAILQ_FOREACH (worker, &workers, next) {
if ((ret = worker_graph_reload(worker, ports)) < 0)
return ret;
Expand Down
27 changes: 20 additions & 7 deletions modules/infra/control/port.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,32 @@ int port_configure(struct iface_info_port *p, uint16_t n_txq_min) {
if (numa_available() != -1)
socket_id = rte_eth_dev_socket_id(p->port_id);

// FIXME: deal with drivers that do not support more than 1 (or N) tx queues
p->n_txq = n_txq_min;
if ((ret = rte_eth_dev_info_get(p->port_id, &info)) < 0)
return errno_log(-ret, "rte_eth_dev_info_get");

if (p->n_rxq == 0)
p->n_rxq = 1;

if ((ret = rte_eth_dev_info_get(p->port_id, &info)) < 0)
return errno_log(-ret, "rte_eth_dev_info_get");
if (p->n_rxq > info.max_rx_queues)
return errno_set(EOVERFLOW);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with virtio ?


// cap number of queues to device maximum
p->n_txq = RTE_MIN(n_txq_min, info.max_tx_queues);

if (strcmp(info.driver_name, "net_tap") == 0) {
p->n_txq = RTE_MAX(p->n_txq, p->n_rxq);
p->n_rxq = p->n_txq;
if (strcmp(info.driver_name, "net_tap") == 0
|| strcmp(info.driver_name, "net_virtio") == 0) {
// force number of TX queues equal to requested RX queues
p->n_txq = p->n_rxq;
if (p->n_txq > info.max_tx_queues)
return errno_set(EOVERFLOW);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not a tx queue with N virtio ring ?

}

if (p->n_txq < n_txq_min)
LOG(NOTICE, "port %s TX queues limited to %u", p->devargs, p->n_txq);

for (uint16_t q = 0; q < p->n_txq; q++)
rte_spinlock_init(&p->txq_locks[q]);

rxq_size = get_rxq_size(p, &info);
txq_size = get_txq_size(p, &info);

Expand Down
70 changes: 34 additions & 36 deletions modules/infra/control/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -222,16 +222,30 @@ int port_plug(struct iface_info_port *p) {
return ret;
}

static uint16_t worker_txq_id(const cpu_set_t *affinity, unsigned cpu_id) {
uint16_t txq = 0;
for (unsigned cpu = 0; cpu < CPU_SETSIZE; cpu++) {
if (CPU_ISSET(cpu, affinity)) {
if (cpu == cpu_id)
break;
txq++;
static void worker_txq_distribute(gr_vec struct iface_info_port **ports) {
struct iface_info_port *port;
struct worker *worker;

// clear all TX queue assignments
STAILQ_FOREACH (worker, &workers, next)
gr_vec_free(worker->txqs);

// assign TX queues only to workers that have RX queues
gr_vec_foreach (port, ports) {
uint16_t txq_idx = 0;
STAILQ_FOREACH (worker, &workers, next) {
if (gr_vec_len(worker->rxqs) == 0)
continue;
assert(port->n_txq > 0);
struct queue_map txq = {
.port_id = port->port_id,
.queue_id = txq_idx % port->n_txq,
.enabled = port->started,
};
gr_vec_add(worker->txqs, txq);
txq_idx++;
}
}
return txq;
}

int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) {
Expand Down Expand Up @@ -275,26 +289,24 @@ int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) {
break;
}

gr_vec struct iface_info_port **ports = NULL;
struct iface *iface = NULL;
while ((iface = iface_next(GR_IFACE_TYPE_PORT, iface)) != NULL)
gr_vec_add(ports, iface_info_port(iface));

// ensure source worker has released the rxq
if ((ret = worker_graph_reload(src_worker, ports)) < 0)
goto end;

// now it is safe to assign rxq to dst_worker
// assign rxq to dst_worker
struct queue_map rx_qmap = {
.port_id = port_id,
.queue_id = rxq_id,
.enabled = true,
};
gr_vec_add(dst_worker->rxqs, rx_qmap);

ret = worker_graph_reload(dst_worker, ports);
// reassign TX queues and reload all graphs
gr_vec struct iface_info_port **ports = NULL;
struct iface *iface = NULL;
while ((iface = iface_next(GR_IFACE_TYPE_PORT, iface)) != NULL)
gr_vec_add(ports, iface_info_port(iface));

worker_txq_distribute(ports);

ret = worker_graph_reload_all(ports);

end:
gr_vec_free(ports);
return ret;
}
Expand All @@ -315,12 +327,9 @@ int worker_queue_distribute(const cpu_set_t *affinity, gr_vec struct iface_info_
STAILQ_FOREACH_SAFE (worker, &workers, next, tmp) {
if (CPU_ISSET(worker->cpu_id, affinity)) {
// Remove all RXQ/TXQ from that worker to have a clean slate.
// worker_graph_reload_all() will stop all workers first.
gr_vec_free(worker->rxqs);
gr_vec_free(worker->txqs);
if ((ret = worker_graph_reload(worker, ports)) < 0) {
errno_log(errno, "worker_graph_reload");
goto end;
}
} else {
// This CPU is out of the affinity mask.
if ((ret = worker_destroy(worker->cpu_id)) < 0) {
Expand Down Expand Up @@ -404,18 +413,7 @@ int worker_queue_distribute(const cpu_set_t *affinity, gr_vec struct iface_info_
}
}

// Assign one txq of each port to each worker.
// Must be done in a separate loop after all workers have been created.
gr_vec_foreach (port, ports) {
STAILQ_FOREACH (worker, &workers, next) {
struct queue_map txq = {
.port_id = port->port_id,
.queue_id = worker_txq_id(affinity, worker->cpu_id),
.enabled = port->started,
};
gr_vec_add(worker->txqs, txq);
}
}
worker_txq_distribute(ports);

ret = worker_graph_reload_all(ports);
end:
Expand Down
37 changes: 19 additions & 18 deletions modules/infra/control/worker_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,12 @@ static struct worker w2 = {.cpu_id = 2};
static struct worker w3 = {.cpu_id = 3};
static struct worker w4 = {.cpu_id = 4};
static struct worker w5 = {.cpu_id = 5};
static struct rte_eth_dev_info dev_info = {.driver_name = "net_null", .nb_rx_queues = 2};
static struct rte_eth_dev_info dev_info = {
.driver_name = "net_null",
.nb_rx_queues = 2,
.max_rx_queues = 4,
.max_tx_queues = 4,
};

// mocked types/functions
int gr_rte_log_type;
Expand Down Expand Up @@ -167,6 +172,7 @@ static int setup(void **) {
port->started = true;
port->port_id = i;
port->n_rxq = 2;
port->n_txq = 3;
ifaces[i] = iface;
}
STAILQ_INSERT_TAIL(&workers, &w1, next);
Expand Down Expand Up @@ -275,7 +281,7 @@ static void rxq_assign_existing_worker(void **) {
assert_qmaps(w3.rxqs);
assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0));
assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1));
assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2));
assert_qmaps(w3.txqs);
}

static void rxq_assign_existing_worker_destroy(void **) {
Expand All @@ -288,16 +294,16 @@ static void rxq_assign_existing_worker_destroy(void **) {
assert_qmaps(w3.rxqs);
assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0));
assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1));
assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2));
assert_qmaps(w3.txqs);

assert_int_equal(worker_rxq_assign(2, 1, 1), 0);
assert_int_equal(worker_count(), 3);
assert_qmaps(w1.rxqs, q(0, 0), q(0, 1), q(1, 0), q(1, 1), q(2, 0), q(2, 1));
assert_qmaps(w2.rxqs);
assert_qmaps(w3.rxqs);
assert_qmaps(w1.txqs, q(1, 0), q(2, 0), q(0, 0));
assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1));
assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2));
assert_qmaps(w2.txqs);
assert_qmaps(w3.txqs);
}

static void rxq_assign_new_worker(void **) {
Expand All @@ -310,7 +316,7 @@ static void rxq_assign_new_worker(void **) {
assert_qmaps(w3.rxqs);
assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0));
assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1));
assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2));
assert_qmaps(w3.txqs);
}

static void rxq_assign_new_worker_destroy(void **) {
Expand All @@ -322,8 +328,8 @@ static void rxq_assign_new_worker_destroy(void **) {
assert_qmaps(w2.rxqs);
assert_qmaps(w3.rxqs, q(2, 1));
assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0));
assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1));
assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2));
assert_qmaps(w2.txqs);
assert_qmaps(w3.txqs, q(0, 1), q(1, 1), q(2, 1));
}

static void rxq_assign_new_worker2(void **) {
Expand Down Expand Up @@ -399,16 +405,11 @@ static void queue_distribute_increase(void **) {
assert_qmaps(w3.rxqs, q(1, 0));
assert_qmaps(w4.rxqs, q(1, 1));
assert_qmaps(w5.rxqs, q(2, 0));
assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0));
assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1));
assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2));
assert_qmaps(w4.txqs, q(0, 3), q(1, 3), q(2, 3));
assert_qmaps(w5.txqs, q(0, 4), q(1, 4), q(2, 4));

for (unsigned i = 0; i < ARRAY_DIM(ifaces); i++) {
struct iface_info_port *p = iface_info_port(ifaces[i]);
assert_int_equal(p->n_txq, CPU_COUNT(&affinity));
}
assert_qmaps(w1.txqs, q(0, 2), q(1, 2), q(2, 2));
assert_qmaps(w2.txqs, q(0, 3), q(1, 3), q(2, 3));
assert_qmaps(w3.txqs, q(0, 0), q(1, 0), q(2, 0));
assert_qmaps(w4.txqs, q(0, 0), q(1, 0), q(2, 0));
assert_qmaps(w5.txqs, q(0, 1), q(1, 1), q(2, 1));
}

int main(void) {
Expand Down
Loading