From f7ab05f70bdbe87c244761e321d4af5211b3e5b0 Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Tue, 15 Jul 2025 11:33:39 +0000 Subject: [PATCH 01/11] Force to auto-detect available port using P2P handshake mode --- .../src/transfer_engine.cpp | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/mooncake-transfer-engine/src/transfer_engine.cpp b/mooncake-transfer-engine/src/transfer_engine.cpp index 3f62b1ae5..8637f09de 100644 --- a/mooncake-transfer-engine/src/transfer_engine.cpp +++ b/mooncake-transfer-engine/src/transfer_engine.cpp @@ -55,9 +55,11 @@ int TransferEngine::init(const std::string &metadata_conn_string, rpc_binding_method = "legacy/P2P"; #ifdef USE_ASCEND int device_id = -1; - auto [host_name, port] = parseHostNameWithPortAscend(local_server_name, &device_id); - LOG(INFO) << "Transfer Engine parseHostNameWithPortAscend. Server: " << host_name << " port: " - << port << " device_id: " << device_id; + auto [host_name, port] = + parseHostNameWithPortAscend(local_server_name, &device_id); + LOG(INFO) << "Transfer Engine parseHostNameWithPortAscend. Server: " + << host_name << " port: " << port + << " device_id: " << device_id; #else auto [host_name, port] = parseHostNameWithPort(local_server_name); #endif @@ -67,17 +69,15 @@ int TransferEngine::init(const std::string &metadata_conn_string, if (metadata_conn_string == P2PHANDSHAKE) { rpc_binding_method = "P2P handshake"; - if (port == getDefaultHandshakePort()) { - desc.rpc_port = findAvailableTcpPort(desc.sockfd); - if (desc.rpc_port == 0) { - LOG(ERROR) - << "P2P: No valid port found for local TCP service."; - return -1; - } + desc.rpc_port = findAvailableTcpPort(desc.sockfd); + if (desc.rpc_port == 0) { + LOG(ERROR) << "P2P: No valid port found for local TCP service."; + return -1; } #ifdef USE_ASCEND - local_server_name_ = - desc.ip_or_host_name + ":" + std::to_string(desc.rpc_port) + ":npu_" + std::to_string(device_id); + local_server_name_ = desc.ip_or_host_name + ":" + + std::to_string(desc.rpc_port) + ":npu_" + + std::to_string(device_id); #else local_server_name_ = desc.ip_or_host_name + ":" + std::to_string(desc.rpc_port); @@ -143,7 +143,8 @@ int TransferEngine::init(const std::string &metadata_conn_string, << local_topology_->getHcaList().size() << " HCAs."; #ifdef USE_MNNVL - if (local_topology_->getHcaList().size() > 0 && !getenv("MC_FORCE_MNNVL")) { + if (local_topology_->getHcaList().size() > 0 && + !getenv("MC_FORCE_MNNVL")) { multi_transports_->installTransport("rdma", local_topology_); } else { multi_transports_->installTransport("nvlink", nullptr); From 1cb4370b2bf628b541793cce364cffd045b5e115 Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Tue, 15 Jul 2025 11:39:23 +0000 Subject: [PATCH 02/11] Move listen inside findAvailableTcpPort --- .../src/transfer_metadata_plugin.cpp | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index 646807f19..1241e63d6 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -629,12 +629,12 @@ struct SocketHandShakePlugin : public HandShakePlugin { return ERR_SOCKET; } } - } - if (listen(listen_fd_, listen_backlog_)) { - PLOG(ERROR) << "SocketHandShakePlugin: listen()"; - closeListen(); - return ERR_SOCKET; + if (listen(listen_fd_, listen_backlog_)) { + PLOG(ERROR) << "SocketHandShakePlugin: listen()"; + closeListen(); + return ERR_SOCKET; + } } listener_running_ = true; @@ -1119,6 +1119,14 @@ uint16_t findAvailableTcpPort(int &sockfd) { } } + auto &config = globalConfig(); + auto listen_backlog_ = config.handshake_listen_backlog; + if (listen(sockfd, listen_backlog_)) { + close(sockfd); + sockfd = -1; + continue; + } + return port; } return 0; From 80645f60ef6a3c2fa512385cbea87b22f1cf71d6 Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Wed, 16 Jul 2025 01:41:49 +0000 Subject: [PATCH 03/11] Revert "Move listen inside findAvailableTcpPort" This reverts commit 1cb4370b2bf628b541793cce364cffd045b5e115. --- .../src/transfer_metadata_plugin.cpp | 18 +++++------------- 1 file changed, 5 insertions(+), 13 deletions(-) diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index 1241e63d6..646807f19 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -629,12 +629,12 @@ struct SocketHandShakePlugin : public HandShakePlugin { return ERR_SOCKET; } } + } - if (listen(listen_fd_, listen_backlog_)) { - PLOG(ERROR) << "SocketHandShakePlugin: listen()"; - closeListen(); - return ERR_SOCKET; - } + if (listen(listen_fd_, listen_backlog_)) { + PLOG(ERROR) << "SocketHandShakePlugin: listen()"; + closeListen(); + return ERR_SOCKET; } listener_running_ = true; @@ -1119,14 +1119,6 @@ uint16_t findAvailableTcpPort(int &sockfd) { } } - auto &config = globalConfig(); - auto listen_backlog_ = config.handshake_listen_backlog; - if (listen(sockfd, listen_backlog_)) { - close(sockfd); - sockfd = -1; - continue; - } - return port; } return 0; From 67029ecaa2a3945d8362299e627bd1e73fd4d391 Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Wed, 16 Jul 2025 01:51:42 +0000 Subject: [PATCH 04/11] Config SO_REUSEPORT to avoid bind success but listen failed --- .../src/transfer_metadata_plugin.cpp | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index 646807f19..9ea328eaf 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -600,6 +600,14 @@ struct SocketHandShakePlugin : public HandShakePlugin { return ERR_SOCKET; } + int one = 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { + PLOG(ERROR) + << "SocketHandShakePlugin: setsockopt(SO_REUSEPORT)"; + closeListen(); + return ERR_SOCKET; + } + if (globalConfig().use_ipv6) { sockaddr_in6 bind_address; memset(&bind_address, 0, sizeof(sockaddr_in6)); @@ -1093,6 +1101,13 @@ uint16_t findAvailableTcpPort(int &sockfd) { continue; } + int one = 1; + if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { + close(sockfd); + sockfd = -1; + continue; + } + if (use_ipv6) { sockaddr_in6 bind_address; memset(&bind_address, 0, sizeof(sockaddr_in6)); From d21d611aaf01d984015198175a26a186df6ddb4e Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Wed, 16 Jul 2025 01:52:01 +0000 Subject: [PATCH 05/11] Fix typos --- mooncake-transfer-engine/src/transfer_metadata_plugin.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index 9ea328eaf..3b7eec0ef 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -601,7 +601,7 @@ struct SocketHandShakePlugin : public HandShakePlugin { } int one = 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { + if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { PLOG(ERROR) << "SocketHandShakePlugin: setsockopt(SO_REUSEPORT)"; closeListen(); @@ -1102,7 +1102,7 @@ uint16_t findAvailableTcpPort(int &sockfd) { } int one = 1; - if (setsockopt(sock, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { close(sockfd); sockfd = -1; continue; From e268f796c918225273ceec0b7501d315afb1fd25 Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Wed, 16 Jul 2025 02:39:38 +0000 Subject: [PATCH 06/11] Retrigger CI --- mooncake-transfer-engine/src/transfer_metadata_plugin.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index 3b7eec0ef..f0186e7d4 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -1102,6 +1102,7 @@ uint16_t findAvailableTcpPort(int &sockfd) { } int one = 1; + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { close(sockfd); sockfd = -1; From b3b37a906da57383e33b5674c0b60737e9228d5e Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Thu, 24 Jul 2025 02:45:06 +0000 Subject: [PATCH 07/11] Add options to disable reuseaddr feature --- doc/en/transfer-engine.md | 3 +- mooncake-transfer-engine/include/common.h | 57 +++++++++++-------- .../src/transfer_metadata_plugin.cpp | 48 +++++++--------- 3 files changed, 54 insertions(+), 54 deletions(-) diff --git a/doc/en/transfer-engine.md b/doc/en/transfer-engine.md index db40d54c6..f5814790f 100644 --- a/doc/en/transfer-engine.md +++ b/doc/en/transfer-engine.md @@ -440,4 +440,5 @@ For advanced users, TransferEngine provides the following advanced runtime optio - `MC_REDIS_PASSWORD` The password for Redis storage plugin, only takes effect when Redis is specified as the metadata server. If not set, no authentication will be attempted to log in to the Redis. - `MC_REDIS_DB_INDEX` The database index for Redis storage plugin, must be an integer between 0 and 255. Only takes effect when Redis is specified as the metadata server. If not set or invalid, the default value is 0. - `MC_FRAGMENT_RATIO ` In RdmaTransport::submitTransferTask, if the last data piece after division is ≤ 1/MC_FRAGMENT_RATIO of the block size, it merges with the previous block to reduce overhead. The default value is 4 -- `MC_ENABLE_DEST_DEVICE_AFFINITY` Enable device affinity for RDMA performance optimization. When enabled, Transfer Engine will prioritize communication with remote NICs that have the same name as local NICs to reduce QP count and improve network performance in rail-optimized topologies. The default value is false \ No newline at end of file +- `MC_ENABLE_DEST_DEVICE_AFFINITY` Enable device affinity for RDMA performance optimization. When enabled, Transfer Engine will prioritize communication with remote NICs that have the same name as local NICs to reduce QP count and improve network performance in rail-optimized topologies. The default value is false +- `MC_DISABLE_REUSEADDR` Force TCP deamon NOT to enable REUSEADDR option. This avoids rare cases that two processes use the same port. However, this prevents to reuse port after shutting down previous connections. diff --git a/mooncake-transfer-engine/include/common.h b/mooncake-transfer-engine/include/common.h index a18b19ad0..2677073a3 100644 --- a/mooncake-transfer-engine/include/common.h +++ b/mooncake-transfer-engine/include/common.h @@ -15,10 +15,10 @@ #ifndef COMMON_H #define COMMON_H -#include -#include #include +#include #include +#include #include #include #include @@ -123,48 +123,54 @@ static inline std::string getCurrentDateTime() { uint16_t getDefaultHandshakePort(); -template +template std::optional parseFromString(std::string_view str) { T result = T(); - auto [ptr, ec] = std::from_chars(str.data(), str.data() + str.size(), result); + auto [ptr, ec] = + std::from_chars(str.data(), str.data() + str.size(), result); if (ec != std::errc() || ptr != str.data() + str.size()) { return {}; } return {std::move(result)}; } -static inline uint16_t getPortFromString(std::string_view port_string, uint16_t default_port) { +static inline uint16_t getPortFromString(std::string_view port_string, + uint16_t default_port) { std::optional port = parseFromString(port_string); if (port.has_value()) { return *port; } - LOG(WARNING) << "Illegal port number in " << port_string << ". Use default port " << default_port << " instead"; + LOG(WARNING) << "Illegal port number in " << port_string + << ". Use default port " << default_port << " instead"; return default_port; } -static inline bool isValidIpV6(const std::string& address) { +static inline bool isValidIpV6(const std::string &address) { sockaddr_in6 addr; std::memset(&addr, 0, sizeof(addr)); return inet_pton(AF_INET6, address.c_str(), &addr.sin6_addr) == 1; } -static inline std::string maybeWrapIpV6(const std::string& address) { +static inline std::string maybeWrapIpV6(const std::string &address) { if (isValidIpV6(address)) { return "[" + address + "]"; } return address; } -static inline std::pair parseHostNameWithPort(const std::string &server_name) { +static inline std::pair parseHostNameWithPort( + const std::string &server_name) { uint16_t port = getDefaultHandshakePort(); if (server_name.starts_with("[")) { // [ipv6] or [ipv6]:port const size_t closing_bracket_pos = server_name.find(']'); const size_t colon_pos = server_name.find(':', closing_bracket_pos); - std::string potentialHost = server_name.substr(1, closing_bracket_pos - 1); + std::string potentialHost = + server_name.substr(1, closing_bracket_pos - 1); if (isValidIpV6(potentialHost)) { - return {std::move(potentialHost), getPortFromString(server_name.substr(colon_pos + 1), port)}; + return {std::move(potentialHost), + getPortFromString(server_name.substr(colon_pos + 1), port)}; } // Not valid ipv6, fallback to ipv4/host/etc mode } else if (isValidIpV6(server_name)) { @@ -177,10 +183,13 @@ static inline std::pair parseHostNameWithPort(const std:: if (colon_pos == server_name.npos) { return {server_name, port}; } - return {server_name.substr(0, colon_pos), getPortFromString(server_name.substr(colon_pos + 1), port)}; + return {server_name.substr(0, colon_pos), + getPortFromString(server_name.substr(colon_pos + 1), port)}; } -static inline uint16_t parsePortAndDevice(std::string_view suffix, uint16_t default_port, int *device_id) { +static inline uint16_t parsePortAndDevice(std::string_view suffix, + uint16_t default_port, + int *device_id) { auto colon_pos = suffix.find(':'); if (colon_pos == suffix.npos) { return getPortFromString(suffix, default_port); @@ -189,8 +198,10 @@ static inline uint16_t parsePortAndDevice(std::string_view suffix, uint16_t defa auto npu_str = suffix.substr(colon_pos + 1); auto npu_ops = npu_str.find('_'); - if (npu_ops != npu_str.npos && npu_ops != 0 && npu_ops != npu_str.size() - 1) { - *device_id = parseFromString(npu_str.substr(npu_ops + 1)).value_or(0); + if (npu_ops != npu_str.npos && npu_ops != 0 && + npu_ops != npu_str.size() - 1) { + *device_id = + parseFromString(npu_str.substr(npu_ops + 1)).value_or(0); } return getPortFromString(port_str, default_port); } @@ -203,12 +214,12 @@ static inline std::pair parseHostNameWithPortAscend( // [ipv6] or [ipv6]:port const size_t closing_bracket_pos = server_name.find(']'); const size_t colon_pos = server_name.find(':', closing_bracket_pos); - std::string potentialHost = server_name.substr(1, closing_bracket_pos - 1); + std::string potentialHost = + server_name.substr(1, closing_bracket_pos - 1); if (isValidIpV6(potentialHost)) { - return { - std::move(potentialHost), - parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id) - }; + return {std::move(potentialHost), + parsePortAndDevice(server_name.substr(colon_pos + 1), port, + device_id)}; } // Not valid ipv6, fallback to ipv4/host/etc mode } else if (isValidIpV6(server_name)) { @@ -222,8 +233,7 @@ static inline std::pair parseHostNameWithPortAscend( return { server_name.substr(0, colon_pos), - parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id) - }; + parsePortAndDevice(server_name.substr(colon_pos + 1), port, device_id)}; } static inline ssize_t writeFully(int fd, const void *buf, size_t len) { @@ -505,10 +515,9 @@ class SimpleRandom { return g_random; } - // 生成下一个伪随机数 uint32_t next() { current = (a * current + c) % m; - return current; + return current >> 12; // Shift right to add randomness of the LSBs } uint32_t next(uint32_t max) { return next() % max; } diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index 8a93db0f2..b8457f96e 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -592,20 +592,14 @@ struct SocketHandShakePlugin : public HandShakePlugin { return ERR_SOCKET; } - if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &on, - sizeof(on))) { - PLOG(ERROR) - << "SocketHandShakePlugin: setsockopt(SO_REUSEADDR)"; - closeListen(); - return ERR_SOCKET; - } - - int one = 1; - if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { - PLOG(ERROR) - << "SocketHandShakePlugin: setsockopt(SO_REUSEPORT)"; - closeListen(); - return ERR_SOCKET; + if (!getenv("MC_DISABLE_REUSEADDR")) { + if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &on, + sizeof(on))) { + PLOG(ERROR) + << "SocketHandShakePlugin: setsockopt(SO_REUSEADDR)"; + closeListen(); + return ERR_SOCKET; + } } if (globalConfig().use_ipv6) { @@ -696,9 +690,11 @@ struct SocketHandShakePlugin : public HandShakePlugin { // old protocol equals Connection type if (type == HandShakeRequestType::Connection || type == HandShakeRequestType::OldProtocol) { - if (on_connection_callback_) on_connection_callback_(peer, local); + if (on_connection_callback_) + on_connection_callback_(peer, local); } else if (type == HandShakeRequestType::Metadata) { - if (on_metadata_callback_) on_metadata_callback_(peer, local); + if (on_metadata_callback_) + on_metadata_callback_(peer, local); } else if (type == HandShakeRequestType::Notify) { if (on_notify_callback_) on_notify_callback_(peer, local); } else { @@ -1094,19 +1090,13 @@ uint16_t findAvailableTcpPort(int &sockfd) { continue; } - int on = 1; - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { - close(sockfd); - sockfd = -1; - continue; - } - - int one = 1; - - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEPORT, &one, sizeof(one))) { - close(sockfd); - sockfd = -1; - continue; + if (!getenv("MC_DISABLE_REUSEADDR")) { + int on = 1; + if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { + close(sockfd); + sockfd = -1; + continue; + } } if (use_ipv6) { From 19913f4d7632f4545f2e05aa9caa013a2544ce7e Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Thu, 24 Jul 2025 02:47:56 +0000 Subject: [PATCH 08/11] Fix typos --- doc/en/transfer-engine.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/doc/en/transfer-engine.md b/doc/en/transfer-engine.md index f5814790f..f97502130 100644 --- a/doc/en/transfer-engine.md +++ b/doc/en/transfer-engine.md @@ -441,4 +441,4 @@ For advanced users, TransferEngine provides the following advanced runtime optio - `MC_REDIS_DB_INDEX` The database index for Redis storage plugin, must be an integer between 0 and 255. Only takes effect when Redis is specified as the metadata server. If not set or invalid, the default value is 0. - `MC_FRAGMENT_RATIO ` In RdmaTransport::submitTransferTask, if the last data piece after division is ≤ 1/MC_FRAGMENT_RATIO of the block size, it merges with the previous block to reduce overhead. The default value is 4 - `MC_ENABLE_DEST_DEVICE_AFFINITY` Enable device affinity for RDMA performance optimization. When enabled, Transfer Engine will prioritize communication with remote NICs that have the same name as local NICs to reduce QP count and improve network performance in rail-optimized topologies. The default value is false -- `MC_DISABLE_REUSEADDR` Force TCP deamon NOT to enable REUSEADDR option. This avoids rare cases that two processes use the same port. However, this prevents to reuse port after shutting down previous connections. +- `MC_DISABLE_REUSEADDR` Force the TCP daemon not to enable the REUSEADDR option. This avoids rare cases where two processes use the same port. However, it also prevents reusing the port immediately after shutting down previous connections. From 3f3a202cbb61b354245c49355d1dfba88ffbd0b8 Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Thu, 24 Jul 2025 04:28:47 +0000 Subject: [PATCH 09/11] Enlarge the TCP port range --- mooncake-transfer-engine/src/transfer_metadata_plugin.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index b8457f96e..37f781fb7 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -1069,7 +1069,7 @@ uint16_t findAvailableTcpPort(int &sockfd) { static std::random_device rand_gen; std::uniform_int_distribution rand_dist; const int min_port = 15000; - const int max_port = 17000; + const int max_port = 25000; const int max_attempts = 500; bool use_ipv6 = globalConfig().use_ipv6; From 543384f83130083191a860952104b3f35f73bd8c Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Thu, 24 Jul 2025 04:32:06 +0000 Subject: [PATCH 10/11] Make REUSEADDR disabled by default --- doc/en/transfer-engine.md | 2 +- mooncake-transfer-engine/src/transfer_metadata_plugin.cpp | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/doc/en/transfer-engine.md b/doc/en/transfer-engine.md index f97502130..17bb4d67a 100644 --- a/doc/en/transfer-engine.md +++ b/doc/en/transfer-engine.md @@ -441,4 +441,4 @@ For advanced users, TransferEngine provides the following advanced runtime optio - `MC_REDIS_DB_INDEX` The database index for Redis storage plugin, must be an integer between 0 and 255. Only takes effect when Redis is specified as the metadata server. If not set or invalid, the default value is 0. - `MC_FRAGMENT_RATIO ` In RdmaTransport::submitTransferTask, if the last data piece after division is ≤ 1/MC_FRAGMENT_RATIO of the block size, it merges with the previous block to reduce overhead. The default value is 4 - `MC_ENABLE_DEST_DEVICE_AFFINITY` Enable device affinity for RDMA performance optimization. When enabled, Transfer Engine will prioritize communication with remote NICs that have the same name as local NICs to reduce QP count and improve network performance in rail-optimized topologies. The default value is false -- `MC_DISABLE_REUSEADDR` Force the TCP daemon not to enable the REUSEADDR option. This avoids rare cases where two processes use the same port. However, it also prevents reusing the port immediately after shutting down previous connections. +- `MC_ENABLE_REUSEADDR` Force the TCP daemon to enable the REUSEADDR option. This avoids rare cases where two processes use the same port. However, it also prevents reusing the port immediately after shutting down previous connections. diff --git a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp index 37f781fb7..fbe9f287c 100644 --- a/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp +++ b/mooncake-transfer-engine/src/transfer_metadata_plugin.cpp @@ -592,7 +592,7 @@ struct SocketHandShakePlugin : public HandShakePlugin { return ERR_SOCKET; } - if (!getenv("MC_DISABLE_REUSEADDR")) { + if (getenv("MC_ENABLE_REUSEADDR")) { if (setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { PLOG(ERROR) @@ -1069,7 +1069,7 @@ uint16_t findAvailableTcpPort(int &sockfd) { static std::random_device rand_gen; std::uniform_int_distribution rand_dist; const int min_port = 15000; - const int max_port = 25000; + const int max_port = 17000; const int max_attempts = 500; bool use_ipv6 = globalConfig().use_ipv6; @@ -1090,7 +1090,7 @@ uint16_t findAvailableTcpPort(int &sockfd) { continue; } - if (!getenv("MC_DISABLE_REUSEADDR")) { + if (getenv("MC_ENABLE_REUSEADDR")) { int on = 1; if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on))) { close(sockfd); From 315aa5aac7e17f0550a9f29d8e5f1fd6de6db27a Mon Sep 17 00:00:00 2001 From: Feng Ren Date: Thu, 24 Jul 2025 10:22:37 +0000 Subject: [PATCH 11/11] Make port range larger --- .../hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp b/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp index 0548fb4be..eb3ab233b 100644 --- a/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp +++ b/mooncake-transfer-engine/src/transport/ascend_transport/hccl_transport/ascend_transport_c/hccl_transport_mem_c.cpp @@ -63,7 +63,7 @@ uint16_t findAvailableTcpPort(int &sockfd, bool use_ipv6) { static std::random_device rand_gen; std::mt19937 gen(rand_gen()); const int min_port = 15000; - const int max_port = 17000; + const int max_port = 25000; const int max_attempts = 500; std::uniform_int_distribution<> rand_dist(min_port, max_port);