diff --git a/common/network.h b/common/network.h index 3a383a62..b5e80dce 100644 --- a/common/network.h +++ b/common/network.h @@ -60,4 +60,6 @@ struct net { enum ip_family { ip_family_ip4, ip_family_ip6, -}; \ No newline at end of file +}; + +enum transport_proto { tcp, udp }; \ No newline at end of file diff --git a/modules/balancer2/api/controlplane.c b/modules/balancer2/api/controlplane.c new file mode 100644 index 00000000..68f033ac --- /dev/null +++ b/modules/balancer2/api/controlplane.c @@ -0,0 +1,1012 @@ +#include "controlplane.h" + +#include "common/memory.h" +#include "common/memory_address.h" +#include "common/network.h" +#include "common/rcu.h" +#include "common/rng.h" +#include "common/ttlmap/ttlmap.h" + +#include "filter/compiler.h" +#include "filter/rule.h" + +#include "modules/balancer2/dataplane/config.h" +#include "modules/balancer2/dataplane/real.h" +#include "modules/balancer2/dataplane/selector.h" +#include "modules/balancer2/dataplane/session.h" +#include "modules/balancer2/dataplane/types/stats.h" +#include "modules/balancer2/dataplane/vs.h" + +#include "lib/controlplane/agent/agent.h" +#include "lib/controlplane/config/cp_module.h" +#include "lib/dataplane/config/zone.h" + +#include +#include +#include +#include +#include +#include +#include +#include + +const char *const balancer_vs_counter_prefix = "vs"; +const char *const balancer_vs_acl_counter_prefix = "vs_acl"; +const char *const balancer_real_counter_prefix = "real"; +const char *const balancer_common_counter_name = "common"; +const char *const balancer_l4_counter_name = "l4"; + +FILTER_COMPILER_DECLARE(vs_acl_ip4, net4_fast_src, port_fast_src); +FILTER_COMPILER_DECLARE(vs_acl_ip6, net6_fast_src, port_fast_src); + +FILTER_COMPILER_DECLARE( + vs_matcher_ip4, net4_fast_dst, port_fast_dst, proto_range +); +FILTER_COMPILER_DECLARE( + vs_matcher_ip6, net6_fast_dst, port_fast_dst, proto_range +); + +struct balancer_handle { + struct balancer_module_config module_config; +}; + +static size_t +real_selector_size(size_t workers) { + return sizeof(struct real_selector) + + sizeof(struct rr_counter) * workers; +} + +static void +vs_prefix(const struct balancer_vs_config *config, char *buf, size_t buf_size) { + char addr_str[INET6_ADDRSTRLEN]; + if (config->ip_family == ip_family_ip4) { + inet_ntop( + AF_INET, + config->dst.v4.bytes, + addr_str, + sizeof(addr_str) + ); + } else { + inet_ntop( + AF_INET6, + config->dst.v6.bytes, + addr_str, + sizeof(addr_str) + ); + } + snprintf( + buf, + buf_size, + "%s:%u/%s", + addr_str, + config->port, + config->transport == tcp ? "tcp" : "udp" + ); +} + +static void +real_addr_str( + const struct balancer_real_config *config, char *buf, size_t buf_size +) { + if (config->ip_family == ip_family_ip4) { + inet_ntop(AF_INET, config->dst.v4.bytes, buf, buf_size); + } else { + inet_ntop(AF_INET6, config->dst.v6.bytes, buf, buf_size); + } +} + +static void +fill_acl_rule( + struct filter_rule *rule, + uint32_t action, + const struct balancer_allowed_sources *src, + enum ip_family family +) { + rule->action = action; + + if (family == ip_family_ip4) { + rule->net4.src_count = src->net4s.count; + rule->net4.srcs = src->net4s.items; + } else { + rule->net6.src_count = src->net6s.count; + rule->net6.srcs = src->net6s.items; + } + + rule->transport.src_count = src->port_ranges.count; + rule->transport.srcs = src->port_ranges.items; +} + +static int +build_vs_acl( + struct memory_context *mctx, + struct virtual_service *vs, + const struct balancer_vs_config *config +) { + const struct filter_compiler *compiler = + config->ip_family == ip_family_ip4 ? vs_acl_ip4 : vs_acl_ip6; + + size_t rule_count = config->allowed_sources_count; + if (rule_count == 0) { + return filter_init(&vs->acl, compiler, NULL, 0, mctx); + } + + int res = -1; + + struct filter_rule *rules = calloc(rule_count, sizeof(*rules)); + const struct filter_rule **rule_ptrs = + malloc(rule_count * sizeof(*rule_ptrs)); + if (rules == NULL || rule_ptrs == NULL) { + goto cleanup; + } + + for (size_t idx = 0; idx < rule_count; ++idx) { + fill_acl_rule( + &rules[idx], + (uint32_t)idx, + &config->allowed_sources[idx], + config->ip_family + ); + rule_ptrs[idx] = &rules[idx]; + } + + res = filter_init( + &vs->acl, compiler, rule_ptrs, (uint32_t)rule_count, mctx + ); + +cleanup: + free(rules); + free(rule_ptrs); + + return res; +} + +static void +build_real(struct real *real, const struct balancer_real_config *config) { + memset(real, 0, sizeof(*real)); + switch (config->ip_family) { + case ip_family_ip4: + real->addr.v4 = config->dst.v4; + real->src.v4 = config->src.v4; + break; + case ip_family_ip6: + real->addr.v6 = config->dst.v6; + real->src.v6 = config->src.v6; + real->flags |= real_ip6; + break; + } + real->counter_id = (uint64_t)-1; +} + +static int +build_vs_reals( + struct memory_context *mctx, + struct virtual_service *vs, + const struct balancer_vs_config *config +) { + if (config->real_count == 0) { + vs->reals_count = 0; + SET_OFFSET_OF(&vs->reals, NULL); + return 0; + } + + struct real *reals = + memory_balloc(mctx, sizeof(struct real) * config->real_count); + if (reals == NULL) { + return -1; + } + for (size_t idx = 0; idx < config->real_count; ++idx) { + build_real(reals + idx, config->reals + idx); + } + + vs->reals_count = config->real_count; + SET_OFFSET_OF(&vs->reals, reals); + return 0; +} + +static uint8_t +build_vs_flags(const struct balancer_vs_config *config) { + uint8_t flags = 0; + if (config->scheduler == balancer_vs_sched_op) { + flags |= vs_ops; + } + if (config->fix_mss) { + flags |= vs_fix_mss; + } + if (config->tunnel == balancer_tunnel_kind_gre) { + flags |= vs_gre; + } + if (config->ip_family == ip_family_ip6) { + flags |= vs_ip6; + } + return flags; +} + +static int +build_real_selector( + struct memory_context *mctx, + struct virtual_service *vs, + size_t workers, + const struct balancer_vs_config *config +) { + const size_t size = real_selector_size(workers); + struct real_selector *selector = memory_balloc(mctx, size); + if (selector == NULL) { + return -1; + } + memset(selector, 0, size); + if (config->scheduler == balancer_vs_sched_sh) { + selector->packet_hash_mask = (uint64_t)-1; + } + + /* Rings are built lazily on the first weight update. */ + SET_OFFSET_OF(&vs->selector, selector); + return 0; +} + +static int +register_acl_counters( + struct memory_context *mctx, + struct counter_registry *registry, + struct virtual_service *vs, + const char *prefix, + const struct balancer_allowed_sources *sources, + size_t source_count +) { + uint64_t *ids = memory_balloc(mctx, sizeof(uint64_t) * source_count); + if (ids == NULL && source_count > 0) { + return -1; + } + for (size_t idx = 0; idx < source_count; ++idx) { + if (sources[idx].tag == NULL) { + ids[idx] = (uint64_t)-1; + continue; + } + char name[COUNTER_NAME_LEN]; + snprintf( + name, + sizeof(name), + "%s_%s_%s", + balancer_vs_acl_counter_prefix, + prefix, + sources[idx].tag + ); + ids[idx] = counter_registry_register(registry, name, 1); + if (ids[idx] == COUNTER_INVALID) { + memory_bfree( + mctx, ids, sizeof(uint64_t) * source_count + ); + return -1; + } + } + vs->rule_count = source_count; + SET_OFFSET_OF(&vs->rule_counter_ids, ids); + return 0; +} + +static int +register_real_counters( + struct counter_registry *registry, + struct virtual_service *vs, + const char *prefix, + const struct balancer_real_config *real_configs +) { + struct real *reals = ADDR_OF(&vs->reals); + for (size_t idx = 0; idx < vs->reals_count; ++idx) { + char dst_str[INET6_ADDRSTRLEN]; + real_addr_str(&real_configs[idx], dst_str, sizeof(dst_str)); + + char name[COUNTER_NAME_LEN]; + snprintf( + name, + sizeof(name), + "%s_%s_%s", + balancer_real_counter_prefix, + prefix, + dst_str + ); + reals[idx].counter_id = counter_registry_register( + registry, + name, + sizeof(struct balancer_real_stats) / sizeof(uint64_t) + ); + if (reals[idx].counter_id == COUNTER_INVALID) { + return -1; + } + } + return 0; +} + +static int +register_vs_counters( + struct memory_context *mctx, + struct counter_registry *registry, + struct virtual_service *vs, + const struct balancer_vs_config *config +) { + char prefix[COUNTER_NAME_LEN]; + vs_prefix(config, prefix, sizeof(prefix)); + + char name[COUNTER_NAME_LEN]; + snprintf( + name, sizeof(name), "%s_%s", balancer_vs_counter_prefix, prefix + ); + vs->counter_id = counter_registry_register( + registry, + name, + sizeof(struct balancer_vs_stats) / sizeof(uint64_t) + ); + if (vs->counter_id == COUNTER_INVALID) { + return -1; + } + + if (register_acl_counters( + mctx, + registry, + vs, + prefix, + config->allowed_sources, + config->allowed_sources_count + ) != 0) { + return -1; + } + + if (register_real_counters(registry, vs, prefix, config->reals) != 0) { + return -1; + } + + return 0; +} + +static void +free_vs_reals(struct memory_context *mctx, struct virtual_service *vs) { + struct real *reals = ADDR_OF(&vs->reals); + if (reals == NULL) { + return; + } + memory_bfree(mctx, reals, sizeof(struct real) * vs->reals_count); + SET_OFFSET_OF(&vs->reals, NULL); + vs->reals_count = 0; +} + +static void +free_vs_acl(struct memory_context *mctx, struct virtual_service *vs) { + if (vs->flags & vs_ip6) { + filter_free(&vs->acl, vs_acl_ip6); + } else { + filter_free(&vs->acl, vs_acl_ip4); + } + + uint64_t *ids = ADDR_OF(&vs->rule_counter_ids); + if (ids != NULL) { + memory_bfree(mctx, ids, sizeof(uint64_t) * vs->rule_count); + SET_OFFSET_OF(&vs->rule_counter_ids, NULL); + vs->rule_count = 0; + } +} + +static void +free_real_selector( + struct memory_context *mctx, struct virtual_service *vs, size_t workers +) { + struct real_selector *selector = ADDR_OF(&vs->selector); + if (selector == NULL) { + return; + } + memory_bfree(mctx, selector, real_selector_size(workers)); + SET_OFFSET_OF(&vs->selector, NULL); +} + +static void +free_vs(struct memory_context *mctx, struct virtual_service *vs, size_t workers +) { + free_vs_reals(mctx, vs); + free_vs_acl(mctx, vs); + free_real_selector(mctx, vs, workers); +} + +static int +init_vs(struct memory_context *mctx, + struct counter_registry *registry, + struct virtual_service *vs, + const struct balancer_vs_config *config, + size_t workers) { + memset(vs, 0, sizeof(*vs)); + vs->counter_id = (uint64_t)-1; + + vs->flags = build_vs_flags(config); + + if (build_vs_acl(mctx, vs, config) != 0) { + return -1; + } + + if (build_vs_reals(mctx, vs, config) != 0) { + free_vs(mctx, vs, workers); + return -1; + } + + if (build_real_selector(mctx, vs, workers, config) != 0) { + free_vs(mctx, vs, workers); + return -1; + } + + if (register_vs_counters(mctx, registry, vs, config) != 0) { + free_vs(mctx, vs, workers); + return -1; + } + + return 0; +} + +static int +register_balancer_counters( + struct balancer_module_config *cfg, struct counter_registry *registry +) { + cfg->common_counter_id = counter_registry_register( + registry, + balancer_common_counter_name, + sizeof(struct balancer_common_stats) / sizeof(uint64_t) + ); + if (cfg->common_counter_id == COUNTER_INVALID) { + return -1; + } + cfg->l4_counter_id = counter_registry_register( + registry, + balancer_l4_counter_name, + sizeof(struct balancer_l4_stats) / sizeof(uint64_t) + ); + if (cfg->l4_counter_id == COUNTER_INVALID) { + return -1; + } + return 0; +} + +static struct virtual_service * +build_vs_array( + struct memory_context *mctx, + struct counter_registry *registry, + const struct balancer_vs_config *configs, + size_t count, + size_t workers +) { + struct virtual_service *vs = + memory_balloc(mctx, sizeof(struct virtual_service) * count); + if (vs == NULL) { + return NULL; + } + for (size_t idx = 0; idx < count; ++idx) { + if (init_vs(mctx, registry, &vs[idx], &configs[idx], workers) != + 0) { + for (size_t j = 0; j < idx; ++j) { + free_vs(mctx, &vs[j], workers); + } + memory_bfree( + mctx, vs, sizeof(struct virtual_service) * count + ); + return NULL; + } + } + return vs; +} + +static void +free_vs_array( + struct memory_context *mctx, + struct virtual_service *vs, + size_t count, + size_t workers +) { + for (size_t idx = 0; idx < count; ++idx) { + free_vs(mctx, &vs[idx], workers); + } + memory_bfree(mctx, vs, sizeof(struct virtual_service) * count); +} + +static void +free_matcher_rule(struct filter_rule *rule) { + free(rule->net4.dsts); + free(rule->net6.dsts); + free(rule->transport.dsts); + free(rule->transport.protos); +} + +static int +fill_matcher_rule( + struct filter_rule *rule, + uint32_t action, + const struct balancer_vs_config *cfg +) { + rule->action = action; + + if (cfg->ip_family == ip_family_ip4) { + struct net4 *n4 = malloc(sizeof(struct net4)); + if (n4 == NULL) { + goto err; + } + memcpy(n4->addr, cfg->dst.v4.bytes, NET4_LEN); + memset(n4->mask, 0xFF, NET4_LEN); + rule->net4.dst_count = 1; + rule->net4.dsts = n4; + } else { + struct net6 *n6 = malloc(sizeof(struct net6)); + if (n6 == NULL) { + goto err; + } + memcpy(n6->addr, cfg->dst.v6.bytes, NET6_LEN); + memset(n6->mask, 0xFF, NET6_LEN); + rule->net6.dst_count = 1; + rule->net6.dsts = n6; + } + + struct filter_port_range *port_range = + malloc(sizeof(struct filter_port_range)); + if (port_range == NULL) { + goto err; + } + if (cfg->port != 0) { + port_range->from = cfg->port; + port_range->to = cfg->port; + } else { + port_range->from = 0; + port_range->to = 65535; + } + rule->transport.dst_count = 1; + rule->transport.dsts = port_range; + + struct filter_proto_range *proto_range = + malloc(sizeof(struct filter_proto_range)); + if (proto_range == NULL) { + goto err; + } + uint16_t transport = cfg->transport == tcp ? IPPROTO_TCP : IPPROTO_UDP; + proto_range->from = transport * 256; + proto_range->to = transport * 256 + 255; + rule->transport.proto_count = 1; + rule->transport.protos = proto_range; + + return 0; + +err: + free_matcher_rule(rule); + return -1; +} + +static int +build_vs_matcher( + struct memory_context *mctx, + struct filter *matcher, + enum ip_family family, + const struct balancer_vs_config *configs, + size_t config_count +) { + const struct filter_compiler *compiler = + family == ip_family_ip4 ? vs_matcher_ip4 : vs_matcher_ip6; + + size_t rule_count = 0; + for (size_t idx = 0; idx < config_count; ++idx) { + if (configs[idx].ip_family == family) { + ++rule_count; + } + } + if (rule_count == 0) { + return filter_init(matcher, compiler, NULL, 0, mctx); + } + + int res = -1; + size_t filled = 0; + + struct filter_rule *rules = calloc(rule_count, sizeof(*rules)); + const struct filter_rule **rule_ptrs = + malloc(rule_count * sizeof(*rule_ptrs)); + if (rules == NULL || rule_ptrs == NULL) { + goto cleanup; + } + + for (size_t idx = 0; idx < config_count; ++idx) { + if (configs[idx].ip_family != family) { + continue; + } + if (fill_matcher_rule( + &rules[filled], (uint32_t)idx, &configs[idx] + ) != 0) { + goto cleanup; + } + rule_ptrs[filled] = &rules[filled]; + ++filled; + } + + res = filter_init( + matcher, compiler, rule_ptrs, (uint32_t)rule_count, mctx + ); + +cleanup: + for (size_t idx = 0; idx < filled; ++idx) { + free_matcher_rule(&rules[idx]); + } + free(rules); + free(rule_ptrs); + + return res; +} + +static int +build_vs_matchers( + struct memory_context *mctx, + struct balancer_module_config *cfg, + const struct balancer_vs_config *configs, + size_t count +) { + if (build_vs_matcher( + mctx, &cfg->vs_matcher_ip4, ip_family_ip4, configs, count + ) != 0) { + return -1; + } + + if (build_vs_matcher( + mctx, &cfg->vs_matcher_ip6, ip_family_ip6, configs, count + ) != 0) { + filter_free(&cfg->vs_matcher_ip4, vs_matcher_ip4); + return -1; + } + + return 0; +} + +static void +free_vs_matchers(struct balancer_module_config *cfg) { + filter_free(&cfg->vs_matcher_ip4, vs_matcher_ip4); + filter_free(&cfg->vs_matcher_ip6, vs_matcher_ip6); +} + +static void +free_module_config(struct agent *agent, struct balancer_module_config *cfg) { + const size_t workers = ADDR_OF(&agent->dp_config)->worker_count; + struct memory_context *mctx = &agent->memory_context; + free_vs_matchers(cfg); + free_vs_array(mctx, ADDR_OF(&cfg->vs), cfg->vs_count, workers); + cp_module_fini(&cfg->cp_module); +} + +static int +init_module_config( + struct agent *agent, + struct balancer_module_config *cfg, + const char *name, + struct balancer_session_table_chain *session_table_chain, + struct balancer_session_timeouts *timeouts, + const struct balancer_vs_config *vs_configs, + size_t vs_count +) { + struct memory_context *mctx = &agent->memory_context; + const size_t workers = ADDR_OF(&agent->dp_config)->worker_count; + + if (cp_module_init(&cfg->cp_module, agent, "balancer", name) != 0) { + return -1; + } + + struct counter_registry *registry = &cfg->cp_module.counter_registry; + + if (register_balancer_counters(cfg, registry) != 0) { + free_module_config(agent, cfg); + return -1; + } + + struct virtual_service *vs = + build_vs_array(mctx, registry, vs_configs, vs_count, workers); + if (vs == NULL) { + free_module_config(agent, cfg); + return -1; + } + + if (build_vs_matchers(mctx, cfg, vs_configs, vs_count) != 0) { + free_module_config(agent, cfg); + return -1; + } + + if (rcu_init(&cfg->rcu, mctx, workers) != 0) { + free_module_config(agent, cfg); + return -1; + } + + SET_OFFSET_OF(&cfg->vs, vs); + cfg->vs_count = (uint32_t)vs_count; + SET_OFFSET_OF(&cfg->st_chain, session_table_chain); + cfg->session_timeouts = *timeouts; + + return 0; +} + +struct balancer_handle * +balancer_create( + struct agent *agent, + const char *name, + struct balancer_session_table_chain *session_table_chain, + struct balancer_session_timeouts *timeouts, + struct balancer_vs_config *vs_configs, + uint32_t vs_count +) { + struct memory_context *mctx = &agent->memory_context; + + struct balancer_handle *handle = memory_balloc(mctx, sizeof(*handle)); + if (handle == NULL) { + return NULL; + } + memset(handle, 0, sizeof(*handle)); + + if (init_module_config( + agent, + &handle->module_config, + name, + session_table_chain, + timeouts, + vs_configs, + vs_count + ) != 0) { + memory_bfree(mctx, handle, sizeof(*handle)); + return NULL; + } + + return handle; +} + +int +balancer_install(struct agent *agent, struct balancer_handle *handle) { + struct cp_module *module = &handle->module_config.cp_module; + return agent_update_modules(agent, 1, &module); +} + +void +balancer_free(struct agent *agent, struct balancer_handle *handle) { + if (handle == NULL) { + return; + } + struct memory_context *mctx = &agent->memory_context; + struct balancer_module_config *cfg = &handle->module_config; + free_module_config(agent, cfg); + memory_bfree(mctx, handle, sizeof(*handle)); +} + +/* This procedure does not respect disabled reals. + * It is expected user manually sets zero weights for disabled reals + * if needed. + */ +static int +build_ring( + struct ring *ring, + const uint32_t *weights, + uint32_t reals_count, + struct memory_context *mctx, + uint64_t shuffle_seed +) { + uint64_t total_weight = 0; + for (uint32_t i = 0; i < reals_count; ++i) { + total_weight += weights[i]; + } + + if (total_weight == 0) { + memset(ring, 0, sizeof(*ring)); + return 0; + } + + size_t bytes = total_weight * sizeof(uint32_t); + if (big_array_init(&ring->real_ids, bytes, mctx) != 0) { + return -1; + } + + size_t pos = 0; + for (uint32_t i = 0; i < reals_count; ++i) { + for (uint32_t j = 0; j < weights[i]; ++j) { + uint32_t *slot = big_array_get( + &ring->real_ids, pos * sizeof(uint32_t) + ); + *slot = i; + ++pos; + } + } + + uint64_t rng = 0xdeadbeef ^ shuffle_seed; + for (size_t i = pos; i > 1; --i) { + uint32_t *a = big_array_get( + &ring->real_ids, (i - 1) * sizeof(uint32_t) + ); + uint32_t *b = big_array_get( + &ring->real_ids, (rng % i) * sizeof(uint32_t) + ); + uint32_t tmp = *a; + *a = *b; + *b = tmp; + rng = rng_next(&rng); + } + + return 0; +} + +int +balancer_vs_update_real_weights( + struct balancer_handle *balancer, + uint32_t vs_idx, + const uint32_t *weights +) { + struct balancer_module_config *cfg = &balancer->module_config; + if (vs_idx >= cfg->vs_count) { + return -1; + } + + struct memory_context *mctx = &cfg->cp_module.agent->memory_context; + struct virtual_service *vs = ADDR_OF(&cfg->vs) + vs_idx; + struct real_selector *selector = ADDR_OF(&vs->selector); + + size_t cur_ring = + atomic_load_explicit(&selector->ring_id, memory_order_relaxed); + size_t new_ring = cur_ring ^ 1; + + if (build_ring( + &selector->rings[new_ring], + weights, + vs->reals_count, + mctx, + vs_idx + ) != 0) { + return -2; + } + + /* Blocks until all workers see new value (or will see on demand) */ + rcu_update(&cfg->rcu, &selector->ring_id, new_ring); + + big_array_free(&selector->rings[cur_ring].real_ids); + + return 0; +} + +int +balancer_vs_update_real_states( + struct balancer_handle *balancer, uint32_t vs_idx, const bool *states +) { + struct balancer_module_config *cfg = &balancer->module_config; + if (vs_idx >= cfg->vs_count) { + return -1; + } + + struct virtual_service *vs = ADDR_OF(&cfg->vs) + vs_idx; + struct real *reals = ADDR_OF(&vs->reals); + + for (uint32_t i = 0; i < vs->reals_count; ++i) { + uint8_t flags = atomic_load_explicit( + &reals[i].flags, memory_order_relaxed + ); + uint8_t new_flags = 0; + if (states[i]) { + new_flags = flags | real_enabled; + } else { + new_flags = flags & ~real_enabled; + } + if (flags != new_flags) { + atomic_store_explicit( + &reals[i].flags, new_flags, memory_order_relaxed + ); + } + } + return 0; +} + +struct balancer_session_table * +balancer_create_session_table(struct agent *agent, size_t capacity) { + if (capacity == 0) { + return NULL; + } + + struct memory_context *mctx = &agent->memory_context; + + struct balancer_session_table *table = + memory_balloc(mctx, sizeof(*table)); + if (table == NULL) { + return NULL; + } + + if (TTLMAP_INIT( + &table->map, + mctx, + struct balancer_session_id, + struct balancer_session_state, + capacity + ) != 0) { + memory_bfree(mctx, table, sizeof(*table)); + return NULL; + } + + return table; +} + +/* + * Slot mapping for the chain follows + * idx(gen) = ((gen + 1) & 0b11) >> 1 + * so even gens are steady (only the front slot is used; back is NULL) + * and odd gens are transitions (both slots populated, workers fall + * back to the back map for sessions that have not migrated yet). + */ + +int +balancer_session_table_chain_push_front( + struct balancer_session_table_chain *chain, + struct balancer_session_table *front_table +) { + uint64_t gen = rcu_load(&chain->rcu, &chain->gen); + if (gen & 1) { + return -1; + } + uint32_t cur_idx = (uint32_t)(((gen + 1) & 0b11) >> 1); + uint32_t back_idx = cur_idx ^ 1; + + assert(chain->tables[back_idx] == NULL); + + SET_OFFSET_OF(&chain->tables[back_idx], front_table); + rcu_update(&chain->rcu, &chain->gen, gen + 1); + return 0; +} + +int +balancer_session_table_chain_pop_back(struct balancer_session_table_chain *chain +) { + uint64_t gen = rcu_load(&chain->rcu, &chain->gen); + if ((gen & 1) == 0) { + return -1; + } + + uint32_t cur_idx = (uint32_t)(((gen + 1) & 0b11) >> 1); + uint32_t back_idx = cur_idx ^ 1; + + assert(chain->tables[back_idx] != NULL); + + rcu_update(&chain->rcu, &chain->gen, gen + 1); + SET_OFFSET_OF(&chain->tables[back_idx], NULL); + return 0; +} + +int +balancer_free_session_table( + struct agent *agent, struct balancer_session_table *table +) { + if (table == NULL) { + return 1; + } + struct memory_context *mctx = &agent->memory_context; + TTLMAP_FREE(&table->map); + memory_bfree(mctx, table, sizeof(*table)); + return 1; +} + +struct balancer_session_table_chain * +balancer_create_session_table_chain( + struct agent *agent, struct balancer_session_table *front_table +) { + struct memory_context *mctx = &agent->memory_context; + const size_t workers = ADDR_OF(&agent->dp_config)->worker_count; + + struct balancer_session_table_chain *chain = + memory_balloc(mctx, sizeof(*chain)); + if (chain == NULL) { + return NULL; + } + memset(chain, 0, sizeof(*chain)); + + if (rcu_init(&chain->rcu, mctx, workers) != 0) { + memory_bfree(mctx, chain, sizeof(*chain)); + return NULL; + } + + SET_OFFSET_OF(&chain->tables[0], front_table); + SET_OFFSET_OF(&chain->tables[1], NULL); + + return chain; +} + +void +balancer_free_session_table_chain( + struct agent *agent, struct balancer_session_table_chain *chain +) { + if (chain == NULL) { + return; + } + struct memory_context *mctx = &agent->memory_context; + rcu_free(&chain->rcu, mctx); + memory_bfree(mctx, chain, sizeof(*chain)); +} diff --git a/modules/balancer2/api/controlplane.h b/modules/balancer2/api/controlplane.h index 878754c1..92edf2de 100644 --- a/modules/balancer2/api/controlplane.h +++ b/modules/balancer2/api/controlplane.h @@ -6,9 +6,7 @@ #include "modules/balancer2/dataplane/types/session.h" struct agent; -struct balancer_session_table; struct balancer_handle; -struct balancer_vs_handle; enum balancer_tunnel_kind { balancer_tunnel_kind_ip, @@ -20,11 +18,8 @@ enum balancer_tunnel_kind { */ struct balancer_real_config { struct net_addr dst; - enum ip_family ip_family; - struct net src; - - enum balancer_tunnel_kind tunnel; + enum ip_family ip_family; }; /* @@ -37,6 +32,7 @@ struct balancer_allowed_sources { struct filter_net4s net4s; struct filter_net6s net6s; struct filter_port_ranges port_ranges; + const char *tag; }; enum balancer_vs_sched { @@ -65,51 +61,19 @@ struct balancer_vs_config { /* Destination port, host byte order. */ uint16_t port; - uint8_t transport_proto; + enum transport_proto transport; - struct balancer_allowed_sources allowed_sources; + struct balancer_allowed_sources *allowed_sources; + size_t allowed_sources_count; enum balancer_vs_sched scheduler; + enum balancer_tunnel_kind tunnel; struct balancer_real_config *reals; size_t real_count; -}; - -/* - * Creates a VS handle from the supplied configuration. The handle is - * used to mutate per-real state (weights, enabled flags) after the - * containing balancer is installed. - */ -struct balancer_vs_handle * -balancer_create_vs( - struct agent *agent, const struct balancer_vs_config *config -); - -/* - * Returns 0 if the handle is still referenced by a balancer, or 1 if - * it was actually freed. - */ -int -balancer_free_vs(struct agent *agent, struct balancer_vs_handle *vs); - -/* - * Creates a session table with the given capacity (number of session - * entries). - */ -struct balancer_session_table * -balancer_create_session_table(struct agent *agent, size_t capacity); -/* - * Returns 0 if the session table is still referenced by a balancer, or 1 - * if it was actually freed. - */ -int -balancer_free_session_table( - struct agent *agent, struct balancer_session_table *table -); - -// TODO: -// session table iter. + bool fix_mss; +}; /* * Bounded chain of session tables consulted by workers on each packet. @@ -129,30 +93,11 @@ balancer_free_session_table( */ struct balancer_session_table_chain; -/* - * Creates a session table chain seeded with the given front table. - * The table is not owned by the chain and must outlive it. - * Returns NULL on allocation failure. - */ -struct balancer_session_table_chain * -balancer_create_session_table_chain( - struct agent *agent, struct balancer_session_table *front_table -); - -/* - * Frees the session table chain. The session tables it referenced - * are not freed — the caller owns them. - */ -void -balancer_free_session_table_chain( - struct agent *agent, struct balancer_session_table_chain *chain -); - /* * Creates a balancer handle from its full configuration. * - * The session table and each VS handle must outlive the returned - * balancer handle; they are not owned by it. + * The session table chain must outlive the returned balancer handle; + * it is referenced, not owned. */ struct balancer_handle * balancer_create( @@ -160,35 +105,8 @@ balancer_create( const char *name, struct balancer_session_table_chain *session_table_chain, struct balancer_session_timeouts *timeouts, - struct balancer_vs_handle **vs, - size_t vs_count -); - -/* - * Pushes the given table as the new front (primary) session table. - * - * Workers look up sessions in the front table first and fall back to - * the previous (back) table; a session found in the back table is - * copied forward. New sessions are always created in the front table. - * - * Returns -1 if two session tables are already attached. - */ -int -balancer_session_table_chain_push_front( - struct balancer_session_table_chain *session_table_chain, - struct balancer_session_table *front_table -); - -/* - * Detaches the back session table. - * - * After this call, new workers ignore the detached table for lookups. - * - * Returns -1 if only one session table is attached. - */ -int -balancer_session_table_chain_pop_back( - struct balancer_session_table_chain *session_table_chain + struct balancer_vs_config *vs, + uint32_t vs_count ); /* @@ -204,8 +122,8 @@ int balancer_install(struct agent *agent, struct balancer_handle *handle); /* - * Frees a balancer handle. The session tables and VS handles - * attached to the balancer are not freed — the caller owns them. + * Frees a balancer handle. The session table chain attached to the + * balancer is not freed — the caller owns it. */ void balancer_free(struct agent *agent, struct balancer_handle *handle); @@ -215,11 +133,13 @@ balancer_free(struct agent *agent, struct balancer_handle *handle); * length equal to the number of reals configured for the VS and be * indexed in the same order as they were passed at VS creation. * Returns 0 on success, -1 if the length does not match the number - * of reals, or -2 on allocation failure. + * of reals or vs_idx exceeds the number of VS, or -2 on allocation failure. */ int balancer_vs_update_real_weights( - struct balancer_vs_handle *vs, const uint32_t *weights + struct balancer_handle *balancer, + uint32_t vs_idx, + const uint32_t *weights ); /* @@ -227,11 +147,11 @@ balancer_vs_update_real_weights( * length equal to the number of reals configured for the VS and be * indexed in the same order as they were passed at VS creation. * Returns 0 on success, -1 if the length does not match the number - * of reals, or -2 on allocation failure. + * of reals or vs_idx exceeds the number of VS, or -2 on allocation failure. */ int balancer_vs_update_real_states( - struct balancer_vs_handle *vs, const bool *states + struct balancer_handle *balancer, uint32_t vs_idx, const bool *states ); /* @@ -256,4 +176,81 @@ extern const char *const balancer_real_counter_prefix; extern const char *const balancer_common_counter_name; -extern const char *const balancer_l4_counter_name; \ No newline at end of file +extern const char *const balancer_l4_counter_name; + +/* + * A session table holds active session entries — one per tracked + * flow — mapping a connection key to its selected real. The table + * has a fixed capacity, set at creation time, that bounds the number + * of concurrent sessions it can store. + * + * A session table is used by a balancer through a session table + * chain; see the balancer_session_table_chain documentation above + * for how front and back tables interact during lookups and inserts. + */ +struct balancer_session_table; + +/* + * Creates a session table with the given capacity (number of session + * entries). + */ +struct balancer_session_table * +balancer_create_session_table(struct agent *agent, size_t capacity); + +/* + * Pushes the given table as the new front (primary) session table. + * + * Workers look up sessions in the front table first and fall back to + * the previous (back) table; a session found in the back table is + * copied forward. New sessions are always created in the front table. + * + * Returns -1 if two session tables are already attached. + */ +int +balancer_session_table_chain_push_front( + struct balancer_session_table_chain *session_table_chain, + struct balancer_session_table *front_table +); + +/* + * Detaches the back session table. + * + * After this call, new workers ignore the detached table for lookups. + * + * Returns -1 if only one session table is attached. + */ +int +balancer_session_table_chain_pop_back( + struct balancer_session_table_chain *session_table_chain +); + +/* + * Returns 0 if the session table is still referenced by a balancer, or 1 + * if it was actually freed. + */ +int +balancer_free_session_table( + struct agent *agent, struct balancer_session_table *table +); + +// TODO: +// session table iter. + +/* + * Creates a session table chain seeded with the given front table. + * The table is not owned by the chain and must outlive it. + * Returns NULL on allocation failure. + */ +struct balancer_session_table_chain * +balancer_create_session_table_chain( + struct agent *agent, struct balancer_session_table *front_table +); + +/* + * Frees the session table chain. The session tables it referenced + * are not freed — the caller owns them. + */ +void +balancer_free_session_table_chain( + struct agent *agent, struct balancer_session_table_chain *chain +); diff --git a/modules/balancer2/api/meson.build b/modules/balancer2/api/meson.build new file mode 100644 index 00000000..e69de29b diff --git a/modules/balancer2/dataplane/config.h b/modules/balancer2/dataplane/config.h new file mode 100644 index 00000000..46b10198 --- /dev/null +++ b/modules/balancer2/dataplane/config.h @@ -0,0 +1,35 @@ +#pragma once + +#include "filter/filter.h" + +#include "lib/controlplane/config/cp_module.h" + +#include "types/session.h" + +#include "common/rcu.h" + +struct balancer_session_table_chain; +struct virtual_service; + +struct balancer_module_config { + struct cp_module cp_module; + + uint64_t common_counter_id; + uint64_t l4_counter_id; + + struct filter vs_matcher_ip4; + struct filter vs_matcher_ip6; + + struct virtual_service *vs; + uint32_t vs_count; + + struct balancer_session_timeouts session_timeouts; + + struct balancer_session_table_chain *st_chain; + + /* + * RCU guard for the inner atomic changes on the config, + * including changes on reals ring of virtual services. + */ + rcu_t rcu; +}; \ No newline at end of file diff --git a/modules/balancer2/dataplane/context.h b/modules/balancer2/dataplane/context.h new file mode 100644 index 00000000..fdeca51d --- /dev/null +++ b/modules/balancer2/dataplane/context.h @@ -0,0 +1,23 @@ +#pragma once + +#include + +struct packet_front; +struct balancer_config; +struct counter_storage; + +struct worker_context { + struct packet_front *packet_front; + struct balancer_module_config *config; + struct counter_storage *counter_storage; + + struct balancer_common_stats *common_stats; + struct balancer_l4_stats *l4_stats; + struct balancer_icmp_stats *icmp_v4_stats; + struct balancer_icmp_stats *icmp_v6_stats; + + uint32_t worker_idx; + + /* Current time in seconds. */ + uint32_t now; +}; \ No newline at end of file diff --git a/modules/balancer2/dataplane/dataplane.c b/modules/balancer2/dataplane/dataplane.c new file mode 100644 index 00000000..1b15753a --- /dev/null +++ b/modules/balancer2/dataplane/dataplane.c @@ -0,0 +1,189 @@ +#include +#include +#include +#include + +#include "common/memory_address.h" + +#include "lib/counters/counters.h" +#include "lib/dataplane/config/zone.h" +#include "lib/dataplane/module/packet_front.h" +#include "lib/dataplane/packet/data.h" +#include "lib/dataplane/pipeline/econtext.h" + +#include "types/stats.h" + +#include "config.h" +#include "context.h" +#include "dataplane.h" + +#include "icmp/handle.h" +#include "l4/handle.h" + +#define MAX_BATCH_SIZE 64 + +typedef void (*batch_handler)( + struct worker_context *context, + struct packet **packets, + size_t packets_count +); + +struct packet_batcher { + struct packet *packets[MAX_BATCH_SIZE]; + size_t count; + batch_handler handler; +}; + +static void +batcher_add( + struct packet_batcher *batcher, + struct worker_context *context, + struct packet *packet +) { + batcher->packets[batcher->count++] = packet; + if (batcher->count == MAX_BATCH_SIZE) { + batcher->handler(context, batcher->packets, batcher->count); + batcher->count = 0; + } +} + +static void +batcher_flush(struct packet_batcher *batcher, struct worker_context *context) { + if (batcher->count > 0) { + batcher->handler(context, batcher->packets, batcher->count); + batcher->count = 0; + } +} + +static uint64_t * +context_get_counter(struct worker_context *context, uint64_t counter_id) { + return counter_get_address( + counter_id, context->worker_idx, context->counter_storage + ); +} + +static void +build_context( + struct worker_context *ctx, + struct dp_worker *dp_worker, + struct module_ectx *module_ectx, + struct packet_front *packet_front +) { + struct balancer_module_config *config = container_of( + ADDR_OF(&module_ectx->cp_module), + struct balancer_module_config, + cp_module + ); + ctx->config = config; + ctx->packet_front = packet_front; + ctx->counter_storage = ADDR_OF(&module_ectx->counter_storage); + ctx->worker_idx = dp_worker->idx; + ctx->now = dp_worker->current_time / (1000 * 1000 * 1000); /* ns -> s */ + + ctx->common_stats = (struct balancer_common_stats *)context_get_counter( + ctx, config->common_counter_id + ); + ctx->l4_stats = (struct balancer_l4_stats *)context_get_counter( + ctx, config->l4_counter_id + ); +} + +static inline bool +valid_packet_network(uint16_t network_type) { + return network_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV4) || + network_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV6); +} + +static inline bool +valid_packet_transport(uint16_t transport_type) { + /* For now, ICMP packets are invalid for the balancer module. */ + /* Further, we will handle ICMP packets as well. */ + return transport_type == IPPROTO_TCP || transport_type == IPPROTO_UDP; +} + +void +balancer2_handle_packets( + struct dp_worker *dp_worker, + struct module_ectx *module_ectx, + struct packet_front *packet_front +) { + struct worker_context context; + build_context(&context, dp_worker, module_ectx, packet_front); + + /* + * Classify incoming packets by protocol (L4/ICMP) and IP version + * (IPv4/IPv6), accumulating them into per-category batches. + * + * Batched processing allows the filter engine to evaluate multiple + * packets at once, which is significantly faster than one-at-a-time + * lookups due to memory prefetching and reduced per-packet overhead. + * + * Batches are flushed when they reach MAX_BATCH_SIZE or when all + * input packets have been classified. + */ + enum { l4_ip4, l4_ip6, icmp_ip4, icmp_ip6, batcher_count }; + struct packet_batcher batchers[batcher_count] = { + [l4_ip4] = {.handler = balancer_handle_l4_ip4}, + [l4_ip6] = {.handler = balancer_handle_l4_ip6}, + /* For the future ICMP support. */ + [icmp_ip4] = {.handler = balancer_handle_icmp_ip4}, + [icmp_ip6] = {.handler = balancer_handle_icmp_ip6}, + }; + + context.common_stats->incoming_packets += packet_front->input.count; + + struct packet *packet; + while ((packet = packet_list_pop(&packet_front->input)) != NULL) { + context.common_stats->incoming_bytes += packet_data_len(packet); + + uint16_t network_type = packet->network_header.type; + if (!valid_packet_network(network_type)) { + context.common_stats->unexpected_network_proto += 1; + packet_front_output(packet_front, packet); + continue; + } + + uint16_t transport_type = packet->transport_header.type; + if (!valid_packet_transport(transport_type)) { + context.common_stats->unexpected_transport_proto += 1; + packet_front_output(packet_front, packet); + continue; + } + + int is_ip6 = + network_type == rte_cpu_to_be_16(RTE_ETHER_TYPE_IPV6); + int is_icmp = transport_type == IPPROTO_ICMP || + transport_type == IPPROTO_ICMPV6; + int idx = is_icmp * 2 + is_ip6; + batcher_add(&batchers[idx], &context, packet); + } + + for (int i = 0; i < batcher_count; ++i) { + batcher_flush(&batchers[i], &context); + } +} + +struct balancer_module { + struct module module; +}; + +struct module * +new_module_balancer2() { + struct balancer_module *module = + (struct balancer_module *)malloc(sizeof(struct balancer_module) + ); + + if (module == NULL) { + return NULL; + } + + snprintf( + module->module.name, + sizeof(module->module.name), + "%s", + "balancer2" + ); + module->module.handler = balancer2_handle_packets; + + return &module->module; +} diff --git a/modules/balancer2/dataplane/dataplane.h b/modules/balancer2/dataplane/dataplane.h new file mode 100644 index 00000000..6f9f176c --- /dev/null +++ b/modules/balancer2/dataplane/dataplane.h @@ -0,0 +1,4 @@ +#pragma once + +struct module * +new_module_balancer2(); diff --git a/modules/balancer2/dataplane/icmp/handle.c b/modules/balancer2/dataplane/icmp/handle.c new file mode 100644 index 00000000..f2a5871a --- /dev/null +++ b/modules/balancer2/dataplane/icmp/handle.c @@ -0,0 +1,27 @@ +#include "handle.h" + +#include "lib/dataplane/module/packet_front.h" + +#include "context.h" + +void +balancer_handle_icmp_ip4( + struct worker_context *context, + struct packet **packets, + size_t packets_count +) { + for (size_t i = 0; i < packets_count; i++) { + packet_front_drop(context->packet_front, packets[i]); + } +} + +void +balancer_handle_icmp_ip6( + struct worker_context *context, + struct packet **packets, + size_t packets_count +) { + for (size_t i = 0; i < packets_count; i++) { + packet_front_drop(context->packet_front, packets[i]); + } +} \ No newline at end of file diff --git a/modules/balancer2/dataplane/icmp/handle.h b/modules/balancer2/dataplane/icmp/handle.h new file mode 100644 index 00000000..fee6aabc --- /dev/null +++ b/modules/balancer2/dataplane/icmp/handle.h @@ -0,0 +1,20 @@ +#pragma once + +#include + +struct packet; +struct worker_context; + +void +balancer_handle_icmp_ip4( + struct worker_context *context, + struct packet **packets, + size_t packets_count +); + +void +balancer_handle_icmp_ip6( + struct worker_context *context, + struct packet **packets, + size_t packets_count +); \ No newline at end of file diff --git a/modules/balancer2/dataplane/l4/group.c b/modules/balancer2/dataplane/l4/group.c new file mode 100644 index 00000000..de436f11 --- /dev/null +++ b/modules/balancer2/dataplane/l4/group.c @@ -0,0 +1,60 @@ +#include "common/likely.h" +#include + +#include "group.h" + +/* + * Maximum number of distinct VS IDs we track per batch. + * Practically 1-5 in normal traffic; 16 covers even degenerate cases. + * If exceeded, grouping is skipped (packets still processed correctly, + * just not batched per-VS for ACL queries). + */ +#define MAX_GROUPS 16 + +void +group_by_id(uint32_t *vs_ids, uint8_t *order, size_t count) { + /* Pass 1: collect unique VS IDs and count per group. */ + uint32_t unique_vs[MAX_GROUPS]; + size_t counts[MAX_GROUPS]; + size_t ngroups = 0; + + for (size_t i = 0; i < count; ++i) { + for (size_t g = 0; g < ngroups; ++g) { + if (unique_vs[g] == vs_ids[i]) { + counts[g]++; + goto next; + } + } + if (unlikely(ngroups == MAX_GROUPS)) { + return; + } + unique_vs[ngroups] = vs_ids[i]; + counts[ngroups] = 1; + ngroups++; + next:; + } + + /* Compute scatter offsets from prefix sums. */ + size_t offsets[MAX_GROUPS]; + offsets[0] = 0; + for (size_t g = 1; g < ngroups; ++g) { + offsets[g] = offsets[g - 1] + counts[g - 1]; + } + + /* Pass 2: scatter into temporary arrays. */ + uint32_t tmp_ids[count]; + uint8_t tmp_order[count]; + for (size_t i = 0; i < count; ++i) { + for (size_t g = 0; g < ngroups; ++g) { + if (unique_vs[g] == vs_ids[i]) { + size_t pos = offsets[g]++; + tmp_ids[pos] = vs_ids[i]; + tmp_order[pos] = order[i]; + break; + } + } + } + + memcpy(vs_ids, tmp_ids, count * sizeof(uint32_t)); + memcpy(order, tmp_order, count * sizeof(uint8_t)); +} \ No newline at end of file diff --git a/modules/balancer2/dataplane/l4/group.h b/modules/balancer2/dataplane/l4/group.h new file mode 100644 index 00000000..033ba0b3 --- /dev/null +++ b/modules/balancer2/dataplane/l4/group.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +/* + * Rearrange vs_ids[] and order[] so that entries with the same vs_id + * are contiguous. Uses a two-pass counting sort with a small group + * table, giving O(n * k) time for k distinct VS IDs. + * + * If there is too many distinct VS, the grouping is skipped. + */ +void +group_by_id(uint32_t *ids, uint8_t *order, size_t count); \ No newline at end of file diff --git a/modules/balancer2/dataplane/l4/handle.c b/modules/balancer2/dataplane/l4/handle.c new file mode 100644 index 00000000..deec46f7 --- /dev/null +++ b/modules/balancer2/dataplane/l4/handle.c @@ -0,0 +1,376 @@ +#include + +#include "common/likely.h" +#include "common/memory_address.h" + +#include "lib/dataplane/module/packet_front.h" +#include "lib/dataplane/packet/data.h" + +#include "filter/query.h" + +#include "config.h" +#include "context.h" +#include "group.h" +#include "packet.h" +#include "select.h" +#include "session.h" +#include "tunnel.h" +#include "vs.h" + +#include "../session.h" + +#include "types/stats.h" + +FILTER_QUERY_DECLARE( + ipv4_vs_matcher, net4_fast_dst, port_fast_dst, proto_range_fast +); +FILTER_QUERY_DECLARE(ipv4_vs_acl, net4_fast_src, port_fast_src); + +FILTER_QUERY_DECLARE( + ipv6_vs_matcher, net6_fast_dst, port_fast_dst, proto_range_fast +); +FILTER_QUERY_DECLARE(ipv6_vs_acl, net6_fast_src, port_fast_src); + +/* + * Batch ACL query for a group of packets sharing the same VS. + * Updates incoming and ACL stats, drops packets that fail. + * Appends passing packets to pkt_ctxs. + * Returns the number of packets that passed. + */ +static size_t +filter_vs_group( + struct worker_context *context, + struct virtual_service *vs, + struct packet **group_pkts, + size_t group_size, + struct packet_context *pkt_ctxs, + bool is_ipv6 +) { + struct balancer_vs_stats *vs_stats = vs_fetch_stats( + vs, context->worker_idx, context->counter_storage + ); + + uint32_t acl_results[group_size]; + if (is_ipv6) { + filter_query( + &vs->acl, + ipv6_vs_acl, + group_pkts, + acl_results, + group_size + ); + } else { + filter_query( + &vs->acl, + ipv4_vs_acl, + group_pkts, + acl_results, + group_size + ); + } + + size_t passed = 0; + for (size_t i = 0; i < group_size; ++i) { + uint32_t rule_idx = acl_results[i]; + if (rule_idx == FILTER_RULE_INVALID) { + vs_stats->packet_src_not_allowed += 1; + packet_front_drop(context->packet_front, group_pkts[i]); + continue; + } + + vs_stats->incoming_packets += 1; + vs_stats->incoming_bytes += packet_data_len(group_pkts[i]); + + uint64_t *rule_counter = vs_fetch_acl_stats( + vs, + context->worker_idx, + context->counter_storage, + rule_idx + ); + + /* Dont track stats for rules with empty tag. */ + if (rule_counter != NULL) { + *rule_counter += 1; + } + + pkt_ctxs[passed++] = (struct packet_context){ + .packet = group_pkts[i], + .matched_vs = vs, + .matched_vs_stats = vs_stats, + }; + } + + return passed; +} + +/* + * Match packets to virtual services, then filter by ACL. + * + * 1. Batch VS lookup — classify all packets at once. + * 2. Group matched packets by VS. + * 3. Batch ACL per group — drop packets that fail. + * + * Returns the number of packets that passed both checks. + * is_ipv6 is a compile-time constant so the compiler + * eliminates the dead branches after inlining. + */ +static size_t +service_lookup( + struct worker_context *context, + struct packet **packets, + size_t packets_count, + struct packet_context *pkt_ctxs, + bool is_ipv6 +) { + /* Batch VS lookup. */ + uint32_t vs_results[packets_count]; + if (is_ipv6) { + filter_query( + &context->config->vs_matcher_ip6, + ipv6_vs_matcher, + packets, + vs_results, + packets_count + ); + } else { + filter_query( + &context->config->vs_matcher_ip4, + ipv4_vs_matcher, + packets, + vs_results, + packets_count + ); + } + + /* Drop unmatched, record indices and vs_ids of the rest. */ + uint8_t order[packets_count]; + uint32_t vs_ids[packets_count]; + size_t matched_count = 0; + + for (size_t i = 0; i < packets_count; ++i) { + uint32_t vs_id = vs_results[i]; + if (vs_id == FILTER_RULE_INVALID) { + context->l4_stats->select_vs_failed += 1; + packet_front_drop(context->packet_front, packets[i]); + continue; + } + + order[matched_count] = i; + vs_ids[matched_count] = vs_id; + matched_count++; + } + + if (unlikely(matched_count == 0)) { + return 0; + } + + /* + * Group by VS for batched ACL queries. If grouping fails + * (too many distinct VSes), the loop below still works + * correctly with ungrouped packets — just smaller batches. + */ + group_by_id(vs_ids, order, matched_count); + + struct virtual_service *virtual_services = + ADDR_OF(&context->config->vs); + + /* Run ACL per VS group. */ + size_t total_passed = 0; + size_t pkt_idx = 0; + struct packet *group_pkts[matched_count]; + while (pkt_idx < matched_count) { + uint32_t cur_vs_id = vs_ids[pkt_idx]; + + /* Collect contiguous packets for this VS. */ + size_t group_size = 0; + while (pkt_idx < matched_count && vs_ids[pkt_idx] == cur_vs_id + ) { + group_pkts[group_size++] = packets[order[pkt_idx]]; + pkt_idx++; + } + + total_passed += filter_vs_group( + context, + virtual_services + cur_vs_id, + group_pkts, + group_size, + pkt_ctxs + total_passed, + is_ipv6 + ); + } + + return total_passed; +} + +static size_t +select_reals( + struct worker_context *context, + struct packet_context *pkt_ctxs, + struct session *sessions, + size_t packet_count, + struct balancer_session_table_chain *st_chain, + uint64_t st_chain_gen +) { + const size_t prefetch_distance = 4; + + size_t kept = 0; + for (size_t pkt_idx = 0; pkt_idx < packet_count; ++pkt_idx) { + if (pkt_idx + prefetch_distance < packet_count) { + st_chain_prefetch_session( + st_chain, + st_chain_gen, + &sessions[pkt_idx + prefetch_distance].id + ); + } + + struct real *real = select_real( + context, + &pkt_ctxs[pkt_idx], + &sessions[pkt_idx], + st_chain, + st_chain_gen + ); + + if (unlikely(real == NULL)) { + packet_front_drop( + context->packet_front, pkt_ctxs[pkt_idx].packet + ); + continue; + } + + if (kept != pkt_idx) { + pkt_ctxs[kept] = pkt_ctxs[pkt_idx]; + } + pkt_ctxs[kept].selected_real = real; + pkt_ctxs[kept].selected_real_stats = real_fetch_stats( + real, context->worker_idx, context->counter_storage + ); + kept++; + } + + return kept; +} + +static void +tunnel_packets( + struct worker_context *context, + struct packet_context *pkt_ctxs, + size_t packet_count, + bool is_ipv6 +) { + struct packet_front *packet_front = context->packet_front; + for (size_t pkt_idx = 0; pkt_idx < packet_count; ++pkt_idx) { + struct packet_context *pkt_ctx = &pkt_ctxs[pkt_idx]; + + int res; + if (is_ipv6) { + res = tunnel_ip6_packet(pkt_ctx); + } else { + res = tunnel_ip4_packet(pkt_ctx); + } + + if (unlikely(res != 0)) { + context->l4_stats->tunnel_failed += 1; + packet_front_drop(packet_front, pkt_ctx->packet); + continue; + } + + packet_front_output(packet_front, pkt_ctx->packet); + + uint64_t pkt_len = packet_data_len(pkt_ctx->packet); + + context->common_stats->outgoing_packets += 1; + context->common_stats->outgoing_bytes += pkt_len; + + context->l4_stats->outgoing_packets += 1; + + pkt_ctx->matched_vs_stats->outgoing_packets += 1; + pkt_ctx->matched_vs_stats->outgoing_bytes += pkt_len; + + pkt_ctx->selected_real_stats->packets += 1; + pkt_ctx->selected_real_stats->bytes += pkt_len; + } +} + +/* + * L4 packet processing pipeline. + * + * 1. match_and_filter: VS lookup + ACL check (batched). + * Drops packets with no VS or failing ACL. + * + * 2. fill_sessions: parse IP/transport headers into + * session_id, timeout, and reschedule flag. + * + * 3. select_reals: look up or create a session, pick a + * real server. Runs inside two critical + * sections: + * - reals_selector_guard: RCU guard for real selector rings, + * held across both select and tunnel + * so the ring isn't freed mid-use. + * - session table CS: pins the current session map generation, + * released after all lookups are done. + * + * 4. tunnel_packets: encapsulate and forward to the chosen real. + */ +static void +balancer_handle_l4_packets( + struct worker_context *context, + struct packet **packets, + size_t packets_count, + bool is_ipv6 +) { + context->l4_stats->incoming_packets += packets_count; + + struct packet_context pkt_ctxs[packets_count]; + struct session sessions[packets_count]; + + size_t count = service_lookup( + context, packets, packets_count, pkt_ctxs, is_ipv6 + ); + + fill_sessions( + sessions, + pkt_ctxs, + count, + &context->config->session_timeouts, + is_ipv6 + ); + + struct balancer_session_table_chain *st_chain = + ADDR_OF(&context->config->st_chain); + + rcu_t *reals_selector_guard = &context->config->rcu; + rcu_read_begin(reals_selector_guard, context->worker_idx); + uint64_t st_chain_gen = + st_chain_begin_cs(st_chain, context->worker_idx); + + count = select_reals( + context, pkt_ctxs, sessions, count, st_chain, st_chain_gen + ); + + st_chain_end_cs(st_chain, context->worker_idx); + + rcu_read_end(reals_selector_guard, context->worker_idx); + + tunnel_packets(context, pkt_ctxs, count, is_ipv6); +} + +void +balancer_handle_l4_ip4( + struct worker_context *context, + struct packet **packets, + size_t packets_count +) { + const bool is_ipv6 = false; + balancer_handle_l4_packets(context, packets, packets_count, is_ipv6); +} + +void +balancer_handle_l4_ip6( + struct worker_context *context, + struct packet **packets, + size_t packets_count +) { + const bool is_ipv6 = true; + balancer_handle_l4_packets(context, packets, packets_count, is_ipv6); +} diff --git a/modules/balancer2/dataplane/l4/handle.h b/modules/balancer2/dataplane/l4/handle.h new file mode 100644 index 00000000..bef45c51 --- /dev/null +++ b/modules/balancer2/dataplane/l4/handle.h @@ -0,0 +1,21 @@ +#pragma once + +#include +#include + +struct packet; +struct worker_context; + +void +balancer_handle_l4_ip4( + struct worker_context *context, + struct packet **packets, + size_t packets_count +); + +void +balancer_handle_l4_ip6( + struct worker_context *context, + struct packet **packets, + size_t packets_count +); \ No newline at end of file diff --git a/modules/balancer2/dataplane/l4/packet.h b/modules/balancer2/dataplane/l4/packet.h new file mode 100644 index 00000000..5ee1d01b --- /dev/null +++ b/modules/balancer2/dataplane/l4/packet.h @@ -0,0 +1,14 @@ +#pragma once + +#include "real.h" +#include "vs.h" + +struct packet_context { + struct packet *packet; + + struct virtual_service *matched_vs; + struct balancer_vs_stats *matched_vs_stats; + + struct real *selected_real; + struct balancer_real_stats *selected_real_stats; +}; diff --git a/modules/balancer2/dataplane/l4/select.c b/modules/balancer2/dataplane/l4/select.c new file mode 100644 index 00000000..9439f43f --- /dev/null +++ b/modules/balancer2/dataplane/l4/select.c @@ -0,0 +1,288 @@ +#include +#include +#include + +#include +#include + +#include "common/big_array.h" +#include "common/memory_address.h" +#include "common/network.h" +#include "common/ttlmap/detail/lock.h" + +#include "lib/dataplane/packet/packet.h" + +#include "../session.h" +#include "context.h" +#include "packet.h" +#include "real.h" +#include "selector.h" +#include "session.h" +#include "vs.h" + +#include "types/stats.h" + +#define INVALID_REAL_IDX ((uint32_t)-1) + +static uint32_t +ring_get(struct ring *ring, uint64_t index) { + if (ring->real_ids.size > 0) { + uint32_t pos = index % (ring->real_ids.size / sizeof(uint32_t)); + uint32_t val; + memcpy(&val, + big_array_get(&ring->real_ids, pos * sizeof(uint32_t)), + sizeof(val)); + return val; + } else { + return INVALID_REAL_IDX; + } +} + +static uint32_t +selector_select( + struct real_selector *selector, uint32_t worker, uint32_t hash +) { + // TODO: dont need atomic here? + size_t ring_id = + atomic_load_explicit(&selector->ring_id, memory_order_relaxed); + + struct ring *ring = &selector->rings[ring_id]; + + uint64_t idx = (selector->workers_rr_counter[worker].value++ & + ~selector->packet_hash_mask) | + (hash & selector->packet_hash_mask); + return ring_get(ring, idx); +} + +static void +update_session_state( + struct balancer_session_state *session_state, + uint32_t now, + uint32_t session_timeout +) { + session_state->last_packet_timestamp = now; + session_state->timeout = session_timeout; +} + +static void +create_session_state( + struct balancer_session_state *session_state, + struct real *real, + uint32_t now, + uint32_t session_timeout +) { + session_state->create_timestamp = now; + session_state->last_packet_timestamp = now; + session_state->real_ip = real->addr; + session_state->ip_family = + (real_flags(real) & real_ip6 ? ip_family_ip6 : ip_family_ip4); + session_state->timeout = session_timeout; +} + +static struct real * +vs_lookup_real( + struct virtual_service *vs, uint8_t *dst, enum ip_family ip_family +) { + struct real *reals = ADDR_OF(&vs->reals); + uint8_t ip6_flag = real_ip6; + uint32_t addr_len = NET6_LEN; + if (ip_family == ip_family_ip4) { + ip6_flag = 0; + addr_len = NET4_LEN; + } + for (size_t i = 0; i < vs->reals_count; ++i) { + struct real *real = &reals[i]; + if ((real_flags(real) & ip6_flag) == ip6_flag && + memcmp(&real->addr, dst, addr_len) == 0) { + return real; + } + } + return NULL; +} + +/* + * Try to reuse the real from an existing session. + * Returns the real if still exists and enabled, NULL otherwise. + */ +static struct real * +try_reuse_session_real( + struct worker_context *context, + struct packet_context *pkt_ctx, + struct balancer_session_state *session_state +) { + struct virtual_service *vs = pkt_ctx->matched_vs; + struct balancer_vs_stats *vs_stats = pkt_ctx->matched_vs_stats; + + /* TODO: lookup real with hashtable. */ + struct real *real = vs_lookup_real( + vs, (uint8_t *)&session_state->real_ip, session_state->ip_family + ); + if (unlikely(real == NULL)) { + return NULL; + } + + if (unlikely((real_flags(real) & real_enabled) == 0)) { + struct balancer_real_stats *stats = real_fetch_stats( + real, context->worker_idx, context->counter_storage + ); + stats->packets_real_disabled += 1; + vs_stats->real_is_disabled += 1; + return NULL; + } + + return real; +} + +static struct real * +select_real_ops( + struct worker_context *context, + struct packet_context *pkt_ctx, + struct virtual_service *vs +) { + uint32_t real_idx = selector_select( + ADDR_OF(&vs->selector), + context->worker_idx, + pkt_ctx->packet->hash + ); + if (unlikely(real_idx == INVALID_REAL_IDX)) { + pkt_ctx->matched_vs_stats->no_reals += 1; + return NULL; + } + return ADDR_OF(&vs->reals) + real_idx; +} + +/* + * Select a new real from the ring and write it into the session slot. + * Returns the real, or NULL if the ring is empty. + */ +static struct real * +schedule_new_real( + struct worker_context *context, + struct packet_context *pkt_ctx, + struct session *session, + struct virtual_service *vs, + struct balancer_session_state *session_state +) { + uint32_t real_id = selector_select( + ADDR_OF(&vs->selector), + context->worker_idx, + pkt_ctx->packet->hash + ); + if (unlikely(real_id == INVALID_REAL_IDX)) { + pkt_ctx->matched_vs_stats->no_reals += 1; + return NULL; + } + + struct real *real = ADDR_OF(&vs->reals) + real_id; + create_session_state( + session_state, real, context->now, session->timeout + ); + + struct balancer_real_stats *real_stats = real_fetch_stats( + real, context->worker_idx, context->counter_storage + ); + real_stats->created_sessions += 1; + pkt_ctx->matched_vs_stats->created_sessions += 1; + + return real; +} + +struct real * +select_real( + struct worker_context *context, + struct packet_context *pkt_ctx, + struct session *session, + struct balancer_session_table_chain *st_chain, + uint64_t current_table_gen +) { + struct virtual_service *vs = pkt_ctx->matched_vs; + struct balancer_vs_stats *vs_stats = pkt_ctx->matched_vs_stats; + + /* OPS: stateless selection, no session involved. */ + if (vs->flags & vs_ops) { + return select_real_ops(context, pkt_ctx, vs); + } + + /* + * Acquire a session slot from the session table. + * + * st_get_or_create_session either finds an existing entry + * or allocates a new one. In both cases it returns a locked + * pointer to the session_state. The caller must eventually + * call st_unlock_session to release the lock. + * + * Possible results: + * - SESSION_FOUND: existing entry, session_state is populated. + * - SESSION_CREATED: new (or recycled) entry, session_state is blank. + * - SESSION_TABLE_OVERFLOW: table is full, no slot acquired, no lock + * held. + */ + struct balancer_session_state *session_state = NULL; + ttlmap_lock_t *session_lock; + int result = st_chain_get_or_create_session( + st_chain, + current_table_gen, + context->now, + session->timeout, + &session->id, + &session_state, + &session_lock + ); + + if (unlikely(result == SESSION_TABLE_OVERFLOW)) { + vs_stats->session_table_overflow += 1; + return NULL; + } + + /* + * From here on, session_lock is held and must be released + * before returning. session_state points to the locked slot. + */ + + /* Try to reuse the real from an existing session. */ + if (result == SESSION_FOUND) { + struct real *real = + try_reuse_session_real(context, pkt_ctx, session_state); + if (real != NULL) { + /* Real is still valid -- prolong the session. */ + update_session_state( + session_state, context->now, session->timeout + ); + st_chain_unlock_session(session_lock); + return real; + } + /* + * Real is gone (disabled or removed). The session slot + * is still allocated and locked -- fall through to + * reschedule, which will overwrite it with a new real. + */ + } + + /* + * At this point we need to assign a new real to the session slot. + * This happens when: + * - SESSION_CREATED: no prior session existed (new flow). + * - SESSION_FOUND but real is stale (disabled/removed). + * + * Only connection-initiating packets may create or overwrite + * sessions. Non-initiating packets (TCP non-SYN) are dropped + * and the session slot is freed. + */ + if (unlikely(!session->can_reschedule)) { + vs_stats->not_rescheduled_packets += 1; + st_chain_remove_session(session_state); + st_chain_unlock_session(session_lock); + return NULL; + } + + /* Select a new real and write it into the session slot. */ + struct real *real = + schedule_new_real(context, pkt_ctx, session, vs, session_state); + if (unlikely(real == NULL)) { + st_chain_remove_session(session_state); + } + + st_chain_unlock_session(session_lock); + + return real; +} \ No newline at end of file diff --git a/modules/balancer2/dataplane/l4/select.h b/modules/balancer2/dataplane/l4/select.h new file mode 100644 index 00000000..a92bc260 --- /dev/null +++ b/modules/balancer2/dataplane/l4/select.h @@ -0,0 +1,54 @@ +#pragma once + +#include + +struct worker_context; +struct session; +struct packet_context; +struct balancer_real; +struct balancer_session_table_chain; + +/* + * Select the real server that should handle this packet. + * + * In OPS (one-packet-scheduling) mode, each packet is independently + * assigned to a real via the selector ring. No session is created + * or consulted. + * + * Otherwise, a session slot is acquired from the session table via + * st_get_or_create_session, which either finds an existing entry or + * allocates a new one. In both cases a locked pointer to the + * session_state is returned. The slot is used as follows: + * + * - Session found, real is valid and enabled: + * The session is prolonged (timestamps and timeout updated) + * and the same real is returned. + * + * - Session found, but the real was disabled or removed: + * The session is stale. If the packet is reschedulable + * (TCP SYN or any UDP), a new real is selected from the ring + * and the session slot is overwritten with the new real. + * Non-reschedulable packets (TCP non-SYN) are dropped and + * the session slot is freed. + * + * - No session found (new flow): + * A blank slot is allocated. If the packet is reschedulable, + * a new real is selected and written into the slot. + * Non-reschedulable packets are dropped and the slot is + * freed -- this handles stray TCP ACKs arriving after + * session timeout. + * + * The session lock is always released before returning. + * + * Returns the selected real, or NULL if the packet should be + * dropped. The caller is responsible for dropping NULL-result + * packets. + */ +struct real * +select_real( + struct worker_context *context, + struct packet_context *pkt_ctx, + struct session *session, + struct balancer_session_table_chain *st_chain, + uint64_t st_chain_gen +); diff --git a/modules/balancer2/dataplane/l4/session.c b/modules/balancer2/dataplane/l4/session.c new file mode 100644 index 00000000..301ccdf9 --- /dev/null +++ b/modules/balancer2/dataplane/l4/session.c @@ -0,0 +1,142 @@ +#include +#include +#include +#include +#include + +#include "common/network.h" + +#include "lib/dataplane/packet/data.h" + +#include "packet.h" +#include "session.h" +#include "types/session.h" + +static void +extract_ipv4(struct packet *packet, struct session *session) { + struct rte_ipv4_hdr *hdr = rte_pktmbuf_mtod_offset( + packet->mbuf, + struct rte_ipv4_hdr *, + packet->network_header.offset + ); + rte_memcpy(&session->id.client_ip, (uint8_t *)&hdr->src_addr, NET4_LEN); + rte_memcpy(&session->id.vip, (uint8_t *)&hdr->dst_addr, NET4_LEN); +} + +static void +extract_ipv6(struct packet *packet, struct session *session) { + struct rte_ipv6_hdr *hdr = rte_pktmbuf_mtod_offset( + packet->mbuf, + struct rte_ipv6_hdr *, + packet->network_header.offset + ); + rte_memcpy(&session->id.client_ip, hdr->src_addr, NET6_LEN); + rte_memcpy(&session->id.vip, hdr->dst_addr, NET6_LEN); +} + +static void +extract_network(struct packet *packet, struct session *session, bool is_ipv6) { + if (is_ipv6) { + extract_ipv6(packet, session); + } else { + extract_ipv4(packet, session); + } +} + +static uint32_t +tcp_timeout(struct balancer_session_timeouts *timeouts, uint16_t tcp_flags) { + if ((tcp_flags & RTE_TCP_SYN_FLAG) == RTE_TCP_SYN_FLAG) { + if ((tcp_flags & RTE_TCP_ACK_FLAG) == RTE_TCP_ACK_FLAG) { + return timeouts->tcp_syn_ack; + } + return timeouts->tcp_syn; + } + if (tcp_flags & RTE_TCP_FIN_FLAG) { + return timeouts->tcp_fin; + } + return timeouts->tcp; +} + +static void +extract_tcp( + struct packet *packet, + struct session *session, + struct balancer_session_timeouts *timeouts +) { + struct rte_tcp_hdr *hdr = rte_pktmbuf_mtod_offset( + packet->mbuf, + struct rte_tcp_hdr *, + packet->transport_header.offset + ); + + uint8_t flags = hdr->tcp_flags; + + session->id.client_port = hdr->src_port; + session->id.vs_port = hdr->dst_port; + session->id.transport = tcp; + + session->timeout = tcp_timeout(timeouts, flags); + session->can_reschedule = (flags & (RTE_TCP_SYN_FLAG | RTE_TCP_RST_FLAG) + ) == RTE_TCP_SYN_FLAG; +} + +static void +extract_udp( + struct packet *packet, + struct session *session, + struct balancer_session_timeouts *timeouts +) { + struct rte_udp_hdr *hdr = rte_pktmbuf_mtod_offset( + packet->mbuf, + struct rte_udp_hdr *, + packet->transport_header.offset + ); + + session->id.client_port = hdr->src_port; + session->id.vs_port = hdr->dst_port; + session->id.transport = udp; + + session->timeout = timeouts->udp; + session->can_reschedule = true; +} + +static void +extract_transport( + struct packet *packet, + struct session *session, + struct balancer_session_timeouts *timeouts +) { + if (packet->transport_header.type == IPPROTO_TCP) { + extract_tcp(packet, session, timeouts); + } else { + extract_udp(packet, session, timeouts); + } +} + +void +fill_sessions( + struct session *sessions, + struct packet_context *pkt_ctxs, + size_t count, + struct balancer_session_timeouts *timeouts, + bool is_ipv6 +) { + const size_t prefetch_distance = 2; + + for (size_t pkt_idx = 0; pkt_idx < count; ++pkt_idx) { + if (pkt_idx + prefetch_distance < count) { + struct rte_mbuf *mbuf = packet_to_mbuf( + pkt_ctxs[pkt_idx + prefetch_distance].packet + ); + rte_prefetch0(mbuf); + } + + struct packet *packet = pkt_ctxs[pkt_idx].packet; + struct session *session = &sessions[pkt_idx]; + + memset(&session->id, 0, sizeof(struct balancer_session_id)); + + extract_network(packet, session, is_ipv6); + extract_transport(packet, session, timeouts); + } +} diff --git a/modules/balancer2/dataplane/l4/session.h b/modules/balancer2/dataplane/l4/session.h new file mode 100644 index 00000000..c8f04254 --- /dev/null +++ b/modules/balancer2/dataplane/l4/session.h @@ -0,0 +1,36 @@ +#pragma once + +#include +#include +#include + +#include "types/session.h" + +struct session { + struct balancer_session_id id; + uint32_t timeout; + bool can_reschedule; +}; + +struct packet_context; + +/* + * Parse packet headers into session state for a batch of packets. + * + * For each packet in pkt_ctxs, fills the corresponding entry in + * sessions with the session id (vs_id + client_ip + client_port), + * the timeout derived from the transport protocol, and the + * can_reschedule flag that tells select_real whether a new real + * may be assigned to this flow. + * + * pkt_ctxs and sessions are parallel arrays of pkt_ctx_count + * elements. + */ +void +fill_sessions( + struct session *sessions, + struct packet_context *pkt_ctxs, + size_t count, + struct balancer_session_timeouts *timeouts, + bool is_ipv6 +); diff --git a/modules/balancer2/dataplane/l4/tunnel.c b/modules/balancer2/dataplane/l4/tunnel.c new file mode 100644 index 00000000..a7b502a5 --- /dev/null +++ b/modules/balancer2/dataplane/l4/tunnel.c @@ -0,0 +1,124 @@ +#include + +#include +#include +#include + +#include "common/network.h" + +#include "lib/dataplane/packet/data.h" +#include "lib/dataplane/packet/encap.h" + +#include "packet.h" +#include "real.h" +#include "tunnel.h" + +/* + * Build an outer IPv4 tunnel source by embedding client source bits into + * the unmasked positions of the configured src network address: + */ +static inline void +build_outer_src4( + uint8_t *out, const uint8_t *client_src, const struct net4 *src_net +) { + memcpy(out, src_net->addr, NET4_LEN); + for (size_t i = 0; i < NET4_LEN; ++i) { + out[i] |= client_src[i] & ~src_net->mask[i]; + } +} + +static inline void +build_outer_src6( + uint8_t *out, + const uint8_t *client_src, + uint8_t client_len, + const struct net6 *src_net +) { + memcpy(out, src_net->addr, NET6_LEN); + for (uint8_t i = 0; i < client_len; ++i) { + out[i] |= client_src[i] & ~src_net->mask[i]; + } +} + +/* + * Encapsulate an inner-IPv4 packet in an IP tunnel and embed the + * client source address into the outer header. + */ +static int +encapsulate_ipv4(struct packet *packet, struct real *real) { + struct rte_mbuf *mbuf = packet_to_mbuf(packet); + bool is_outer_ipv6 = real_flags(real) & real_ip6; + + struct rte_ipv4_hdr *inner = rte_pktmbuf_mtod_offset( + mbuf, struct rte_ipv4_hdr *, packet->network_header.offset + ); + const uint8_t *client_src = (const uint8_t *)&inner->src_addr; + + if (is_outer_ipv6) { + uint8_t outer_src[NET6_LEN]; + build_outer_src6( + outer_src, client_src, NET4_LEN, &real->src.v6 + ); + return packet_ip6_encap(packet, real->addr.v6.bytes, outer_src); + } else { + uint8_t outer_src[NET4_LEN]; + build_outer_src4(outer_src, client_src, &real->src.v4); + return packet_ip4_encap(packet, real->addr.v4.bytes, outer_src); + } +} + +static int +encapsulate_ipv6(struct packet *packet, struct real *real) { + struct rte_mbuf *mbuf = packet_to_mbuf(packet); + bool is_outer_ipv6 = real_flags(real) & real_ip6; + + struct rte_ipv6_hdr *inner = rte_pktmbuf_mtod_offset( + mbuf, struct rte_ipv6_hdr *, packet->network_header.offset + ); + const uint8_t *client_src = (const uint8_t *)inner->src_addr; + + if (is_outer_ipv6) { + uint8_t outer_src[NET6_LEN]; + build_outer_src6( + outer_src, client_src, NET6_LEN, &real->src.v6 + ); + return packet_ip6_encap(packet, real->addr.v6.bytes, outer_src); + } else { + uint8_t outer_src[NET4_LEN]; + build_outer_src4(outer_src, client_src, &real->src.v4); + return packet_ip4_encap(packet, real->addr.v4.bytes, outer_src); + } +} + +/* + * Tunnel a packet whose inner layer is IPv4. + */ +int +tunnel_ip4_packet(struct packet_context *pkt_ctx) { + struct packet *packet = pkt_ctx->packet; + struct real *real = pkt_ctx->selected_real; + + /* No MSS clamping for inner IPv4. */ + + /* TODO: check GRE encapsulation */ + + return encapsulate_ipv4(packet, real); +} + +/* + * Tunnel a packet whose inner layer is IPv6. + * + * MSS clamping is applied when the VS has balancer_vs_fix_mss set. + */ +int +tunnel_ip6_packet(struct packet_context *pkt_ctx) { + struct packet *packet = pkt_ctx->packet; + struct real *real = pkt_ctx->selected_real; + + /* TODO: fix mss */ + /* On error, update corresponding VS counter and continue (no drop). */ + + /* TODO: check GRE encapsulation */ + + return encapsulate_ipv6(packet, real); +} diff --git a/modules/balancer2/dataplane/l4/tunnel.h b/modules/balancer2/dataplane/l4/tunnel.h new file mode 100644 index 00000000..3cf1b799 --- /dev/null +++ b/modules/balancer2/dataplane/l4/tunnel.h @@ -0,0 +1,27 @@ +#pragma once + +struct packet_context; + +/* + * Tunnel a packet whose inner layer is IPv4. + * + * Encapsulates the packet in an IP-in-IP tunnel towards the resolved + * real server, embeds the client source IP into the outer header, + * optionally wraps in GRE, and updates forwarding statistics. + * + * MSS clamping is not performed — it only applies to inner IPv6. + */ +int +tunnel_ip4_packet(struct packet_context *pkt_ctx); + +/* + * Tunnel a packet whose inner layer is IPv6. + * + * Encapsulates the packet in an IP-in-IP tunnel towards the resolved + * real server, embeds the client source IP into the outer header, + * optionally wraps in GRE, and updates forwarding statistics. + * + * MSS clamping is applied when the VS has balancer_vs_fix_mss set. + */ +int +tunnel_ip6_packet(struct packet_context *pkt_ctx); diff --git a/modules/balancer2/dataplane/meson.build b/modules/balancer2/dataplane/meson.build new file mode 100644 index 00000000..81562462 --- /dev/null +++ b/modules/balancer2/dataplane/meson.build @@ -0,0 +1,38 @@ +dp_dependencies = [ + lib_common_dep, + lib_packet_dp_dep, + lib_module_dp_dep, + lib_worker_dp_dep, + lib_filter_query_dep, +] + +includes = include_directories('.', '../../../') + +dp_sources = files( + 'dataplane.c', + 'l4/handle.c', + 'l4/select.c', + 'l4/tunnel.c', + 'l4/group.c', + 'l4/session.c', + 'icmp/handle.c', +) + +lib_balancer2_dp = static_library( + 'balancer2_dp', + dp_sources, + c_args: yanet_c_args, + link_args: yanet_link_args, + dependencies: dp_dependencies, + include_directories: includes, + install: false, +) + +lib_balancer2_dp_dep = declare_dependency( + link_with: lib_balancer2_dp, + link_args: [ + '-Wl,--defsym', + '-Wl,new_module_balancer2=new_module_balancer2', + '-Wl,--export-dynamic-symbol=new_module_balancer2', + ], +) diff --git a/modules/balancer2/dataplane/real.h b/modules/balancer2/dataplane/real.h new file mode 100644 index 00000000..bcc5852b --- /dev/null +++ b/modules/balancer2/dataplane/real.h @@ -0,0 +1,56 @@ +#pragma once + +#include +#include + +#include "common/network.h" + +#include "lib/counters/counters.h" + +enum real_flags { + real_enabled = 1u << 0, + real_ip6 = 1u << 1, +}; + +/* A real (backend) server within a virtual service. */ +struct real { + /* Destination IP address of the real server (IPv4 or IPv6). */ + struct net_addr addr; + + /* + * Source network for the outer tunnel header. + * + * Contains both the base address (src.v4.addr / src.v6.addr) + * and the mask (src.v4.mask / src.v6.mask). + * + * INVARIANT: the address bytes must be pre-masked by the + * controlplane, i.e. (addr[i] & mask[i]) == addr[i] for every + * byte i. The tunnel code relies on this to embed client source + * IP bits into the unmasked positions without an extra AND: + * + * outer_src[i] = addr[i] | (client_src[i] & ~mask[i]) + * + * Use v4 when real_ip6 is clear, v6 when set. + */ + struct net src; + + uint64_t counter_id; + + _Atomic uint8_t flags; +}; + +static inline struct balancer_real_stats * +real_fetch_stats( + struct real *real, + uint32_t worker, + struct counter_storage *counter_storage +) { + return (struct balancer_real_stats *)counter_get_address( + real->counter_id, worker, counter_storage + ); +} + +static inline uint8_t +real_flags(struct real *real) { + return atomic_load_explicit(&real->flags, memory_order_relaxed); +} \ No newline at end of file diff --git a/modules/balancer2/dataplane/selector.h b/modules/balancer2/dataplane/selector.h new file mode 100644 index 00000000..d7104d42 --- /dev/null +++ b/modules/balancer2/dataplane/selector.h @@ -0,0 +1,52 @@ +#pragma once + +#include +#include + +#include "common/big_array.h" + +/** + * Ring containing real indices. + * + * Each backend appears multiple times according to its weight. + * The ring is shuffled to distribute selections evenly. + */ +struct ring { + /* + * Indices of backend servers. + * + * Stored in a big array because the weighted list can exceed + * the allocator's maximum block size. + */ + struct big_array real_ids; +}; + +/** + * Round-robin counter. + * + * Used to track the position of the current real in the ring. + */ +struct rr_counter { + uint64_t value; +} __attribute__((aligned(64))); + +/** + * Real backend selector. + * + * Maintains two rings for RCU-swapped updates and per-worker RR counters. + * Uses either round-robin or hash-based selection, + * depending on the virtual server scheduler. + */ +struct real_selector { + /* Double-buffered rings. */ + struct ring rings[2]; + + /* Active ring index. */ + _Atomic uint64_t ring_id; + + /* TODO: docs */ + uint64_t packet_hash_mask; + + /* Array of per-worker round-robin counters. */ + struct rr_counter workers_rr_counter[]; +}; diff --git a/modules/balancer2/dataplane/session.h b/modules/balancer2/dataplane/session.h new file mode 100644 index 00000000..76c40b18 --- /dev/null +++ b/modules/balancer2/dataplane/session.h @@ -0,0 +1,177 @@ +#pragma once + +#include + +#include "common/memory_address.h" +#include "common/rcu.h" +#include "common/ttlmap/ttlmap.h" + +#include "types/session.h" + +struct balancer_session_table { + struct ttlmap map; +}; + +/* Mark a session slot as empty. Must be called while the lock is held. */ +static inline void +st_chain_remove_session(struct balancer_session_state *session_state) { + TTLMAP_REMOVE(struct balancer_session_id, session_state); +} + +/* Release the session lock. */ +static inline void +st_chain_unlock_session(ttlmap_lock_t *lock) { + ttlmap_release_lock(lock); +} + +/* + * All access to session table chain must happen within a critical + * section (st_begin_cs / st_end_cs). The critical section pins the + * current generation, ensuring the controlplane does not + * free a map while workers are still reading it. + */ +struct balancer_session_table_chain { + _Atomic uint64_t gen; + struct balancer_session_table *tables[2]; + rcu_t rcu; +}; + +/* + * Enter a session table chain critical section. + * Returns the current generation, which must be passed to + * all subsequent st_* calls within this critical section. + */ +static inline uint64_t +st_chain_begin_cs( + struct balancer_session_table_chain *st_chain, uint32_t worker +) { + return RCU_READ_BEGIN(&st_chain->rcu, worker, &st_chain->gen); +} + +/* + * Leave a session table chain critical section. + * After this call, the controlplane may update the chain. + */ +static inline void +st_chain_end_cs( + struct balancer_session_table_chain *st_chain, uint32_t worker +) { + RCU_READ_END(&st_chain->rcu, worker); +} + +static inline uint32_t +st_chain_current_session_table_index(uint64_t gen) { + return ((gen + 1) & 0b11) >> 1; + ; +} + +static inline void +st_chain_read_front_back( + struct balancer_session_table_chain *st_chain, + uint64_t gen, + struct balancer_session_table **front, + struct balancer_session_table **back +) { + uint32_t cur = st_chain_current_session_table_index(gen); + *front = ADDR_OF(&st_chain->tables[cur]); + *back = ADDR_OF(&st_chain->tables[cur ^ 1]); +} + +/* + * Whether the previous map should be consulted. + * True during transition generations (odd), when sessions may + * still reside in the session table pending migration. + */ +static inline int +st_chain_prev_table_used(uint64_t table_gen) { + return table_gen & 1; +} + +#define SESSION_TABLE_OVERFLOW 0 +#define SESSION_FOUND 1 +#define SESSION_CREATED 2 + +/* + * Look up or create a session entry. + * + * First tries the current map. If the session is found, returns + * SESSION_FOUND with session_state pointing to the existing entry. + * + * If the session is not found, a new slot is allocated in the + * current map. During transition generations (odd), the previous + * map is also checked -- if the session exists there, its state + * is copied into the new slot and SESSION_FOUND is returned. + * This handles sessions that have not yet been migrated. + * + * If neither map has the session, returns SESSION_CREATED with + * a blank slot allocated in the current map. + * + * Returns SESSION_TABLE_OVERFLOW if the current map is full and + * no slot could be allocated. In this case no lock is held. + * + * On SESSION_FOUND or SESSION_CREATED, the returned session_state + * is locked via *lock. The caller must call st_unlock_session + * after modifying the state. + */ +static inline int +st_chain_get_or_create_session( + struct balancer_session_table_chain *st_chain, + uint64_t gen, + uint32_t now, + uint32_t timeout, + struct balancer_session_id *session_id, + struct balancer_session_state **session_state, + ttlmap_lock_t **lock +) { + struct balancer_session_table *front; + struct balancer_session_table *back; + st_chain_read_front_back(st_chain, gen, &front, &back); + + int res = TTLMAP_GET( + &front->map, session_id, session_state, lock, now, timeout + ); + int status = TTLMAP_STATUS(res); + + int result_status; + if (status == TTLMAP_FOUND) { + result_status = SESSION_FOUND; + } else if (status == TTLMAP_INSERTED || status == TTLMAP_REPLACED) { + if (!st_chain_prev_table_used(gen)) { + result_status = SESSION_CREATED; + } else { + /* + * New slot allocated in front map. During transition + * (odd gen), check the back map for a session that + * has not been migrated yet. + */ + int lookup_res = TTLMAP_LOOKUP( + &back->map, session_id, *session_state, now + ); + if (TTLMAP_STATUS(lookup_res) == TTLMAP_FAILED) { + result_status = SESSION_CREATED; + } else { + result_status = SESSION_FOUND; + } + } + } else { // status == TTLMAP_FAILED + result_status = SESSION_TABLE_OVERFLOW; + } + + return result_status; +} + +static inline void +st_chain_prefetch_session( + struct balancer_session_table_chain *st_chain, + uint64_t gen, + struct balancer_session_id *session_id +) { + struct balancer_session_table *front; + struct balancer_session_table *back; + + st_chain_read_front_back(st_chain, gen, &front, &back); + + TTLMAP_PREFETCH( + &front->map, session_id, struct balancer_session_state, 1, 0 + ); +} \ No newline at end of file diff --git a/modules/balancer2/dataplane/types/session.h b/modules/balancer2/dataplane/types/session.h index ea05eb01..982d5a8f 100644 --- a/modules/balancer2/dataplane/types/session.h +++ b/modules/balancer2/dataplane/types/session.h @@ -1,7 +1,26 @@ #pragma once +#include "common/network.h" + +#include #include +struct balancer_session_id { + uint16_t client_port; + uint16_t vs_port; + struct net_addr vip; + struct net_addr client_ip; + enum transport_proto transport; +}; + +struct balancer_session_state { + uint32_t last_packet_timestamp; + uint32_t create_timestamp; + uint32_t timeout; + struct net_addr real_ip; + enum ip_family ip_family; +}; + struct balancer_session_timeouts { uint32_t tcp_syn_ack; uint32_t tcp_syn; diff --git a/modules/balancer2/dataplane/types/stats.h b/modules/balancer2/dataplane/types/stats.h new file mode 100644 index 00000000..4678cadb --- /dev/null +++ b/modules/balancer2/dataplane/types/stats.h @@ -0,0 +1,168 @@ +#pragma once + +#include + +// TODO: docs +struct balancer_common_stats { + uint64_t incoming_packets; + uint64_t incoming_bytes; + uint64_t unexpected_network_proto; + uint64_t unexpected_transport_proto; + uint64_t decap_successful; + uint64_t decap_failed; + uint64_t outgoing_packets; + uint64_t outgoing_bytes; +}; + +// TODO: docs +struct balancer_l4_stats { + uint64_t incoming_packets; + uint64_t select_vs_failed; + uint64_t tunnel_failed; + uint64_t select_real_failed; + uint64_t outgoing_packets; +}; + +/** + * Per-virtual-service runtime counters. + * + * Tracks packet processing statistics for a specific virtual service, + * including successful forwards, various failure conditions, and + * session management metrics. + */ +struct balancer_vs_stats { + /* Total packets received matching this virtual service. */ + uint64_t incoming_packets; + + /* Total bytes received matching this virtual service. */ + uint64_t incoming_bytes; + + /* Packets dropped due to source address not in allowlist. */ + uint64_t packet_src_not_allowed; + + /* + * Packets that failed real server selection. + * + * Incremented when: + * - No real servers are configured + * - All real servers are disabled + * - All real servers have zero weight. + */ + uint64_t no_reals; + + /* Session creation failures due to table capacity. */ + uint64_t session_table_overflow; + + /* ICMP echo packets processed. */ + uint64_t echo_icmp_packets; + + /* + * ICMP error packets forwarded to real servers. + * + * Tracks ICMP errors (destination unreachable, time exceeded, + * etc.) that were matched to sessions and forwarded to the + * appropriate real server. + */ + uint64_t error_icmp_packets; + + /* + * Packets for sessions where the real server is disabled. + * + * Incremented when: + * - Session exists for a specific real + * - That real is currently disabled + * - Packet arrives for the session + * + * These packets are dropped. + */ + uint64_t real_is_disabled; + + /* + * Packets for sessions where the real server was removed. + * + * Incremented when: + * - Session exists for a specific real + * - That real is no longer in the configuration + * - Packet arrives for the session + * + * These packets are dropped. + */ + uint64_t real_is_removed; + + /* + * Packets that couldn't be rescheduled. + * + * Incremented when: + * - No existing session found + * - Packet doesn't start a new session (e.g., TCP non-SYN) + * + * Common for: + * - TCP packets without SYN flag when no session exists + * - Packets arriving after session timeout + */ + uint64_t not_rescheduled_packets; + + /* + * ICMP packets broadcasted to peer balancers. + * + * Incremented when: + * - ICMP error has this VS as source + * - Packet is cloned and sent to configured peers + * - Used for distributed ICMP error handling + */ + uint64_t broadcasted_icmp_packets; + + /* + * Total sessions created for this virtual service. + * + * Tracks the cumulative number of sessions created since + * the balancer started or statistics were reset. Does not + * include OPS packets (which don't create sessions). + */ + uint64_t created_sessions; + + /* Packets successfully forwarded to real servers. */ + uint64_t outgoing_packets; + + /* Failed to fix MSS because of malformed TCP options. */ + uint64_t malformed_tcp; + + /* Failed to fix MSS because of rte_mbuf_prepend failure. */ + uint64_t mss_prepend_failed; + + /* Bytes successfully forwarded to real servers (IP layer). */ + uint64_t outgoing_bytes; +}; + +/* + * Per-real-server statistics. + * + * Tracks packet processing and session creation for a specific real + * server within a virtual service. + */ +struct balancer_real_stats { + /* + * Packets for sessions assigned to this real when it was disabled. + * + * Incremented when: + * - A session exists for this real + * - The real is currently disabled + * - A packet arrives for that session + * + * This indicates packets that were dropped or rescheduled because + * the real was disabled after the session was created. + */ + uint64_t packets_real_disabled; + + /* ICMP error packets forwarded to this real server. */ + uint64_t error_icmp_packets; + + /* Total number of new sessions created with this real as backend. */ + uint64_t created_sessions; + + /* Total packets forwarded to this real server. */ + uint64_t packets; + + /* Total bytes forwarded to this real server. */ + uint64_t bytes; +}; diff --git a/modules/balancer2/dataplane/vs.h b/modules/balancer2/dataplane/vs.h new file mode 100644 index 00000000..1e71c001 --- /dev/null +++ b/modules/balancer2/dataplane/vs.h @@ -0,0 +1,59 @@ +#pragma once + +#include +#include +#include + +#include "filter/filter.h" + +#include "selector.h" + +#include "lib/counters/counters.h" + +enum vs_flags { + vs_fix_mss = 1u << 0, + vs_ops = 1u << 1, + vs_gre = 1u << 2, + vs_ip6 = 1u << 3 +}; + +struct virtual_service { + struct real *reals; + uint32_t reals_count; + + uint64_t counter_id; + + struct filter acl; + + uint64_t *rule_counter_ids; + size_t rule_count; + + uint8_t flags; + + struct real_selector *selector; +}; + +static inline struct balancer_vs_stats * +vs_fetch_stats( + struct virtual_service *vs, + uint32_t worker, + struct counter_storage *counter_storage +) { + return (struct balancer_vs_stats *)counter_get_address( + vs->counter_id, worker, counter_storage + ); +} + +static inline uint64_t * +vs_fetch_acl_stats( + struct virtual_service *vs, + uint32_t worker, + struct counter_storage *counter_storage, + uint32_t rule_idx +) { + // Rule counter is undefined if tag is empty + uint64_t id = ADDR_OF(&vs->rule_counter_ids)[rule_idx]; + return id != (uint64_t)-1 + ? counter_get_address(id, worker, counter_storage) + : NULL; +} diff --git a/modules/balancer2/meson.build b/modules/balancer2/meson.build new file mode 100644 index 00000000..c999500d --- /dev/null +++ b/modules/balancer2/meson.build @@ -0,0 +1,2 @@ +subdir('dataplane') +subdir('api') \ No newline at end of file diff --git a/modules/meson.build b/modules/meson.build index db02d722..4a3ddbdf 100644 --- a/modules/meson.build +++ b/modules/meson.build @@ -8,3 +8,4 @@ subdir('forward') subdir('dscp') subdir('nat64') subdir('pdump') +subdir('balancer2') \ No newline at end of file