diff --git a/config.h.cmake b/config.h.cmake index 42553fafeed3..2561d38c6194 100644 --- a/config.h.cmake +++ b/config.h.cmake @@ -363,6 +363,7 @@ #define SO_EXT "@CMAKE_SHARED_MODULE_SUFFIX@" +#cmakedefine GMOCK_FOUND 1 /* From libmysql/CMakeLists.txt */ #cmakedefine HAVE_UNIX_DNS_SRV @HAVE_UNIX_DNS_SRV@ diff --git a/share/messages_to_clients.txt b/share/messages_to_clients.txt index 5a510b5e73ca..6cede6fe1c18 100644 --- a/share/messages_to_clients.txt +++ b/share/messages_to_clients.txt @@ -10800,6 +10800,15 @@ ER_EXTERNAL_TABLE_ENGINE_NOT_SPECIFIED ER_LH_NO_FILES_FOUND_NO_SECURE_FILE_PRIV eng "No files found or files provided not under secure-file-priv." +ER_INVALID_CPU_STRING + eng "Invalid cpu string %s." + +ER_CANNOT_UPDATE_SCHED_AFFINITY_PARAMETER + eng "Cannot update %s successfully." + +ER_CANNOT_UPDATE_SCHED_AFFINITY_NUMA_AWARE + eng "Cannot update sched_affinity_numa_aware successfully." + # # End of 9.x error messages (server-to-client). # diff --git a/share/messages_to_error_log.txt b/share/messages_to_error_log.txt index 916eee8b0af0..f269fe216790 100644 --- a/share/messages_to_error_log.txt +++ b/share/messages_to_error_log.txt @@ -12847,6 +12847,45 @@ ER_RPL_MTA_ALLOW_COMMIT_OUT_OF_ORDER ER_TEMPTABLE_ENGINE_ERROR eng "Temptable: %s" +ER_CANT_PARSE_CPU_STRING + eng "Cannot parse cpu string '%s'." + +ER_LIBNUMA_TEST_FAIL + eng "libnuma test fail." + +ER_NUMA_AVAILABLE_TEST_FAIL + eng "numa_available test fail." + +ER_CANNOT_SET_THREAD_SCHED_AFFINIFY + eng "Cannot set thread %s sched affinity." + +ER_CANNOT_UNSET_THREAD_SCHED_AFFINIFY + eng "Cannot unset thread %s sched affinity." + +ER_CANNOT_REGISTER_THREAD_TO_SCHED_AFFINIFY_MANAGER + eng "Cannot register thread %s sched affinity manager." + +ER_CANNOT_UNREGISTER_THREAD_FROM_SCHED_AFFINIFY_MANAGER + eng "Cannot unregister thread %s sched affinity manager." + +ER_USE_DUMMY_SCHED_AFFINITY_MANAGER + eng "use dummy sched_affinity_manager." + +ER_SCHED_AFFINITY_THREAD_PROCESS_CONFLICT + eng "Found sched affinity conflict between threads and process." + +ER_SCHED_AFFINITY_FOREGROUND_BACKGROUND_CONFLICT + eng "Found sched affinity conflict between foreground threads and background threads." + +ER_CANNOT_CREATE_SCHED_AFFINITY_MANAGER + eng "Cannot create sched affinity manager." + +ER_SET_FALLBACK_MODE + eng "sched_affinity_manager is set to fallback mode." + +ER_FALLBACK_DELEGATE_SCHED_AFFINITY_MANAGER_IS_CALLED + eng "sched_affinity_manager is in fallback mode. A fallback version of sched_affinity_manager is called, which does nothing." + # DO NOT add server-to-client messages here; # they go in messages_to_clients.txt # in the same directory as this file. diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 8f4bb2b92103..f18dfff78555 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -20,6 +20,7 @@ # You should have received a copy of the GNU General Public License # along with this program; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA +INCLUDE_DIRECTORIES(${GMOCK_INCLUDE_DIRS}) ADD_WSHADOW_WARNING() @@ -530,6 +531,7 @@ SET(SQL_SHARED_SOURCES rpl_write_set_handler.cc rules_table_service.cc rwlock_scoped_lock.cc + sched_affinity_manager.cc sd_notify.cc sdi_utils.cc session_tracker.cc diff --git a/sql/conn_handler/connection_handler_per_thread.cc b/sql/conn_handler/connection_handler_per_thread.cc index f6ba4257bc25..745b1f147cab 100644 --- a/sql/conn_handler/connection_handler_per_thread.cc +++ b/sql/conn_handler/connection_handler_per_thread.cc @@ -25,6 +25,7 @@ #include #include +#include #include #include @@ -56,6 +57,7 @@ #include "sql/mysqld.h" // max_connections #include "sql/mysqld_thd_manager.h" // Global_THD_manager #include "sql/protocol_classic.h" +#include "sql/sched_affinity_manager.h" #include "sql/sql_class.h" // THD #include "sql/sql_connect.h" // close_connection #include "sql/sql_error.h" @@ -297,6 +299,18 @@ static void *handle_connection(void *arg) { mysql_socket_set_thread_owner(socket); thd_manager->add_thd(thd); + auto sched_affinity_manager = + sched_affinity::Sched_affinity_manager::get_instance(); + bool is_registered_to_sched_affinity = false; + auto pid = sched_affinity::gettid(); + if (sched_affinity_manager == nullptr || + !(is_registered_to_sched_affinity = + sched_affinity_manager->register_thread( + sched_affinity::Thread_type::FOREGROUND, pid))) { + LogErr(ERROR_LEVEL, ER_CANNOT_REGISTER_THREAD_TO_SCHED_AFFINIFY_MANAGER, + "foreground"); + } + if (thd_prepare_connection(thd)) handler_manager->inc_aborted_connects(); else { @@ -307,6 +321,13 @@ static void *handle_connection(void *arg) { } close_connection(thd, 0, false, false); + if (is_registered_to_sched_affinity && + !sched_affinity_manager->unregister_thread(pid)) { + LogErr(ERROR_LEVEL, + ER_CANNOT_UNREGISTER_THREAD_FROM_SCHED_AFFINIFY_MANAGER, + "foreground"); + } + thd->get_stmt_da()->reset_diagnostics_area(); thd->release_resources(); diff --git a/sql/memory/aligned_atomic.h b/sql/memory/aligned_atomic.h index 55770f8158b7..54275d9e265a 100644 --- a/sql/memory/aligned_atomic.h +++ b/sql/memory/aligned_atomic.h @@ -75,7 +75,9 @@ static inline size_t _cache_line_size() { } free(buffer); - return line_size; + long size = sysconf(_SC_LEVEL1_DCACHE_LINESIZE); + if (size == -1 || size == 0) return 64; + return static_cast(size); } #elif defined(__GLIBC__) diff --git a/sql/mysqld.cc b/sql/mysqld.cc index e9c0772ccb8b..0f3e19128952 100644 --- a/sql/mysqld.cc +++ b/sql/mysqld.cc @@ -857,6 +857,7 @@ MySQL clients support the protocol: #include "sql/rpl_rli.h" // Relay_log_info #include "sql/rpl_source.h" // max_binlog_dump_events #include "sql/rpl_trx_tracking.h" +#include "sql/sched_affinity_manager.h" #include "sql/sd_notify.h" // sd_notify_connect #include "sql/session_tracker.h" #include "sql/set_var.h" @@ -1244,6 +1245,9 @@ bool opt_persist_sensitive_variables_in_plaintext{true}; int argc_cached; char **argv_cached; +extern std::map sched_affinity_parameter; +extern bool sched_affinity_numa_aware; + #if defined(_WIN32) /* Thread handle of shutdown event handler thread. @@ -2897,6 +2901,7 @@ static void clean_up(bool print_message) { */ sys_var_end(); free_status_vars(); + sched_affinity::Sched_affinity_manager::free_instance(); finish_client_errs(); deinit_errmessage(); // finish server errs @@ -9829,6 +9834,11 @@ int mysqld_main(int argc, char **argv) /* Determine default TCP port and unix socket name */ set_ports(); + if (sched_affinity::Sched_affinity_manager::create_instance(sched_affinity_parameter, sched_affinity_numa_aware) == nullptr) { + LogErr(ERROR_LEVEL, ER_CANNOT_CREATE_SCHED_AFFINITY_MANAGER); + unireg_abort(MYSQLD_ABORT_EXIT); + } + if (init_server_components()) unireg_abort(MYSQLD_ABORT_EXIT); if (!server_id_supplied) @@ -11243,6 +11253,31 @@ static int show_queries(THD *thd, SHOW_VAR *var, char *) { return 0; } +static int show_sched_affinity_status(THD *, SHOW_VAR *var, char *buff) { + var->type = SHOW_CHAR; + var->value = buff; + std::string group_snapshot = sched_affinity::Sched_affinity_manager::get_instance()->take_group_snapshot(); + strncpy(buff, group_snapshot.c_str(), SHOW_VAR_FUNC_BUFF_SIZE); + buff[SHOW_VAR_FUNC_BUFF_SIZE]='\0'; + return 0; +} + +static int show_sched_affinity_group_number(THD *, SHOW_VAR *var, char *buff) { + var->type = SHOW_SIGNED_INT; + var->value = buff; + *(reinterpret_cast(buff)) = sched_affinity::Sched_affinity_manager::get_instance() + ->get_total_node_number(); + return 0; +} + +static int show_sched_affinity_group_capacity(THD *, SHOW_VAR *var, char *buff) { + var->type = SHOW_SIGNED_INT; + var->value = buff; + *(reinterpret_cast(buff)) = sched_affinity::Sched_affinity_manager::get_instance() + ->get_cpu_number_per_node(); + return 0; +} + static int show_net_compression(THD *thd, SHOW_VAR *var, char *buff) { var->type = SHOW_MY_BOOL; var->value = buff; @@ -11871,6 +11906,12 @@ SHOW_VAR status_vars[] = { {"Queries", (char *)&show_queries, SHOW_FUNC, SHOW_SCOPE_ALL}, {"Questions", (char *)offsetof(System_status_var, questions), SHOW_LONGLONG_STATUS, SHOW_SCOPE_ALL}, + {"Sched_affinity_status", + (char *)&show_sched_affinity_status, SHOW_FUNC, SHOW_SCOPE_ALL}, + {"Sched_affinity_group_number", + (char *)&show_sched_affinity_group_number, SHOW_FUNC, SHOW_SCOPE_ALL}, + {"Sched_affinity_group_capacity", + (char *)&show_sched_affinity_group_capacity, SHOW_FUNC, SHOW_SCOPE_ALL}, {"Secondary_engine_execution_count", (char *)offsetof(System_status_var, secondary_engine_execution_count), SHOW_LONGLONG_STATUS, SHOW_SCOPE_ALL}, @@ -13998,6 +14039,7 @@ PSI_mutex_key key_mutex_replica_worker_hash; PSI_mutex_key key_monitor_info_run_lock; PSI_mutex_key key_LOCK_delegate_connection_mutex; PSI_mutex_key key_LOCK_group_replication_connection_mutex; +PSI_mutex_key key_sched_affinity_mutex; /* clang-format off */ static PSI_mutex_info all_server_mutexes[]= @@ -14094,6 +14136,7 @@ static PSI_mutex_info all_server_mutexes[]= { &key_LOCK_group_replication_connection_mutex, "LOCK_group_replication_connection_mutex", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}, { &key_LOCK_authentication_policy, "LOCK_authentication_policy", PSI_FLAG_SINGLETON, 0, "A lock to ensure execution of CREATE USER or ALTER USER sql and SET @@global.authentication_policy variable are serialized"}, { &key_LOCK_global_conn_mem_limit, "LOCK_global_conn_mem_limit", PSI_FLAG_SINGLETON, 0, PSI_DOCUMENT_ME}, + { &key_sched_affinity_mutex, "Sched_affinity::m_mutex", 0, 0, PSI_DOCUMENT_ME} }; /* clang-format on */ diff --git a/sql/mysqld.h b/sql/mysqld.h index a404e395cbb4..ee40d1b70ef2 100644 --- a/sql/mysqld.h +++ b/sql/mysqld.h @@ -463,6 +463,8 @@ extern PSI_mutex_key key_LOCK_group_replication_connection_mutex; extern PSI_mutex_key key_commit_order_manager_mutex; extern PSI_mutex_key key_mutex_replica_worker_hash; +extern PSI_mutex_key key_sched_affinity_mutex; + extern PSI_rwlock_key key_rwlock_LOCK_logger; extern PSI_rwlock_key key_rwlock_channel_map_lock; extern PSI_rwlock_key key_rwlock_channel_lock; diff --git a/sql/sched_affinity_manager.cc b/sql/sched_affinity_manager.cc new file mode 100644 index 000000000000..a7cc3afaa916 --- /dev/null +++ b/sql/sched_affinity_manager.cc @@ -0,0 +1,615 @@ +/***************************************************************************** +Copyright (c) 2022, Huawei Technologies Co., Ltd. All Rights Reserved. +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License, version 2.0, as published by the +Free Software Foundation. +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, +for more details. +*****************************************************************************/ + +#include "sql/sched_affinity_manager.h" + +#include + +#include + +#include "mysql/components/services/log_builtins.h" +#include "mysqld_error.h" +#include "sql/mysqld.h" + +namespace sched_affinity { +const std::vector thread_types = { + Thread_type::FOREGROUND, Thread_type::LOG_WRITER, + Thread_type::LOG_FLUSHER, Thread_type::LOG_WRITE_NOTIFIER, + Thread_type::LOG_FLUSH_NOTIFIER, Thread_type::LOG_CHECKPOINTER, + Thread_type::PURGE_COORDINATOR}; + +const std::map thread_type_names = { + {Thread_type::FOREGROUND, "foreground"}, + {Thread_type::LOG_WRITER, "log_writer"}, + {Thread_type::LOG_FLUSHER, "log_flusher"}, + {Thread_type::LOG_WRITE_NOTIFIER, "log_write_notifier"}, + {Thread_type::LOG_FLUSH_NOTIFIER, "log_flush_notifier"}, + {Thread_type::LOG_CHECKPOINTER, "log_checkpointer"}, + {Thread_type::PURGE_COORDINATOR, "purge_coordinator"}, + {Thread_type::UNDEFINED, "undefined"}}; +} // namespace sched_affinity + + +#ifdef HAVE_LIBNUMA +namespace sched_affinity { +class Lock_guard { + public: + explicit Lock_guard(mysql_mutex_t &mutex) { + m_mutex = &mutex; + mysql_mutex_lock(m_mutex); + } + Lock_guard(const Lock_guard &) = delete; + Lock_guard &operator=(const Lock_guard &) = delete; + ~Lock_guard() { mysql_mutex_unlock(m_mutex); } + + private: + mysql_mutex_t *m_mutex; +}; + + +Sched_affinity_manager_numa::Sched_affinity_manager_numa() + : Sched_affinity_manager(), + m_total_cpu_num(0), + m_total_node_num(0), + m_cpu_num_per_node(0), + m_numa_aware(false), + m_root_pid(0), + m_is_fallback(false) { + mysql_mutex_init(key_sched_affinity_mutex, &m_mutex, nullptr); +} + +Sched_affinity_manager_numa::~Sched_affinity_manager_numa() { + mysql_mutex_destroy(&m_mutex); +} + + +bool Sched_affinity_manager_numa::init( + const std::map &sched_affinity_parameter, + bool numa_aware) { + m_total_cpu_num = numa_num_configured_cpus(); + m_total_node_num = numa_num_configured_nodes(); + m_cpu_num_per_node = m_total_cpu_num / m_total_node_num; + m_numa_aware = numa_aware; + m_root_pid = gettid(); + + m_thread_bitmask.clear(); + m_sched_affinity_groups.clear(); + m_thread_pid.clear(); + for (const auto &thread_type : thread_types) { + if (sched_affinity_parameter.find(thread_type) == + sched_affinity_parameter.end()) { + continue; + } + m_thread_pid[thread_type] = std::set(); + auto cpu_string = sched_affinity_parameter.at(thread_type); + if (!init_sched_affinity_info( + cpu_string == nullptr ? std::string("") : std::string(cpu_string), + m_thread_bitmask[thread_type])) { + return false; + } + if (is_thread_sched_enabled(thread_type) && + !init_sched_affinity_group( + m_thread_bitmask[thread_type], + m_numa_aware && thread_type == Thread_type::FOREGROUND, + m_sched_affinity_groups[thread_type])) { + return false; + } + } + + return true; +} + +void Sched_affinity_manager_numa::fallback() { + if (!m_is_fallback) { + m_is_fallback = true; + m_fallback_delegate.reset(new Sched_affinity_manager_dummy()); + LogErr(ERROR_LEVEL, ER_SET_FALLBACK_MODE); + } +} + +bool Sched_affinity_manager_numa::init_sched_affinity_info( + const std::string &cpu_string, Bitmask_ptr &group_bitmask) { + group_bitmask.reset(); + if (cpu_string.empty()) { + return true; + } + std::pair normalized_result = + normalize_cpu_string(cpu_string); + if (normalized_result.second == false) { + LogErr(ERROR_LEVEL, ER_CANT_PARSE_CPU_STRING, cpu_string.c_str()); + return false; + } + group_bitmask.reset(numa_parse_cpustring(normalized_result.first.c_str())); + if (!group_bitmask) { + LogErr(ERROR_LEVEL, ER_CANT_PARSE_CPU_STRING, cpu_string.c_str()); + return false; + } + return true; +} + +bool Sched_affinity_manager_numa::init_sched_affinity_group( + const Bitmask_ptr &group_bitmask, const bool numa_aware, + std::vector &sched_affinity_group) { + if (numa_aware) { + sched_affinity_group.resize(m_total_node_num); + for (auto node_id = 0; node_id < m_total_node_num; ++node_id) { + sched_affinity_group[node_id].avail_cpu_num = 0; + sched_affinity_group[node_id].avail_cpu_mask = + Bitmask_ptr(numa_allocate_cpumask()); + sched_affinity_group[node_id].assigned_thread_num = 0; + for (auto cpu_id = m_cpu_num_per_node * node_id; + cpu_id < m_cpu_num_per_node * (node_id + 1); ++cpu_id) { + if (numa_bitmask_isbitset(group_bitmask.get(), cpu_id)) { + numa_bitmask_setbit( + sched_affinity_group[node_id].avail_cpu_mask.get(), cpu_id); + ++sched_affinity_group[node_id].avail_cpu_num; + } + } + } + } else { + sched_affinity_group.resize(1); + sched_affinity_group[0].avail_cpu_num = 0; + sched_affinity_group[0].avail_cpu_mask = + Bitmask_ptr(numa_allocate_cpumask()); + copy_bitmask_to_bitmask(group_bitmask.get(), + sched_affinity_group[0].avail_cpu_mask.get()); + sched_affinity_group[0].assigned_thread_num = 0; + for (auto cpu_id = 0; cpu_id < m_total_cpu_num; ++cpu_id) { + if (numa_bitmask_isbitset(group_bitmask.get(), cpu_id)) { + ++sched_affinity_group[0].avail_cpu_num; + } + } + } + return true; +} + + +bool Sched_affinity_manager_numa::rebalance_group( + const char *cpu_string, const Thread_type thread_type) { + const Lock_guard lock(m_mutex); + if (m_is_fallback) { + LogErr(ERROR_LEVEL, ER_FALLBACK_DELEGATE_SCHED_AFFINITY_MANAGER_IS_CALLED); + return m_fallback_delegate->rebalance_group(cpu_string, thread_type); + } + const bool is_previous_sched_enabled = is_thread_sched_enabled(thread_type); + std::vector> group_thread; + if (!reset_sched_affinity_info(cpu_string, thread_type, group_thread)) { + fallback(); + return false; + } + if (!is_thread_sched_enabled(thread_type) && !is_previous_sched_enabled) { + return true; + } + if (!is_thread_sched_enabled(thread_type) && is_previous_sched_enabled) { + Bitmask_ptr root_process_bitmask(numa_allocate_cpumask()); + if (numa_sched_getaffinity(m_root_pid, root_process_bitmask.get()) < 0) { + fallback(); + return false; + } + for (const auto tid : m_thread_pid[thread_type]) { + m_pid_group_id.erase(tid); + if (numa_sched_setaffinity(tid, root_process_bitmask.get()) < 0) { + fallback(); + return false; + } + } + return true; + } + if (is_thread_sched_enabled(thread_type) && !is_previous_sched_enabled) { + for (const auto tid : m_thread_pid[thread_type]) { + if (!bind_to_group(tid)) { + fallback(); + return false; + } + } + return true; + } + auto &sched_affinity_group = m_sched_affinity_groups[thread_type]; + std::vector migrate_thread_num; + migrate_thread_num.resize(sched_affinity_group.size()); + count_migrate_thread_num(group_thread, sched_affinity_group, + migrate_thread_num); + if (!migrate_thread_and_setaffinity(group_thread, sched_affinity_group, + migrate_thread_num)) { + fallback(); + return false; + } + return true; +} + +bool Sched_affinity_manager_numa::reset_sched_affinity_info( + const char *cpu_string, const Thread_type &thread_type, + std::vector> &group_thread) { + bool numa_aware = m_numa_aware && thread_type == Thread_type::FOREGROUND; + group_thread.resize(numa_aware ? m_total_node_num : 1, std::set()); + for (const auto tid : m_thread_pid[thread_type]) { + const auto group_index = m_pid_group_id[tid]; + group_thread[group_index].insert(tid); + } + if (!init_sched_affinity_info( + cpu_string == nullptr ? std::string("") : std::string(cpu_string), + m_thread_bitmask[thread_type])) { + return false; + } + if (is_thread_sched_enabled(thread_type) && + !init_sched_affinity_group(m_thread_bitmask[thread_type], numa_aware, + m_sched_affinity_groups[thread_type])) { + return false; + } + return true; +} + +void Sched_affinity_manager_numa::count_migrate_thread_num( + const std::vector> &group_thread, + std::vector &sched_affinity_group, + std::vector &migrate_thread_num) { + int total_thread_num = 0; + int total_avail_cpu_num = 0; + for (auto i = 0u; i < sched_affinity_group.size(); ++i) { + total_thread_num += group_thread[i].size(); + total_avail_cpu_num += sched_affinity_group[i].avail_cpu_num; + } + if (total_avail_cpu_num == 0) { + for (auto i = 0u; i < sched_affinity_group.size(); ++i) { + sched_affinity_group[i].assigned_thread_num = 0; + migrate_thread_num[i] = 0; + } + return; + } + for (auto i = 0u; i < sched_affinity_group.size(); ++i) { + sched_affinity_group[i].assigned_thread_num = + std::ceil(static_cast(total_thread_num * + sched_affinity_group[i].avail_cpu_num) / + total_avail_cpu_num); + migrate_thread_num[i] = + sched_affinity_group[i].assigned_thread_num - group_thread[i].size(); + } +} + +bool Sched_affinity_manager_numa::migrate_thread_and_setaffinity( + const std::vector> &group_thread, + const std::vector &sched_affinity_group, + std::vector &migrate_thread_num) { + for (auto i = 0u; i < group_thread.size(); ++i) { + for (auto tid : group_thread[i]) { + if (sched_affinity_group[i].avail_cpu_num != 0 && + numa_sched_setaffinity( + tid, sched_affinity_group[i].avail_cpu_mask.get()) < 0) { + return false; + } + } + } + for (auto i = 0u; i < group_thread.size(); ++i) { + if (migrate_thread_num[i] >= 0) { + continue; + } + std::set::iterator it = group_thread[i].begin(); + for (auto j = 0u; j < group_thread.size(); ++j) { + while (migrate_thread_num[j] > 0 && migrate_thread_num[i] < 0 && + it != group_thread[i].end()) { + m_pid_group_id[*it] = j; + if (numa_sched_setaffinity( + *it, sched_affinity_group[j].avail_cpu_mask.get()) < 0) { + return false; + } + --migrate_thread_num[j]; + ++migrate_thread_num[i]; + ++it; + } + } + } + return true; +} + +bool Sched_affinity_manager_numa::is_thread_sched_enabled( + const Thread_type thread_type) { + auto it = m_thread_bitmask.find(thread_type); + return (it != m_thread_bitmask.end() && it->second) ? true : false; +} + +bool Sched_affinity_manager_numa::register_thread(const Thread_type thread_type, + const pid_t pid) { + const Lock_guard lock(m_mutex); + + if (m_is_fallback) { + LogErr(ERROR_LEVEL, ER_FALLBACK_DELEGATE_SCHED_AFFINITY_MANAGER_IS_CALLED); + return m_fallback_delegate->register_thread(thread_type, pid); + } + + m_thread_pid[thread_type].insert(pid); + if (!bind_to_group(pid)) { + LogErr(ERROR_LEVEL, ER_CANNOT_SET_THREAD_SCHED_AFFINIFY, + thread_type_names.at(thread_type).c_str()); + fallback(); + return false; + } + return true; +} + +bool Sched_affinity_manager_numa::unregister_thread(const pid_t pid) { + const Lock_guard lock(m_mutex); + + if (m_is_fallback) { + LogErr(ERROR_LEVEL, ER_FALLBACK_DELEGATE_SCHED_AFFINITY_MANAGER_IS_CALLED); + return m_fallback_delegate->unregister_thread(pid); + } + + auto thread_type = get_thread_type_by_pid(pid); + if (thread_type == Thread_type::UNDEFINED) { + return false; + } + + if (!unbind_from_group(pid)) { + LogErr(ERROR_LEVEL, ER_CANNOT_UNSET_THREAD_SCHED_AFFINIFY, + thread_type_names.at(thread_type).c_str()); + fallback(); + return false; + } + m_thread_pid[thread_type].erase(pid); + return true; +} + +Thread_type Sched_affinity_manager_numa::get_thread_type_by_pid( + const pid_t pid) { + for (const auto &thread_pid : m_thread_pid) { + if (thread_pid.second.find(pid) != thread_pid.second.end()) { + return thread_pid.first; + } + } + return Thread_type::UNDEFINED; +} + +bool Sched_affinity_manager_numa::bind_to_group(const pid_t pid) { + auto thread_type = get_thread_type_by_pid(pid); + if (thread_type == Thread_type::UNDEFINED) { + return false; + } + if (!is_thread_sched_enabled(thread_type)) { + return true; + } + auto &sched_affinity_group = m_sched_affinity_groups[thread_type]; + const int INVALID_INDEX = -1; + auto best_index = INVALID_INDEX; + for (auto i = 0u; i < sched_affinity_group.size(); ++i) { + if (sched_affinity_group[i].avail_cpu_num == 0) { + continue; + } + if (best_index == INVALID_INDEX || + sched_affinity_group[i].assigned_thread_num * + sched_affinity_group[best_index].avail_cpu_num < + sched_affinity_group[best_index].assigned_thread_num * + sched_affinity_group[i].avail_cpu_num) { + best_index = i; + } + } + + if (best_index == INVALID_INDEX) { + return false; + } + auto ret = numa_sched_setaffinity( + pid, sched_affinity_group[best_index].avail_cpu_mask.get()); + if (ret == 0) { + ++sched_affinity_group[best_index].assigned_thread_num; + m_pid_group_id[pid] = best_index; + return true; + } + return false; +} + + +bool Sched_affinity_manager_numa::unbind_from_group(const pid_t pid) { + auto thread_type = get_thread_type_by_pid(pid); + if (thread_type == Thread_type::UNDEFINED) { + return false; + } + if (!is_thread_sched_enabled(thread_type)) { + return true; + } + auto &sched_affinity_group = m_sched_affinity_groups[thread_type]; + auto index = m_pid_group_id.find(pid); + if (index == m_pid_group_id.end() || + index->second >= static_cast(sched_affinity_group.size())) { + return false; + } + --sched_affinity_group[index->second].assigned_thread_num; + m_pid_group_id.erase(index); + + return copy_affinity(pid, m_root_pid); +} + +bool Sched_affinity_manager_numa::copy_affinity(pid_t from, pid_t to) { + Bitmask_ptr to_bitmask(numa_allocate_cpumask()); + if (numa_sched_getaffinity(to, to_bitmask.get()) < 0) { + return false; + } + if (numa_sched_setaffinity(from, to_bitmask.get()) < 0) { + return false; + } + return true; +} + +std::string Sched_affinity_manager_numa::take_group_snapshot() { + const Lock_guard lock(m_mutex); + + if (m_is_fallback) { + LogErr(ERROR_LEVEL, ER_FALLBACK_DELEGATE_SCHED_AFFINITY_MANAGER_IS_CALLED); + return m_fallback_delegate->take_group_snapshot(); + } + + std::string group_snapshot = ""; + for (const auto &thread_type : thread_types) { + if (!is_thread_sched_enabled(thread_type)) { + continue; + } + group_snapshot += thread_type_names.at(thread_type) + ": "; + for (const auto &sched_affinity_group : + m_sched_affinity_groups[thread_type]) { + group_snapshot += + (std::to_string(sched_affinity_group.assigned_thread_num) + + std::string("/") + + std::to_string(sched_affinity_group.avail_cpu_num) + + std::string("; ")); + } + } + return group_snapshot; +} + +int Sched_affinity_manager_numa::get_total_node_number() { + return m_total_node_num; +} + +int Sched_affinity_manager_numa::get_cpu_number_per_node() { + return m_cpu_num_per_node; +} + +bool Sched_affinity_manager_numa::check_cpu_string( + const std::string &cpu_string) { + auto ret = normalize_cpu_string(cpu_string); + if (!ret.second) { + return false; + } + Bitmask_ptr bitmask(numa_parse_cpustring(ret.first.c_str())); + return bitmask.get() != nullptr; +} + +std::pair Sched_affinity_manager_numa::normalize_cpu_string( + const std::string &cpu_string) { + std::string normalized_cpu_string = ""; + bool invalid_cpu_string = false; + const int INVALID_CORE_ID = -1; + int core_id = INVALID_CORE_ID; + for (auto c : cpu_string) { + switch (c) { + case ' ': + break; + case '-': + case ',': + if (core_id == INVALID_CORE_ID) { + invalid_cpu_string = true; + } else { + normalized_cpu_string += std::to_string(core_id); + normalized_cpu_string += c; + core_id = INVALID_CORE_ID; + } + break; + case '0' ... '9': + if (core_id == INVALID_CORE_ID) { + core_id = (c - '0'); + } else { + core_id = core_id * 10 + (c - '0'); + } + break; + default: + invalid_cpu_string = true; + break; + } + if (invalid_cpu_string) { + break; + } + } + if (core_id != INVALID_CORE_ID) { + normalized_cpu_string += std::to_string(core_id); + } + if (!normalized_cpu_string.empty() && + (*normalized_cpu_string.rbegin() == '-' || + *normalized_cpu_string.rbegin() == ',')) { + invalid_cpu_string = true; + } + if (invalid_cpu_string) { + return std::make_pair(std::string(), false); + } + return std::make_pair(normalized_cpu_string, true); +} + +bool Sched_affinity_manager_numa::update_numa_aware(bool numa_aware) { + const Lock_guard lock(m_mutex); + if (m_is_fallback) { + LogErr(ERROR_LEVEL, ER_FALLBACK_DELEGATE_SCHED_AFFINITY_MANAGER_IS_CALLED); + return m_fallback_delegate->update_numa_aware(numa_aware); + } + if (m_numa_aware == numa_aware) { + return true; + } + std::vector pending_pids; + pending_pids.resize(m_pid_group_id.size()); + std::transform(m_pid_group_id.begin(), m_pid_group_id.end(), + pending_pids.begin(), + [](auto &pid_group_id) { return pid_group_id.first; }); + for (const auto &pending_pid : pending_pids) { + if (!unbind_from_group(pending_pid)) { + LogErr(ERROR_LEVEL, ER_CANNOT_UNSET_THREAD_SCHED_AFFINIFY, + thread_type_names.at(get_thread_type_by_pid(pending_pid)).c_str()); + fallback(); + return false; + } + } + m_numa_aware = numa_aware; + for (const auto &thread_type : thread_types) { + if (is_thread_sched_enabled(thread_type) && + !init_sched_affinity_group( + m_thread_bitmask[thread_type], + m_numa_aware && thread_type == Thread_type::FOREGROUND, + m_sched_affinity_groups[thread_type])) { + fallback(); + return false; + } + } + for (const auto &pending_pid : pending_pids) { + if (!bind_to_group(pending_pid)) { + LogErr(ERROR_LEVEL, ER_CANNOT_SET_THREAD_SCHED_AFFINIFY, + thread_type_names.at(get_thread_type_by_pid(pending_pid)).c_str()); + fallback(); + return false; + } + } + return true; +} +} // namespace sched_affinity +#endif /* HAVE_LIBNUMA */ + +namespace sched_affinity { +static Sched_affinity_manager *sched_affinity_manager = nullptr; +Sched_affinity_manager *Sched_affinity_manager::create_instance( + const std::map &sched_affinity_parameter, + bool numa_aware) { + Sched_affinity_manager::free_instance(); +#ifdef HAVE_LIBNUMA + if (numa_available() == -1) { + LogErr(WARNING_LEVEL, ER_NUMA_AVAILABLE_TEST_FAIL); + LogErr(INFORMATION_LEVEL, ER_USE_DUMMY_SCHED_AFFINITY_MANAGER); + sched_affinity_manager = new Sched_affinity_manager_dummy(); + } else { + sched_affinity_manager = new Sched_affinity_manager_numa(); + } +#else + LogErr(WARNING_LEVEL, ER_LIBNUMA_TEST_FAIL); + LogErr(INFORMATION_LEVEL, ER_USE_DUMMY_SCHED_AFFINITY_MANAGER); + sched_affinity_manager = new Sched_affinity_manager_dummy(); +#endif /* HAVE_LIBNUMA */ + if (!sched_affinity_manager->init(sched_affinity_parameter, numa_aware)) { + return nullptr; + } + return sched_affinity_manager; +} + +Sched_affinity_manager *Sched_affinity_manager::get_instance() { + return sched_affinity_manager; +} + +void Sched_affinity_manager::free_instance() { + if (sched_affinity_manager != nullptr) { + delete sched_affinity_manager; + sched_affinity_manager = nullptr; + } +} + +pid_t gettid() { return static_cast(syscall(SYS_gettid)); } +} // namespace sched_affinity diff --git a/sql/sched_affinity_manager.h b/sql/sched_affinity_manager.h new file mode 100644 index 000000000000..22d6a7b5b252 --- /dev/null +++ b/sql/sched_affinity_manager.h @@ -0,0 +1,217 @@ +/***************************************************************************** +Copyright (c) 2022, Huawei Technologies Co., Ltd. All Rights Reserved. +This program is free software; you can redistribute it and/or modify it under +the terms of the GNU General Public License, version 2.0, as published by the +Free Software Foundation. +This program is distributed in the hope that it will be useful, but WITHOUT +ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS +FOR A PARTICULAR PURPOSE. See the GNU General Public License, version 2.0, +for more details. +*****************************************************************************/ + +#ifndef SCHED_AFFINITY_MANAGER_H +#define SCHED_AFFINITY_MANAGER_H +#include "my_config.h" +#ifdef HAVE_LIBNUMA +#include +#endif + +#include +#include +#include +#include +#include +#include + +#include + +#ifdef GMOCK_FOUND +#include "gtest/gtest_prod.h" +#endif + +#include "mysql/psi/mysql_mutex.h" + +namespace sched_affinity { +enum class Thread_type { + FOREGROUND, + LOG_WRITER, + LOG_FLUSHER, + LOG_WRITE_NOTIFIER, + LOG_FLUSH_NOTIFIER, + LOG_CHECKPOINTER, + PURGE_COORDINATOR, + UNDEFINED +}; + +extern const std::vector thread_types; +extern const std::map thread_type_names; + +pid_t gettid(); + +class Sched_affinity_manager { + public: + virtual ~Sched_affinity_manager(){}; + static Sched_affinity_manager *create_instance( + const std::map &, bool numa_aware); + static Sched_affinity_manager *get_instance(); + static void free_instance(); + virtual bool register_thread(const Thread_type thread_type, + const pid_t pid) = 0; + virtual bool unregister_thread(const pid_t pid) = 0; + virtual bool rebalance_group(const char *cpu_string, + const Thread_type thread_type) = 0; + virtual bool update_numa_aware(bool numa_aware) = 0; + virtual std::string take_group_snapshot() = 0; + virtual int get_total_node_number() = 0; + virtual int get_cpu_number_per_node() = 0; + virtual bool check_cpu_string(const std::string &cpu_string) = 0; + + protected: + virtual bool init(const std::map &, + bool numa_aware) = 0; +}; + +class Sched_affinity_manager_dummy : public Sched_affinity_manager { + public: + Sched_affinity_manager_dummy(const Sched_affinity_manager_dummy &) = delete; + Sched_affinity_manager_dummy &operator=( + const Sched_affinity_manager_dummy &) = delete; + Sched_affinity_manager_dummy(Sched_affinity_manager_dummy &&) = delete; + Sched_affinity_manager_dummy &operator=(Sched_affinity_manager_dummy &&) = + delete; + bool register_thread(const Thread_type, const pid_t) override { return true; } + bool unregister_thread(const pid_t) override { return true; } + bool rebalance_group(const char *, const Thread_type) override { + return true; + } + bool update_numa_aware(bool) override { return true; } + std::string take_group_snapshot() override { return std::string(); } + int get_total_node_number() override { return -1; } + int get_cpu_number_per_node() override { return -1; } + bool check_cpu_string(const std::string &) override { return true; } + + private: + Sched_affinity_manager_dummy() : Sched_affinity_manager(){}; + ~Sched_affinity_manager_dummy() override{}; + bool init(const std::map &, bool) override { + return true; + } + friend class Sched_affinity_manager; + friend class Sched_affinity_manager_numa; + +#ifdef FRIEND_TEST + FRIEND_TEST(SchedAffinityManagerDummyTest, Implementation); +#endif +}; + +#ifdef HAVE_LIBNUMA + +struct Bitmask_deleter { + void operator()(bitmask *ptr) { + if (ptr != nullptr) { + numa_free_cpumask(ptr); + } + } +}; + +using Bitmask_ptr = std::unique_ptr; + +struct Sched_affinity_group { + Bitmask_ptr avail_cpu_mask; + int avail_cpu_num; + int assigned_thread_num; +}; + +class Sched_affinity_manager_numa : public Sched_affinity_manager { + public: + Sched_affinity_manager_numa(const Sched_affinity_manager_numa &) = delete; + Sched_affinity_manager_numa &operator=(const Sched_affinity_manager_numa &) = + delete; + Sched_affinity_manager_numa(Sched_affinity_manager_numa &&) = delete; + Sched_affinity_manager_numa &operator=(Sched_affinity_manager_numa &&) = + delete; + + bool register_thread(const Thread_type thread_type, const pid_t pid) override; + bool unregister_thread(const pid_t pid) override; + bool rebalance_group(const char *cpu_string, + const Thread_type thread_type) override; + bool update_numa_aware(bool numa_aware) override; + std::string take_group_snapshot() override; + int get_total_node_number() override; + int get_cpu_number_per_node() override; + bool check_cpu_string(const std::string &cpu_string) override; + + private: + Sched_affinity_manager_numa(); + ~Sched_affinity_manager_numa() override; + bool init(const std::map &, bool) override; + bool init_sched_affinity_info(const std::string &cpu_string, + Bitmask_ptr &group_bitmask); + bool init_sched_affinity_group( + const Bitmask_ptr &group_bitmask, const bool numa_aware, + std::vector &sched_affinity_group); + bool is_thread_sched_enabled(const Thread_type thread_type); + bool bind_to_group(const pid_t pid); + bool unbind_from_group(const pid_t pid); + + bool copy_affinity(pid_t from, pid_t to); + bool reset_sched_affinity_info(const char *cpu_string, const Thread_type &, + std::vector> &); + void count_migrate_thread_num(const std::vector> &, + std::vector &, + std::vector &); + bool migrate_thread_and_setaffinity(const std::vector> &, + const std::vector &, + std::vector &); + Thread_type get_thread_type_by_pid(const pid_t pid); + static std::pair normalize_cpu_string( + const std::string &cpu_string); + /** + The sched_affinity_manager_numa instance's internal state may become + inconsistent due to some previous failure, e.g. libnuma return error. Call + fallback() to use a fallback_delegate to serve further request to + sched_affinity_manager_numa instance's public interface. This method should be + called under the protection of m_mutex. + */ + void fallback(); + + private: + int m_total_cpu_num; + int m_total_node_num; + int m_cpu_num_per_node; + bool m_numa_aware; + pid_t m_root_pid; + bool m_is_fallback; + std::unique_ptr m_fallback_delegate; + std::map> + m_sched_affinity_groups; + std::map m_thread_bitmask; + std::map> m_thread_pid; + std::map m_pid_group_id; + mysql_mutex_t m_mutex; + + friend class Sched_affinity_manager; + +#ifdef FRIEND_TEST + FRIEND_TEST(SchedAffinityManagerTest, InitSchedAffinityInfo); + FRIEND_TEST(SchedAffinityManagerTest, InitSchedAffinityGroup); + FRIEND_TEST(SchedAffinityManagerTest, NormalizeCpuString); + FRIEND_TEST(SchedAffinityManagerTest, BindToGroup); + FRIEND_TEST(SchedAffinityManagerTest, UnbindFromGroup); + FRIEND_TEST(SchedAffinityManagerTest, GetThreadTypeByPid); + FRIEND_TEST(SchedAffinityManagerTest, RegisterThread); + FRIEND_TEST(SchedAffinityManagerTest, UnregisterThread); + FRIEND_TEST(SchedAffinityManagerTest, NumaAwareDisabled); + FRIEND_TEST(SchedAffinityManagerTest, NumaAwareEnabled); + FRIEND_TEST(SchedAffinityManagerTest, RebalanceGroup); + FRIEND_TEST(SchedAffinityManagerTest, IsThreadSchedEnabled); + FRIEND_TEST(SchedAffinityManagerTest, UpdateNumaAware); + FRIEND_TEST(SchedAffinityManagerTest, AllNullptrConfig); + FRIEND_TEST(SchedAffinityManagerTest, EmptyStringConfig); + FRIEND_TEST(SchedAffinityManagerTest, EmptyContainerConfig); + FRIEND_TEST(SchedAffinityManagerTest, Fallback); +#endif +}; +#endif /* HAVE_LIBNUMA */ +} // namespace sched_affinity +#endif /* SCHED_AFFINITY_MANAGER_H */ \ No newline at end of file diff --git a/sql/sys_vars.cc b/sql/sys_vars.cc index cd1a8ab78dea..690140a13490 100644 --- a/sql/sys_vars.cc +++ b/sql/sys_vars.cc @@ -127,6 +127,7 @@ #include "sql/rpl_mta_submode.h" // MTS_PARALLEL_TYPE_DB_NAME #include "sql/rpl_replica.h" // SLAVE_THD_TYPE #include "sql/rpl_rli.h" // Relay_log_info +#include "sql/sched_affinity_manager.h" #include "sql/server_component/log_builtins_filter_imp.h" // until we have pluggable variables #include "sql/server_component/log_builtins_imp.h" #include "sql/session_tracker.h" @@ -1612,6 +1613,174 @@ static bool check_binlog_trx_compression(sys_var *self [[maybe_unused]], return false; } +bool sched_affinity_numa_aware = false; + +static bool on_sched_affinity_numa_aware_update(sys_var *, THD *, enum_var_type) +{ + if (sched_affinity::Sched_affinity_manager::get_instance() != nullptr && + !sched_affinity::Sched_affinity_manager::get_instance() + ->update_numa_aware(sched_affinity_numa_aware)) { + my_error(ER_CANNOT_UPDATE_SCHED_AFFINITY_NUMA_AWARE, MYF(0)); + return true; + } + return false; +} + +Sys_var_bool Sys_sched_affinity_numa_aware( + "sched_affinity_numa_aware", + "Schedule threads with numa information", + GLOBAL_VAR(sched_affinity_numa_aware), CMD_LINE(OPT_ARG), + DEFAULT(false), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(nullptr), ON_UPDATE(on_sched_affinity_numa_aware_update)); + +std::map sched_affinity_parameter = { + {sched_affinity::Thread_type::FOREGROUND, nullptr}, + {sched_affinity::Thread_type::LOG_WRITER, nullptr}, + {sched_affinity::Thread_type::LOG_FLUSHER, nullptr}, + {sched_affinity::Thread_type::LOG_WRITE_NOTIFIER, nullptr}, + {sched_affinity::Thread_type::LOG_FLUSH_NOTIFIER, nullptr}, + {sched_affinity::Thread_type::LOG_CHECKPOINTER, nullptr}, + {sched_affinity::Thread_type::PURGE_COORDINATOR, nullptr}}; + +static bool check_sched_affinity_parameter(sys_var *, THD *, set_var *var) { + char *c = var->save_result.string_value.str; + if (sched_affinity::Sched_affinity_manager::get_instance() != nullptr && + c != nullptr && + !sched_affinity::Sched_affinity_manager::get_instance()->check_cpu_string( + std::string(c))) { + my_error(ER_INVALID_CPU_STRING, MYF(0), c); + return true; + } + return false; +} + +static bool on_sched_affinity_foreground_thread_update(sys_var *, THD *, + enum_var_type) { + if (!sched_affinity::Sched_affinity_manager::get_instance()->rebalance_group( + sched_affinity_parameter[sched_affinity::Thread_type::FOREGROUND], + sched_affinity::Thread_type::FOREGROUND)) { + my_error(ER_CANNOT_UPDATE_SCHED_AFFINITY_PARAMETER, MYF(0), + sched_affinity::thread_type_names.at(sched_affinity::Thread_type::FOREGROUND).c_str()); + return true; + } + return false; +} + +static Sys_var_charptr Sys_sched_affinity_foreground_thread( + "sched_affinity_foreground_thread", + "The set of cpus which foreground threads will run on.", + GLOBAL_VAR(sched_affinity_parameter[sched_affinity::Thread_type::FOREGROUND]), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(nullptr), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_sched_affinity_parameter), + ON_UPDATE(on_sched_affinity_foreground_thread_update)); + +static bool on_sched_affinity_log_writer_update(sys_var *, THD *, + enum_var_type) { + if (!sched_affinity::Sched_affinity_manager::get_instance()->rebalance_group( + sched_affinity_parameter[sched_affinity::Thread_type::LOG_WRITER], + sched_affinity::Thread_type::LOG_WRITER)) { + my_error(ER_CANNOT_UPDATE_SCHED_AFFINITY_PARAMETER, MYF(0), + sched_affinity::thread_type_names.at(sched_affinity::Thread_type::LOG_WRITER).c_str()); + return true; + } + return false; +} + +static Sys_var_charptr Sys_sched_affinity_log_writer( + "sched_affinity_log_writer", + "The set of cpus which log writer thread will run on.", + GLOBAL_VAR(sched_affinity_parameter[sched_affinity::Thread_type::LOG_WRITER]), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(nullptr), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_sched_affinity_parameter), + ON_UPDATE(on_sched_affinity_log_writer_update)); + +static bool on_sched_affinity_log_flusher_update(sys_var *, THD *, enum_var_type) { + if (!sched_affinity::Sched_affinity_manager::get_instance()->rebalance_group( + sched_affinity_parameter[sched_affinity::Thread_type::LOG_FLUSHER], + sched_affinity::Thread_type::LOG_FLUSHER)) { + my_error(ER_CANNOT_UPDATE_SCHED_AFFINITY_PARAMETER, MYF(0), + sched_affinity::thread_type_names.at(sched_affinity::Thread_type::LOG_FLUSHER).c_str()); + return true; + } + return false; +} + +static Sys_var_charptr Sys_sched_affinity_log_flusher( + "sched_affinity_log_flusher", + "The set of cpus which log flusher thread will run on.", + GLOBAL_VAR(sched_affinity_parameter[sched_affinity::Thread_type::LOG_FLUSHER]), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(nullptr), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_sched_affinity_parameter), + ON_UPDATE(on_sched_affinity_log_flusher_update)); + +static bool on_sched_affinity_log_write_notifier_update(sys_var *, THD *, enum_var_type) { + if (!sched_affinity::Sched_affinity_manager::get_instance()->rebalance_group( + sched_affinity_parameter[sched_affinity::Thread_type::LOG_WRITE_NOTIFIER], + sched_affinity::Thread_type::LOG_WRITE_NOTIFIER)) { + my_error(ER_CANNOT_UPDATE_SCHED_AFFINITY_PARAMETER, MYF(0), + sched_affinity::thread_type_names.at(sched_affinity::Thread_type::LOG_WRITE_NOTIFIER).c_str()); + return true; + } + return false; +} + +static Sys_var_charptr Sys_sched_affinity_log_write_notifier( + "sched_affinity_log_write_notifier", + "The set of cpus which log write notifier thread will run on.", + GLOBAL_VAR(sched_affinity_parameter[sched_affinity::Thread_type::LOG_WRITE_NOTIFIER]), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(nullptr), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_sched_affinity_parameter), + ON_UPDATE(on_sched_affinity_log_write_notifier_update)); + +static bool on_sched_affinity_log_flush_notifier_update(sys_var *, THD *, enum_var_type) { + if (!sched_affinity::Sched_affinity_manager::get_instance()->rebalance_group( + sched_affinity_parameter[sched_affinity::Thread_type::LOG_FLUSH_NOTIFIER], + sched_affinity::Thread_type::LOG_FLUSH_NOTIFIER)) { + my_error(ER_CANNOT_UPDATE_SCHED_AFFINITY_PARAMETER, MYF(0), + sched_affinity::thread_type_names.at(sched_affinity::Thread_type::LOG_FLUSH_NOTIFIER).c_str()); + return true; + } + return false; +} + +static Sys_var_charptr Sys_sched_affinity_log_flush_notifier( + "sched_affinity_log_flush_notifier", + "The set of cpus which log flush notifier thread will run on.", + GLOBAL_VAR(sched_affinity_parameter[sched_affinity::Thread_type::LOG_FLUSH_NOTIFIER]), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(nullptr), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_sched_affinity_parameter), + ON_UPDATE(on_sched_affinity_log_flush_notifier_update)); + +static bool on_sched_affinity_log_checkpointer_update(sys_var *, THD *, enum_var_type) { + if (!sched_affinity::Sched_affinity_manager::get_instance()->rebalance_group( + sched_affinity_parameter[sched_affinity::Thread_type::LOG_CHECKPOINTER], + sched_affinity::Thread_type::LOG_CHECKPOINTER)) { + my_error(ER_CANNOT_UPDATE_SCHED_AFFINITY_PARAMETER, MYF(0), + sched_affinity::thread_type_names.at(sched_affinity::Thread_type::LOG_CHECKPOINTER).c_str()); + return true; + } + return false; +} + +static Sys_var_charptr Sys_sched_affinity_log_checkpointer( + "sched_affinity_log_checkpointer", + "The set of cpus which log checkpointer thread will run on.", + GLOBAL_VAR(sched_affinity_parameter[sched_affinity::Thread_type::LOG_CHECKPOINTER]), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(nullptr), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_sched_affinity_parameter), + ON_UPDATE(on_sched_affinity_log_checkpointer_update)); + +static bool on_sched_affinity_purge_coordinator_update(sys_var *, THD *, enum_var_type) { + if (!sched_affinity::Sched_affinity_manager::get_instance()->rebalance_group( + sched_affinity_parameter[sched_affinity::Thread_type::PURGE_COORDINATOR], + sched_affinity::Thread_type::PURGE_COORDINATOR)) { + my_error(ER_CANNOT_UPDATE_SCHED_AFFINITY_PARAMETER, MYF(0), + sched_affinity::thread_type_names.at(sched_affinity::Thread_type::PURGE_COORDINATOR).c_str()); + return true; + } + return false; +} + +static Sys_var_charptr Sys_sched_affinity_purge_coordinator( + "sched_affinity_purge_coordinator", + "The set of cpus which purge coordinator thread will run on.", + GLOBAL_VAR(sched_affinity_parameter[sched_affinity::Thread_type::PURGE_COORDINATOR]), CMD_LINE(REQUIRED_ARG), + IN_FS_CHARSET, DEFAULT(nullptr), NO_MUTEX_GUARD, NOT_IN_BINLOG, ON_CHECK(check_sched_affinity_parameter), + ON_UPDATE(on_sched_affinity_purge_coordinator_update)); + static Sys_var_bool Sys_binlog_trx_compression( "binlog_transaction_compression", "Whether to compress transactions or not. Transactions are compressed " diff --git a/storage/innobase/CMakeLists.txt b/storage/innobase/CMakeLists.txt index cd0f3aaf0963..d5a3d915b3a6 100644 --- a/storage/innobase/CMakeLists.txt +++ b/storage/innobase/CMakeLists.txt @@ -55,8 +55,11 @@ ENDIF() INCLUDE_DIRECTORIES( ${CMAKE_SOURCE_DIR}/sql ${CMAKE_SOURCE_DIR}/sql/auth + ${GMOCK_INCLUDE_DIRS} ) +INCLUDE_DIRECTORIES(${GMOCK_INCLUDE_DIRS}) + # Conflicting YYSTYPE, because we have multiple Bison grammars. # WL#11100 Migrate to Bison 3.x should fix this. # diff --git a/storage/innobase/log/log0chkp.cc b/storage/innobase/log/log0chkp.cc index 9e33ab36f3a8..84de2d639699 100644 --- a/storage/innobase/log/log0chkp.cc +++ b/storage/innobase/log/log0chkp.cc @@ -38,6 +38,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA *******************************************************/ +#include + /* std::chrono::X */ #include @@ -94,6 +96,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA /* os_event_wait_time_low */ #include "os0event.h" +#include "sql/sched_affinity_manager.h" + /* MONITOR_INC, ... */ #include "srv0mon.h" @@ -914,6 +918,17 @@ static void log_consider_checkpoint(log_t &log) { } void log_checkpointer(log_t *log_ptr) { + auto sched_affinity_manager = sched_affinity::Sched_affinity_manager::get_instance(); + bool is_registered_to_sched_affinity = false; + auto pid = sched_affinity::gettid(); + if (sched_affinity_manager != nullptr && + !(is_registered_to_sched_affinity = + sched_affinity_manager->register_thread( + sched_affinity::Thread_type::LOG_CHECKPOINTER, pid))) { + ib::error(ER_CANNOT_REGISTER_THREAD_TO_SCHED_AFFINIFY_MANAGER) + << "log_checkpointer"; + } + ut_a(log_ptr != nullptr); log_t &log = *log_ptr; @@ -1008,6 +1023,12 @@ void log_checkpointer(log_t *log_ptr) { } } + if (is_registered_to_sched_affinity && + !sched_affinity_manager->unregister_thread(pid)) { + ib::error(ER_CANNOT_UNREGISTER_THREAD_FROM_SCHED_AFFINIFY_MANAGER) + << "log_checkpointer"; + } + ut_d(destroy_internal_thd(log.m_checkpointer_thd)); } diff --git a/storage/innobase/log/log0write.cc b/storage/innobase/log/log0write.cc index 822fdd9feb16..4dae24c1f2fd 100644 --- a/storage/innobase/log/log0write.cc +++ b/storage/innobase/log/log0write.cc @@ -40,6 +40,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #ifndef UNIV_HOTBACKUP +#include + /* std::memory_order_* */ #include @@ -91,6 +93,8 @@ Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA /* create_internal_thd, destroy_internal_thd */ #include "sql/sql_thd_internal_api.h" +#include "sql/sched_affinity_manager.h" + /* MONITOR_INC, ... */ #include "srv0mon.h" @@ -2226,6 +2230,17 @@ static bool log_writer_is_allowed_to_stop(log_t &log) { } void log_writer(log_t *log_ptr) { + auto sched_affinity_manager = sched_affinity::Sched_affinity_manager::get_instance(); + auto pid = sched_affinity::gettid(); + bool is_registered_to_sched_affinity = false; + if (sched_affinity_manager != nullptr && + !(is_registered_to_sched_affinity = + sched_affinity_manager->register_thread( + sched_affinity::Thread_type::LOG_WRITER, pid))) { + ib::error(ER_CANNOT_REGISTER_THREAD_TO_SCHED_AFFINIFY_MANAGER) + << "log_writer"; + } + ut_a(log_ptr != nullptr); log_t &log = *log_ptr; @@ -2235,6 +2250,12 @@ void log_writer(log_t *log_ptr) { log_writer_mutex_enter(log); + if (is_registered_to_sched_affinity && + !sched_affinity_manager->unregister_thread(pid)) { + ib::error(ER_CANNOT_UNREGISTER_THREAD_FROM_SCHED_AFFINIFY_MANAGER) + << "log_writer"; + } + Log_thread_waiting waiting{log, log.writer_event, srv_log_writer_spin_delay, get_srv_log_writer_timeout()}; @@ -2491,6 +2512,17 @@ static void log_flush_low(log_t &log) { } void log_flusher(log_t *log_ptr) { + auto sched_affinity_manager = sched_affinity::Sched_affinity_manager::get_instance(); + bool is_registered_to_sched_affinity = false; + auto pid = sched_affinity::gettid(); + if (sched_affinity_manager != nullptr && + !(is_registered_to_sched_affinity = + sched_affinity_manager->register_thread( + sched_affinity::Thread_type::LOG_FLUSHER, pid))) { + ib::error(ER_CANNOT_REGISTER_THREAD_TO_SCHED_AFFINIFY_MANAGER) + << "log_flusher"; + } + ut_a(log_ptr != nullptr); log_t &log = *log_ptr; @@ -2500,6 +2532,12 @@ void log_flusher(log_t *log_ptr) { log_flusher_mutex_enter(log); + if (is_registered_to_sched_affinity && + !sched_affinity_manager->unregister_thread(pid)) { + ib::error(ER_CANNOT_UNREGISTER_THREAD_FROM_SCHED_AFFINIFY_MANAGER) + << "log_flusher"; + } + for (uint64_t step = 0;; ++step) { if (log.should_stop_threads.load()) { if (!log_writer_is_active()) { @@ -2628,6 +2666,17 @@ void log_flusher(log_t *log_ptr) { /** @{ */ void log_write_notifier(log_t *log_ptr) { + auto sched_affinity_manager = sched_affinity::Sched_affinity_manager::get_instance(); + bool is_registered_to_sched_affinity = false; + auto pid = sched_affinity::gettid(); + if (sched_affinity_manager != nullptr && + !(is_registered_to_sched_affinity = + sched_affinity_manager->register_thread( + sched_affinity::Thread_type::LOG_WRITE_NOTIFIER, pid))) { + ib::error(ER_CANNOT_REGISTER_THREAD_TO_SCHED_AFFINIFY_MANAGER) + << "log_write_notifier"; + } + ut_a(log_ptr != nullptr); log_t &log = *log_ptr; @@ -2654,6 +2703,12 @@ void log_write_notifier(log_t *log_ptr) { ut_ad(log.write_notifier_resume_lsn.load(std::memory_order_acquire) == 0); log_write_notifier_mutex_exit(log); + if (is_registered_to_sched_affinity && + !sched_affinity_manager->unregister_thread(pid)) { + ib::error(ER_CANNOT_UNREGISTER_THREAD_FROM_SCHED_AFFINIFY_MANAGER) + << "log_write_notifier"; + } + /* set to acknowledge */ log.write_notifier_resume_lsn.store(lsn, std::memory_order_release); @@ -2750,6 +2805,17 @@ void log_write_notifier(log_t *log_ptr) { /** @{ */ void log_flush_notifier(log_t *log_ptr) { + auto sched_affinity_manager = sched_affinity::Sched_affinity_manager::get_instance(); + bool is_registered_to_sched_affinity = false; + auto pid = sched_affinity::gettid(); + if (sched_affinity_manager != nullptr && + !(is_registered_to_sched_affinity = + sched_affinity_manager->register_thread( + sched_affinity::Thread_type::LOG_FLUSH_NOTIFIER, pid))) { + ib::error(ER_CANNOT_REGISTER_THREAD_TO_SCHED_AFFINIFY_MANAGER) + << "log_flush_notifier"; + } + ut_a(log_ptr != nullptr); log_t &log = *log_ptr; @@ -2859,6 +2925,12 @@ void log_flush_notifier(log_t *log_ptr) { } log_flush_notifier_mutex_exit(log); + + if (is_registered_to_sched_affinity && + !sched_affinity_manager->unregister_thread(pid)) { + ib::error(ER_CANNOT_UNREGISTER_THREAD_FROM_SCHED_AFFINIFY_MANAGER) + << "log_flush_notifier"; + } } /** @} */ diff --git a/storage/innobase/srv/srv0srv.cc b/storage/innobase/srv/srv0srv.cc index 2300445c54b2..5b1cec7afadd 100644 --- a/storage/innobase/srv/srv0srv.cc +++ b/storage/innobase/srv/srv0srv.cc @@ -50,6 +50,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include #include #include +#include #include #include @@ -78,6 +79,7 @@ this program; if not, write to the Free Software Foundation, Inc., #include "pars0pars.h" #include "que0que.h" #include "row0mysql.h" +#include "sql/sched_affinity_manager.h" #include "sql/sql_class.h" #include "sql_thd_internal_api.h" #include "srv0mon.h" @@ -3037,6 +3039,17 @@ static void srv_purge_coordinator_suspend( /** Purge coordinator thread that schedules the purge tasks. */ void srv_purge_coordinator_thread() { + auto sched_affinity_manager = sched_affinity::Sched_affinity_manager::get_instance(); + bool is_registered_to_sched_affinity = false; + auto pid = sched_affinity::gettid(); + if (sched_affinity_manager != nullptr && + !(is_registered_to_sched_affinity = + sched_affinity_manager->register_thread( + sched_affinity::Thread_type::PURGE_COORDINATOR, pid))) { + ib::error(ER_CANNOT_REGISTER_THREAD_TO_SCHED_AFFINIFY_MANAGER) + << "purge_coordinator"; + } + srv_slot_t *slot; THD *thd = create_internal_thd(); @@ -3150,6 +3163,12 @@ void srv_purge_coordinator_thread() { srv_thread_delay_cleanup_if_needed(false); destroy_internal_thd(thd); + + if (is_registered_to_sched_affinity && + !sched_affinity_manager->unregister_thread(pid)) { + ib::error(ER_CANNOT_UNREGISTER_THREAD_FROM_SCHED_AFFINIFY_MANAGER) + << "purge_coordinator"; + } } /** Enqueues a task to server task queue and releases a worker thread, if there