From 070967e895c9530c5a24ec9ae40bdfeb65dd48bc Mon Sep 17 00:00:00 2001 From: chcwww Date: Sun, 26 Jan 2025 02:34:42 +0800 Subject: [PATCH 01/11] Add parallel training for OVR --- libmultilabel/linear/linear.py | 8 +- libmultilabel/linear/parallel.py | 157 +++++++++++++++++++++++++++++++ 2 files changed, 161 insertions(+), 4 deletions(-) create mode 100644 libmultilabel/linear/parallel.py diff --git a/libmultilabel/linear/linear.py b/libmultilabel/linear/linear.py index 7ea9b9bb..f4c0c048 100644 --- a/libmultilabel/linear/linear.py +++ b/libmultilabel/linear/linear.py @@ -2,6 +2,7 @@ import logging import os +import re import numpy as np import scipy.sparse as sparse @@ -101,9 +102,8 @@ def train_1vsrest( if verbose: logging.info(f"Training one-vs-rest model on {num_class} labels") - for i in tqdm(range(num_class), disable=not verbose): - yi = y[:, i].toarray().reshape(-1) - weights[:, i] = _do_train(2 * yi - 1, x, options).ravel() + from .parallel import train_parallel_1vsrest + train_parallel_1vsrest(y, x, options, num_class, weights, verbose) return FlatModel( name="1vsrest", @@ -159,7 +159,7 @@ def _prepare_options(x: sparse.csr_matrix, options: str) -> tuple[sparse.csr_mat options_split.append(f"-m {int(os.cpu_count() / 2)}") options = " ".join(options_split) - return x, options, bias + return x, re.sub(r"-m\s+\d+", "", options), bias def train_thresholding( diff --git a/libmultilabel/linear/parallel.py b/libmultilabel/linear/parallel.py new file mode 100644 index 00000000..7212044b --- /dev/null +++ b/libmultilabel/linear/parallel.py @@ -0,0 +1,157 @@ +import os +import threading +from queue import Queue +from tqdm import tqdm + +import numpy as np +import scipy.sparse as sparse +from liblinear.liblinearutil import train, parameter, problem + +from ctypes import c_double + +class ParallelTrainer(threading.Thread): + """A trainer for parallel 1vsrest training.""" + y: sparse.csc_matrix + x: sparse.csr_matrix + prob_var: dict + param: parameter + weights: np.ndarray + pbar: tqdm + queue: Queue[int] + lock: threading.Lock + + def __init__(self): + threading.Thread.__init__(self) + + @classmethod + def init_trainer( + cls, + y: sparse.csc_matrix, + x: sparse.csr_matrix, + options: str, + num_class: int, + weights: np.ndarray, + verbose: bool, + ): + """Initialize the parallel trainer, setting y, x, parameter and threading related + variables as static variable of ParallelTrainer. + + Args: + y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. + x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. + options (str): The option string passed to liblinear. + num_class (int): Number of class. + weights (np.ndarray): the weights. + verbose (bool): Output extra progress information. + """ + cls.x = x + cls.y = y + prob = problem(np.ones((y.shape[0],)), x) # pre-compute x for every OVR training tasks + cls.prob_var = {key: getattr(prob, key) for key in problem._names} + cls.param = parameter(options) + cls.param.w_recalc = True # only works for solving L1/L2-SVM dual + cls.weights = weights + cls.pbar= tqdm(total=num_class, disable=not verbose) + cls.queue = Queue() + cls.lock = threading.Lock() + + for i in range(num_class): + cls.queue.put(i) + + @staticmethod + def _do_parallel_train(prob: problem, param: parameter) -> np.matrix: + """Wrapper around liblinear.liblinearutil.train. + Forcibly suppresses all IO regardless of options. + + Args: + prob (problem): A liblinear.problem ready to train. + param (parameter): The liblinear.parameter passed to liblinear. + + Returns: + np.matrix: the weights. + """ + if prob.l == 0: + return np.matrix(np.zeros((prob.n, 1))) + + model = train(prob, param) + + w = np.ctypeslib.as_array(model.w, (prob.n, 1)) + w = np.asmatrix(w) + # Liblinear flips +1/-1 labels so +1 is always the first label, + # but not if all labels are -1. + # For our usage, we need +1 to always be the first label, + # so the check is necessary. + if model.get_labels()[0] == -1: + return -w + else: + # The memory is freed on model deletion so we make a copy. + return w.copy() + + def set_problem(self, label_idx: int) -> problem: + """Prepare a problem for parallel training with given label index and + pre-computed x (**feature_node). + + Args: + label_idx (int): label index for the problem currently solving. + + Returns: + problem: A problem prepared for liblinear.train. + """ + # Build a new problem in small cost (because we'll call train, which is C API + # from liblinear later, GIL may break and race condition could happend in python. + # Thus, instead of pre-compute a problem as class variable, we have to build + # a new one for every OVR tasks) + prob = problem(np.ones((1, 1)), np.ones((1, 1))) + for key in problem._names: + setattr(prob, key, self.prob_var[key]) # overwrite with pre-computed attributes + + # Build y pointer with label index + yi = self.y[:, label_idx].toarray().reshape(-1) + y_prob = (c_double * prob.l)() + np.ctypeslib.as_array(y_prob, (prob.l,))[:] = 2 * yi - 1 + prob.y = y_prob + return prob + + def run(self): + while self.queue.qsize() > 0: + label_idx = self.queue.get() + + weight = self._do_parallel_train(self.set_problem(label_idx), self.param).ravel() + + self.lock.acquire() + self.weights[:, label_idx] = weight + self.pbar.update() + self.lock.release() + +def train_parallel_1vsrest( + y: sparse.csc_matrix, + x: sparse.csr_matrix, + options: str, + num_class: int, + weights: np.ndarray, + verbose: bool, + ): + """Parallel training on labels when using one-vs-rest strategy, + and save trained weights by reference. + + Args: + y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. + x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. + options (str): The option string passed to liblinear. + num_class (int): Number of class. + weights (np.ndarray): the weights. + verbose (bool): Output extra progress information. + """ + ParallelTrainer.init_trainer(y, x, options, num_class, weights, verbose) + num_thread = int(os.cpu_count() / 2) + # stderr = os.dup(2) + trainers = [ParallelTrainer() for _ in range(num_thread)] + + for trainer in trainers: + trainer.start() + for trainer in trainers: + trainer.join() + + # os.dup2(stderr, 2) + # os.close(stderr) + ParallelTrainer.pbar.close() From 1da3d9fbbf49ebb4dc99da548ddd11acc9479577 Mon Sep 17 00:00:00 2001 From: chcwww Date: Sun, 26 Jan 2025 15:49:17 +0800 Subject: [PATCH 02/11] Use simplequeue, remove lock and build problem faster --- libmultilabel/linear/parallel.py | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/libmultilabel/linear/parallel.py b/libmultilabel/linear/parallel.py index 7212044b..64f8e08a 100644 --- a/libmultilabel/linear/parallel.py +++ b/libmultilabel/linear/parallel.py @@ -1,6 +1,6 @@ import os import threading -from queue import Queue +from queue import SimpleQueue from tqdm import tqdm import numpy as np @@ -17,8 +17,7 @@ class ParallelTrainer(threading.Thread): param: parameter weights: np.ndarray pbar: tqdm - queue: Queue[int] - lock: threading.Lock + queue: SimpleQueue[int] def __init__(self): threading.Thread.__init__(self) @@ -52,8 +51,7 @@ def init_trainer( cls.param.w_recalc = True # only works for solving L1/L2-SVM dual cls.weights = weights cls.pbar= tqdm(total=num_class, disable=not verbose) - cls.queue = Queue() - cls.lock = threading.Lock() + cls.queue = SimpleQueue() for i in range(num_class): cls.queue.put(i) @@ -61,7 +59,6 @@ def init_trainer( @staticmethod def _do_parallel_train(prob: problem, param: parameter) -> np.matrix: """Wrapper around liblinear.liblinearutil.train. - Forcibly suppresses all IO regardless of options. Args: prob (problem): A liblinear.problem ready to train. @@ -101,7 +98,7 @@ def set_problem(self, label_idx: int) -> problem: # from liblinear later, GIL may break and race condition could happend in python. # Thus, instead of pre-compute a problem as class variable, we have to build # a new one for every OVR tasks) - prob = problem(np.ones((1, 1)), np.ones((1, 1))) + prob = problem([0], [[0]]) for key in problem._names: setattr(prob, key, self.prob_var[key]) # overwrite with pre-computed attributes @@ -117,11 +114,8 @@ def run(self): label_idx = self.queue.get() weight = self._do_parallel_train(self.set_problem(label_idx), self.param).ravel() - - self.lock.acquire() self.weights[:, label_idx] = weight self.pbar.update() - self.lock.release() def train_parallel_1vsrest( y: sparse.csc_matrix, From 8b0410f6f6054c14765c1bb5230f7b4be7109137 Mon Sep 17 00:00:00 2001 From: chcwww Date: Sun, 26 Jan 2025 16:21:06 +0800 Subject: [PATCH 03/11] Fix annotation for python 3.8 --- libmultilabel/linear/parallel.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libmultilabel/linear/parallel.py b/libmultilabel/linear/parallel.py index 64f8e08a..ad907094 100644 --- a/libmultilabel/linear/parallel.py +++ b/libmultilabel/linear/parallel.py @@ -17,7 +17,7 @@ class ParallelTrainer(threading.Thread): param: parameter weights: np.ndarray pbar: tqdm - queue: SimpleQueue[int] + queue: SimpleQueue def __init__(self): threading.Thread.__init__(self) From cd801a8fd81209b2b3efb18686009477851428e2 Mon Sep 17 00:00:00 2001 From: chcwww Date: Tue, 18 Feb 2025 12:54:39 +0000 Subject: [PATCH 04/11] adjust comments --- libmultilabel/linear/parallel.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/libmultilabel/linear/parallel.py b/libmultilabel/linear/parallel.py index ad907094..9899ab61 100644 --- a/libmultilabel/linear/parallel.py +++ b/libmultilabel/linear/parallel.py @@ -32,8 +32,8 @@ def init_trainer( weights: np.ndarray, verbose: bool, ): - """Initialize the parallel trainer, setting y, x, parameter and threading related - variables as static variable of ParallelTrainer. + """Initialize the parallel trainer by setting y, x, parameter and threading related + variables as class variable of ParallelTrainer. Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. @@ -61,8 +61,8 @@ def _do_parallel_train(prob: problem, param: parameter) -> np.matrix: """Wrapper around liblinear.liblinearutil.train. Args: - prob (problem): A liblinear.problem ready to train. - param (parameter): The liblinear.parameter passed to liblinear. + prob (problem): A preprocessed liblinear.problem instance. + param (parameter): A preprocessed liblinear.parameter instance. Returns: np.matrix: the weights. @@ -85,8 +85,8 @@ def _do_parallel_train(prob: problem, param: parameter) -> np.matrix: return w.copy() def set_problem(self, label_idx: int) -> problem: - """Prepare a problem for parallel training with given label index and - pre-computed x (**feature_node). + """Prepare a problem instance for parallel training using the + given label index and pre-computed x (POINTER(feature_node) * l). Args: label_idx (int): label index for the problem currently solving. @@ -94,13 +94,13 @@ def set_problem(self, label_idx: int) -> problem: Returns: problem: A problem prepared for liblinear.train. """ - # Build a new problem in small cost (because we'll call train, which is C API - # from liblinear later, GIL may break and race condition could happend in python. - # Thus, instead of pre-compute a problem as class variable, we have to build + # Build a new problem in small cost. (Since we'll call train, which is C API from + # liblinear later, GIL may released and race condition could happen in python. + # Thus, instead of pre-computing a problem as class variable, we have to build # a new one for every OVR tasks) prob = problem([0], [[0]]) for key in problem._names: - setattr(prob, key, self.prob_var[key]) # overwrite with pre-computed attributes + setattr(prob, key, self.prob_var[key]) # restore pre-computed attributes # Build y pointer with label index yi = self.y[:, label_idx].toarray().reshape(-1) @@ -126,7 +126,7 @@ def train_parallel_1vsrest( verbose: bool, ): """Parallel training on labels when using one-vs-rest strategy, - and save trained weights by reference. + and saving trained weights by reference. Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. From 145331263ae40b9635d8a4e0070d81a37546aa40 Mon Sep 17 00:00:00 2001 From: chcwww Date: Tue, 25 Feb 2025 15:02:30 +0800 Subject: [PATCH 05/11] use a partial problem instead of building a complete one --- libmultilabel/linear/parallel.py | 28 ++++++++++++++++++---------- 1 file changed, 18 insertions(+), 10 deletions(-) diff --git a/libmultilabel/linear/parallel.py b/libmultilabel/linear/parallel.py index 9899ab61..3652ccd9 100644 --- a/libmultilabel/linear/parallel.py +++ b/libmultilabel/linear/parallel.py @@ -5,9 +5,19 @@ import numpy as np import scipy.sparse as sparse -from liblinear.liblinearutil import train, parameter, problem +from liblinear.liblinearutil import train, parameter, problem, feature_node -from ctypes import c_double +import ctypes +from dataclasses import dataclass + +@dataclass +class PartialProblem(problem): + """A liblinear.problem with only C attributes""" + l: ctypes.c_int + n: ctypes.c_int + y: ctypes.POINTER(ctypes.c_double) + x: ctypes.POINTER(ctypes.POINTER(feature_node)) + bias: ctypes.c_double class ParallelTrainer(threading.Thread): """A trainer for parallel 1vsrest training.""" @@ -94,17 +104,15 @@ def set_problem(self, label_idx: int) -> problem: Returns: problem: A problem prepared for liblinear.train. """ - # Build a new problem in small cost. (Since we'll call train, which is C API from - # liblinear later, GIL may released and race condition could happen in python. - # Thus, instead of pre-computing a problem as class variable, we have to build - # a new one for every OVR tasks) - prob = problem([0], [[0]]) - for key in problem._names: - setattr(prob, key, self.prob_var[key]) # restore pre-computed attributes + # Build a new problem with prob_var (avoid x copy) + prob = PartialProblem(**self.prob_var) + # prob = problem([0], [[0]]) + # for key in problem._names: + # setattr(prob, key, self.prob_var[key]) # overwrite with pre-computed attributes # Build y pointer with label index yi = self.y[:, label_idx].toarray().reshape(-1) - y_prob = (c_double * prob.l)() + y_prob = (ctypes.c_double * prob.l)() np.ctypeslib.as_array(y_prob, (prob.l,))[:] = 2 * yi - 1 prob.y = y_prob return prob From e130504863cf0067dc4f18179cfa31e3a0827977 Mon Sep 17 00:00:00 2001 From: chcwww Date: Wed, 28 May 2025 18:16:50 +0800 Subject: [PATCH 06/11] use python-level parallel as default in ovr training --- libmultilabel/linear/linear.py | 23 ++--- libmultilabel/linear/parallel.py | 142 +++++++++++++------------------ 2 files changed, 66 insertions(+), 99 deletions(-) diff --git a/libmultilabel/linear/linear.py b/libmultilabel/linear/linear.py index f4c0c048..0e27956f 100644 --- a/libmultilabel/linear/linear.py +++ b/libmultilabel/linear/linear.py @@ -2,11 +2,11 @@ import logging import os -import re import numpy as np import scipy.sparse as sparse from liblinear.liblinearutil import train, problem, parameter +from .parallel import train_parallel_1vsrest from tqdm import tqdm __all__ = [ @@ -95,15 +95,7 @@ def train_1vsrest( # Follows the MATLAB implementation at https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/multilabel/ x, options, bias = _prepare_options(x, options) - y = y.tocsc() - num_class = y.shape[1] - num_feature = x.shape[1] - weights = np.zeros((num_feature, num_class), order="F") - - if verbose: - logging.info(f"Training one-vs-rest model on {num_class} labels") - from .parallel import train_parallel_1vsrest - train_parallel_1vsrest(y, x, options, num_class, weights, verbose) + weights = train_parallel_1vsrest(y, x, options, verbose) return FlatModel( name="1vsrest", @@ -159,7 +151,7 @@ def _prepare_options(x: sparse.csr_matrix, options: str) -> tuple[sparse.csr_mat options_split.append(f"-m {int(os.cpu_count() / 2)}") options = " ".join(options_split) - return x, re.sub(r"-m\s+\d+", "", options), bias + return x, options, bias def train_thresholding( @@ -341,10 +333,11 @@ def _do_train(y: np.ndarray, x: sparse.csr_matrix, options: str) -> np.matrix: w = np.ctypeslib.as_array(model.w, (x.shape[1], 1)) w = np.asmatrix(w) - # Liblinear flips +1/-1 labels so +1 is always the first label, - # but not if all labels are -1. - # For our usage, we need +1 to always be the first label, - # so the check is necessary. + # When all labels are -1, we must flip the sign of the weights + # because LIBLINEAR treats the first label as positive, which + # is -1 in this case. But for our usage we need them to be negative. + # For labels with both +1 and -1, LIBLINEAR guarantees that +1 + # is always the first label. if model.get_labels()[0] == -1: return -w else: diff --git a/libmultilabel/linear/parallel.py b/libmultilabel/linear/parallel.py index 3652ccd9..dab4ad17 100644 --- a/libmultilabel/linear/parallel.py +++ b/libmultilabel/linear/parallel.py @@ -1,33 +1,28 @@ +from __future__ import annotations + import os +import re +import logging import threading -from queue import SimpleQueue +import queue from tqdm import tqdm import numpy as np import scipy.sparse as sparse -from liblinear.liblinearutil import train, parameter, problem, feature_node +from liblinear.liblinearutil import train, parameter, problem, solver_names -import ctypes -from dataclasses import dataclass +from ctypes import c_double -@dataclass -class PartialProblem(problem): - """A liblinear.problem with only C attributes""" - l: ctypes.c_int - n: ctypes.c_int - y: ctypes.POINTER(ctypes.c_double) - x: ctypes.POINTER(ctypes.POINTER(feature_node)) - bias: ctypes.c_double class ParallelTrainer(threading.Thread): """A trainer for parallel 1vsrest training.""" y: sparse.csc_matrix x: sparse.csr_matrix - prob_var: dict + prob: problem param: parameter weights: np.ndarray pbar: tqdm - queue: SimpleQueue + queue: queue.SimpleQueue def __init__(self): threading.Thread.__init__(self) @@ -38,122 +33,101 @@ def init_trainer( y: sparse.csc_matrix, x: sparse.csr_matrix, options: str, - num_class: int, - weights: np.ndarray, verbose: bool, ): """Initialize the parallel trainer by setting y, x, parameter and threading related variables as class variable of ParallelTrainer. Args: - y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. + y (sparse.csc_matrix): A 0/1 matrix with dimensions number of instances * number of classes. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. options (str): The option string passed to liblinear. - num_class (int): Number of class. - weights (np.ndarray): the weights. verbose (bool): Output extra progress information. """ + cls.y = y.tocsc() cls.x = x - cls.y = y - prob = problem(np.ones((y.shape[0],)), x) # pre-compute x for every OVR training tasks - cls.prob_var = {key: getattr(prob, key) for key in problem._names} - cls.param = parameter(options) - cls.param.w_recalc = True # only works for solving L1/L2-SVM dual - cls.weights = weights - cls.pbar= tqdm(total=num_class, disable=not verbose) - cls.queue = SimpleQueue() - - for i in range(num_class): + num_instances, num_classes = cls.y.shape + num_features = cls.x.shape[1] + cls.prob = problem(np.ones((num_instances,)), cls.x) + cls.param = parameter(re.sub(r"-m\s+\d+", "", options)) + if cls.param.solver_type in [solver_names.L2R_L1LOSS_SVC_DUAL, solver_names.L2R_L2LOSS_SVC_DUAL]: + cls.param.w_recalc = True # only works for solving L1/L2-SVM dual + cls.weights = np.zeros((num_features, num_classes), order="F") + cls.pbar = tqdm(total=num_classes, disable=not verbose) + cls.queue = queue.SimpleQueue() + + if verbose: + logging.info(f"Training one-vs-rest model on {num_classes} labels") + for i in range(num_classes): cls.queue.put(i) - @staticmethod - def _do_parallel_train(prob: problem, param: parameter) -> np.matrix: - """Wrapper around liblinear.liblinearutil.train. + def _do_parallel_train(self, y: np.ndarray) -> np.matrix: + """Wrap around liblinear.liblinearutil.train. Args: - prob (problem): A preprocessed liblinear.problem instance. - param (parameter): A preprocessed liblinear.parameter instance. + y (np.ndarray): A +1/-1 array with dimensions number of instances * 1. Returns: - np.matrix: the weights. + np.matrix: The weights. """ - if prob.l == 0: - return np.matrix(np.zeros((prob.n, 1))) + if y.shape[0] == 0: + return np.matrix(np.zeros((self.prob.n, 1))) - model = train(prob, param) + prob = self.prob.copy() + prob.y = (c_double * prob.l)(*y) + model = train(prob, self.param) - w = np.ctypeslib.as_array(model.w, (prob.n, 1)) + w = np.ctypeslib.as_array(model.w, (self.prob.n, 1)) w = np.asmatrix(w) - # Liblinear flips +1/-1 labels so +1 is always the first label, - # but not if all labels are -1. - # For our usage, we need +1 to always be the first label, - # so the check is necessary. + # When all labels are -1, we must flip the sign of the weights + # because LIBLINEAR treats the first label as positive, which + # is -1 in this case. But for our usage we need them to be negative. + # For labels with both +1 and -1, LIBLINEAR guarantees that +1 + # is always the first label. if model.get_labels()[0] == -1: return -w else: # The memory is freed on model deletion so we make a copy. return w.copy() - def set_problem(self, label_idx: int) -> problem: - """Prepare a problem instance for parallel training using the - given label index and pre-computed x (POINTER(feature_node) * l). - - Args: - label_idx (int): label index for the problem currently solving. - - Returns: - problem: A problem prepared for liblinear.train. - """ - # Build a new problem with prob_var (avoid x copy) - prob = PartialProblem(**self.prob_var) - # prob = problem([0], [[0]]) - # for key in problem._names: - # setattr(prob, key, self.prob_var[key]) # overwrite with pre-computed attributes - - # Build y pointer with label index - yi = self.y[:, label_idx].toarray().reshape(-1) - y_prob = (ctypes.c_double * prob.l)() - np.ctypeslib.as_array(y_prob, (prob.l,))[:] = 2 * yi - 1 - prob.y = y_prob - return prob - def run(self): - while self.queue.qsize() > 0: - label_idx = self.queue.get() + while True: + try: + label_idx = self.queue.get_nowait() + except queue.Empty: + break + + yi = self.y[:, label_idx].toarray().reshape(-1) + self.weights[:, label_idx] = self._do_parallel_train(2 * yi - 1).ravel() - weight = self._do_parallel_train(self.set_problem(label_idx), self.param).ravel() - self.weights[:, label_idx] = weight self.pbar.update() + def train_parallel_1vsrest( - y: sparse.csc_matrix, + y: sparse.csr_matrix, x: sparse.csr_matrix, options: str, - num_class: int, - weights: np.ndarray, verbose: bool, - ): - """Parallel training on labels when using one-vs-rest strategy, - and saving trained weights by reference. + ) -> np.matrix: + """Parallel training on labels when using one-vs-rest strategy. Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. options (str): The option string passed to liblinear. - num_class (int): Number of class. - weights (np.ndarray): the weights. verbose (bool): Output extra progress information. + + Returns: + np.matrix: The weights. """ - ParallelTrainer.init_trainer(y, x, options, num_class, weights, verbose) - num_thread = int(os.cpu_count() / 2) - # stderr = os.dup(2) - trainers = [ParallelTrainer() for _ in range(num_thread)] + ParallelTrainer.init_trainer(y, x, options, verbose) + num_threads = int(os.cpu_count() / 2) + trainers = [ParallelTrainer() for _ in range(num_threads)] for trainer in trainers: trainer.start() for trainer in trainers: trainer.join() - # os.dup2(stderr, 2) - # os.close(stderr) ParallelTrainer.pbar.close() + return ParallelTrainer.weights From 1db54f6cbdb7b33d59e06227d41c5ebf048c2d97 Mon Sep 17 00:00:00 2001 From: chcwww Date: Sat, 31 May 2025 15:21:20 +0800 Subject: [PATCH 07/11] apply black formatting --- libmultilabel/linear/linear.py | 2 +- libmultilabel/linear/parallel.py | 11 ++++++----- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/libmultilabel/linear/linear.py b/libmultilabel/linear/linear.py index 0e27956f..94426c07 100644 --- a/libmultilabel/linear/linear.py +++ b/libmultilabel/linear/linear.py @@ -327,7 +327,7 @@ def _do_train(y: np.ndarray, x: sparse.csr_matrix, options: str) -> np.matrix: prob = problem(y, x) param = parameter(options) - param.w_recalc = True # only works for solving L1/L2-SVM dual + param.w_recalc = True # only works for solving L1/L2-SVM dual with silent_stderr(): model = train(prob, param) diff --git a/libmultilabel/linear/parallel.py b/libmultilabel/linear/parallel.py index dab4ad17..3271b010 100644 --- a/libmultilabel/linear/parallel.py +++ b/libmultilabel/linear/parallel.py @@ -16,6 +16,7 @@ class ParallelTrainer(threading.Thread): """A trainer for parallel 1vsrest training.""" + y: sparse.csc_matrix x: sparse.csr_matrix prob: problem @@ -104,11 +105,11 @@ def run(self): def train_parallel_1vsrest( - y: sparse.csr_matrix, - x: sparse.csr_matrix, - options: str, - verbose: bool, - ) -> np.matrix: + y: sparse.csr_matrix, + x: sparse.csr_matrix, + options: str, + verbose: bool, +) -> np.matrix: """Parallel training on labels when using one-vs-rest strategy. Args: From 1d3ff6db1c885e9108543341e732084a16b11a84 Mon Sep 17 00:00:00 2001 From: chcwww Date: Wed, 11 Jun 2025 17:24:14 +0800 Subject: [PATCH 08/11] move parallel module to linear.py and fix comments - add del_trainer as a classmethod to ParallelOVRTrainer to ensure a proper close --- libmultilabel/linear/linear.py | 128 ++++++++++++++++++++++++++--- libmultilabel/linear/parallel.py | 134 ------------------------------- 2 files changed, 119 insertions(+), 143 deletions(-) delete mode 100644 libmultilabel/linear/parallel.py diff --git a/libmultilabel/linear/linear.py b/libmultilabel/linear/linear.py index 4a67928a..2b3157ca 100644 --- a/libmultilabel/linear/linear.py +++ b/libmultilabel/linear/linear.py @@ -2,12 +2,18 @@ import logging import os +import psutil +import threading +import queue +import re import numpy as np import scipy.sparse as sparse from liblinear.liblinearutil import train, problem, parameter, solver_names from tqdm import tqdm +from ctypes import c_double + __all__ = [ "train_1vsrest", "train_thresholding", @@ -72,6 +78,104 @@ def predict_values(self, x: sparse.csr_matrix) -> np.ndarray: return (x * self.weights).A + self.thresholds +class ParallelOVRTrainer(threading.Thread): + """A trainer for parallel 1vsrest training.""" + + y: sparse.csc_matrix + x: sparse.csr_matrix + bias: float + prob: problem + param: parameter + weights: np.ndarray + pbar: tqdm + queue: queue.SimpleQueue + + def __init__(self): + threading.Thread.__init__(self) + + @classmethod + def init_trainer( + cls, + y: sparse.csc_matrix, + x: sparse.csr_matrix, + options: str, + verbose: bool, + ): + """Initialize the parallel trainer by setting y, x, parameter and threading related + variables as class variables of ParallelOVRTrainer. + + Args: + y (sparse.csc_matrix): A 0/1 matrix with dimensions number of instances * number of classes. + x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. + options (str): The option string passed to liblinear. + verbose (bool): Output extra progress information. + """ + x, options, bias = _prepare_options(x, options) + cls.y = y.tocsc() + cls.x = x + cls.bias = bias + num_instances, num_classes = cls.y.shape + num_features = cls.x.shape[1] + cls.prob = problem(np.ones((num_instances,)), cls.x) + cls.param = parameter(re.sub(r"-m\s+\d+", "", options)) + if cls.param.solver_type in [solver_names.L2R_L1LOSS_SVC_DUAL, solver_names.L2R_L2LOSS_SVC_DUAL]: + cls.param.w_recalc = True # only works for solving L1/L2-SVM dual + cls.weights = np.zeros((num_features, num_classes), order="F") + cls.queue = queue.SimpleQueue() + + if verbose: + logging.info(f"Training an one-vs-rest model on {num_classes} labels") + for i in range(num_classes): + cls.queue.put(i) + cls.pbar = tqdm(total=num_classes, disable=not verbose) + + @classmethod + def del_trainer(cls): + cls.pbar.close() + for key in list(cls.__annotations__): + delattr(cls, key) + + def _do_parallel_train(self, y: np.ndarray) -> np.matrix: + """Wrap around liblinear.liblinearutil.train. + + Args: + y (np.ndarray): A +1/-1 array with dimensions number of instances * 1. + + Returns: + np.matrix: The weights. + """ + if y.shape[0] == 0: + return np.matrix(np.zeros((self.prob.n, 1))) + + prob = self.prob.copy() + prob.y = (c_double * prob.l)(*y) + model = train(prob, self.param) + + w = np.ctypeslib.as_array(model.w, (self.prob.n, 1)) + w = np.asmatrix(w) + # When all labels are -1, we must flip the sign of the weights + # because LIBLINEAR treats the first label as positive, which + # is -1 in this case. But for our usage we need them to be negative. + # For data with both +1 and -1 for labels, LIBLINEAR guarantees + # that +1 is always the first label. + if model.get_labels()[0] == -1: + return -w + else: + # The memory is freed on model deletion so we make a copy. + return w.copy() + + def run(self): + while True: + try: + label_idx = self.queue.get_nowait() + except queue.Empty: + break + yi = self.y[:, label_idx].toarray().reshape(-1) + self.weights[:, label_idx] = self._do_parallel_train(2 * yi - 1).ravel() + + self.pbar.update() + + def train_1vsrest( y: sparse.csr_matrix, x: sparse.csr_matrix, @@ -79,7 +183,7 @@ def train_1vsrest( options: str = "", verbose: bool = True, ) -> FlatModel: - """Train a linear model for multi-label data using a one-vs-rest strategy. + """Train a linear model parallel on labels for multi-label data using a one-vs-rest strategy. Args: y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. @@ -92,9 +196,15 @@ def train_1vsrest( A model which can be used in predict_values. """ # Follows the MATLAB implementation at https://www.csie.ntu.edu.tw/~cjlin/libsvmtools/multilabel/ - x, options, bias = _prepare_options(x, options) - - weights = train_parallel_1vsrest(y, x, options, verbose) + ParallelOVRTrainer.init_trainer(y, x, options, verbose) + num_threads = psutil.cpu_count(logical=False) + trainers = [ParallelOVRTrainer() for _ in range(num_threads)] + for trainer in trainers: + trainer.start() + for trainer in trainers: + trainer.join() + weights, bias = ParallelOVRTrainer.weights, ParallelOVRTrainer.bias + ParallelOVRTrainer.del_trainer() return FlatModel( name="1vsrest", @@ -147,7 +257,7 @@ def _prepare_options(x: sparse.csr_matrix, options: str) -> tuple[sparse.csr_mat if not "-q" in options_split: options_split.append("-q") if not "-m" in options: - options_split.append(f"-m {int(os.cpu_count() / 2)}") + options_split.append(f"-m {psutil.cpu_count(logical=False)}") options = " ".join(options_split) return x, options, bias @@ -189,7 +299,7 @@ def train_thresholding( thresholds = np.zeros(num_class) if verbose: - logging.info("Training thresholding model on %s labels", num_class) + logging.info("Training a thresholding model on %s labels", num_class) num_positives = np.sum(y, 2) label_order = np.flip(np.argsort(num_positives)).flat @@ -336,7 +446,7 @@ def _do_train(y: np.ndarray, x: sparse.csr_matrix, options: str) -> np.matrix: # When all labels are -1, we must flip the sign of the weights # because LIBLINEAR treats the first label as positive, which # is -1 in this case. But for our usage we need them to be negative. - # For labels with both +1 and -1, LIBLINEAR guarantees that +1 + # For data with both +1 and -1, LIBLINEAR guarantees that +1 # is always the first label. if model.get_labels()[0] == -1: return -w @@ -418,7 +528,7 @@ def train_cost_sensitive( weights = np.zeros((num_feature, num_class), order="F") if verbose: - logging.info(f"Training cost-sensitive model for Macro-F1 on {num_class} labels") + logging.info(f"Training a cost-sensitive model for Macro-F1 on {num_class} labels") for i in tqdm(range(num_class), disable=not verbose): yi = y[:, i].toarray().reshape(-1) w = _cost_sensitive_one_label(2 * yi - 1, x, options) @@ -527,7 +637,7 @@ def train_cost_sensitive_micro( bestScore = -np.Inf if verbose: - logging.info(f"Training cost-sensitive model for Micro-F1 on {num_class} labels") + logging.info(f"Training a cost-sensitive model for Micro-F1 on {num_class} labels") for a in param_space: tp = fn = fp = 0 for i in tqdm(range(num_class), disable=not verbose): diff --git a/libmultilabel/linear/parallel.py b/libmultilabel/linear/parallel.py deleted file mode 100644 index 3271b010..00000000 --- a/libmultilabel/linear/parallel.py +++ /dev/null @@ -1,134 +0,0 @@ -from __future__ import annotations - -import os -import re -import logging -import threading -import queue -from tqdm import tqdm - -import numpy as np -import scipy.sparse as sparse -from liblinear.liblinearutil import train, parameter, problem, solver_names - -from ctypes import c_double - - -class ParallelTrainer(threading.Thread): - """A trainer for parallel 1vsrest training.""" - - y: sparse.csc_matrix - x: sparse.csr_matrix - prob: problem - param: parameter - weights: np.ndarray - pbar: tqdm - queue: queue.SimpleQueue - - def __init__(self): - threading.Thread.__init__(self) - - @classmethod - def init_trainer( - cls, - y: sparse.csc_matrix, - x: sparse.csr_matrix, - options: str, - verbose: bool, - ): - """Initialize the parallel trainer by setting y, x, parameter and threading related - variables as class variable of ParallelTrainer. - - Args: - y (sparse.csc_matrix): A 0/1 matrix with dimensions number of instances * number of classes. - x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. - options (str): The option string passed to liblinear. - verbose (bool): Output extra progress information. - """ - cls.y = y.tocsc() - cls.x = x - num_instances, num_classes = cls.y.shape - num_features = cls.x.shape[1] - cls.prob = problem(np.ones((num_instances,)), cls.x) - cls.param = parameter(re.sub(r"-m\s+\d+", "", options)) - if cls.param.solver_type in [solver_names.L2R_L1LOSS_SVC_DUAL, solver_names.L2R_L2LOSS_SVC_DUAL]: - cls.param.w_recalc = True # only works for solving L1/L2-SVM dual - cls.weights = np.zeros((num_features, num_classes), order="F") - cls.pbar = tqdm(total=num_classes, disable=not verbose) - cls.queue = queue.SimpleQueue() - - if verbose: - logging.info(f"Training one-vs-rest model on {num_classes} labels") - for i in range(num_classes): - cls.queue.put(i) - - def _do_parallel_train(self, y: np.ndarray) -> np.matrix: - """Wrap around liblinear.liblinearutil.train. - - Args: - y (np.ndarray): A +1/-1 array with dimensions number of instances * 1. - - Returns: - np.matrix: The weights. - """ - if y.shape[0] == 0: - return np.matrix(np.zeros((self.prob.n, 1))) - - prob = self.prob.copy() - prob.y = (c_double * prob.l)(*y) - model = train(prob, self.param) - - w = np.ctypeslib.as_array(model.w, (self.prob.n, 1)) - w = np.asmatrix(w) - # When all labels are -1, we must flip the sign of the weights - # because LIBLINEAR treats the first label as positive, which - # is -1 in this case. But for our usage we need them to be negative. - # For labels with both +1 and -1, LIBLINEAR guarantees that +1 - # is always the first label. - if model.get_labels()[0] == -1: - return -w - else: - # The memory is freed on model deletion so we make a copy. - return w.copy() - - def run(self): - while True: - try: - label_idx = self.queue.get_nowait() - except queue.Empty: - break - - yi = self.y[:, label_idx].toarray().reshape(-1) - self.weights[:, label_idx] = self._do_parallel_train(2 * yi - 1).ravel() - - self.pbar.update() - - -def train_parallel_1vsrest( - y: sparse.csr_matrix, - x: sparse.csr_matrix, - options: str, - verbose: bool, -) -> np.matrix: - """Parallel training on labels when using one-vs-rest strategy. - - Args: - y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. - x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. - options (str): The option string passed to liblinear. - verbose (bool): Output extra progress information. - - Returns: - np.matrix: The weights. - """ - ParallelTrainer.init_trainer(y, x, options, verbose) - num_threads = int(os.cpu_count() / 2) - trainers = [ParallelTrainer() for _ in range(num_threads)] - - for trainer in trainers: - trainer.start() - for trainer in trainers: - trainer.join() - - ParallelTrainer.pbar.close() - return ParallelTrainer.weights From c305199be54e1a1f5b219c40b878223126f7c6a5 Mon Sep 17 00:00:00 2001 From: chcwww Date: Wed, 11 Jun 2025 17:32:21 +0800 Subject: [PATCH 09/11] correct the grammar --- libmultilabel/linear/linear.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libmultilabel/linear/linear.py b/libmultilabel/linear/linear.py index 2b3157ca..a4e00abc 100644 --- a/libmultilabel/linear/linear.py +++ b/libmultilabel/linear/linear.py @@ -124,7 +124,7 @@ def init_trainer( cls.queue = queue.SimpleQueue() if verbose: - logging.info(f"Training an one-vs-rest model on {num_classes} labels") + logging.info(f"Training a one-vs-rest model on {num_classes} labels") for i in range(num_classes): cls.queue.put(i) cls.pbar = tqdm(total=num_classes, disable=not verbose) From c5ac599cf5ffa5c3cc600ad0ebbf3578478cebca Mon Sep 17 00:00:00 2001 From: chcwww Date: Wed, 16 Jul 2025 18:19:34 +0800 Subject: [PATCH 10/11] update doc string --- libmultilabel/linear/linear.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/libmultilabel/linear/linear.py b/libmultilabel/linear/linear.py index a4e00abc..f0357e81 100644 --- a/libmultilabel/linear/linear.py +++ b/libmultilabel/linear/linear.py @@ -96,7 +96,7 @@ def __init__(self): @classmethod def init_trainer( cls, - y: sparse.csc_matrix, + y: sparse.csr_matrix, x: sparse.csr_matrix, options: str, verbose: bool, @@ -105,7 +105,7 @@ def init_trainer( variables as class variables of ParallelOVRTrainer. Args: - y (sparse.csc_matrix): A 0/1 matrix with dimensions number of instances * number of classes. + y (sparse.csr_matrix): A 0/1 matrix with dimensions number of instances * number of classes. x (sparse.csr_matrix): A matrix with dimensions number of instances * number of features. options (str): The option string passed to liblinear. verbose (bool): Output extra progress information. From faa72932091c6c22944bcd737a0fad3ea4588666 Mon Sep 17 00:00:00 2001 From: chcwww Date: Sat, 30 Aug 2025 18:29:57 +0800 Subject: [PATCH 11/11] add comment for removing nr_thread in liblinear-multicore options --- libmultilabel/linear/linear.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/libmultilabel/linear/linear.py b/libmultilabel/linear/linear.py index 15655e9a..04d25a21 100644 --- a/libmultilabel/linear/linear.py +++ b/libmultilabel/linear/linear.py @@ -131,6 +131,8 @@ def init_trainer( num_instances, num_classes = cls.y.shape num_features = cls.x.shape[1] cls.prob = problem(np.ones((num_instances,)), cls.x) + + # remove "-m nr_thread" from options to prevent nested multi-threading cls.param = parameter(re.sub(r"-m\s+\d+", "", options)) if cls.param.solver_type in [solver_names.L2R_L1LOSS_SVC_DUAL, solver_names.L2R_L2LOSS_SVC_DUAL]: cls.param.w_recalc = True # only works for solving L1/L2-SVM dual