From 23e806a813d1604aa491bca7b40664a82a28b0d8 Mon Sep 17 00:00:00 2001 From: Francisco Rossi Date: Fri, 13 Jun 2025 19:16:55 -0300 Subject: [PATCH] Fix metrics computation --- src/utils/metrics.py | 5 +- src/utils/metrics_integration_test.py | 80 +++++++++ src/utils/metrics_lenet_test.py | 82 ++++++---- src/utils/metrics_test.py | 225 ++++++++++++++++++++++++++ 4 files changed, 356 insertions(+), 36 deletions(-) create mode 100755 src/utils/metrics_integration_test.py diff --git a/src/utils/metrics.py b/src/utils/metrics.py index 99816af..f7ebcbd 100644 --- a/src/utils/metrics.py +++ b/src/utils/metrics.py @@ -24,12 +24,9 @@ def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float: total_layer_size = 0.0 qconfig = qlayer.quantize_config - # Assumption: order is the same for layer.weights and get_weights_and_quantizers weights_and_quantizers = qconfig.get_weights_and_quantizers(qlayer.layer) - weights = qlayer.weights[: len(weights_and_quantizers)] - for weight, weight_and_quantizer in zip(weights, weights_and_quantizers): - quantizer = weight_and_quantizer[1] + for weight, quantizer in weights_and_quantizers: if isinstance(quantizer, UniformQuantizer): weight_size = weight.shape.num_elements() * quantizer.bits elif isinstance(quantizer, FlexQuantizer): diff --git a/src/utils/metrics_integration_test.py b/src/utils/metrics_integration_test.py new file mode 100755 index 0000000..a631980 --- /dev/null +++ b/src/utils/metrics_integration_test.py @@ -0,0 +1,80 @@ +#!/usr/bin/env python3 +# Integration of saving models and metrics + +import tempfile +import unittest + +import numpy as np +import tensorflow as tf + +from configs.qmodel import apply_quantization +from configs.serialization.serialization import load_qmodel, save_qmodel +from quantizers.uniform_quantizer import UniformQuantizer +from utils.metrics import compute_space_complexity_model + + +class TestIntegrationMetricsSerialization(unittest.TestCase): + def test_save_and_load_model(self): + """Test saving and loading a model with metrics.""" + model = tf.keras.Sequential( + [ + tf.keras.layers.Dense(10, input_shape=(5,), name="dense_1"), + tf.keras.layers.Dense(5, name="dense_2"), + ] + ) + + qconfig = { + "dense_1": { + "weights": { + "kernel": UniformQuantizer(bits=4, signed=True), + "bias": UniformQuantizer(bits=4, signed=True), + }, + }, + "dense_2": { + "weights": { + "kernel": UniformQuantizer(bits=4, signed=True), + "bias": UniformQuantizer(bits=4, signed=True), + }, + }, + } + qmodel = apply_quantization(model, qconfig) + qmodel.build((None, 5)) + + tmpdir = tempfile.mkdtemp() + save_qmodel(qmodel, tmpdir) + loaded_model = load_qmodel(tmpdir) + # make an inference to ensure the model is loaded correctly + loaded_model(tf.random.normal((1, 5))) + + original_weights = {w.name: w.numpy() for w in qmodel.weights} + loaded_weights = {w.name: w.numpy() for w in loaded_model.weights} + + # First, check that the set of weight names is identical + self.assertEqual( + set(original_weights.keys()), + set(loaded_weights.keys()), + "Models have different sets of weight names.", + ) + + # Now, compare each weight tensor by name + for name, orig_w in original_weights.items(): + loaded_w = loaded_weights[name] + # print(f"Comparing weight tensor: {name}") + # print(f"Weights: {orig_w}") + # print(f"Loaded: {loaded_w}") + np.testing.assert_allclose( + orig_w, + loaded_w, + rtol=1e-6, + atol=1e-6, + err_msg=f"Weight tensor '{name}' differs.", + ) + + self.assertEqual( + compute_space_complexity_model(qmodel), + compute_space_complexity_model(loaded_model), + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/utils/metrics_lenet_test.py b/src/utils/metrics_lenet_test.py index 37cb1cf..c72aec0 100755 --- a/src/utils/metrics_lenet_test.py +++ b/src/utils/metrics_lenet_test.py @@ -6,6 +6,9 @@ import numpy as np import tensorflow as tf from tensorflow.keras import layers, models +from tensorflow_model_optimization.python.core.quantization.keras.quantize_wrapper import ( + QuantizeWrapperV2, +) from configs.qmodel import apply_quantization from quantizers.flex_quantizer import FlexQuantizer @@ -14,43 +17,57 @@ def apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict): - """TODO(Colo): This function will is implemented in branch - colo/model_evalution in QTensor/src/examples/functions.py. - - When merged, import that functions insted of redefining it here. - """ + """Sets the internal state (alpha, levels, thresholds) of FlexQuantizers + within a quantized model by directly finding and assigning to the live + tf.Variable objects.""" for layer in qmodel.layers: - orig_layer_name = layer.name - if orig_layer_name.startswith("quant_"): - orig_layer_name = orig_layer_name[len("quant_") :] + if not isinstance(layer, QuantizeWrapperV2): + continue + orig_layer_name = layer.layer.name if orig_layer_name in alpha_dict: - for alpha_type in ["kernel", "bias", "activation"]: - new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) - new_levels = levels_dict[orig_layer_name].get(alpha_type, None) - new_thresholds = thresholds_dict[orig_layer_name].get( - alpha_type, None + # Find all variables in the wrapper layer and create a map by name + var_map = {v.name: v for v in layer.weights} + + # Iterate through the types ('kernel', 'bias') we might want to change + for attr_type in ["kernel", "bias"]: + new_alpha = alpha_dict.get(orig_layer_name, {}).get(attr_type) + new_levels = levels_dict.get(orig_layer_name, {}).get( + attr_type + ) + new_thresholds = thresholds_dict.get(orig_layer_name, {}).get( + attr_type ) - if new_alpha is not None: - for v in layer.weights: - if "alpha" in v.name and alpha_type in v.name: - v.assign(new_alpha) - # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") - elif ( - alpha_type == "activation" - and "post_activation" in v.name - and "alpha" in v.name - ): - v.assign(new_alpha) - # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") - if "levels" in v.name and alpha_type in v.name: - v.assign(new_levels) - # print(f"Updated {v.name} ({alpha_type}) with new levels value {new_levels}") - if "thresholds" in v.name and alpha_type in v.name: - v.assign(new_thresholds) - # print(f"Updated {v.name} ({alpha_type}) with new thresholds value {new_thresholds}") - return qmodel + # Construct the expected variable names and assign if they exist + if new_alpha is not None: + # Note: TFMOT might name variables slightly differently. + # This searches for common patterns. + for name_pattern in [ + f"/{attr_type}_alpha:0", + f"_{attr_type}_alpha:0", + ]: + var_name = layer.name + name_pattern + if var_name in var_map: + var_map[var_name].assign(new_alpha) + + if new_levels is not None: + for name_pattern in [ + f"/{attr_type}_levels:0", + f"_{attr_type}_levels:0", + ]: + var_name = layer.name + name_pattern + if var_name in var_map: + var_map[var_name].assign(new_levels) + + if new_thresholds is not None: + for name_pattern in [ + f"/{attr_type}_thresholds:0", + f"_{attr_type}_thresholds:0", + ]: + var_name = layer.name + name_pattern + if var_name in var_map: + var_map[var_name].assign(new_thresholds) def apply_alpha_dict(qmodel, alpha_dict): @@ -418,6 +435,7 @@ def base_flex_quantizer_space_complexity( "bias": bthresholds, } apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict) + qmodel(tf.random.normal(input_shape)) # 6) compare to your implementation computed_bits = compute_space_complexity_model(qmodel) diff --git a/src/utils/metrics_test.py b/src/utils/metrics_test.py index 760be67..5a3368b 100755 --- a/src/utils/metrics_test.py +++ b/src/utils/metrics_test.py @@ -1,10 +1,16 @@ #!/usr/bin/env python3 import unittest +from collections import Counter +import numpy as np import tensorflow as tf +from tensorflow_model_optimization.python.core.quantization.keras.quantize_wrapper import ( + QuantizeWrapperV2, +) from configs.qmodel import apply_quantization +from quantizers.flex_quantizer import FlexQuantizer from quantizers.uniform_quantizer import UniformQuantizer from utils.metrics import ( compute_space_complexity_model, @@ -12,6 +18,103 @@ ) +def apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict): + """Sets the internal state (alpha, levels, thresholds) of FlexQuantizers + within a quantized model by directly finding and assigning to the live + tf.Variable objects.""" + for layer in qmodel.layers: + if not isinstance(layer, QuantizeWrapperV2): + continue + + orig_layer_name = layer.layer.name + if orig_layer_name in alpha_dict: + # Find all variables in the wrapper layer and create a map by name + var_map = {v.name: v for v in layer.weights} + + # Iterate through the types ('kernel', 'bias') we might want to change + for attr_type in ["kernel", "bias"]: + new_alpha = alpha_dict.get(orig_layer_name, {}).get(attr_type) + new_levels = levels_dict.get(orig_layer_name, {}).get( + attr_type + ) + new_thresholds = thresholds_dict.get(orig_layer_name, {}).get( + attr_type + ) + + # Construct the expected variable names and assign if they exist + if new_alpha is not None: + # Note: TFMOT might name variables slightly differently. + # This searches for common patterns. + for name_pattern in [ + f"/{attr_type}_alpha:0", + f"_{attr_type}_alpha:0", + ]: + var_name = layer.name + name_pattern + if var_name in var_map: + var_map[var_name].assign(new_alpha) + + if new_levels is not None: + for name_pattern in [ + f"/{attr_type}_levels:0", + f"_{attr_type}_levels:0", + ]: + var_name = layer.name + name_pattern + if var_name in var_map: + var_map[var_name].assign(new_levels) + + if new_thresholds is not None: + for name_pattern in [ + f"/{attr_type}_thresholds:0", + f"_{attr_type}_thresholds:0", + ]: + var_name = layer.name + name_pattern + if var_name in var_map: + var_map[var_name].assign(new_thresholds) + + +def check_weights(qlayer): + qlayer_weights = qlayer.get_weights() + qconfig = qlayer.quantize_config + weights_and_quantizers = qconfig.get_weights_and_quantizers(qlayer.layer) + weights_from_config = [ + weight_and_quantizer[0] + for weight_and_quantizer in weights_and_quantizers + ] + quantizers = [ + weight_and_quantizer[1] + for weight_and_quantizer in weights_and_quantizers + ] + + qlayer_weights = qlayer.get_weights() + + # Original weights from the quantized layer + weights_in_layer = qlayer_weights[0] + print("Non-quantized weights in layer:") + print(weights_in_layer) + print() + + weights_quantized_from_config = weights_from_config[0].numpy() + print("Weights from config:") + print(weights_quantized_from_config) + print() + + quantizer = quantizers[0] if quantizers else None + print("Quantizer levels:") + print(quantizer.levels.numpy()) + print() + + weights_manually_quantized = quantizer.quantize_op( + qlayer_weights[0] + ).numpy() + print("Manually quantized weights:") + print(weights_manually_quantized) + print() + + assert np.array_equal( + weights_quantized_from_config, weights_manually_quantized + ) + + # From tensorflow internal code def _compute_memory_size(weight): weight_counts = weight.shape.num_elements() @@ -66,6 +169,128 @@ def test_compute_space_complexity_uniform_only(self): self.assertEqual(quantized_size, expected_size) + def test_understanding_weights(self): + """Verify that we can access the weights of a quantized layer.""" + layer = tf.keras.layers.Dense(10, input_shape=(5,), name="dense_1") + layer.build((None, 5)) + qconfig = { + "dense_1": { + "weights": { + "kernel": FlexQuantizer(bits=2, n_levels=4, signed=True), + "bias": FlexQuantizer(bits=2, n_levels=4, signed=True), + }, + }, + } + model = tf.keras.Sequential([layer]) + qmodel = apply_quantization(model, qconfig) + qmodel.build((None, 5)) + # Run an inference to have access to the variables. + qmodel(tf.random.normal((1, 5))) + # Access the quantized layer + qlayer = qmodel.get_layer("quant_dense_1") + + check_weights(qlayer) + # Define the quantizer parameters based on our ideal state + alpha = 1.0 + # 2. Calculate the expected complexity based on a known data distribution + ideal_levels = np.array([-0.8, -0.2, 0.3, 0.9], dtype=np.float32) + midpoints = (ideal_levels[:-1] + ideal_levels[1:]) / 2.0 + thresholds = np.concatenate(([-alpha], midpoints, [alpha])).astype( + np.float32 + ) + + apply_flex_dict( + qmodel, + alpha_dict={"dense_1": {"kernel": alpha}}, + levels_dict={"dense_1": {"kernel": ideal_levels}}, + thresholds_dict={"dense_1": {"kernel": thresholds}}, + ) + qmodel(tf.random.normal((1, 5))) # Force weight creation + check_weights(qlayer) + + def test_compute_space_complexity_flex_only(self): + """Verify that for a flex configuration a layer size is as expected.""" + # 1. Setup the initial layer and model + layer = tf.keras.layers.Dense( + 10, input_shape=(5,), name="dense_1", use_bias=False + ) + model = tf.keras.Sequential([layer]) + model.build((None, 5)) + + # Define the FlexQuantizer configuration + qconfig = { + "dense_1": { + "weights": { + "kernel": FlexQuantizer(bits=4, n_levels=4, signed=True), + }, + }, + } + + # 2. Calculate the expected complexity based on a known data distribution + ideal_levels = np.array([-0.8, -0.2, 0.3, 0.9], dtype=np.float32) + ideal_weight_data = np.random.choice( + ideal_levels, size=(5, 10), replace=True + ) + + counter = Counter(ideal_weight_data.flatten()) + total_elements = sum(counter.values()) + emp_probs = np.array(list(counter.values())) / total_elements + entropy = -np.sum(emp_probs * np.log2(emp_probs)) + + huffman_size = ideal_weight_data.size * entropy + levels_size = len(ideal_levels) * 4 # n_levels * bits + expected_size = huffman_size + levels_size + + # 3. Apply quantization to get the qmodel structure + qmodel = apply_quantization(model, qconfig) + + # 4. Force weight creation by calling the model with a dummy input. + dummy_input_shape = (1,) + model.input_shape[1:] + qmodel(tf.random.normal(dummy_input_shape)) + + # 5. Inject the known state into the qmodel + q_layer = qmodel.get_layer("quant_dense_1") + + # Find the actual tf.Variable for the kernel. + kernel_var = None + for v in q_layer.trainable_weights: + if v.name.endswith("kernel:0"): + kernel_var = v + break + + self.assertIsNotNone( + kernel_var, "Could not find the kernel variable to assign." + ) + kernel_var.assign(ideal_weight_data) + + kernel_weights = qmodel.get_layer("quant_dense_1").get_weights() + print("Kernel Weights before assignment:") + print(kernel_weights) + + # Define the quantizer parameters based on our ideal state + alpha = 1.0 + # KEY CHANGE: Calculate thresholds correctly to match the quantizer's expected variable shape. + # The shape should be (n_levels + 1) to include outer bounds. + midpoints = (ideal_levels[:-1] + ideal_levels[1:]) / 2.0 + thresholds = np.concatenate(([-alpha], midpoints, [alpha])).astype( + np.float32 + ) + + # Use our helper to set the FlexQuantizer's internal state + apply_flex_dict( + qmodel, + alpha_dict={"dense_1": {"kernel": alpha}}, + levels_dict={"dense_1": {"kernel": ideal_levels}}, + thresholds_dict={"dense_1": {"kernel": thresholds}}, + ) + qmodel(tf.random.normal(dummy_input_shape)) # Force assigment + + # 6. Compute the quantized size using the metric function + quantized_size = compute_space_complexity_quantize(q_layer) + + # 7. Assert that the computed size matches the theoretical expected size + self.assertAlmostEqual(quantized_size, expected_size, places=6) + def test_compute_non_quantized_model(self): """Verify that computing the size of the model.""" layer = tf.keras.layers.Dense(30, input_shape=(5,), name="dense_1")