From d7c78c1bd758e08fcd1e963ee90288fac7ebd02e Mon Sep 17 00:00:00 2001 From: Francisco Rossi Date: Sat, 24 May 2025 13:40:16 -0300 Subject: [PATCH 1/8] General cleanup and coherence between quantizers --- src/configs/generate_config.py | 2 -- src/examples/models/mlp.py | 12 +++++++- src/quantizers/flex_quantizer.py | 30 ++++++++++++------ src/quantizers/uniform_quantizer.py | 48 +++++++++++++++++++---------- 4 files changed, 62 insertions(+), 30 deletions(-) diff --git a/src/configs/generate_config.py b/src/configs/generate_config.py index e6f98b2..de7315f 100644 --- a/src/configs/generate_config.py +++ b/src/configs/generate_config.py @@ -79,7 +79,6 @@ def get_activations_and_quantizers( activations_and_quantizers.append( (get_nested_attribute(layer, activation_attribute), quantizer) ) - print(f"AQ: {activations_and_quantizers}") return activations_and_quantizers def set_quantize_activations( @@ -88,7 +87,6 @@ def set_quantize_activations( for attribute, quantized_activation in zip( self.activations.keys(), quantize_activations ): - print(f"SA: {attribute} {quantized_activation}") set_nested_attribute(layer, attribute, quantized_activation) def get_output_quantizers(self, layer): diff --git a/src/examples/models/mlp.py b/src/examples/models/mlp.py index 908c306..00f6750 100644 --- a/src/examples/models/mlp.py +++ b/src/examples/models/mlp.py @@ -17,4 +17,14 @@ } } -qconfigs = {"qconfig": simple_qconfig} +uniform_qconfig = { + "hidden": { + "weights": {"kernel": UniformQuantizer(bits=4, signed=True)}, + "activations": {"activation": UniformQuantizer(bits=4, signed=False)}, + } +} + +qconfigs = { + "simple": simple_qconfig, + "uniform": uniform_qconfig, +} diff --git a/src/quantizers/flex_quantizer.py b/src/quantizers/flex_quantizer.py index 7763c73..b5d2715 100644 --- a/src/quantizers/flex_quantizer.py +++ b/src/quantizers/flex_quantizer.py @@ -31,6 +31,7 @@ def __init__( bits: int, n_levels: int, signed: bool = True, + name_suffix: str = "", ): """Constructor. @@ -55,10 +56,12 @@ def __init__( self.levels = None # possible output values self.thresholds = None # boundaries between levels + self.name_suffix = name_suffix + def build(self, tensor_shape, name: str, layer: tf.keras.layers.Layer): alpha = layer.add_weight( - "alpha", + name=f"{name}{self.name_suffix}_alpha", initializer=tf.keras.initializers.Constant(0.1), trainable=True, dtype=tf.float32, @@ -68,7 +71,7 @@ def build(self, tensor_shape, name: str, layer: tf.keras.layers.Layer): self.alpha = alpha levels = layer.add_weight( - "levels", + name=f"{name}{self.name_suffix}_levels", initializer=tf.keras.initializers.Constant( np.linspace( min_value(self.alpha, self.signed), @@ -84,7 +87,7 @@ def build(self, tensor_shape, name: str, layer: tf.keras.layers.Layer): self.levels = levels thresholds = layer.add_weight( - "thresholds", + name=f"{name}{self.name_suffix}_thresholds", initializer=tf.keras.initializers.Constant( np.linspace( min_value(self.alpha, self.signed), @@ -112,13 +115,7 @@ def range(self): def delta(self): return self.range() / self.m_levels - @tf.custom_gradient - def quantize(self, x, alpha, levels, thresholds): - # Capture the values of the parameters - self.alpha = alpha - self.levels = levels - self.thresholds = thresholds - + def quantize_op(self, x): # Quantize levels (uniform quantization) qlevels = self.delta() * tf.math.floor(self.levels / self.delta()) @@ -134,6 +131,19 @@ def quantize(self, x, alpha, levels, thresholds): q, ) + return q + + @tf.custom_gradient + def quantize(self, x, alpha, levels, thresholds): + # Capture the values of the parameters + self.alpha = alpha + self.levels = levels + self.thresholds = thresholds + + q = self.quantize_op(x) + + qlevels = self.delta() * tf.math.floor(self.levels / self.delta()) + def grad(upstream): ##### dq_dx uses STE ##### dq_dx = tf.where( diff --git a/src/quantizers/uniform_quantizer.py b/src/quantizers/uniform_quantizer.py index 5ae73c0..33a2c34 100755 --- a/src/quantizers/uniform_quantizer.py +++ b/src/quantizers/uniform_quantizer.py @@ -12,6 +12,8 @@ _QuantizeHelper, ) +from quantizers.common import delta, max_value, min_value, span + class UniformQuantizer(_QuantizeHelper, Quantizer): """An uniform quantizer algorithm support both signed and unsigned @@ -65,30 +67,45 @@ def __call__(self, w): return tf.clip_by_value(w, tf.keras.backend.epsilon(), np.inf) alpha = layer.add_weight( - name.join("_alpha"), + name=f"{name}{self.name_suffix}_alpha", initializer=self.initializer, trainable=True, dtype=tf.float32, regularizer=self.regularizer, constraint=PositiveConstraint(), ) + levels = layer.add_weight( + name=f"{name}{self.name_suffix}_levels", + trainable=False, + shape=(self.m_levels,), + dtype=tf.float32, + ) self.alpha = alpha - return {"alpha": alpha} + self.levels = levels + + return {"alpha": alpha, "levels": levels} def __call__(self, inputs, training, weights, **kwargs): return self.quantize(inputs, weights["alpha"]) def range(self): - return 2 * self.alpha if self.signed else self.alpha + return span(self.alpha, self.signed) def delta(self): - return self.range() / self.m_levels + return delta(self.alpha, self.m_levels, self.signed) - def levels(self): + def compute_levels(self): """Compute the quantization levels.""" - start = -self.alpha if self.signed else 0 + start = min_value(self.alpha, self.signed) return tf.range(start, start + self.range(), self.delta()) + def quantize_op(self, x): + clipped_x = tf.clip_by_value(x, self.levels[0], self.levels[-1]) + delta_v = ( + 2 * self.alpha if self.signed else self.alpha + ) / self.m_levels + return delta_v * tf.math.floor(clipped_x / delta_v) + @tf.custom_gradient def quantize(self, x, alpha): """Uniform quantization. @@ -97,25 +114,22 @@ def quantize(self, x, alpha): :param alpha: alpha parameter :returns: quantized input tensor """ - # Capture alpha + # Store alpha for other methods to use self.alpha = alpha - # Compute quantization levels - levels = self.levels() - - # Clip input values between min and max levels (function is zero outside the range) - clipped_x = tf.clip_by_value(x, levels[0], levels[-1]) + self.levels = self.compute_levels() - # Quantize input values - q = self.delta() * tf.math.floor(clipped_x / self.delta()) + # Use direct parameter passing to avoid graph scope issues + q = self.quantize_op(x) def grad(upstream): # Gradient only flows through if the input is within range - ## Use STE to estimate the gradient dq_dx = tf.where( tf.logical_and( - tf.greater_equal(x, levels[0]), - tf.less_equal(x, levels[-1]), + tf.greater_equal(x, min_value(alpha, self.signed)), + tf.less_equal( + x, max_value(alpha, self.m_levels, self.signed) + ), ), upstream, tf.zeros_like(x), From 269e04fa8fb7f7d87c1b8480738470448c22e3e6 Mon Sep 17 00:00:00 2001 From: Francisco Rossi Date: Sat, 24 May 2025 13:49:33 -0300 Subject: [PATCH 2/8] Add space complexity computation code --- src/examples/run.py | 12 ++++-- src/utils/__init__.py | 0 src/utils/huffman.py | 39 ++++++++++++++++++ src/utils/metrics.py | 94 +++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 src/utils/__init__.py create mode 100644 src/utils/huffman.py create mode 100644 src/utils/metrics.py diff --git a/src/examples/run.py b/src/examples/run.py index a2bc433..93f18f8 100755 --- a/src/examples/run.py +++ b/src/examples/run.py @@ -9,6 +9,7 @@ from tensorflow.keras.optimizers import Adam from configs.qmodel import apply_quantization +from utils.metrics import compute_space_complexity_model def main(args): @@ -49,9 +50,6 @@ def main(args): loss="categorical_crossentropy", metrics=["accuracy"], ) - print(qmodel.summary()) - print(f"qweights: {[w.name for w in qmodel.layers[1].weights]}") - # print(f"qactivations: {[w.name for w in qmodel.layers[1].weights]}") callback_tuples = [ (CaptureWeightCallback(qlayer), qconfig[layer.name]) @@ -69,6 +67,14 @@ def main(args): callbacks=[callback for callback, _ in callback_tuples], ) + qmodel(next(iter(test_dataset))[0]) + space_complexity = compute_space_complexity_model(qmodel) + print(f"Space complexity: {space_complexity / 8 * 1/1024} kB") + original_space_complexity = compute_space_complexity_model(model) + print( + f"Original space complexity: {original_space_complexity / 8 * 1/1024} kB" + ) + output_dict = {} output_dict["global"] = hist.history for callback, qconfig in callback_tuples: diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/huffman.py b/src/utils/huffman.py new file mode 100644 index 0000000..5093e0f --- /dev/null +++ b/src/utils/huffman.py @@ -0,0 +1,39 @@ +import heapq +from collections import Counter, namedtuple + + +# A simple Node for the Huffman tree +class Node(namedtuple("Node", ["freq", "value", "left", "right"])): + def __lt__(self, other): + return self.freq < other.freq + + +def build_huffman_tree(weights): + counter = Counter(weights) + heap = [Node(freq, value, None, None) for value, freq in counter.items()] + heapq.heapify(heap) + + while len(heap) > 1: + left = heapq.heappop(heap) + right = heapq.heappop(heap) + new_node = Node(left.freq + right.freq, None, left, right) + heapq.heappush(heap, new_node) + + return heap[0] + + +def assign_codes(node, prefix="", codebook={}): + if node.value is not None: + codebook[node.value] = prefix + else: + assign_codes(node.left, prefix + "0", codebook) + assign_codes(node.right, prefix + "1", codebook) + return codebook + + +# TODO(frneer): Add tests for the Huffman tree +# test_weights = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4] +# huffman_tree = build_huffman_tree(test_weights) +# # print(huffman_tree) +# huffman_codes = assign_codes(huffman_tree) +# # print("Huffman Codes:", [len(code) for idx, code in huffman_codes.items()]) diff --git a/src/utils/metrics.py b/src/utils/metrics.py new file mode 100644 index 0000000..96dd4ce --- /dev/null +++ b/src/utils/metrics.py @@ -0,0 +1,94 @@ +from collections import Counter + +import numpy as np +import tensorflow as tf +from tensorflow_model_optimization.python.core.quantization.keras.quantize_layer import ( + QuantizeLayer, +) +from tensorflow_model_optimization.python.core.quantization.keras.quantize_wrapper import ( + QuantizeWrapperV2, +) + +from quantizers.flex_quantizer import FlexQuantizer +from quantizers.uniform_quantizer import UniformQuantizer + + +def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float: + """Compute the uniform space complexity of a layer based on its + quantization configuration. + + returns: + The space complexity of the layer in bits. + """ + if not isinstance(qlayer, QuantizeWrapperV2): + raise ValueError("Layer is not a QuantizeWrapperV2") + + total_layer_size = 0 + qconfig = qlayer.quantize_config + + # Assumption: order is the same for layer.weights and get_weights_and_quantizers + weights_and_quantizers = qconfig.get_weights_and_quantizers(qlayer.layer) + weights = qlayer.weights[: len(weights_and_quantizers)] + + for weight, weight_and_quantizer in zip(weights, weights_and_quantizers): + quantizer = weight_and_quantizer[1] + if isinstance(quantizer, UniformQuantizer): + weight_size = weight.shape.num_elements() * quantizer.bits + elif isinstance(quantizer, FlexQuantizer): + qweight = quantizer.quantize_op(weight) + counter = Counter(qweight.numpy().flatten()) + total = sum(counter.values()) + probabilities = np.array( + [freq / total for freq in counter.values()] + ) + entropy = -np.sum(probabilities * np.log2(probabilities)) + weight_size = weight.shape.num_elements() * entropy + else: + raise ValueError(f"Unknown quantizer type: {type(quantizer)}") + total_layer_size += weight_size + + return total_layer_size + + +def compute_huffman_nominal_complexity(qweights): + """Compute the nominal complexity of a huffman codification of the + quantized weights.""" + N = qweights.shape.num_elements() + counter = Counter(qweights.numpy().flatten()) + total = sum(counter.values()) + probabilities = np.array([freq / total for freq in counter.values()]) + entropy = -np.sum(probabilities * np.log2(probabilities)) + return N * entropy + + +def compute_space_complexity(layer): + """Compute the space complexity for a normal layer.""" + total_layer_size = 0 + for weight in layer.weights: + weight_size = ( + 8 * weight.dtype.size * weight.shape.num_elements() + ) # bits + total_layer_size += weight_size + + return total_layer_size + + +def compute_space_complexity_model(model: tf.keras.Model) -> float: + """Compute the uniform space complexity of a model based on its + quantization configuration.""" + total_space_complexity = 0 + + for layer in model.layers: + print(f"Layer {layer}") + if isinstance(layer, QuantizeWrapperV2): + layer_size = compute_space_complexity_quantize(layer) + elif isinstance(layer, QuantizeLayer): + # Verify if there's no layer we need to keep of this type. + print(f"Skipping {layer.name}") + continue + else: + layer_size = compute_space_complexity(layer) + print(f"Layer: {layer.name}, Size: {layer_size} bits") + total_space_complexity += layer_size + + return total_space_complexity From 8b21f081924d34e59532842c103b628d6aa4d5e6 Mon Sep 17 00:00:00 2001 From: Francisco Rossi Date: Sat, 24 May 2025 14:01:20 -0300 Subject: [PATCH 3/8] Add pyproject toml --- .pre-commit-config.yaml | 2 -- pyproject.toml | 7 +++++++ 2 files changed, 7 insertions(+), 2 deletions(-) create mode 100644 pyproject.toml diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ab5c28c..02c2f48 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,7 +22,6 @@ repos: hooks: - id: isort name: isort (python) - args: ["--profile", "black"] - repo: https://github.com/myint/docformatter @@ -35,4 +34,3 @@ repos: rev: 25.1.0 hooks: - id: black - args: ["--line-length", "79"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..514f2cf --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +# pyproject.toml +[tool.black] +line-length = 79 + +[tool.isort] +profile = "black" +line_length = 79 From a7a7ca861e578967676714fda62afc19b2b68a2b Mon Sep 17 00:00:00 2001 From: Francisco Rossi Date: Sat, 24 May 2025 14:01:44 -0300 Subject: [PATCH 4/8] Add space complexity tests --- src/utils/metrics_test.py | 113 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100755 src/utils/metrics_test.py diff --git a/src/utils/metrics_test.py b/src/utils/metrics_test.py new file mode 100755 index 0000000..3040585 --- /dev/null +++ b/src/utils/metrics_test.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 + +import unittest + +import tensorflow as tf +from metrics import ( + compute_space_complexity_model, + compute_space_complexity_quantize, +) + +from configs.qmodel import apply_quantization +from quantizers.uniform_quantizer import UniformQuantizer + + +# From tensorflow internal code +def _compute_memory_size(weight): + weight_counts = weight.shape.num_elements() + per_param_size = weight.dtype.size + return weight_counts * per_param_size + + +def weight_memory_size(weights): + """Compute the memory footprint for weights based on their dtypes. + + Args: + weights: An iterable contains the weights to compute weight size. + + Returns: + The total memory size (in bits) of the weights. + """ + unique_weights = {id(w): w for w in weights}.values() + total_memory_size = 0 + for w in unique_weights: + total_memory_size += _compute_memory_size(w) + return total_memory_size + + +class TestMetrics(unittest.TestCase): + def test_compute_space_complexity_uniform_only(self): + """Verify that for a uniform configuration a layer size is as + expected.""" + layer = tf.keras.layers.Dense(10, input_shape=(5,), name="dense_1") + layer.build((None, 5)) # Build the layer to initialize weights + qconfig = { + "dense_1": { + "weights": { + "kernel": UniformQuantizer(bits=4, signed=True), + "bias": UniformQuantizer(bits=4, signed=True), + }, + }, + } + model = tf.keras.Sequential([layer]) + + qmodel = apply_quantization(model, qconfig) + qmodel.build((None, 5)) + # Run an inference to have access to the variables. + qmodel(tf.random.normal((1, 5))) + # Compute quantized size + quantized_size = compute_space_complexity_quantize(qmodel.layers[1]) + + kernel_expected_size = ( + layer.kernel.shape.num_elements() * 4 + ) # 4 bits for kernel + bias_expected_size = layer.bias.shape.num_elements() * 4 + expected_size = kernel_expected_size + bias_expected_size + + self.assertEqual(quantized_size, expected_size) + + def test_compute_non_quantized_model(self): + """Verify that computing the size of the model.""" + layer = tf.keras.layers.Dense(30, input_shape=(5,), name="dense_1") + layer.build((None, 5)) + model = tf.keras.Sequential([layer]) + + size = compute_space_complexity_model(model) / 8 # To bytes + size_according_to_tensorflow = weight_memory_size(model.weights) + self.assertEqual(size, size_according_to_tensorflow) + + def test_compare(self): + def test_verify_proportional_to_base_size(bits): + layer = tf.keras.layers.Dense(10, input_shape=(5,), name="dense_1") + layer.build((None, 5)) # Build the layer to initialize weights + qconfig = { + "dense_1": { + "weights": { + "kernel": UniformQuantizer(bits=bits, signed=True), + "bias": UniformQuantizer(bits=bits, signed=True), + }, + }, + } + model = tf.keras.Sequential([layer]) + model.build((None, 5)) + model(tf.random.normal((1, 5))) + + qmodel = apply_quantization(model, qconfig) + qmodel.build((None, 5)) + qmodel(tf.random.normal((1, 5))) + non_quantized_size = compute_space_complexity_model(model) + quantized_size = compute_space_complexity_model(qmodel) + + # We expect weights size proportionally smaller + size_scale_factor = bits / 32 + self.assertEqual( + quantized_size, non_quantized_size * size_scale_factor + ) + + for bits in [2, 4, 6, 8, 10, 12, 16]: + with self.subTest(val=bits): + test_verify_proportional_to_base_size(bits) + + +if __name__ == "__main__": + unittest.main() From 57f33f5eb3c14800ade288c24e29107a9607a59a Mon Sep 17 00:00:00 2001 From: Francisco Rossi Date: Sat, 24 May 2025 14:02:01 -0300 Subject: [PATCH 5/8] Fix shebang --- src/examples/data_analysis/generate_plots.py | 2 +- src/quantizers/uniform_quantizer.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/examples/data_analysis/generate_plots.py b/src/examples/data_analysis/generate_plots.py index 07692c5..59d1817 100755 --- a/src/examples/data_analysis/generate_plots.py +++ b/src/examples/data_analysis/generate_plots.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import argparse diff --git a/src/quantizers/uniform_quantizer.py b/src/quantizers/uniform_quantizer.py index 33a2c34..1467119 100755 --- a/src/quantizers/uniform_quantizer.py +++ b/src/quantizers/uniform_quantizer.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """This module implements a uniform quantizer for quantizing weights and activations.""" From 06320ab7f9b14acfbda6e837ca85e76b2edd03dc Mon Sep 17 00:00:00 2001 From: Francisco Rossi Date: Sat, 24 May 2025 16:30:54 -0300 Subject: [PATCH 6/8] Fix broken tests --- src/quantizers/uniform_quantizer_test.py | 16 +++++++++------- src/utils/metrics.py | 4 +--- src/utils/metrics_test.py | 8 ++++---- 3 files changed, 14 insertions(+), 14 deletions(-) diff --git a/src/quantizers/uniform_quantizer_test.py b/src/quantizers/uniform_quantizer_test.py index f6cb5f6..3527bf7 100755 --- a/src/quantizers/uniform_quantizer_test.py +++ b/src/quantizers/uniform_quantizer_test.py @@ -52,7 +52,9 @@ def test_can_build_weights(self): name_suffix="_test", ) weights = quantizer.build(self.input_shape, "test", self.mock_layer) - self.assertDictEqual(weights, {"alpha": weights["alpha"]}) + self.assertDictEqual( + weights, {"alpha": weights["alpha"], "levels": weights["levels"]} + ) # TODO(Fran): Consider using a fixture here? def assert_weights_within_limits(self, bits, signed): @@ -71,7 +73,7 @@ def assert_weights_within_limits(self, bits, signed): output = quantizer(self.input_tensor, training=True, weights=weights) # Check that all output values are within the range determined by alpha - quantizer_levels = quantizer.levels() + quantizer_levels = quantizer.compute_levels() min = quantizer_levels[0] max = quantizer_levels[-1] @@ -142,9 +144,9 @@ def test_expected_levels(self): quantizer.build(self.input_shape, "test", self.mock_layer) - levels = quantizer.levels() + levels = quantizer.compute_levels() expected_n_levels = 2**3 - self.assertEqual(len(levels), expected_n_levels) + self.assertEqual(levels.shape.num_elements(), expected_n_levels) expected_levels = [-1.0, -0.75, -0.5, -0.25, 0.0, 0.25, 0.5, 0.75] self.assertListEqual(list(levels), expected_levels) @@ -161,7 +163,7 @@ def test_quantizer_levels_getitem(self): quantizer.build(self.input_shape, "test", self.mock_layer) - levels = quantizer.levels() + levels = quantizer.compute_levels() self.assertEqual(levels[0], -1.0) self.assertEqual(levels[2], -0.5) self.assertEqual(levels[7], 0.75) @@ -192,7 +194,7 @@ def test_expected_levels_reflects_in_output_signed(self): # Call the quantizer output = quantizer(self.input_tensor, training=True, weights=weights) output_set = sorted(set(output.numpy().flatten())) - expected_set = list(quantizer.levels()) + expected_set = list(quantizer.compute_levels()) self.assertListEqual(output_set, expected_set) @@ -221,7 +223,7 @@ def test_expected_levels_reflects_in_output_unsigned(self): # Call the quantizer output = quantizer(self.input_tensor, training=True, weights=weights) output_set = sorted(set(output.numpy().flatten())) - expected_set = list(quantizer.levels()) + expected_set = list(quantizer.compute_levels()) self.assertListEqual(output_set, expected_set) diff --git a/src/utils/metrics.py b/src/utils/metrics.py index 96dd4ce..25cfd8d 100644 --- a/src/utils/metrics.py +++ b/src/utils/metrics.py @@ -77,18 +77,16 @@ def compute_space_complexity_model(model: tf.keras.Model) -> float: """Compute the uniform space complexity of a model based on its quantization configuration.""" total_space_complexity = 0 + print(model.summary()) for layer in model.layers: - print(f"Layer {layer}") if isinstance(layer, QuantizeWrapperV2): layer_size = compute_space_complexity_quantize(layer) elif isinstance(layer, QuantizeLayer): # Verify if there's no layer we need to keep of this type. - print(f"Skipping {layer.name}") continue else: layer_size = compute_space_complexity(layer) - print(f"Layer: {layer.name}, Size: {layer_size} bits") total_space_complexity += layer_size return total_space_complexity diff --git a/src/utils/metrics_test.py b/src/utils/metrics_test.py index 3040585..760be67 100755 --- a/src/utils/metrics_test.py +++ b/src/utils/metrics_test.py @@ -3,13 +3,13 @@ import unittest import tensorflow as tf -from metrics import ( - compute_space_complexity_model, - compute_space_complexity_quantize, -) from configs.qmodel import apply_quantization from quantizers.uniform_quantizer import UniformQuantizer +from utils.metrics import ( + compute_space_complexity_model, + compute_space_complexity_quantize, +) # From tensorflow internal code From ca372c8b1f68a3890cbb106cd8638be0b3666a12 Mon Sep 17 00:00:00 2001 From: colorete87 Date: Tue, 27 May 2025 21:10:44 -0300 Subject: [PATCH 7/8] Add more tests --- src/quantizers/flex_quantizer.py | 9 + src/utils/metrics.py | 20 +- src/utils/metrics_test.py | 912 +++++++++++++++++++++++++++++++ 3 files changed, 932 insertions(+), 9 deletions(-) diff --git a/src/quantizers/flex_quantizer.py b/src/quantizers/flex_quantizer.py index b5d2715..ea2883b 100644 --- a/src/quantizers/flex_quantizer.py +++ b/src/quantizers/flex_quantizer.py @@ -118,6 +118,15 @@ def delta(self): def quantize_op(self, x): # Quantize levels (uniform quantization) qlevels = self.delta() * tf.math.floor(self.levels / self.delta()) + # TODO(Colo): I think we can replace + # `qlevels = self.delta() * tf.math.floor(self.levels / self.delta())` + # with + # `qlevels = self.qlevels` + # and compute + # `self.qlevels = self.delta() * tf.math.floor(self.levels / self.delta())` + # before + # `q = self.quantize_op(x)` + # in the `quantize` function. # Quantize input q = tf.zeros_like(x) diff --git a/src/utils/metrics.py b/src/utils/metrics.py index 25cfd8d..e40d314 100644 --- a/src/utils/metrics.py +++ b/src/utils/metrics.py @@ -23,7 +23,7 @@ def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float: if not isinstance(qlayer, QuantizeWrapperV2): raise ValueError("Layer is not a QuantizeWrapperV2") - total_layer_size = 0 + total_layer_size = 0.0 qconfig = qlayer.quantize_config # Assumption: order is the same for layer.weights and get_weights_and_quantizers @@ -36,13 +36,15 @@ def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float: weight_size = weight.shape.num_elements() * quantizer.bits elif isinstance(quantizer, FlexQuantizer): qweight = quantizer.quantize_op(weight) - counter = Counter(qweight.numpy().flatten()) - total = sum(counter.values()) - probabilities = np.array( - [freq / total for freq in counter.values()] - ) - entropy = -np.sum(probabilities * np.log2(probabilities)) - weight_size = weight.shape.num_elements() * entropy + # counter = Counter(qweight.numpy().flatten()) + # total = sum(counter.values()) + # probabilities = np.array( + # [freq / total for freq in counter.values()] + # ) + # entropy = -np.sum(probabilities * np.log2(probabilities)) + # weight_size = weight.shape.num_elements() * entropy + weight_size = compute_huffman_nominal_complexity(qweight) + weight_size += quantizer.n_levels * quantizer.bits else: raise ValueError(f"Unknown quantizer type: {type(quantizer)}") total_layer_size += weight_size @@ -77,7 +79,7 @@ def compute_space_complexity_model(model: tf.keras.Model) -> float: """Compute the uniform space complexity of a model based on its quantization configuration.""" total_space_complexity = 0 - print(model.summary()) + # print(model.summary()) for layer in model.layers: if isinstance(layer, QuantizeWrapperV2): diff --git a/src/utils/metrics_test.py b/src/utils/metrics_test.py index 760be67..9b246e5 100755 --- a/src/utils/metrics_test.py +++ b/src/utils/metrics_test.py @@ -1,12 +1,17 @@ #!/usr/bin/env python3 import unittest +from collections import Counter +import numpy as np import tensorflow as tf +from tensorflow.keras import layers, models from configs.qmodel import apply_quantization +from quantizers.flex_quantizer import FlexQuantizer from quantizers.uniform_quantizer import UniformQuantizer from utils.metrics import ( + compute_huffman_nominal_complexity, compute_space_complexity_model, compute_space_complexity_quantize, ) @@ -109,5 +114,912 @@ def test_verify_proportional_to_base_size(bits): test_verify_proportional_to_base_size(bits) +class TestComputeHuffmanNominalComplexity(unittest.TestCase): + """Unit tests for `compute_huffman_nominal_complexity`.""" + + def test_all_identical(self): + """Entropy should be zero when every symbol is the same.""" + q = tf.constant([7, 7, 7, 7], dtype=tf.int32) + expected = 0.0 # N=4 => 0 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_balanced_two_symbols(self): + """50 % / 50 % distribution => entropy = 1 bit per symbol.""" + q = tf.constant([0, 1, 0, 1], dtype=tf.int32) + expected = 4.0 # N=4 => 4 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_three_to_one_ratio(self): + """75 % / 25 % distribution => entropy ~0.811278 bits per symbol.""" + q = tf.constant([0, 0, 0, 1], dtype=tf.int32) + entropy = -( + 0.75 * np.log2(0.75) + 0.25 * np.log2(0.25) + ) # ~0.811278 bits + expected = 4 * entropy # N=4 => ~3.2451 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_larger_vector_distribution(self): + """100 elements: 50*0, 30*1, 20*2 =>""" + # build the vector + vals = [0] * 50 + [1] * 30 + [2] * 20 + q = tf.constant(vals, dtype=tf.int32) + + # compute expected: N=100, p0=0.5, p1=0.3, p2=0.2 + ps = np.array([0.5, 0.3, 0.2]) + entropy = -np.sum(ps * np.log2(ps)) # ~1.485475 + expected = 100 * entropy # N=100 => ~148.5475 + + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + +def apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict): + """TODO(Colo): This function will is implemented in branch + colo/model_evalution in QTensor/src/examples/functions.py. + + When merged, import that functions insted of redefining it here. + """ + for layer in qmodel.layers: + orig_layer_name = layer.name + if orig_layer_name.startswith("quant_"): + orig_layer_name = orig_layer_name[len("quant_") :] + + if orig_layer_name in alpha_dict: + for alpha_type in ["kernel", "bias", "activation"]: + new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) + new_levels = levels_dict[orig_layer_name].get(alpha_type, None) + new_thresholds = thresholds_dict[orig_layer_name].get( + alpha_type, None + ) + if new_alpha is not None: + for v in layer.weights: + if "alpha" in v.name and alpha_type in v.name: + v.assign(new_alpha) + # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") + elif ( + alpha_type == "activation" + and "post_activation" in v.name + and "alpha" in v.name + ): + v.assign(new_alpha) + # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") + if "levels" in v.name and alpha_type in v.name: + v.assign(new_levels) + # print(f"Updated {v.name} ({alpha_type}) with new levels value {new_levels}") + if "thresholds" in v.name and alpha_type in v.name: + v.assign(new_thresholds) + # print(f"Updated {v.name} ({alpha_type}) with new thresholds value {new_thresholds}") + + return qmodel + + +def apply_alpha_dict(qmodel, alpha_dict): + """TODO(Colo): This function will is implemented in branch + colo/model_evalution in QTensor/src/examples/functions.py. + + When merged, import that functions insted of redefining it here. + """ + for layer in qmodel.layers: + orig_layer_name = layer.name + if orig_layer_name.startswith("quant_"): + orig_layer_name = orig_layer_name[len("quant_") :] + + if orig_layer_name in alpha_dict: + for alpha_type in ["kernel", "bias", "activation"]: + new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) + if new_alpha is not None: + for v in layer.weights: + if "alpha" in v.name and alpha_type in v.name: + v.assign(new_alpha) + # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") + elif ( + alpha_type == "activation" + and "post_activation" in v.name + and "alpha" in v.name + ): + v.assign(new_alpha) + # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") + + return qmodel + + +class TestLeNetQuantizedComplexity(unittest.TestCase): + def setUp(self): + # build a small LeNet-5 for, say, 10 classes over 28×28×1 inputs + categories = 10 + input_shape = (None, 28, 28, 1) + self.model_lenet = models.Sequential( + [ + layers.Conv2D( + 6, + kernel_size=5, + activation="relu", + padding="same", + name="conv2d", + ), + layers.AveragePooling2D(name="pool1"), + layers.Conv2D( + 16, kernel_size=5, activation="relu", name="conv2d_1" + ), + layers.AveragePooling2D(name="pool2"), + layers.Flatten(name="flatten"), + layers.Dense(120, activation="relu", name="dense"), + layers.Dense(84, activation="relu", name="dense_1"), + layers.Dense(categories, activation="softmax", name="dense_2"), + ] + ) + self.model_lenet.build(input_shape) + + input_shape = (None, 28, 28, 1) + self.model_single_conv2d = models.Sequential( + [ + layers.Conv2D( + 32, + kernel_size=5, + activation="relu", + padding="same", + name="conv2d", + ), + ] + ) + self.model_single_conv2d.build(input_shape) + + input_shape = (None, 10) + self.model_single_dense = models.Sequential( + [ + layers.Dense(20, activation="relu", name="dense"), + ] + ) + self.model_single_dense.build(input_shape) + + def setup_model(self, model): + self.model = model + self.model.compile( + loss="categorical_crossentropy", metrics=["accuracy"] + ) + # run one dummy inference so any lazy weights are created + input_shape = self.model.input_shape + input_shape = (1,) + input_shape[1:] + self.model(tf.random.normal(input_shape)) + self.kernel_shape = list() + self.bias_shape = list() + self.kernel_size = list() + self.bias_size = list() + for layer in self.model.layers: + if hasattr(layer, "kernel"): + shape = layer.kernel.shape + self.kernel_shape.append(shape) + self.kernel_size.append(shape.num_elements()) + if hasattr(layer, "bias"): + shape = layer.bias.shape + self.bias_shape.append(shape) + self.bias_size.append(shape.num_elements()) + # self.model.summary() + + def gen_qconfig( + self, + qtype, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels=None, + bias_n_levels=None, + ): + qconfig = {} + for i, layer in enumerate(layer_names): + qconfig[layer] = dict() + qconfig[layer]["weights"] = dict() + if qtype == "uniform": + # 1) define an 8‐bit uniform quantizer on every kernel + for i, layer in enumerate(layer_names): + qconfig[layer]["weights"]["kernel"] = UniformQuantizer( + bits=kernel_bits[i], signed=True + ) + qconfig[layer]["weights"]["bias"] = UniformQuantizer( + bits=bias_bits[i], signed=True + ) + + elif qtype == "flexible": + # 1) build a qconfig where every layer's kernel & bias uses a FlexQuantizer + for i, layer in enumerate(layer_names): + qconfig[layer]["weights"]["kernel"] = FlexQuantizer( + bits=kernel_bits[i], + n_levels=kernel_n_levels[i], + signed=True, + ) + qconfig[layer]["weights"]["bias"] = FlexQuantizer( + bits=bias_bits[i], n_levels=bias_n_levels[i], signed=True + ) + + else: + raise ValueError(f"Invalid qtype ({qtype})") + + # DEBUG: Print qconfig + # for key in qconfig: + # print(f'{key}: {qconfig[key]}') + return qconfig + + def random_probability_vector(self, n, epsilon=1e-8): + vec = np.random.rand(n) + epsilon + return vec / vec.sum() + + def equal_probability_vector(self, n): + vec = np.ones(n) + return vec / vec.sum() + + def increasing_probability_vector(self, n): + vec = np.arange(1, n + 1) + return vec / vec.sum() + + def base_uniform_quantizer_space_complexity( + self, + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ): + """All weights quantized using uniform quantizer.""" + + self.setup_model(model) + + qconfig = self.gen_qconfig( + "uniform", layer_names, kernel_bits, bias_bits + ) + + # 4) compute expected size + expected_bits = 0 + for kb, ks, bb, bs in zip( + kernel_bits, self.kernel_size, bias_bits, self.bias_size + ): + expected_bits += kb * ks + bb * bs + + # 2) apply quantization and build + input_shape = self.model.input_shape + qmodel = apply_quantization(self.model, qconfig) + qmodel.build(input_shape) + input_shape = (1,) + input_shape[1:] + qmodel(tf.random.normal(input_shape)) + + # 3) set alphas + alpha_dict = { + layer_name: {"kernel": kalpha, "bias": balpha} + for layer_name, kalpha, balpha in zip( + qconfig, kernel_alphas, bias_alphas + ) + } + apply_alpha_dict(qmodel, alpha_dict) + + # 5) Check result + computed_bits = compute_space_complexity_model(qmodel) + self.assertEqual(computed_bits, expected_bits) + + def base_flex_quantizer_space_complexity( + self, + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ): + """All weights quantized using flexible quantizer.""" + + self.setup_model(model) + + qconfig = self.gen_qconfig( + "flexible", + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + ) + + # 2) compute expected total bits + expected_bits = 0.0 + kernels = [] + biases = [] + kvvalues = [] + bvvalues = [] + # pack both “kernel” and “bias” data into a single list of groups + groups = [ + ( + self.kernel_shape, + self.kernel_size, + kernel_bits, + kernel_n_levels, + kernel_alphas, + kernel_probabilities, + kernels, + kvvalues, + ), + ( + self.bias_shape, + self.bias_size, + bias_bits, + bias_n_levels, + bias_alphas, + bias_probabilities, + biases, + bvvalues, + ), + ] + for ( + shape_list, + size_list, + bits_list, + levels_list, + alphas_list, + probs_list, + container, + vvalues, + ) in groups: + for shape, size, bits, n_levels, alpha, probs in zip( + shape_list, + size_list, + bits_list, + levels_list, + alphas_list, + probs_list, + ): + # 1) build the set of valid quantized values + valid_values = np.linspace(-alpha, alpha, num=2**bits + 1)[:-1] + + # 2) pick exactly `n_levels` of them, then sample your weight‐vector + values = np.sort( + np.random.choice( + valid_values, size=n_levels, replace=False + ) + ) + vvalues.append(values) + vector = np.random.choice( + values, size=size, replace=True, p=probs + ) + weight = vector.reshape(shape) + + # store the weight‐tensor + container.append(weight) + + # 3) recompute empirical probabilities from the sampled weights + counter = Counter(weight.flatten()) + sorted_items = sorted(counter.items()) + counter_keys, counter_values = zip(*sorted_items) + emp_probs = np.array(counter_values) / sum(counter_values) + + # 4) entropy and Huffman bits + entropy = -np.sum(emp_probs * np.log2(emp_probs)) + expected_bits += size * entropy + expected_bits += n_levels * bits + + # 5) sanity checks + # a) all entries are in the valid set + mask = np.isin(weight, valid_values) + assert np.all( + mask + ), f"These values are not valid: {weight[~mask]}" + # b) no more unique levels than n_levels + unique_vals = np.unique(weight) + assert unique_vals.size <= n_levels, ( + f"Expected <= {n_levels} unique values, but found " + f"{unique_vals.size}: {unique_vals}" + ) + + # 3) Set weights to the model + weights = list() + for k, b in zip(kernels, biases): + weights.append(k) + weights.append(b) + self.model.set_weights(weights) + + # 4) apply quantization & init everything + input_shape = self.model.input_shape + qmodel = apply_quantization(self.model, qconfig) + qmodel.build(input_shape) + input_shape = (1,) + input_shape[1:] + qmodel(tf.random.normal(input_shape)) + + # 5) set alphats + # alpha_dict = { + # layer_name: {"kernel": kalpha, "bias": balpha} + # for layer_name, kalpha, balpha in zip( + # qconfig, kernel_alphas, bias_alphas + # ) + # } + alpha_dict = {} + levels_dict = {} + thresholds_dict = {} + for layer_name, kalpha, balpha, k, b in zip( + qconfig, kernel_alphas, bias_alphas, kvvalues, bvvalues + ): + klevels = k + blevels = b + kthresholds = [-kalpha] + list((k[1:] + k[:-1]) / 2) + [kalpha] + bthresholds = [-balpha] + list((b[1:] + b[:-1]) / 2) + [balpha] + alpha_dict[layer_name] = {"kernel": kalpha, "bias": balpha} + levels_dict[layer_name] = {"kernel": klevels, "bias": blevels} + thresholds_dict[layer_name] = { + "kernel": kthresholds, + "bias": bthresholds, + } + apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict) + + # 6) compare to your implementation + computed_bits = compute_space_complexity_model(qmodel) + self.assertAlmostEqual(computed_bits, expected_bits, places=6) + + def test_uniform_quantizer_space_complexity_single_dense(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_uniform_quantizer_space_complexity_single_conv2d(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_uniform_quantizer_space_complexity_lenet(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_1(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [2] * 1 # TEST: for levels = 2 + bias_n_levels = [2] * 1 # TEST: for levels = 2 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_2(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_3(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.increasing_probability_vector(kl)) + bias_probabilities.append(self.increasing_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_4(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_1(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [2] * 1 # TEST: for levels = 2 + bias_n_levels = [2] * 1 # TEST: for levels = 2 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_2(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_3(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.increasing_probability_vector(kl)) + bias_probabilities.append(self.increasing_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_4(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_1(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [2] * 5 # TEST: for levels = 2 + bias_n_levels = [2] * 5 # TEST: for levels = 2 + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_2(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_3(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.increasing_probability_vector(kl) + ) # TEST: increasing probabilities + bias_probabilities.append( + self.increasing_probability_vector(bl) + ) # TEST: increasing probabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_4(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + if __name__ == "__main__": unittest.main() From ded44d538c99925c2b211dcedcd344ebcf72ba5c Mon Sep 17 00:00:00 2001 From: Francisco Rossi Date: Fri, 30 May 2025 12:44:04 -0300 Subject: [PATCH 8/8] Split metrics test --- src/examples/mnist.py | 2 +- src/utils/huffman.py | 47 +- src/utils/huffman_test.py | 58 ++ src/utils/metrics.py | 23 +- src/utils/metrics_lenet_test.py | 888 +++++++++++++++++++++++++++++++ src/utils/metrics_test.py | 912 -------------------------------- src/utils/{utils.py => plot.py} | 0 7 files changed, 959 insertions(+), 971 deletions(-) create mode 100755 src/utils/huffman_test.py create mode 100755 src/utils/metrics_lenet_test.py rename src/utils/{utils.py => plot.py} (100%) diff --git a/src/examples/mnist.py b/src/examples/mnist.py index 3695336..135c3f5 100755 --- a/src/examples/mnist.py +++ b/src/examples/mnist.py @@ -12,7 +12,7 @@ from configs.qmodel import apply_quantization from quantizers.flex_quantizer import FlexQuantizer from quantizers.uniform_quantizer import UniformQuantizer -from utils.utils import VariableHistoryCallback, plot_snapshot +from utils.plot import VariableHistoryCallback, plot_snapshot def generate_dataset(): diff --git a/src/utils/huffman.py b/src/utils/huffman.py index 5093e0f..053d54a 100644 --- a/src/utils/huffman.py +++ b/src/utils/huffman.py @@ -1,39 +1,14 @@ -import heapq -from collections import Counter, namedtuple +from collections import Counter +import numpy as np -# A simple Node for the Huffman tree -class Node(namedtuple("Node", ["freq", "value", "left", "right"])): - def __lt__(self, other): - return self.freq < other.freq - -def build_huffman_tree(weights): - counter = Counter(weights) - heap = [Node(freq, value, None, None) for value, freq in counter.items()] - heapq.heapify(heap) - - while len(heap) > 1: - left = heapq.heappop(heap) - right = heapq.heappop(heap) - new_node = Node(left.freq + right.freq, None, left, right) - heapq.heappush(heap, new_node) - - return heap[0] - - -def assign_codes(node, prefix="", codebook={}): - if node.value is not None: - codebook[node.value] = prefix - else: - assign_codes(node.left, prefix + "0", codebook) - assign_codes(node.right, prefix + "1", codebook) - return codebook - - -# TODO(frneer): Add tests for the Huffman tree -# test_weights = [1, 2, 2, 3, 3, 3, 4, 4, 4, 4] -# huffman_tree = build_huffman_tree(test_weights) -# # print(huffman_tree) -# huffman_codes = assign_codes(huffman_tree) -# # print("Huffman Codes:", [len(code) for idx, code in huffman_codes.items()]) +def compute_huffman_nominal_complexity(qweights): + """Compute the nominal complexity of a huffman codification of the + quantized weights.""" + N = qweights.shape.num_elements() + counter = Counter(qweights.numpy().flatten()) + total = sum(counter.values()) + probabilities = np.array([freq / total for freq in counter.values()]) + entropy = -np.sum(probabilities * np.log2(probabilities)) + return N * entropy diff --git a/src/utils/huffman_test.py b/src/utils/huffman_test.py new file mode 100755 index 0000000..486b0d5 --- /dev/null +++ b/src/utils/huffman_test.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 + +import unittest + +import numpy as np +import tensorflow as tf + +from utils.huffman import compute_huffman_nominal_complexity + + +class TestComputeHuffmanNominalComplexity(unittest.TestCase): + """Unit tests for `compute_huffman_nominal_complexity`.""" + + def test_all_identical(self): + """Entropy should be zero when every symbol is the same.""" + q = tf.constant([7, 7, 7, 7], dtype=tf.int32) + expected = 0.0 # N=4 => 0 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_balanced_two_symbols(self): + """50 % / 50 % distribution => entropy = 1 bit per symbol.""" + q = tf.constant([0, 1, 0, 1], dtype=tf.int32) + expected = 4.0 # N=4 => 4 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_three_to_one_ratio(self): + """75 % / 25 % distribution => entropy ~0.811278 bits per symbol.""" + q = tf.constant([0, 0, 0, 1], dtype=tf.int32) + entropy = -( + 0.75 * np.log2(0.75) + 0.25 * np.log2(0.25) + ) # ~0.811278 bits + expected = 4 * entropy # N=4 => ~3.2451 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_larger_vector_distribution(self): + """100 elements: 50*0, 30*1, 20*2 =>""" + # build the vector + vals = [0] * 50 + [1] * 30 + [2] * 20 + q = tf.constant(vals, dtype=tf.int32) + + # compute expected: N=100, p0=0.5, p1=0.3, p2=0.2 + ps = np.array([0.5, 0.3, 0.2]) + entropy = -np.sum(ps * np.log2(ps)) # ~1.485475 + expected = 100 * entropy # N=100 => ~148.5475 + + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/utils/metrics.py b/src/utils/metrics.py index e40d314..99816af 100644 --- a/src/utils/metrics.py +++ b/src/utils/metrics.py @@ -1,6 +1,3 @@ -from collections import Counter - -import numpy as np import tensorflow as tf from tensorflow_model_optimization.python.core.quantization.keras.quantize_layer import ( QuantizeLayer, @@ -11,6 +8,7 @@ from quantizers.flex_quantizer import FlexQuantizer from quantizers.uniform_quantizer import UniformQuantizer +from utils.huffman import compute_huffman_nominal_complexity def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float: @@ -36,13 +34,6 @@ def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float: weight_size = weight.shape.num_elements() * quantizer.bits elif isinstance(quantizer, FlexQuantizer): qweight = quantizer.quantize_op(weight) - # counter = Counter(qweight.numpy().flatten()) - # total = sum(counter.values()) - # probabilities = np.array( - # [freq / total for freq in counter.values()] - # ) - # entropy = -np.sum(probabilities * np.log2(probabilities)) - # weight_size = weight.shape.num_elements() * entropy weight_size = compute_huffman_nominal_complexity(qweight) weight_size += quantizer.n_levels * quantizer.bits else: @@ -52,17 +43,6 @@ def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float: return total_layer_size -def compute_huffman_nominal_complexity(qweights): - """Compute the nominal complexity of a huffman codification of the - quantized weights.""" - N = qweights.shape.num_elements() - counter = Counter(qweights.numpy().flatten()) - total = sum(counter.values()) - probabilities = np.array([freq / total for freq in counter.values()]) - entropy = -np.sum(probabilities * np.log2(probabilities)) - return N * entropy - - def compute_space_complexity(layer): """Compute the space complexity for a normal layer.""" total_layer_size = 0 @@ -79,7 +59,6 @@ def compute_space_complexity_model(model: tf.keras.Model) -> float: """Compute the uniform space complexity of a model based on its quantization configuration.""" total_space_complexity = 0 - # print(model.summary()) for layer in model.layers: if isinstance(layer, QuantizeWrapperV2): diff --git a/src/utils/metrics_lenet_test.py b/src/utils/metrics_lenet_test.py new file mode 100755 index 0000000..37cb1cf --- /dev/null +++ b/src/utils/metrics_lenet_test.py @@ -0,0 +1,888 @@ +#!/usr/bin/env python3 + +import unittest +from collections import Counter + +import numpy as np +import tensorflow as tf +from tensorflow.keras import layers, models + +from configs.qmodel import apply_quantization +from quantizers.flex_quantizer import FlexQuantizer +from quantizers.uniform_quantizer import UniformQuantizer +from utils.metrics import compute_space_complexity_model + + +def apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict): + """TODO(Colo): This function will is implemented in branch + colo/model_evalution in QTensor/src/examples/functions.py. + + When merged, import that functions insted of redefining it here. + """ + for layer in qmodel.layers: + orig_layer_name = layer.name + if orig_layer_name.startswith("quant_"): + orig_layer_name = orig_layer_name[len("quant_") :] + + if orig_layer_name in alpha_dict: + for alpha_type in ["kernel", "bias", "activation"]: + new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) + new_levels = levels_dict[orig_layer_name].get(alpha_type, None) + new_thresholds = thresholds_dict[orig_layer_name].get( + alpha_type, None + ) + if new_alpha is not None: + for v in layer.weights: + if "alpha" in v.name and alpha_type in v.name: + v.assign(new_alpha) + # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") + elif ( + alpha_type == "activation" + and "post_activation" in v.name + and "alpha" in v.name + ): + v.assign(new_alpha) + # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") + if "levels" in v.name and alpha_type in v.name: + v.assign(new_levels) + # print(f"Updated {v.name} ({alpha_type}) with new levels value {new_levels}") + if "thresholds" in v.name and alpha_type in v.name: + v.assign(new_thresholds) + # print(f"Updated {v.name} ({alpha_type}) with new thresholds value {new_thresholds}") + + return qmodel + + +def apply_alpha_dict(qmodel, alpha_dict): + """TODO(Colo): This function will is implemented in branch + colo/model_evalution in QTensor/src/examples/functions.py. + + When merged, import that functions insted of redefining it here. + """ + for layer in qmodel.layers: + orig_layer_name = layer.name + if orig_layer_name.startswith("quant_"): + orig_layer_name = orig_layer_name[len("quant_") :] + + if orig_layer_name in alpha_dict: + for alpha_type in ["kernel", "bias", "activation"]: + new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) + if new_alpha is not None: + for v in layer.weights: + if "alpha" in v.name and alpha_type in v.name: + v.assign(new_alpha) + # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") + elif ( + alpha_type == "activation" + and "post_activation" in v.name + and "alpha" in v.name + ): + v.assign(new_alpha) + # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") + + return qmodel + + +def create_lenet_model(categories): + model = models.Sequential( + [ + layers.Conv2D( + 6, + kernel_size=5, + activation="relu", + padding="same", + name="conv2d", + ), + layers.AveragePooling2D(name="pool1"), + layers.Conv2D( + 16, kernel_size=5, activation="relu", name="conv2d_1" + ), + layers.AveragePooling2D(name="pool2"), + layers.Flatten(name="flatten"), + layers.Dense(120, activation="relu", name="dense"), + layers.Dense(84, activation="relu", name="dense_1"), + layers.Dense(categories, activation="softmax", name="dense_2"), + ] + ) + return model + + +def create_single_conv2d_model(): + model = models.Sequential( + [ + layers.Conv2D( + 32, + kernel_size=5, + activation="relu", + padding="same", + name="conv2d", + ), + ] + ) + return model + + +class TestLeNetQuantizedComplexity(unittest.TestCase): + def setUp(self): + # build a small LeNet-5 for, say, 10 classes over 28×28×1 inputs + categories = 10 + input_shape = (None, 28, 28, 1) + self.model_lenet = create_lenet_model(categories) + self.model_lenet.build(input_shape) + + input_shape = (None, 28, 28, 1) + self.model_single_conv2d = create_single_conv2d_model() + self.model_single_conv2d.build(input_shape) + + input_shape = (None, 10) + self.model_single_dense = models.Sequential( + [ + layers.Dense(20, activation="relu", name="dense"), + ] + ) + self.model_single_dense.build(input_shape) + + def setup_model(self, model): + self.model = model + self.model.compile( + loss="categorical_crossentropy", metrics=["accuracy"] + ) + # run one dummy inference so any lazy weights are created + input_shape = self.model.input_shape + input_shape = (1,) + input_shape[1:] + self.model(tf.random.normal(input_shape)) + self.kernel_shape = list() + self.bias_shape = list() + self.kernel_size = list() + self.bias_size = list() + for layer in self.model.layers: + if hasattr(layer, "kernel"): + shape = layer.kernel.shape + self.kernel_shape.append(shape) + self.kernel_size.append(shape.num_elements()) + if hasattr(layer, "bias"): + shape = layer.bias.shape + self.bias_shape.append(shape) + self.bias_size.append(shape.num_elements()) + # self.model.summary() + + def gen_qconfig( + self, + qtype, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels=None, + bias_n_levels=None, + ): + qconfig = {} + for i, layer in enumerate(layer_names): + qconfig[layer] = dict() + qconfig[layer]["weights"] = dict() + if qtype == "uniform": + # 1) define an 8‐bit uniform quantizer on every kernel + for i, layer in enumerate(layer_names): + qconfig[layer]["weights"]["kernel"] = UniformQuantizer( + bits=kernel_bits[i], signed=True + ) + qconfig[layer]["weights"]["bias"] = UniformQuantizer( + bits=bias_bits[i], signed=True + ) + + elif qtype == "flexible": + # 1) build a qconfig where every layer's kernel & bias uses a FlexQuantizer + for i, layer in enumerate(layer_names): + qconfig[layer]["weights"]["kernel"] = FlexQuantizer( + bits=kernel_bits[i], + n_levels=kernel_n_levels[i], + signed=True, + ) + qconfig[layer]["weights"]["bias"] = FlexQuantizer( + bits=bias_bits[i], n_levels=bias_n_levels[i], signed=True + ) + + else: + raise ValueError(f"Invalid qtype ({qtype})") + + # DEBUG: Print qconfig + # for key in qconfig: + # print(f'{key}: {qconfig[key]}') + return qconfig + + def random_probability_vector(self, n, epsilon=1e-8): + vec = np.random.rand(n) + epsilon + return vec / vec.sum() + + def equal_probability_vector(self, n): + vec = np.ones(n) + return vec / vec.sum() + + def increasing_probability_vector(self, n): + vec = np.arange(1, n + 1) + return vec / vec.sum() + + def base_uniform_quantizer_space_complexity( + self, + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ): + """All weights quantized using uniform quantizer.""" + + self.setup_model(model) + + qconfig = self.gen_qconfig( + "uniform", layer_names, kernel_bits, bias_bits + ) + + # 4) compute expected size + expected_bits = 0 + for kb, ks, bb, bs in zip( + kernel_bits, self.kernel_size, bias_bits, self.bias_size + ): + expected_bits += kb * ks + bb * bs + + # 2) apply quantization and build + input_shape = self.model.input_shape + qmodel = apply_quantization(self.model, qconfig) + qmodel.build(input_shape) + input_shape = (1,) + input_shape[1:] + qmodel(tf.random.normal(input_shape)) + + # 3) set alphas + alpha_dict = { + layer_name: {"kernel": kalpha, "bias": balpha} + for layer_name, kalpha, balpha in zip( + qconfig, kernel_alphas, bias_alphas + ) + } + apply_alpha_dict(qmodel, alpha_dict) + + # 5) Check result + computed_bits = compute_space_complexity_model(qmodel) + self.assertEqual(computed_bits, expected_bits) + + def base_flex_quantizer_space_complexity( + self, + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ): + """All weights quantized using flexible quantizer.""" + + self.setup_model(model) + + qconfig = self.gen_qconfig( + "flexible", + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + ) + + # 2) compute expected total bits + expected_bits = 0.0 + kernels = [] + biases = [] + kvvalues = [] + bvvalues = [] + # pack both “kernel” and “bias” data into a single list of groups + groups = [ + ( + self.kernel_shape, + self.kernel_size, + kernel_bits, + kernel_n_levels, + kernel_alphas, + kernel_probabilities, + kernels, + kvvalues, + ), + ( + self.bias_shape, + self.bias_size, + bias_bits, + bias_n_levels, + bias_alphas, + bias_probabilities, + biases, + bvvalues, + ), + ] + for ( + shape_list, + size_list, + bits_list, + levels_list, + alphas_list, + probs_list, + container, + vvalues, + ) in groups: + for shape, size, bits, n_levels, alpha, probs in zip( + shape_list, + size_list, + bits_list, + levels_list, + alphas_list, + probs_list, + ): + # 1) build the set of valid quantized values + valid_values = np.linspace(-alpha, alpha, num=2**bits + 1)[:-1] + + # 2) pick exactly `n_levels` of them, then sample your weight‐vector + values = np.sort( + np.random.choice( + valid_values, size=n_levels, replace=False + ) + ) + vvalues.append(values) + vector = np.random.choice( + values, size=size, replace=True, p=probs + ) + weight = vector.reshape(shape) + + # store the weight‐tensor + container.append(weight) + + # 3) recompute empirical probabilities from the sampled weights + counter = Counter(weight.flatten()) + sorted_items = sorted(counter.items()) + counter_keys, counter_values = zip(*sorted_items) + emp_probs = np.array(counter_values) / sum(counter_values) + + # 4) entropy and Huffman bits + entropy = -np.sum(emp_probs * np.log2(emp_probs)) + expected_bits += size * entropy + expected_bits += n_levels * bits + + # 5) sanity checks + # a) all entries are in the valid set + mask = np.isin(weight, valid_values) + assert np.all( + mask + ), f"These values are not valid: {weight[~mask]}" + # b) no more unique levels than n_levels + unique_vals = np.unique(weight) + assert unique_vals.size <= n_levels, ( + f"Expected <= {n_levels} unique values, but found " + f"{unique_vals.size}: {unique_vals}" + ) + + # 3) Set weights to the model + weights = list() + for k, b in zip(kernels, biases): + weights.append(k) + weights.append(b) + self.model.set_weights(weights) + + # 4) apply quantization & init everything + input_shape = self.model.input_shape + qmodel = apply_quantization(self.model, qconfig) + qmodel.build(input_shape) + input_shape = (1,) + input_shape[1:] + qmodel(tf.random.normal(input_shape)) + + # 5) set alphats + # alpha_dict = { + # layer_name: {"kernel": kalpha, "bias": balpha} + # for layer_name, kalpha, balpha in zip( + # qconfig, kernel_alphas, bias_alphas + # ) + # } + alpha_dict = {} + levels_dict = {} + thresholds_dict = {} + for layer_name, kalpha, balpha, k, b in zip( + qconfig, kernel_alphas, bias_alphas, kvvalues, bvvalues + ): + klevels = k + blevels = b + kthresholds = [-kalpha] + list((k[1:] + k[:-1]) / 2) + [kalpha] + bthresholds = [-balpha] + list((b[1:] + b[:-1]) / 2) + [balpha] + alpha_dict[layer_name] = {"kernel": kalpha, "bias": balpha} + levels_dict[layer_name] = {"kernel": klevels, "bias": blevels} + thresholds_dict[layer_name] = { + "kernel": kthresholds, + "bias": bthresholds, + } + apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict) + + # 6) compare to your implementation + computed_bits = compute_space_complexity_model(qmodel) + self.assertAlmostEqual(computed_bits, expected_bits, places=6) + + def test_uniform_quantizer_space_complexity_single_dense(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_uniform_quantizer_space_complexity_single_conv2d(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_uniform_quantizer_space_complexity_lenet(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_1(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [2] * 1 # TEST: for levels = 2 + bias_n_levels = [2] * 1 # TEST: for levels = 2 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_2(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_3(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.increasing_probability_vector(kl)) + bias_probabilities.append(self.increasing_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_4(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_1(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [2] * 1 # TEST: for levels = 2 + bias_n_levels = [2] * 1 # TEST: for levels = 2 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_2(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_3(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.increasing_probability_vector(kl)) + bias_probabilities.append(self.increasing_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_4(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_1(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [2] * 5 # TEST: for levels = 2 + bias_n_levels = [2] * 5 # TEST: for levels = 2 + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_2(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_3(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.increasing_probability_vector(kl) + ) # TEST: increasing probabilities + bias_probabilities.append( + self.increasing_probability_vector(bl) + ) # TEST: increasing probabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_4(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/utils/metrics_test.py b/src/utils/metrics_test.py index 9b246e5..760be67 100755 --- a/src/utils/metrics_test.py +++ b/src/utils/metrics_test.py @@ -1,17 +1,12 @@ #!/usr/bin/env python3 import unittest -from collections import Counter -import numpy as np import tensorflow as tf -from tensorflow.keras import layers, models from configs.qmodel import apply_quantization -from quantizers.flex_quantizer import FlexQuantizer from quantizers.uniform_quantizer import UniformQuantizer from utils.metrics import ( - compute_huffman_nominal_complexity, compute_space_complexity_model, compute_space_complexity_quantize, ) @@ -114,912 +109,5 @@ def test_verify_proportional_to_base_size(bits): test_verify_proportional_to_base_size(bits) -class TestComputeHuffmanNominalComplexity(unittest.TestCase): - """Unit tests for `compute_huffman_nominal_complexity`.""" - - def test_all_identical(self): - """Entropy should be zero when every symbol is the same.""" - q = tf.constant([7, 7, 7, 7], dtype=tf.int32) - expected = 0.0 # N=4 => 0 bits - self.assertAlmostEqual( - compute_huffman_nominal_complexity(q), expected, places=6 - ) - - def test_balanced_two_symbols(self): - """50 % / 50 % distribution => entropy = 1 bit per symbol.""" - q = tf.constant([0, 1, 0, 1], dtype=tf.int32) - expected = 4.0 # N=4 => 4 bits - self.assertAlmostEqual( - compute_huffman_nominal_complexity(q), expected, places=6 - ) - - def test_three_to_one_ratio(self): - """75 % / 25 % distribution => entropy ~0.811278 bits per symbol.""" - q = tf.constant([0, 0, 0, 1], dtype=tf.int32) - entropy = -( - 0.75 * np.log2(0.75) + 0.25 * np.log2(0.25) - ) # ~0.811278 bits - expected = 4 * entropy # N=4 => ~3.2451 bits - self.assertAlmostEqual( - compute_huffman_nominal_complexity(q), expected, places=6 - ) - - def test_larger_vector_distribution(self): - """100 elements: 50*0, 30*1, 20*2 =>""" - # build the vector - vals = [0] * 50 + [1] * 30 + [2] * 20 - q = tf.constant(vals, dtype=tf.int32) - - # compute expected: N=100, p0=0.5, p1=0.3, p2=0.2 - ps = np.array([0.5, 0.3, 0.2]) - entropy = -np.sum(ps * np.log2(ps)) # ~1.485475 - expected = 100 * entropy # N=100 => ~148.5475 - - self.assertAlmostEqual( - compute_huffman_nominal_complexity(q), expected, places=6 - ) - - -def apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict): - """TODO(Colo): This function will is implemented in branch - colo/model_evalution in QTensor/src/examples/functions.py. - - When merged, import that functions insted of redefining it here. - """ - for layer in qmodel.layers: - orig_layer_name = layer.name - if orig_layer_name.startswith("quant_"): - orig_layer_name = orig_layer_name[len("quant_") :] - - if orig_layer_name in alpha_dict: - for alpha_type in ["kernel", "bias", "activation"]: - new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) - new_levels = levels_dict[orig_layer_name].get(alpha_type, None) - new_thresholds = thresholds_dict[orig_layer_name].get( - alpha_type, None - ) - if new_alpha is not None: - for v in layer.weights: - if "alpha" in v.name and alpha_type in v.name: - v.assign(new_alpha) - # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") - elif ( - alpha_type == "activation" - and "post_activation" in v.name - and "alpha" in v.name - ): - v.assign(new_alpha) - # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") - if "levels" in v.name and alpha_type in v.name: - v.assign(new_levels) - # print(f"Updated {v.name} ({alpha_type}) with new levels value {new_levels}") - if "thresholds" in v.name and alpha_type in v.name: - v.assign(new_thresholds) - # print(f"Updated {v.name} ({alpha_type}) with new thresholds value {new_thresholds}") - - return qmodel - - -def apply_alpha_dict(qmodel, alpha_dict): - """TODO(Colo): This function will is implemented in branch - colo/model_evalution in QTensor/src/examples/functions.py. - - When merged, import that functions insted of redefining it here. - """ - for layer in qmodel.layers: - orig_layer_name = layer.name - if orig_layer_name.startswith("quant_"): - orig_layer_name = orig_layer_name[len("quant_") :] - - if orig_layer_name in alpha_dict: - for alpha_type in ["kernel", "bias", "activation"]: - new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) - if new_alpha is not None: - for v in layer.weights: - if "alpha" in v.name and alpha_type in v.name: - v.assign(new_alpha) - # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") - elif ( - alpha_type == "activation" - and "post_activation" in v.name - and "alpha" in v.name - ): - v.assign(new_alpha) - # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") - - return qmodel - - -class TestLeNetQuantizedComplexity(unittest.TestCase): - def setUp(self): - # build a small LeNet-5 for, say, 10 classes over 28×28×1 inputs - categories = 10 - input_shape = (None, 28, 28, 1) - self.model_lenet = models.Sequential( - [ - layers.Conv2D( - 6, - kernel_size=5, - activation="relu", - padding="same", - name="conv2d", - ), - layers.AveragePooling2D(name="pool1"), - layers.Conv2D( - 16, kernel_size=5, activation="relu", name="conv2d_1" - ), - layers.AveragePooling2D(name="pool2"), - layers.Flatten(name="flatten"), - layers.Dense(120, activation="relu", name="dense"), - layers.Dense(84, activation="relu", name="dense_1"), - layers.Dense(categories, activation="softmax", name="dense_2"), - ] - ) - self.model_lenet.build(input_shape) - - input_shape = (None, 28, 28, 1) - self.model_single_conv2d = models.Sequential( - [ - layers.Conv2D( - 32, - kernel_size=5, - activation="relu", - padding="same", - name="conv2d", - ), - ] - ) - self.model_single_conv2d.build(input_shape) - - input_shape = (None, 10) - self.model_single_dense = models.Sequential( - [ - layers.Dense(20, activation="relu", name="dense"), - ] - ) - self.model_single_dense.build(input_shape) - - def setup_model(self, model): - self.model = model - self.model.compile( - loss="categorical_crossentropy", metrics=["accuracy"] - ) - # run one dummy inference so any lazy weights are created - input_shape = self.model.input_shape - input_shape = (1,) + input_shape[1:] - self.model(tf.random.normal(input_shape)) - self.kernel_shape = list() - self.bias_shape = list() - self.kernel_size = list() - self.bias_size = list() - for layer in self.model.layers: - if hasattr(layer, "kernel"): - shape = layer.kernel.shape - self.kernel_shape.append(shape) - self.kernel_size.append(shape.num_elements()) - if hasattr(layer, "bias"): - shape = layer.bias.shape - self.bias_shape.append(shape) - self.bias_size.append(shape.num_elements()) - # self.model.summary() - - def gen_qconfig( - self, - qtype, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels=None, - bias_n_levels=None, - ): - qconfig = {} - for i, layer in enumerate(layer_names): - qconfig[layer] = dict() - qconfig[layer]["weights"] = dict() - if qtype == "uniform": - # 1) define an 8‐bit uniform quantizer on every kernel - for i, layer in enumerate(layer_names): - qconfig[layer]["weights"]["kernel"] = UniformQuantizer( - bits=kernel_bits[i], signed=True - ) - qconfig[layer]["weights"]["bias"] = UniformQuantizer( - bits=bias_bits[i], signed=True - ) - - elif qtype == "flexible": - # 1) build a qconfig where every layer's kernel & bias uses a FlexQuantizer - for i, layer in enumerate(layer_names): - qconfig[layer]["weights"]["kernel"] = FlexQuantizer( - bits=kernel_bits[i], - n_levels=kernel_n_levels[i], - signed=True, - ) - qconfig[layer]["weights"]["bias"] = FlexQuantizer( - bits=bias_bits[i], n_levels=bias_n_levels[i], signed=True - ) - - else: - raise ValueError(f"Invalid qtype ({qtype})") - - # DEBUG: Print qconfig - # for key in qconfig: - # print(f'{key}: {qconfig[key]}') - return qconfig - - def random_probability_vector(self, n, epsilon=1e-8): - vec = np.random.rand(n) + epsilon - return vec / vec.sum() - - def equal_probability_vector(self, n): - vec = np.ones(n) - return vec / vec.sum() - - def increasing_probability_vector(self, n): - vec = np.arange(1, n + 1) - return vec / vec.sum() - - def base_uniform_quantizer_space_complexity( - self, - model, - layer_names, - kernel_bits, - bias_bits, - kernel_alphas, - bias_alphas, - ): - """All weights quantized using uniform quantizer.""" - - self.setup_model(model) - - qconfig = self.gen_qconfig( - "uniform", layer_names, kernel_bits, bias_bits - ) - - # 4) compute expected size - expected_bits = 0 - for kb, ks, bb, bs in zip( - kernel_bits, self.kernel_size, bias_bits, self.bias_size - ): - expected_bits += kb * ks + bb * bs - - # 2) apply quantization and build - input_shape = self.model.input_shape - qmodel = apply_quantization(self.model, qconfig) - qmodel.build(input_shape) - input_shape = (1,) + input_shape[1:] - qmodel(tf.random.normal(input_shape)) - - # 3) set alphas - alpha_dict = { - layer_name: {"kernel": kalpha, "bias": balpha} - for layer_name, kalpha, balpha in zip( - qconfig, kernel_alphas, bias_alphas - ) - } - apply_alpha_dict(qmodel, alpha_dict) - - # 5) Check result - computed_bits = compute_space_complexity_model(qmodel) - self.assertEqual(computed_bits, expected_bits) - - def base_flex_quantizer_space_complexity( - self, - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ): - """All weights quantized using flexible quantizer.""" - - self.setup_model(model) - - qconfig = self.gen_qconfig( - "flexible", - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - ) - - # 2) compute expected total bits - expected_bits = 0.0 - kernels = [] - biases = [] - kvvalues = [] - bvvalues = [] - # pack both “kernel” and “bias” data into a single list of groups - groups = [ - ( - self.kernel_shape, - self.kernel_size, - kernel_bits, - kernel_n_levels, - kernel_alphas, - kernel_probabilities, - kernels, - kvvalues, - ), - ( - self.bias_shape, - self.bias_size, - bias_bits, - bias_n_levels, - bias_alphas, - bias_probabilities, - biases, - bvvalues, - ), - ] - for ( - shape_list, - size_list, - bits_list, - levels_list, - alphas_list, - probs_list, - container, - vvalues, - ) in groups: - for shape, size, bits, n_levels, alpha, probs in zip( - shape_list, - size_list, - bits_list, - levels_list, - alphas_list, - probs_list, - ): - # 1) build the set of valid quantized values - valid_values = np.linspace(-alpha, alpha, num=2**bits + 1)[:-1] - - # 2) pick exactly `n_levels` of them, then sample your weight‐vector - values = np.sort( - np.random.choice( - valid_values, size=n_levels, replace=False - ) - ) - vvalues.append(values) - vector = np.random.choice( - values, size=size, replace=True, p=probs - ) - weight = vector.reshape(shape) - - # store the weight‐tensor - container.append(weight) - - # 3) recompute empirical probabilities from the sampled weights - counter = Counter(weight.flatten()) - sorted_items = sorted(counter.items()) - counter_keys, counter_values = zip(*sorted_items) - emp_probs = np.array(counter_values) / sum(counter_values) - - # 4) entropy and Huffman bits - entropy = -np.sum(emp_probs * np.log2(emp_probs)) - expected_bits += size * entropy - expected_bits += n_levels * bits - - # 5) sanity checks - # a) all entries are in the valid set - mask = np.isin(weight, valid_values) - assert np.all( - mask - ), f"These values are not valid: {weight[~mask]}" - # b) no more unique levels than n_levels - unique_vals = np.unique(weight) - assert unique_vals.size <= n_levels, ( - f"Expected <= {n_levels} unique values, but found " - f"{unique_vals.size}: {unique_vals}" - ) - - # 3) Set weights to the model - weights = list() - for k, b in zip(kernels, biases): - weights.append(k) - weights.append(b) - self.model.set_weights(weights) - - # 4) apply quantization & init everything - input_shape = self.model.input_shape - qmodel = apply_quantization(self.model, qconfig) - qmodel.build(input_shape) - input_shape = (1,) + input_shape[1:] - qmodel(tf.random.normal(input_shape)) - - # 5) set alphats - # alpha_dict = { - # layer_name: {"kernel": kalpha, "bias": balpha} - # for layer_name, kalpha, balpha in zip( - # qconfig, kernel_alphas, bias_alphas - # ) - # } - alpha_dict = {} - levels_dict = {} - thresholds_dict = {} - for layer_name, kalpha, balpha, k, b in zip( - qconfig, kernel_alphas, bias_alphas, kvvalues, bvvalues - ): - klevels = k - blevels = b - kthresholds = [-kalpha] + list((k[1:] + k[:-1]) / 2) + [kalpha] - bthresholds = [-balpha] + list((b[1:] + b[:-1]) / 2) + [balpha] - alpha_dict[layer_name] = {"kernel": kalpha, "bias": balpha} - levels_dict[layer_name] = {"kernel": klevels, "bias": blevels} - thresholds_dict[layer_name] = { - "kernel": kthresholds, - "bias": bthresholds, - } - apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict) - - # 6) compare to your implementation - computed_bits = compute_space_complexity_model(qmodel) - self.assertAlmostEqual(computed_bits, expected_bits, places=6) - - def test_uniform_quantizer_space_complexity_single_dense(self): - model = self.model_single_dense - layer_names = [ - "dense", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - self.base_uniform_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_alphas, - bias_alphas, - ) - - def test_uniform_quantizer_space_complexity_single_conv2d(self): - model = self.model_single_conv2d - layer_names = [ - "conv2d", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - self.base_uniform_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_alphas, - bias_alphas, - ) - - def test_uniform_quantizer_space_complexity_lenet(self): - model = self.model_lenet - layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] - kernel_bits = [7, 6, 5, 4, 3] - bias_bits = [3, 4, 5, 6, 7] - kernel_alphas = [1.0] * 5 - bias_alphas = [1.0] * 5 - self.base_uniform_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_single_dense_1(self): - model = self.model_single_dense - layer_names = [ - "dense", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_n_levels = [2] * 1 # TEST: for levels = 2 - bias_n_levels = [2] * 1 # TEST: for levels = 2 - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append( - self.equal_probability_vector(kl) - ) # TEST: equiprobabilities - bias_probabilities.append( - self.equal_probability_vector(bl) - ) # TEST: equiprobabilities - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_single_dense_2(self): - model = self.model_single_dense - layer_names = [ - "dense", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_n_levels = [13] * 1 - bias_n_levels = [8] * 1 - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append( - self.equal_probability_vector(kl) - ) # TEST: equiprobabilities - bias_probabilities.append( - self.equal_probability_vector(bl) - ) # TEST: equiprobabilities - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_single_dense_3(self): - model = self.model_single_dense - layer_names = [ - "dense", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_n_levels = [13] * 1 - bias_n_levels = [8] * 1 - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append(self.increasing_probability_vector(kl)) - bias_probabilities.append(self.increasing_probability_vector(bl)) - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_single_dense_4(self): - model = self.model_single_dense - layer_names = [ - "dense", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_n_levels = [13] * 1 - bias_n_levels = [8] * 1 - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append(self.random_probability_vector(kl)) - bias_probabilities.append(self.random_probability_vector(bl)) - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_single_conv2d_1(self): - model = self.model_single_conv2d - layer_names = [ - "conv2d", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_n_levels = [2] * 1 # TEST: for levels = 2 - bias_n_levels = [2] * 1 # TEST: for levels = 2 - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append( - self.equal_probability_vector(kl) - ) # TEST: equiprobabilities - bias_probabilities.append( - self.equal_probability_vector(bl) - ) # TEST: equiprobabilities - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_single_conv2d_2(self): - model = self.model_single_conv2d - layer_names = [ - "conv2d", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_n_levels = [13] * 1 - bias_n_levels = [8] * 1 - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append( - self.equal_probability_vector(kl) - ) # TEST: equiprobabilities - bias_probabilities.append( - self.equal_probability_vector(bl) - ) # TEST: equiprobabilities - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_single_conv2d_3(self): - model = self.model_single_conv2d - layer_names = [ - "conv2d", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_n_levels = [13] * 1 - bias_n_levels = [8] * 1 - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append(self.increasing_probability_vector(kl)) - bias_probabilities.append(self.increasing_probability_vector(bl)) - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_single_conv2d_4(self): - model = self.model_single_conv2d - layer_names = [ - "conv2d", - ] - kernel_bits = [ - 6, - ] - bias_bits = [ - 4, - ] - kernel_n_levels = [13] * 1 - bias_n_levels = [8] * 1 - kernel_alphas = [1.0] * 1 - bias_alphas = [1.0] * 1 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append(self.random_probability_vector(kl)) - bias_probabilities.append(self.random_probability_vector(bl)) - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_lenet_1(self): - model = self.model_lenet - layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] - kernel_bits = [7, 6, 5, 4, 3] - bias_bits = [3, 4, 5, 6, 7] - kernel_n_levels = [2] * 5 # TEST: for levels = 2 - bias_n_levels = [2] * 5 # TEST: for levels = 2 - kernel_alphas = [1.0] * 5 - bias_alphas = [1.0] * 5 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append( - self.equal_probability_vector(kl) - ) # TEST: equiprobabilities - bias_probabilities.append( - self.equal_probability_vector(bl) - ) # TEST: equiprobabilities - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_lenet_2(self): - model = self.model_lenet - layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] - kernel_bits = [7, 6, 5, 4, 3] - bias_bits = [3, 4, 5, 6, 7] - kernel_n_levels = [25, 12, 13, 5, 2] - bias_n_levels = [3, 7, 15, 7, 14] - kernel_alphas = [1.0] * 5 - bias_alphas = [1.0] * 5 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append( - self.equal_probability_vector(kl) - ) # TEST: equiprobabilities - bias_probabilities.append( - self.equal_probability_vector(bl) - ) # TEST: equiprobabilities - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_lenet_3(self): - model = self.model_lenet - layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] - kernel_bits = [7, 6, 5, 4, 3] - bias_bits = [3, 4, 5, 6, 7] - kernel_n_levels = [25, 12, 13, 5, 2] - bias_n_levels = [3, 7, 15, 7, 14] - kernel_alphas = [1.0] * 5 - bias_alphas = [1.0] * 5 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append( - self.increasing_probability_vector(kl) - ) # TEST: increasing probabilities - bias_probabilities.append( - self.increasing_probability_vector(bl) - ) # TEST: increasing probabilities - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - def test_flex_quantizer_space_complexity_lenet_4(self): - model = self.model_lenet - layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] - kernel_bits = [7, 6, 5, 4, 3] - bias_bits = [3, 4, 5, 6, 7] - kernel_n_levels = [25, 12, 13, 5, 2] - bias_n_levels = [3, 7, 15, 7, 14] - kernel_alphas = [1.0] * 5 - bias_alphas = [1.0] * 5 - kernel_probabilities = [] - bias_probabilities = [] - for kl, bl in zip(kernel_n_levels, bias_n_levels): - kernel_probabilities.append(self.random_probability_vector(kl)) - bias_probabilities.append(self.random_probability_vector(bl)) - self.base_flex_quantizer_space_complexity( - model, - layer_names, - kernel_bits, - bias_bits, - kernel_n_levels, - bias_n_levels, - kernel_probabilities, - bias_probabilities, - kernel_alphas, - bias_alphas, - ) - - if __name__ == "__main__": unittest.main() diff --git a/src/utils/utils.py b/src/utils/plot.py similarity index 100% rename from src/utils/utils.py rename to src/utils/plot.py