From a7dfd2647f3dba2fc5b41fedae0f410a73ddf192 Mon Sep 17 00:00:00 2001 From: Robin Jarry Date: Mon, 12 Jan 2026 10:24:19 +0100 Subject: [PATCH 1/7] dpdk: add patch to fix iavf max queues This fix has been submitted upstream. Link: https://patches.dpdk.org/project/dpdk/patch/20260109142129.1550058-2-rjarry@redhat.com/ Signed-off-by: Robin Jarry --- subprojects/dpdk.wrap | 1 + ...d-max-TX-and-RX-queues-in-ethdev-inf.patch | 50 +++++++++++++++++++ 2 files changed, 51 insertions(+) create mode 100644 subprojects/packagefiles/dpdk/iavf-fix-reported-max-TX-and-RX-queues-in-ethdev-inf.patch diff --git a/subprojects/dpdk.wrap b/subprojects/dpdk.wrap index 08dfbe930..828812103 100644 --- a/subprojects/dpdk.wrap +++ b/subprojects/dpdk.wrap @@ -2,6 +2,7 @@ url = https://github.com/DPDK/dpdk-stable revision = v25.11 depth = 1 +diff_files = dpdk/iavf-fix-reported-max-TX-and-RX-queues-in-ethdev-inf.patch [provide] dependency_names = libdpdk diff --git a/subprojects/packagefiles/dpdk/iavf-fix-reported-max-TX-and-RX-queues-in-ethdev-inf.patch b/subprojects/packagefiles/dpdk/iavf-fix-reported-max-TX-and-RX-queues-in-ethdev-inf.patch new file mode 100644 index 000000000..9a4882231 --- /dev/null +++ b/subprojects/packagefiles/dpdk/iavf-fix-reported-max-TX-and-RX-queues-in-ethdev-inf.patch @@ -0,0 +1,50 @@ +From 637aa38ece33cb7fe214e8cfe0940b89160b6fa2 Mon Sep 17 00:00:00 2001 +From: Robin Jarry +Date: Fri, 9 Jan 2026 15:17:50 +0100 +Subject: [PATCH dpdk] iavf: fix reported max TX and RX queues in ethdev info + +With a regular iavf device, rte_eth_dev_info_get reports 256 maximum TX +and RX queues. Trying to configure a port with 20 queues returns an +error: + + ERR: IAVF_DRIVER: iavf_dev_configure(): large VF is not supported + +When the large VF feature isn't supported by the PF kernel driver, +ethdev info must not report 256 supported queues but 16. + +Fixes: e436cd43835b ("net/iavf: negotiate large VF and request more queues") +Cc: stable@dpdk.org + +Signed-off-by: Robin Jarry +--- + drivers/net/intel/iavf/iavf_ethdev.c | 10 ++++++++-- + 1 file changed, 8 insertions(+), 2 deletions(-) + +diff --git a/drivers/net/intel/iavf/iavf_ethdev.c b/drivers/net/intel/iavf/iavf_ethdev.c +index 15e49fe24814..59328ebc5ad2 100644 +--- a/drivers/net/intel/iavf/iavf_ethdev.c ++++ b/drivers/net/intel/iavf/iavf_ethdev.c +@@ -1126,12 +1126,18 @@ iavf_dev_info_get(struct rte_eth_dev *dev, struct rte_eth_dev_info *dev_info) + struct iavf_adapter *adapter = + IAVF_DEV_PRIVATE_TO_ADAPTER(dev->data->dev_private); + struct iavf_info *vf = &adapter->vf; ++ uint16_t max_queue_pairs; + + if (adapter->closed) + return -EIO; + +- dev_info->max_rx_queues = IAVF_MAX_NUM_QUEUES_LV; +- dev_info->max_tx_queues = IAVF_MAX_NUM_QUEUES_LV; ++ if (vf->vf_res->vf_cap_flags & VIRTCHNL_VF_LARGE_NUM_QPAIRS) ++ max_queue_pairs = IAVF_MAX_NUM_QUEUES_LV; ++ else ++ max_queue_pairs = IAVF_MAX_NUM_QUEUES_DFLT; ++ ++ dev_info->max_rx_queues = max_queue_pairs; ++ dev_info->max_tx_queues = max_queue_pairs; + dev_info->min_rx_bufsize = IAVF_BUF_SIZE_MIN; + dev_info->max_rx_pktlen = IAVF_FRAME_SIZE_MAX; + dev_info->max_mtu = dev_info->max_rx_pktlen - IAVF_ETH_OVERHEAD; +-- +2.52.0 + From a1af6bba731138731e694b0a968ab70eb79ed157 Mon Sep 17 00:00:00 2001 From: Robin Jarry Date: Fri, 9 Jan 2026 14:15:50 +0100 Subject: [PATCH 2/7] infra: only assign TX queues to workers with RX queues Replace worker_txq_id() with worker_txq_distribute() which clears all TX queue assignments and reassigns them only to workers that have at least one RX queue. Workers without RX queues do not process packets and therefore have no need for TX queues. Call worker_txq_distribute() from both worker_queue_distribute() during initial setup and worker_rxq_assign() when RX queues are moved at runtime. Since worker_rxq_assign() now calls worker_graph_reload_all() instead of reloading individual workers, remove the now-redundant worker_graph_reload() call from worker_queue_distribute(). To prevent two workers from transmitting on the same TX queue during graph reloads, modify worker_graph_reload_all() to first stop all workers by reloading them with NULL ports, then reload with the actual configuration. Adjust worker_graph_new() to treat NULL ports as a request to create a NULL graph. Update unit tests to expect empty TX queue assignments for workers without RX queues. Signed-off-by: Robin Jarry --- modules/infra/control/graph.c | 9 +++- modules/infra/control/worker.c | 69 ++++++++++++++--------------- modules/infra/control/worker_test.c | 24 +++++----- 3 files changed, 53 insertions(+), 49 deletions(-) diff --git a/modules/infra/control/graph.c b/modules/infra/control/graph.c index 964f81060..330c97707 100644 --- a/modules/infra/control/graph.c +++ b/modules/infra/control/graph.c @@ -88,7 +88,7 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_ n_rxqs = 0; gr_vec_foreach_ref (qmap, worker->rxqs) { - if (qmap->enabled) + if (ports != NULL && qmap->enabled) n_rxqs++; } if (n_rxqs == 0) { @@ -307,6 +307,13 @@ int worker_graph_reload_all(gr_vec struct iface_info_port **ports) { gr_vec rte_node_t *unused_nodes = worker_graph_nodes_add_missing(ports); + // stop all workers by switching them to NULL graphs + STAILQ_FOREACH (worker, &workers, next) { + if ((ret = worker_graph_reload(worker, NULL)) < 0) + return ret; + } + + // create new graphs and start all workers STAILQ_FOREACH (worker, &workers, next) { if ((ret = worker_graph_reload(worker, ports)) < 0) return ret; diff --git a/modules/infra/control/worker.c b/modules/infra/control/worker.c index daae8547d..7dff70d59 100644 --- a/modules/infra/control/worker.c +++ b/modules/infra/control/worker.c @@ -222,16 +222,29 @@ int port_plug(struct iface_info_port *p) { return ret; } -static uint16_t worker_txq_id(const cpu_set_t *affinity, unsigned cpu_id) { - uint16_t txq = 0; - for (unsigned cpu = 0; cpu < CPU_SETSIZE; cpu++) { - if (CPU_ISSET(cpu, affinity)) { - if (cpu == cpu_id) - break; - txq++; +static void worker_txq_distribute(gr_vec struct iface_info_port **ports) { + struct iface_info_port *port; + struct worker *worker; + + // clear all TX queue assignments + STAILQ_FOREACH (worker, &workers, next) + gr_vec_free(worker->txqs); + + // assign TX queues only to workers that have RX queues + gr_vec_foreach (port, ports) { + uint16_t txq_idx = 0; + STAILQ_FOREACH (worker, &workers, next) { + if (gr_vec_len(worker->rxqs) == 0) + continue; + struct queue_map txq = { + .port_id = port->port_id, + .queue_id = txq_idx, + .enabled = port->started, + }; + gr_vec_add(worker->txqs, txq); + txq_idx++; } } - return txq; } int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) { @@ -275,16 +288,7 @@ int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) { break; } - gr_vec struct iface_info_port **ports = NULL; - struct iface *iface = NULL; - while ((iface = iface_next(GR_IFACE_TYPE_PORT, iface)) != NULL) - gr_vec_add(ports, iface_info_port(iface)); - - // ensure source worker has released the rxq - if ((ret = worker_graph_reload(src_worker, ports)) < 0) - goto end; - - // now it is safe to assign rxq to dst_worker + // assign rxq to dst_worker struct queue_map rx_qmap = { .port_id = port_id, .queue_id = rxq_id, @@ -292,9 +296,16 @@ int worker_rxq_assign(uint16_t port_id, uint16_t rxq_id, uint16_t cpu_id) { }; gr_vec_add(dst_worker->rxqs, rx_qmap); - ret = worker_graph_reload(dst_worker, ports); + // reassign TX queues and reload all graphs + gr_vec struct iface_info_port **ports = NULL; + struct iface *iface = NULL; + while ((iface = iface_next(GR_IFACE_TYPE_PORT, iface)) != NULL) + gr_vec_add(ports, iface_info_port(iface)); + + worker_txq_distribute(ports); + + ret = worker_graph_reload_all(ports); -end: gr_vec_free(ports); return ret; } @@ -315,12 +326,9 @@ int worker_queue_distribute(const cpu_set_t *affinity, gr_vec struct iface_info_ STAILQ_FOREACH_SAFE (worker, &workers, next, tmp) { if (CPU_ISSET(worker->cpu_id, affinity)) { // Remove all RXQ/TXQ from that worker to have a clean slate. + // worker_graph_reload_all() will stop all workers first. gr_vec_free(worker->rxqs); gr_vec_free(worker->txqs); - if ((ret = worker_graph_reload(worker, ports)) < 0) { - errno_log(errno, "worker_graph_reload"); - goto end; - } } else { // This CPU is out of the affinity mask. if ((ret = worker_destroy(worker->cpu_id)) < 0) { @@ -404,18 +412,7 @@ int worker_queue_distribute(const cpu_set_t *affinity, gr_vec struct iface_info_ } } - // Assign one txq of each port to each worker. - // Must be done in a separate loop after all workers have been created. - gr_vec_foreach (port, ports) { - STAILQ_FOREACH (worker, &workers, next) { - struct queue_map txq = { - .port_id = port->port_id, - .queue_id = worker_txq_id(affinity, worker->cpu_id), - .enabled = port->started, - }; - gr_vec_add(worker->txqs, txq); - } - } + worker_txq_distribute(ports); ret = worker_graph_reload_all(ports); end: diff --git a/modules/infra/control/worker_test.c b/modules/infra/control/worker_test.c index b28f07e68..ee11002ce 100644 --- a/modules/infra/control/worker_test.c +++ b/modules/infra/control/worker_test.c @@ -275,7 +275,7 @@ static void rxq_assign_existing_worker(void **) { assert_qmaps(w3.rxqs); assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0)); assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1)); - assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2)); + assert_qmaps(w3.txqs); } static void rxq_assign_existing_worker_destroy(void **) { @@ -288,7 +288,7 @@ static void rxq_assign_existing_worker_destroy(void **) { assert_qmaps(w3.rxqs); assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0)); assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1)); - assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2)); + assert_qmaps(w3.txqs); assert_int_equal(worker_rxq_assign(2, 1, 1), 0); assert_int_equal(worker_count(), 3); @@ -296,8 +296,8 @@ static void rxq_assign_existing_worker_destroy(void **) { assert_qmaps(w2.rxqs); assert_qmaps(w3.rxqs); assert_qmaps(w1.txqs, q(1, 0), q(2, 0), q(0, 0)); - assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1)); - assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2)); + assert_qmaps(w2.txqs); + assert_qmaps(w3.txqs); } static void rxq_assign_new_worker(void **) { @@ -310,7 +310,7 @@ static void rxq_assign_new_worker(void **) { assert_qmaps(w3.rxqs); assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0)); assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1)); - assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2)); + assert_qmaps(w3.txqs); } static void rxq_assign_new_worker_destroy(void **) { @@ -322,8 +322,8 @@ static void rxq_assign_new_worker_destroy(void **) { assert_qmaps(w2.rxqs); assert_qmaps(w3.rxqs, q(2, 1)); assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0)); - assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1)); - assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2)); + assert_qmaps(w2.txqs); + assert_qmaps(w3.txqs, q(0, 1), q(1, 1), q(2, 1)); } static void rxq_assign_new_worker2(void **) { @@ -399,11 +399,11 @@ static void queue_distribute_increase(void **) { assert_qmaps(w3.rxqs, q(1, 0)); assert_qmaps(w4.rxqs, q(1, 1)); assert_qmaps(w5.rxqs, q(2, 0)); - assert_qmaps(w1.txqs, q(0, 0), q(1, 0), q(2, 0)); - assert_qmaps(w2.txqs, q(0, 1), q(1, 1), q(2, 1)); - assert_qmaps(w3.txqs, q(0, 2), q(1, 2), q(2, 2)); - assert_qmaps(w4.txqs, q(0, 3), q(1, 3), q(2, 3)); - assert_qmaps(w5.txqs, q(0, 4), q(1, 4), q(2, 4)); + assert_qmaps(w1.txqs, q(0, 2), q(1, 2), q(2, 2)); + assert_qmaps(w2.txqs, q(0, 3), q(1, 3), q(2, 3)); + assert_qmaps(w3.txqs, q(0, 4), q(1, 4), q(2, 4)); + assert_qmaps(w4.txqs, q(0, 0), q(1, 0), q(2, 0)); + assert_qmaps(w5.txqs, q(0, 1), q(1, 1), q(2, 1)); for (unsigned i = 0; i < ARRAY_DIM(ifaces); i++) { struct iface_info_port *p = iface_info_port(ifaces[i]); From 87a1b13bb8d3063f616799efacfe27c28bfc2760 Mon Sep 17 00:00:00 2001 From: Robin Jarry Date: Mon, 12 Jan 2026 11:43:33 +0100 Subject: [PATCH 3/7] infra: introduce tx_node_ctx for TX node context Replace direct use of port_queue as the TX node context with a new tx_node_ctx structure that wraps port_queue as a member. This allows extending the TX node context with additional fields in future commits. Convert port_queue from a GR_NODE_CTX_TYPE macro to a plain struct since it is no longer used directly as a node context. It remains used for packet tracing where only port_id and queue_id are recorded. Signed-off-by: Robin Jarry --- modules/infra/control/graph.c | 14 +++++++------- modules/infra/datapath/gr_rxtx.h | 6 ++++-- modules/infra/datapath/port_tx.c | 8 ++++---- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/modules/infra/control/graph.c b/modules/infra/control/graph.c index 330c97707..31174af50 100644 --- a/modules/infra/control/graph.c +++ b/modules/infra/control/graph.c @@ -154,9 +154,9 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_ // initialize all tx nodes context to invalid ports and queues gr_vec_foreach (const char *name, tx_node_names) { node = rte_graph_node_get_by_name(graph_name, name); - struct port_queue *ctx = port_queue(node); - ctx->port_id = UINT16_MAX; - ctx->queue_id = UINT16_MAX; + struct tx_node_ctx *ctx = tx_node_ctx(node); + ctx->txq.port_id = UINT16_MAX; + ctx->txq.queue_id = UINT16_MAX; } // initialize the port_output node context to point to invalid edges @@ -180,15 +180,15 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_ snprintf(node_name, sizeof(node_name), TX_NODE_FMT, qmap->port_id, qmap->queue_id); node = rte_graph_node_get_by_name(graph_name, node_name); // and update its context data to correct values - struct port_queue *ctx = port_queue(node); - ctx->port_id = qmap->port_id; - ctx->queue_id = qmap->queue_id; + struct tx_node_ctx *ctx = tx_node_ctx(node); + ctx->txq.port_id = qmap->port_id; + ctx->txq.queue_id = qmap->queue_id; for (rte_edge_t edge = 0; edge < gr_vec_len(tx_node_names); edge++) { if (strcmp(tx_node_names[edge], node_name) == 0) { // update the port_output context data to map this port to the // correct edge - out->edges[ctx->port_id] = edge; + out->edges[ctx->txq.port_id] = edge; break; } } diff --git a/modules/infra/datapath/gr_rxtx.h b/modules/infra/datapath/gr_rxtx.h index 76d1c7e0c..a5eea22e7 100644 --- a/modules/infra/datapath/gr_rxtx.h +++ b/modules/infra/datapath/gr_rxtx.h @@ -17,10 +17,10 @@ #define TX_NODE_BASE "port_tx" #define TX_NODE_FMT TX_NODE_BASE "-p%uq%u" -GR_NODE_CTX_TYPE(port_queue, { +struct port_queue { uint16_t port_id; uint16_t queue_id; -}); +}; GR_NODE_CTX_TYPE(rx_node_ctx, { const struct iface *iface; @@ -28,6 +28,8 @@ GR_NODE_CTX_TYPE(rx_node_ctx, { uint16_t burst_size; }); +GR_NODE_CTX_TYPE(tx_node_ctx, { struct port_queue txq; }); + struct port_output_edges { rte_edge_t edges[RTE_MAX_ETHPORTS]; }; diff --git a/modules/infra/datapath/port_tx.c b/modules/infra/datapath/port_tx.c index 73055a448..bccb1d701 100644 --- a/modules/infra/datapath/port_tx.c +++ b/modules/infra/datapath/port_tx.c @@ -24,7 +24,7 @@ enum { static uint16_t tx_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t nb_objs) { - const struct port_queue *ctx = port_queue(node); + const struct tx_node_ctx *ctx = tx_node_ctx(node); struct rte_mbuf **mbufs = (struct rte_mbuf **)objs; const struct iface_info_port *port; const struct iface *iface; @@ -38,7 +38,7 @@ tx_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t if (gr_mbuf_is_traced(mbufs[i])) { struct port_queue *t; t = gr_mbuf_trace_add(mbufs[i], node, sizeof(*t)); - *t = *ctx; + *t = ctx->txq; } } rte_node_enqueue(graph, node, TX_DOWN, objs, nb_objs); @@ -53,7 +53,7 @@ tx_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t } } - tx_ok = rte_eth_tx_burst(ctx->port_id, ctx->queue_id, mbufs, nb_objs); + tx_ok = rte_eth_tx_burst(ctx->txq.port_id, ctx->txq.queue_id, mbufs, nb_objs); if (tx_ok < nb_objs) rte_node_enqueue(graph, node, TX_ERROR, &objs[tx_ok], nb_objs - tx_ok); @@ -62,7 +62,7 @@ tx_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t if (gr_mbuf_is_traced(mbufs[i])) { struct port_queue *t; t = gr_mbuf_trace_add(mbufs[i], node, sizeof(*t)); - *t = *ctx; + *t = ctx->txq; gr_mbuf_trace_finish(mbufs[i]); } } From a66b3443a065fda83919c059c277f373dd8efb3e Mon Sep 17 00:00:00 2001 From: Robin Jarry Date: Mon, 12 Jan 2026 11:44:57 +0100 Subject: [PATCH 4/7] infra: factor out find_port helper in graph.c Extract the port lookup loop into a dedicated find_port() function. This will be reused in the next commit when configuring TX node context. Signed-off-by: Robin Jarry --- modules/infra/control/graph.c | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/modules/infra/control/graph.c b/modules/infra/control/graph.c index 31174af50..696fdb127 100644 --- a/modules/infra/control/graph.c +++ b/modules/infra/control/graph.c @@ -74,6 +74,18 @@ void worker_graph_free(struct worker *worker) { } } +static struct iface_info_port *find_port(gr_vec struct iface_info_port **ports, uint16_t port_id) { + struct iface_info_port *port = NULL; + gr_vec_foreach (struct iface_info_port *p, ports) { + if (p->port_id == port_id) { + port = p; + break; + } + } + assert(port != NULL); + return port; +} + static int worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_port **ports) { gr_vec const char **graph_nodes = NULL; @@ -139,13 +151,8 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_ snprintf(node_name, sizeof(node_name), RX_NODE_FMT, qmap->port_id, qmap->queue_id); node = rte_graph_node_get_by_name(graph_name, node_name); struct rx_node_ctx *ctx = rx_node_ctx(node); - gr_vec_foreach (struct iface_info_port *p, ports) { - if (p->port_id == qmap->port_id) { - ctx->iface = RTE_PTR_SUB(p, offsetof(struct iface, info)); - break; - } - } - assert(ctx->iface != NULL); + struct iface_info_port *port = find_port(ports, qmap->port_id); + ctx->iface = RTE_PTR_SUB(port, offsetof(struct iface, info)); ctx->rxq.port_id = qmap->port_id; ctx->rxq.queue_id = qmap->queue_id; ctx->burst_size = RTE_GRAPH_BURST_SIZE / gr_vec_len(worker->rxqs); From ee6d8b27e96efd16a801367f349267574ffaf2a3 Mon Sep 17 00:00:00 2001 From: Robin Jarry Date: Mon, 12 Jan 2026 11:52:44 +0100 Subject: [PATCH 5/7] infra: add TX queue sharing support Some network devices report fewer TX queues than the number of datapath workers. Cap the TX queue count to the device maximum and distribute workers across available queues using modulo assignment. When multiple workers share the same TX queue, protect rte_eth_tx_burst() with a per-queue spinlock stored in iface_info_port. The lock pointer in tx_node_ctx is only set for shared queues to avoid locking overhead when a worker has exclusive access. Update unit tests to set max_tx_queues in the mock device info and adjust expected TX queue assignments for modulo wrapping. Signed-off-by: Robin Jarry --- modules/infra/control/gr_port.h | 2 ++ modules/infra/control/graph.c | 21 +++++++++++++++++++++ modules/infra/control/port.c | 15 +++++++++++---- modules/infra/control/worker.c | 3 ++- modules/infra/control/worker_test.c | 15 ++++++++------- modules/infra/datapath/gr_rxtx.h | 6 +++++- modules/infra/datapath/port_tx.c | 10 +++++++++- 7 files changed, 58 insertions(+), 14 deletions(-) diff --git a/modules/infra/control/gr_port.h b/modules/infra/control/gr_port.h index 6a8a8e647..78c74d365 100644 --- a/modules/infra/control/gr_port.h +++ b/modules/infra/control/gr_port.h @@ -9,6 +9,7 @@ #include #include #include +#include #include #include @@ -32,6 +33,7 @@ GR_IFACE_INFO(GR_IFACE_TYPE_PORT, iface_info_port, { struct rte_mempool *pool; char *devargs; uint32_t pool_size; + rte_spinlock_t txq_locks[RTE_MAX_QUEUES_PER_PORT]; struct { mac_filter_flags_t flags; unsigned hw_limit; diff --git a/modules/infra/control/graph.c b/modules/infra/control/graph.c index 696fdb127..fbdc2fdf1 100644 --- a/modules/infra/control/graph.c +++ b/modules/infra/control/graph.c @@ -164,6 +164,7 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_ struct tx_node_ctx *ctx = tx_node_ctx(node); ctx->txq.port_id = UINT16_MAX; ctx->txq.queue_id = UINT16_MAX; + ctx->lock = NULL; } // initialize the port_output node context to point to invalid edges @@ -191,6 +192,26 @@ worker_graph_new(struct worker *worker, uint8_t index, gr_vec struct iface_info_ ctx->txq.port_id = qmap->port_id; ctx->txq.queue_id = qmap->queue_id; + // set spinlock only if multiple workers share this TX queue + unsigned txq_users = 0; + struct worker *w = NULL; + STAILQ_FOREACH (w, &workers, next) { + gr_vec_foreach_ref (struct queue_map *q, w->txqs) { + if (q->port_id == qmap->port_id && q->queue_id == qmap->queue_id) + txq_users++; + } + } + if (txq_users > 1) { + struct iface_info_port *port = find_port(ports, qmap->port_id); + ctx->lock = &port->txq_locks[qmap->queue_id]; + LOG(WARNING, + "[CPU %d] port %s txq %u shared by %u workers", + worker->cpu_id, + port->devargs, + qmap->queue_id, + txq_users); + } + for (rte_edge_t edge = 0; edge < gr_vec_len(tx_node_names); edge++) { if (strcmp(tx_node_names[edge], node_name) == 0) { // update the port_output context data to map this port to the diff --git a/modules/infra/control/port.c b/modules/infra/control/port.c index 5823c5505..13e8e3c66 100644 --- a/modules/infra/control/port.c +++ b/modules/infra/control/port.c @@ -91,19 +91,26 @@ int port_configure(struct iface_info_port *p, uint16_t n_txq_min) { if (numa_available() != -1) socket_id = rte_eth_dev_socket_id(p->port_id); - // FIXME: deal with drivers that do not support more than 1 (or N) tx queues - p->n_txq = n_txq_min; + if ((ret = rte_eth_dev_info_get(p->port_id, &info)) < 0) + return errno_log(-ret, "rte_eth_dev_info_get"); + if (p->n_rxq == 0) p->n_rxq = 1; - if ((ret = rte_eth_dev_info_get(p->port_id, &info)) < 0) - return errno_log(-ret, "rte_eth_dev_info_get"); + // cap number of queues to device maximum + p->n_txq = RTE_MIN(n_txq_min, info.max_tx_queues); if (strcmp(info.driver_name, "net_tap") == 0) { p->n_txq = RTE_MAX(p->n_txq, p->n_rxq); p->n_rxq = p->n_txq; } + if (p->n_txq < n_txq_min) + LOG(NOTICE, "port %s TX queues limited to %u", p->devargs, p->n_txq); + + for (uint16_t q = 0; q < p->n_txq; q++) + rte_spinlock_init(&p->txq_locks[q]); + rxq_size = get_rxq_size(p, &info); txq_size = get_txq_size(p, &info); diff --git a/modules/infra/control/worker.c b/modules/infra/control/worker.c index 7dff70d59..ad268d4c6 100644 --- a/modules/infra/control/worker.c +++ b/modules/infra/control/worker.c @@ -236,9 +236,10 @@ static void worker_txq_distribute(gr_vec struct iface_info_port **ports) { STAILQ_FOREACH (worker, &workers, next) { if (gr_vec_len(worker->rxqs) == 0) continue; + assert(port->n_txq > 0); struct queue_map txq = { .port_id = port->port_id, - .queue_id = txq_idx, + .queue_id = txq_idx % port->n_txq, .enabled = port->started, }; gr_vec_add(worker->txqs, txq); diff --git a/modules/infra/control/worker_test.c b/modules/infra/control/worker_test.c index ee11002ce..c6b78c9a1 100644 --- a/modules/infra/control/worker_test.c +++ b/modules/infra/control/worker_test.c @@ -26,7 +26,12 @@ static struct worker w2 = {.cpu_id = 2}; static struct worker w3 = {.cpu_id = 3}; static struct worker w4 = {.cpu_id = 4}; static struct worker w5 = {.cpu_id = 5}; -static struct rte_eth_dev_info dev_info = {.driver_name = "net_null", .nb_rx_queues = 2}; +static struct rte_eth_dev_info dev_info = { + .driver_name = "net_null", + .nb_rx_queues = 2, + .max_rx_queues = 4, + .max_tx_queues = 4, +}; // mocked types/functions int gr_rte_log_type; @@ -167,6 +172,7 @@ static int setup(void **) { port->started = true; port->port_id = i; port->n_rxq = 2; + port->n_txq = 3; ifaces[i] = iface; } STAILQ_INSERT_TAIL(&workers, &w1, next); @@ -401,14 +407,9 @@ static void queue_distribute_increase(void **) { assert_qmaps(w5.rxqs, q(2, 0)); assert_qmaps(w1.txqs, q(0, 2), q(1, 2), q(2, 2)); assert_qmaps(w2.txqs, q(0, 3), q(1, 3), q(2, 3)); - assert_qmaps(w3.txqs, q(0, 4), q(1, 4), q(2, 4)); + assert_qmaps(w3.txqs, q(0, 0), q(1, 0), q(2, 0)); assert_qmaps(w4.txqs, q(0, 0), q(1, 0), q(2, 0)); assert_qmaps(w5.txqs, q(0, 1), q(1, 1), q(2, 1)); - - for (unsigned i = 0; i < ARRAY_DIM(ifaces); i++) { - struct iface_info_port *p = iface_info_port(ifaces[i]); - assert_int_equal(p->n_txq, CPU_COUNT(&affinity)); - } } int main(void) { diff --git a/modules/infra/datapath/gr_rxtx.h b/modules/infra/datapath/gr_rxtx.h index a5eea22e7..7b2b476c7 100644 --- a/modules/infra/datapath/gr_rxtx.h +++ b/modules/infra/datapath/gr_rxtx.h @@ -8,6 +8,7 @@ #include #include +#include #include #include @@ -28,7 +29,10 @@ GR_NODE_CTX_TYPE(rx_node_ctx, { uint16_t burst_size; }); -GR_NODE_CTX_TYPE(tx_node_ctx, { struct port_queue txq; }); +GR_NODE_CTX_TYPE(tx_node_ctx, { + struct port_queue txq; + rte_spinlock_t *lock; +}); struct port_output_edges { rte_edge_t edges[RTE_MAX_ETHPORTS]; diff --git a/modules/infra/datapath/port_tx.c b/modules/infra/datapath/port_tx.c index bccb1d701..261ddb8df 100644 --- a/modules/infra/datapath/port_tx.c +++ b/modules/infra/datapath/port_tx.c @@ -13,6 +13,7 @@ #include #include #include +#include #include @@ -53,7 +54,14 @@ tx_process(struct rte_graph *graph, struct rte_node *node, void **objs, uint16_t } } - tx_ok = rte_eth_tx_burst(ctx->txq.port_id, ctx->txq.queue_id, mbufs, nb_objs); + if (likely(ctx->lock == NULL)) { + tx_ok = rte_eth_tx_burst(ctx->txq.port_id, ctx->txq.queue_id, mbufs, nb_objs); + } else { + rte_spinlock_lock(ctx->lock); + tx_ok = rte_eth_tx_burst(ctx->txq.port_id, ctx->txq.queue_id, mbufs, nb_objs); + rte_spinlock_unlock(ctx->lock); + } + if (tx_ok < nb_objs) rte_node_enqueue(graph, node, TX_ERROR, &objs[tx_ok], nb_objs - tx_ok); From ade3ca67d5481da0dab55f88182296e91d32c894 Mon Sep 17 00:00:00 2001 From: Robin Jarry Date: Fri, 9 Jan 2026 15:38:12 +0100 Subject: [PATCH 6/7] port: return explicit error when requesting too many rxqs Unlike TX queues which are silently capped to the device maximum, RX queues are explicitly requested by the user. Return EOVERFLOW if the requested count exceeds what the device supports. Signed-off-by: Robin Jarry --- modules/infra/control/port.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/modules/infra/control/port.c b/modules/infra/control/port.c index 13e8e3c66..677ead05d 100644 --- a/modules/infra/control/port.c +++ b/modules/infra/control/port.c @@ -97,6 +97,9 @@ int port_configure(struct iface_info_port *p, uint16_t n_txq_min) { if (p->n_rxq == 0) p->n_rxq = 1; + if (p->n_rxq > info.max_rx_queues) + return errno_set(EOVERFLOW); + // cap number of queues to device maximum p->n_txq = RTE_MIN(n_txq_min, info.max_tx_queues); From a588805af2313d02a392904d612ae5b893ad7364 Mon Sep 17 00:00:00 2001 From: Robin Jarry Date: Fri, 9 Jan 2026 15:34:35 +0100 Subject: [PATCH 7/7] port: force equal queue counts for net_tap and net_virtio The net_tap and net_virtio drivers require equal RX and TX queue counts. Set TX queue count to match RX queues for these drivers. This may result in fewer TX queues than workers, which is now handled by the TX queue sharing support. Signed-off-by: Robin Jarry --- modules/infra/control/port.c | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/modules/infra/control/port.c b/modules/infra/control/port.c index 677ead05d..b6e6e420c 100644 --- a/modules/infra/control/port.c +++ b/modules/infra/control/port.c @@ -103,9 +103,12 @@ int port_configure(struct iface_info_port *p, uint16_t n_txq_min) { // cap number of queues to device maximum p->n_txq = RTE_MIN(n_txq_min, info.max_tx_queues); - if (strcmp(info.driver_name, "net_tap") == 0) { - p->n_txq = RTE_MAX(p->n_txq, p->n_rxq); - p->n_rxq = p->n_txq; + if (strcmp(info.driver_name, "net_tap") == 0 + || strcmp(info.driver_name, "net_virtio") == 0) { + // force number of TX queues equal to requested RX queues + p->n_txq = p->n_rxq; + if (p->n_txq > info.max_tx_queues) + return errno_set(EOVERFLOW); } if (p->n_txq < n_txq_min)