diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index ab5c28c..02c2f48 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -22,7 +22,6 @@ repos: hooks: - id: isort name: isort (python) - args: ["--profile", "black"] - repo: https://github.com/myint/docformatter @@ -35,4 +34,3 @@ repos: rev: 25.1.0 hooks: - id: black - args: ["--line-length", "79"] diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..514f2cf --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,7 @@ +# pyproject.toml +[tool.black] +line-length = 79 + +[tool.isort] +profile = "black" +line_length = 79 diff --git a/src/configs/generate_config.py b/src/configs/generate_config.py index e6f98b2..de7315f 100644 --- a/src/configs/generate_config.py +++ b/src/configs/generate_config.py @@ -79,7 +79,6 @@ def get_activations_and_quantizers( activations_and_quantizers.append( (get_nested_attribute(layer, activation_attribute), quantizer) ) - print(f"AQ: {activations_and_quantizers}") return activations_and_quantizers def set_quantize_activations( @@ -88,7 +87,6 @@ def set_quantize_activations( for attribute, quantized_activation in zip( self.activations.keys(), quantize_activations ): - print(f"SA: {attribute} {quantized_activation}") set_nested_attribute(layer, attribute, quantized_activation) def get_output_quantizers(self, layer): diff --git a/src/examples/data_analysis/generate_plots.py b/src/examples/data_analysis/generate_plots.py index 07692c5..59d1817 100755 --- a/src/examples/data_analysis/generate_plots.py +++ b/src/examples/data_analysis/generate_plots.py @@ -1,4 +1,4 @@ -#!/usr/bin/python3 +#!/usr/bin/env python3 import argparse diff --git a/src/examples/mnist.py b/src/examples/mnist.py index 3695336..135c3f5 100755 --- a/src/examples/mnist.py +++ b/src/examples/mnist.py @@ -12,7 +12,7 @@ from configs.qmodel import apply_quantization from quantizers.flex_quantizer import FlexQuantizer from quantizers.uniform_quantizer import UniformQuantizer -from utils.utils import VariableHistoryCallback, plot_snapshot +from utils.plot import VariableHistoryCallback, plot_snapshot def generate_dataset(): diff --git a/src/examples/models/mlp.py b/src/examples/models/mlp.py index 908c306..00f6750 100644 --- a/src/examples/models/mlp.py +++ b/src/examples/models/mlp.py @@ -17,4 +17,14 @@ } } -qconfigs = {"qconfig": simple_qconfig} +uniform_qconfig = { + "hidden": { + "weights": {"kernel": UniformQuantizer(bits=4, signed=True)}, + "activations": {"activation": UniformQuantizer(bits=4, signed=False)}, + } +} + +qconfigs = { + "simple": simple_qconfig, + "uniform": uniform_qconfig, +} diff --git a/src/examples/run.py b/src/examples/run.py index a2bc433..93f18f8 100755 --- a/src/examples/run.py +++ b/src/examples/run.py @@ -9,6 +9,7 @@ from tensorflow.keras.optimizers import Adam from configs.qmodel import apply_quantization +from utils.metrics import compute_space_complexity_model def main(args): @@ -49,9 +50,6 @@ def main(args): loss="categorical_crossentropy", metrics=["accuracy"], ) - print(qmodel.summary()) - print(f"qweights: {[w.name for w in qmodel.layers[1].weights]}") - # print(f"qactivations: {[w.name for w in qmodel.layers[1].weights]}") callback_tuples = [ (CaptureWeightCallback(qlayer), qconfig[layer.name]) @@ -69,6 +67,14 @@ def main(args): callbacks=[callback for callback, _ in callback_tuples], ) + qmodel(next(iter(test_dataset))[0]) + space_complexity = compute_space_complexity_model(qmodel) + print(f"Space complexity: {space_complexity / 8 * 1/1024} kB") + original_space_complexity = compute_space_complexity_model(model) + print( + f"Original space complexity: {original_space_complexity / 8 * 1/1024} kB" + ) + output_dict = {} output_dict["global"] = hist.history for callback, qconfig in callback_tuples: diff --git a/src/quantizers/flex_quantizer.py b/src/quantizers/flex_quantizer.py index 7763c73..ea2883b 100644 --- a/src/quantizers/flex_quantizer.py +++ b/src/quantizers/flex_quantizer.py @@ -31,6 +31,7 @@ def __init__( bits: int, n_levels: int, signed: bool = True, + name_suffix: str = "", ): """Constructor. @@ -55,10 +56,12 @@ def __init__( self.levels = None # possible output values self.thresholds = None # boundaries between levels + self.name_suffix = name_suffix + def build(self, tensor_shape, name: str, layer: tf.keras.layers.Layer): alpha = layer.add_weight( - "alpha", + name=f"{name}{self.name_suffix}_alpha", initializer=tf.keras.initializers.Constant(0.1), trainable=True, dtype=tf.float32, @@ -68,7 +71,7 @@ def build(self, tensor_shape, name: str, layer: tf.keras.layers.Layer): self.alpha = alpha levels = layer.add_weight( - "levels", + name=f"{name}{self.name_suffix}_levels", initializer=tf.keras.initializers.Constant( np.linspace( min_value(self.alpha, self.signed), @@ -84,7 +87,7 @@ def build(self, tensor_shape, name: str, layer: tf.keras.layers.Layer): self.levels = levels thresholds = layer.add_weight( - "thresholds", + name=f"{name}{self.name_suffix}_thresholds", initializer=tf.keras.initializers.Constant( np.linspace( min_value(self.alpha, self.signed), @@ -112,15 +115,18 @@ def range(self): def delta(self): return self.range() / self.m_levels - @tf.custom_gradient - def quantize(self, x, alpha, levels, thresholds): - # Capture the values of the parameters - self.alpha = alpha - self.levels = levels - self.thresholds = thresholds - + def quantize_op(self, x): # Quantize levels (uniform quantization) qlevels = self.delta() * tf.math.floor(self.levels / self.delta()) + # TODO(Colo): I think we can replace + # `qlevels = self.delta() * tf.math.floor(self.levels / self.delta())` + # with + # `qlevels = self.qlevels` + # and compute + # `self.qlevels = self.delta() * tf.math.floor(self.levels / self.delta())` + # before + # `q = self.quantize_op(x)` + # in the `quantize` function. # Quantize input q = tf.zeros_like(x) @@ -134,6 +140,19 @@ def quantize(self, x, alpha, levels, thresholds): q, ) + return q + + @tf.custom_gradient + def quantize(self, x, alpha, levels, thresholds): + # Capture the values of the parameters + self.alpha = alpha + self.levels = levels + self.thresholds = thresholds + + q = self.quantize_op(x) + + qlevels = self.delta() * tf.math.floor(self.levels / self.delta()) + def grad(upstream): ##### dq_dx uses STE ##### dq_dx = tf.where( diff --git a/src/quantizers/uniform_quantizer.py b/src/quantizers/uniform_quantizer.py index 5ae73c0..1467119 100755 --- a/src/quantizers/uniform_quantizer.py +++ b/src/quantizers/uniform_quantizer.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """This module implements a uniform quantizer for quantizing weights and activations.""" @@ -12,6 +12,8 @@ _QuantizeHelper, ) +from quantizers.common import delta, max_value, min_value, span + class UniformQuantizer(_QuantizeHelper, Quantizer): """An uniform quantizer algorithm support both signed and unsigned @@ -65,30 +67,45 @@ def __call__(self, w): return tf.clip_by_value(w, tf.keras.backend.epsilon(), np.inf) alpha = layer.add_weight( - name.join("_alpha"), + name=f"{name}{self.name_suffix}_alpha", initializer=self.initializer, trainable=True, dtype=tf.float32, regularizer=self.regularizer, constraint=PositiveConstraint(), ) + levels = layer.add_weight( + name=f"{name}{self.name_suffix}_levels", + trainable=False, + shape=(self.m_levels,), + dtype=tf.float32, + ) self.alpha = alpha - return {"alpha": alpha} + self.levels = levels + + return {"alpha": alpha, "levels": levels} def __call__(self, inputs, training, weights, **kwargs): return self.quantize(inputs, weights["alpha"]) def range(self): - return 2 * self.alpha if self.signed else self.alpha + return span(self.alpha, self.signed) def delta(self): - return self.range() / self.m_levels + return delta(self.alpha, self.m_levels, self.signed) - def levels(self): + def compute_levels(self): """Compute the quantization levels.""" - start = -self.alpha if self.signed else 0 + start = min_value(self.alpha, self.signed) return tf.range(start, start + self.range(), self.delta()) + def quantize_op(self, x): + clipped_x = tf.clip_by_value(x, self.levels[0], self.levels[-1]) + delta_v = ( + 2 * self.alpha if self.signed else self.alpha + ) / self.m_levels + return delta_v * tf.math.floor(clipped_x / delta_v) + @tf.custom_gradient def quantize(self, x, alpha): """Uniform quantization. @@ -97,25 +114,22 @@ def quantize(self, x, alpha): :param alpha: alpha parameter :returns: quantized input tensor """ - # Capture alpha + # Store alpha for other methods to use self.alpha = alpha - # Compute quantization levels - levels = self.levels() - - # Clip input values between min and max levels (function is zero outside the range) - clipped_x = tf.clip_by_value(x, levels[0], levels[-1]) + self.levels = self.compute_levels() - # Quantize input values - q = self.delta() * tf.math.floor(clipped_x / self.delta()) + # Use direct parameter passing to avoid graph scope issues + q = self.quantize_op(x) def grad(upstream): # Gradient only flows through if the input is within range - ## Use STE to estimate the gradient dq_dx = tf.where( tf.logical_and( - tf.greater_equal(x, levels[0]), - tf.less_equal(x, levels[-1]), + tf.greater_equal(x, min_value(alpha, self.signed)), + tf.less_equal( + x, max_value(alpha, self.m_levels, self.signed) + ), ), upstream, tf.zeros_like(x), diff --git a/src/quantizers/uniform_quantizer_test.py b/src/quantizers/uniform_quantizer_test.py index f6cb5f6..3527bf7 100755 --- a/src/quantizers/uniform_quantizer_test.py +++ b/src/quantizers/uniform_quantizer_test.py @@ -52,7 +52,9 @@ def test_can_build_weights(self): name_suffix="_test", ) weights = quantizer.build(self.input_shape, "test", self.mock_layer) - self.assertDictEqual(weights, {"alpha": weights["alpha"]}) + self.assertDictEqual( + weights, {"alpha": weights["alpha"], "levels": weights["levels"]} + ) # TODO(Fran): Consider using a fixture here? def assert_weights_within_limits(self, bits, signed): @@ -71,7 +73,7 @@ def assert_weights_within_limits(self, bits, signed): output = quantizer(self.input_tensor, training=True, weights=weights) # Check that all output values are within the range determined by alpha - quantizer_levels = quantizer.levels() + quantizer_levels = quantizer.compute_levels() min = quantizer_levels[0] max = quantizer_levels[-1] @@ -142,9 +144,9 @@ def test_expected_levels(self): quantizer.build(self.input_shape, "test", self.mock_layer) - levels = quantizer.levels() + levels = quantizer.compute_levels() expected_n_levels = 2**3 - self.assertEqual(len(levels), expected_n_levels) + self.assertEqual(levels.shape.num_elements(), expected_n_levels) expected_levels = [-1.0, -0.75, -0.5, -0.25, 0.0, 0.25, 0.5, 0.75] self.assertListEqual(list(levels), expected_levels) @@ -161,7 +163,7 @@ def test_quantizer_levels_getitem(self): quantizer.build(self.input_shape, "test", self.mock_layer) - levels = quantizer.levels() + levels = quantizer.compute_levels() self.assertEqual(levels[0], -1.0) self.assertEqual(levels[2], -0.5) self.assertEqual(levels[7], 0.75) @@ -192,7 +194,7 @@ def test_expected_levels_reflects_in_output_signed(self): # Call the quantizer output = quantizer(self.input_tensor, training=True, weights=weights) output_set = sorted(set(output.numpy().flatten())) - expected_set = list(quantizer.levels()) + expected_set = list(quantizer.compute_levels()) self.assertListEqual(output_set, expected_set) @@ -221,7 +223,7 @@ def test_expected_levels_reflects_in_output_unsigned(self): # Call the quantizer output = quantizer(self.input_tensor, training=True, weights=weights) output_set = sorted(set(output.numpy().flatten())) - expected_set = list(quantizer.levels()) + expected_set = list(quantizer.compute_levels()) self.assertListEqual(output_set, expected_set) diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/huffman.py b/src/utils/huffman.py new file mode 100644 index 0000000..053d54a --- /dev/null +++ b/src/utils/huffman.py @@ -0,0 +1,14 @@ +from collections import Counter + +import numpy as np + + +def compute_huffman_nominal_complexity(qweights): + """Compute the nominal complexity of a huffman codification of the + quantized weights.""" + N = qweights.shape.num_elements() + counter = Counter(qweights.numpy().flatten()) + total = sum(counter.values()) + probabilities = np.array([freq / total for freq in counter.values()]) + entropy = -np.sum(probabilities * np.log2(probabilities)) + return N * entropy diff --git a/src/utils/huffman_test.py b/src/utils/huffman_test.py new file mode 100755 index 0000000..486b0d5 --- /dev/null +++ b/src/utils/huffman_test.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3 + +import unittest + +import numpy as np +import tensorflow as tf + +from utils.huffman import compute_huffman_nominal_complexity + + +class TestComputeHuffmanNominalComplexity(unittest.TestCase): + """Unit tests for `compute_huffman_nominal_complexity`.""" + + def test_all_identical(self): + """Entropy should be zero when every symbol is the same.""" + q = tf.constant([7, 7, 7, 7], dtype=tf.int32) + expected = 0.0 # N=4 => 0 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_balanced_two_symbols(self): + """50 % / 50 % distribution => entropy = 1 bit per symbol.""" + q = tf.constant([0, 1, 0, 1], dtype=tf.int32) + expected = 4.0 # N=4 => 4 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_three_to_one_ratio(self): + """75 % / 25 % distribution => entropy ~0.811278 bits per symbol.""" + q = tf.constant([0, 0, 0, 1], dtype=tf.int32) + entropy = -( + 0.75 * np.log2(0.75) + 0.25 * np.log2(0.25) + ) # ~0.811278 bits + expected = 4 * entropy # N=4 => ~3.2451 bits + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + def test_larger_vector_distribution(self): + """100 elements: 50*0, 30*1, 20*2 =>""" + # build the vector + vals = [0] * 50 + [1] * 30 + [2] * 20 + q = tf.constant(vals, dtype=tf.int32) + + # compute expected: N=100, p0=0.5, p1=0.3, p2=0.2 + ps = np.array([0.5, 0.3, 0.2]) + entropy = -np.sum(ps * np.log2(ps)) # ~1.485475 + expected = 100 * entropy # N=100 => ~148.5475 + + self.assertAlmostEqual( + compute_huffman_nominal_complexity(q), expected, places=6 + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/utils/metrics.py b/src/utils/metrics.py new file mode 100644 index 0000000..99816af --- /dev/null +++ b/src/utils/metrics.py @@ -0,0 +1,73 @@ +import tensorflow as tf +from tensorflow_model_optimization.python.core.quantization.keras.quantize_layer import ( + QuantizeLayer, +) +from tensorflow_model_optimization.python.core.quantization.keras.quantize_wrapper import ( + QuantizeWrapperV2, +) + +from quantizers.flex_quantizer import FlexQuantizer +from quantizers.uniform_quantizer import UniformQuantizer +from utils.huffman import compute_huffman_nominal_complexity + + +def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float: + """Compute the uniform space complexity of a layer based on its + quantization configuration. + + returns: + The space complexity of the layer in bits. + """ + if not isinstance(qlayer, QuantizeWrapperV2): + raise ValueError("Layer is not a QuantizeWrapperV2") + + total_layer_size = 0.0 + qconfig = qlayer.quantize_config + + # Assumption: order is the same for layer.weights and get_weights_and_quantizers + weights_and_quantizers = qconfig.get_weights_and_quantizers(qlayer.layer) + weights = qlayer.weights[: len(weights_and_quantizers)] + + for weight, weight_and_quantizer in zip(weights, weights_and_quantizers): + quantizer = weight_and_quantizer[1] + if isinstance(quantizer, UniformQuantizer): + weight_size = weight.shape.num_elements() * quantizer.bits + elif isinstance(quantizer, FlexQuantizer): + qweight = quantizer.quantize_op(weight) + weight_size = compute_huffman_nominal_complexity(qweight) + weight_size += quantizer.n_levels * quantizer.bits + else: + raise ValueError(f"Unknown quantizer type: {type(quantizer)}") + total_layer_size += weight_size + + return total_layer_size + + +def compute_space_complexity(layer): + """Compute the space complexity for a normal layer.""" + total_layer_size = 0 + for weight in layer.weights: + weight_size = ( + 8 * weight.dtype.size * weight.shape.num_elements() + ) # bits + total_layer_size += weight_size + + return total_layer_size + + +def compute_space_complexity_model(model: tf.keras.Model) -> float: + """Compute the uniform space complexity of a model based on its + quantization configuration.""" + total_space_complexity = 0 + + for layer in model.layers: + if isinstance(layer, QuantizeWrapperV2): + layer_size = compute_space_complexity_quantize(layer) + elif isinstance(layer, QuantizeLayer): + # Verify if there's no layer we need to keep of this type. + continue + else: + layer_size = compute_space_complexity(layer) + total_space_complexity += layer_size + + return total_space_complexity diff --git a/src/utils/metrics_lenet_test.py b/src/utils/metrics_lenet_test.py new file mode 100755 index 0000000..37cb1cf --- /dev/null +++ b/src/utils/metrics_lenet_test.py @@ -0,0 +1,888 @@ +#!/usr/bin/env python3 + +import unittest +from collections import Counter + +import numpy as np +import tensorflow as tf +from tensorflow.keras import layers, models + +from configs.qmodel import apply_quantization +from quantizers.flex_quantizer import FlexQuantizer +from quantizers.uniform_quantizer import UniformQuantizer +from utils.metrics import compute_space_complexity_model + + +def apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict): + """TODO(Colo): This function will is implemented in branch + colo/model_evalution in QTensor/src/examples/functions.py. + + When merged, import that functions insted of redefining it here. + """ + for layer in qmodel.layers: + orig_layer_name = layer.name + if orig_layer_name.startswith("quant_"): + orig_layer_name = orig_layer_name[len("quant_") :] + + if orig_layer_name in alpha_dict: + for alpha_type in ["kernel", "bias", "activation"]: + new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) + new_levels = levels_dict[orig_layer_name].get(alpha_type, None) + new_thresholds = thresholds_dict[orig_layer_name].get( + alpha_type, None + ) + if new_alpha is not None: + for v in layer.weights: + if "alpha" in v.name and alpha_type in v.name: + v.assign(new_alpha) + # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") + elif ( + alpha_type == "activation" + and "post_activation" in v.name + and "alpha" in v.name + ): + v.assign(new_alpha) + # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") + if "levels" in v.name and alpha_type in v.name: + v.assign(new_levels) + # print(f"Updated {v.name} ({alpha_type}) with new levels value {new_levels}") + if "thresholds" in v.name and alpha_type in v.name: + v.assign(new_thresholds) + # print(f"Updated {v.name} ({alpha_type}) with new thresholds value {new_thresholds}") + + return qmodel + + +def apply_alpha_dict(qmodel, alpha_dict): + """TODO(Colo): This function will is implemented in branch + colo/model_evalution in QTensor/src/examples/functions.py. + + When merged, import that functions insted of redefining it here. + """ + for layer in qmodel.layers: + orig_layer_name = layer.name + if orig_layer_name.startswith("quant_"): + orig_layer_name = orig_layer_name[len("quant_") :] + + if orig_layer_name in alpha_dict: + for alpha_type in ["kernel", "bias", "activation"]: + new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None) + if new_alpha is not None: + for v in layer.weights: + if "alpha" in v.name and alpha_type in v.name: + v.assign(new_alpha) + # print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}") + elif ( + alpha_type == "activation" + and "post_activation" in v.name + and "alpha" in v.name + ): + v.assign(new_alpha) + # print(f"Updated {v.name} (activation) with new alpha value {new_alpha}") + + return qmodel + + +def create_lenet_model(categories): + model = models.Sequential( + [ + layers.Conv2D( + 6, + kernel_size=5, + activation="relu", + padding="same", + name="conv2d", + ), + layers.AveragePooling2D(name="pool1"), + layers.Conv2D( + 16, kernel_size=5, activation="relu", name="conv2d_1" + ), + layers.AveragePooling2D(name="pool2"), + layers.Flatten(name="flatten"), + layers.Dense(120, activation="relu", name="dense"), + layers.Dense(84, activation="relu", name="dense_1"), + layers.Dense(categories, activation="softmax", name="dense_2"), + ] + ) + return model + + +def create_single_conv2d_model(): + model = models.Sequential( + [ + layers.Conv2D( + 32, + kernel_size=5, + activation="relu", + padding="same", + name="conv2d", + ), + ] + ) + return model + + +class TestLeNetQuantizedComplexity(unittest.TestCase): + def setUp(self): + # build a small LeNet-5 for, say, 10 classes over 28×28×1 inputs + categories = 10 + input_shape = (None, 28, 28, 1) + self.model_lenet = create_lenet_model(categories) + self.model_lenet.build(input_shape) + + input_shape = (None, 28, 28, 1) + self.model_single_conv2d = create_single_conv2d_model() + self.model_single_conv2d.build(input_shape) + + input_shape = (None, 10) + self.model_single_dense = models.Sequential( + [ + layers.Dense(20, activation="relu", name="dense"), + ] + ) + self.model_single_dense.build(input_shape) + + def setup_model(self, model): + self.model = model + self.model.compile( + loss="categorical_crossentropy", metrics=["accuracy"] + ) + # run one dummy inference so any lazy weights are created + input_shape = self.model.input_shape + input_shape = (1,) + input_shape[1:] + self.model(tf.random.normal(input_shape)) + self.kernel_shape = list() + self.bias_shape = list() + self.kernel_size = list() + self.bias_size = list() + for layer in self.model.layers: + if hasattr(layer, "kernel"): + shape = layer.kernel.shape + self.kernel_shape.append(shape) + self.kernel_size.append(shape.num_elements()) + if hasattr(layer, "bias"): + shape = layer.bias.shape + self.bias_shape.append(shape) + self.bias_size.append(shape.num_elements()) + # self.model.summary() + + def gen_qconfig( + self, + qtype, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels=None, + bias_n_levels=None, + ): + qconfig = {} + for i, layer in enumerate(layer_names): + qconfig[layer] = dict() + qconfig[layer]["weights"] = dict() + if qtype == "uniform": + # 1) define an 8‐bit uniform quantizer on every kernel + for i, layer in enumerate(layer_names): + qconfig[layer]["weights"]["kernel"] = UniformQuantizer( + bits=kernel_bits[i], signed=True + ) + qconfig[layer]["weights"]["bias"] = UniformQuantizer( + bits=bias_bits[i], signed=True + ) + + elif qtype == "flexible": + # 1) build a qconfig where every layer's kernel & bias uses a FlexQuantizer + for i, layer in enumerate(layer_names): + qconfig[layer]["weights"]["kernel"] = FlexQuantizer( + bits=kernel_bits[i], + n_levels=kernel_n_levels[i], + signed=True, + ) + qconfig[layer]["weights"]["bias"] = FlexQuantizer( + bits=bias_bits[i], n_levels=bias_n_levels[i], signed=True + ) + + else: + raise ValueError(f"Invalid qtype ({qtype})") + + # DEBUG: Print qconfig + # for key in qconfig: + # print(f'{key}: {qconfig[key]}') + return qconfig + + def random_probability_vector(self, n, epsilon=1e-8): + vec = np.random.rand(n) + epsilon + return vec / vec.sum() + + def equal_probability_vector(self, n): + vec = np.ones(n) + return vec / vec.sum() + + def increasing_probability_vector(self, n): + vec = np.arange(1, n + 1) + return vec / vec.sum() + + def base_uniform_quantizer_space_complexity( + self, + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ): + """All weights quantized using uniform quantizer.""" + + self.setup_model(model) + + qconfig = self.gen_qconfig( + "uniform", layer_names, kernel_bits, bias_bits + ) + + # 4) compute expected size + expected_bits = 0 + for kb, ks, bb, bs in zip( + kernel_bits, self.kernel_size, bias_bits, self.bias_size + ): + expected_bits += kb * ks + bb * bs + + # 2) apply quantization and build + input_shape = self.model.input_shape + qmodel = apply_quantization(self.model, qconfig) + qmodel.build(input_shape) + input_shape = (1,) + input_shape[1:] + qmodel(tf.random.normal(input_shape)) + + # 3) set alphas + alpha_dict = { + layer_name: {"kernel": kalpha, "bias": balpha} + for layer_name, kalpha, balpha in zip( + qconfig, kernel_alphas, bias_alphas + ) + } + apply_alpha_dict(qmodel, alpha_dict) + + # 5) Check result + computed_bits = compute_space_complexity_model(qmodel) + self.assertEqual(computed_bits, expected_bits) + + def base_flex_quantizer_space_complexity( + self, + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ): + """All weights quantized using flexible quantizer.""" + + self.setup_model(model) + + qconfig = self.gen_qconfig( + "flexible", + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + ) + + # 2) compute expected total bits + expected_bits = 0.0 + kernels = [] + biases = [] + kvvalues = [] + bvvalues = [] + # pack both “kernel” and “bias” data into a single list of groups + groups = [ + ( + self.kernel_shape, + self.kernel_size, + kernel_bits, + kernel_n_levels, + kernel_alphas, + kernel_probabilities, + kernels, + kvvalues, + ), + ( + self.bias_shape, + self.bias_size, + bias_bits, + bias_n_levels, + bias_alphas, + bias_probabilities, + biases, + bvvalues, + ), + ] + for ( + shape_list, + size_list, + bits_list, + levels_list, + alphas_list, + probs_list, + container, + vvalues, + ) in groups: + for shape, size, bits, n_levels, alpha, probs in zip( + shape_list, + size_list, + bits_list, + levels_list, + alphas_list, + probs_list, + ): + # 1) build the set of valid quantized values + valid_values = np.linspace(-alpha, alpha, num=2**bits + 1)[:-1] + + # 2) pick exactly `n_levels` of them, then sample your weight‐vector + values = np.sort( + np.random.choice( + valid_values, size=n_levels, replace=False + ) + ) + vvalues.append(values) + vector = np.random.choice( + values, size=size, replace=True, p=probs + ) + weight = vector.reshape(shape) + + # store the weight‐tensor + container.append(weight) + + # 3) recompute empirical probabilities from the sampled weights + counter = Counter(weight.flatten()) + sorted_items = sorted(counter.items()) + counter_keys, counter_values = zip(*sorted_items) + emp_probs = np.array(counter_values) / sum(counter_values) + + # 4) entropy and Huffman bits + entropy = -np.sum(emp_probs * np.log2(emp_probs)) + expected_bits += size * entropy + expected_bits += n_levels * bits + + # 5) sanity checks + # a) all entries are in the valid set + mask = np.isin(weight, valid_values) + assert np.all( + mask + ), f"These values are not valid: {weight[~mask]}" + # b) no more unique levels than n_levels + unique_vals = np.unique(weight) + assert unique_vals.size <= n_levels, ( + f"Expected <= {n_levels} unique values, but found " + f"{unique_vals.size}: {unique_vals}" + ) + + # 3) Set weights to the model + weights = list() + for k, b in zip(kernels, biases): + weights.append(k) + weights.append(b) + self.model.set_weights(weights) + + # 4) apply quantization & init everything + input_shape = self.model.input_shape + qmodel = apply_quantization(self.model, qconfig) + qmodel.build(input_shape) + input_shape = (1,) + input_shape[1:] + qmodel(tf.random.normal(input_shape)) + + # 5) set alphats + # alpha_dict = { + # layer_name: {"kernel": kalpha, "bias": balpha} + # for layer_name, kalpha, balpha in zip( + # qconfig, kernel_alphas, bias_alphas + # ) + # } + alpha_dict = {} + levels_dict = {} + thresholds_dict = {} + for layer_name, kalpha, balpha, k, b in zip( + qconfig, kernel_alphas, bias_alphas, kvvalues, bvvalues + ): + klevels = k + blevels = b + kthresholds = [-kalpha] + list((k[1:] + k[:-1]) / 2) + [kalpha] + bthresholds = [-balpha] + list((b[1:] + b[:-1]) / 2) + [balpha] + alpha_dict[layer_name] = {"kernel": kalpha, "bias": balpha} + levels_dict[layer_name] = {"kernel": klevels, "bias": blevels} + thresholds_dict[layer_name] = { + "kernel": kthresholds, + "bias": bthresholds, + } + apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict) + + # 6) compare to your implementation + computed_bits = compute_space_complexity_model(qmodel) + self.assertAlmostEqual(computed_bits, expected_bits, places=6) + + def test_uniform_quantizer_space_complexity_single_dense(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_uniform_quantizer_space_complexity_single_conv2d(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_uniform_quantizer_space_complexity_lenet(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + self.base_uniform_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_1(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [2] * 1 # TEST: for levels = 2 + bias_n_levels = [2] * 1 # TEST: for levels = 2 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_2(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_3(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.increasing_probability_vector(kl)) + bias_probabilities.append(self.increasing_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_dense_4(self): + model = self.model_single_dense + layer_names = [ + "dense", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_1(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [2] * 1 # TEST: for levels = 2 + bias_n_levels = [2] * 1 # TEST: for levels = 2 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_2(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_3(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.increasing_probability_vector(kl)) + bias_probabilities.append(self.increasing_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_single_conv2d_4(self): + model = self.model_single_conv2d + layer_names = [ + "conv2d", + ] + kernel_bits = [ + 6, + ] + bias_bits = [ + 4, + ] + kernel_n_levels = [13] * 1 + bias_n_levels = [8] * 1 + kernel_alphas = [1.0] * 1 + bias_alphas = [1.0] * 1 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_1(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [2] * 5 # TEST: for levels = 2 + bias_n_levels = [2] * 5 # TEST: for levels = 2 + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_2(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.equal_probability_vector(kl) + ) # TEST: equiprobabilities + bias_probabilities.append( + self.equal_probability_vector(bl) + ) # TEST: equiprobabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_3(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append( + self.increasing_probability_vector(kl) + ) # TEST: increasing probabilities + bias_probabilities.append( + self.increasing_probability_vector(bl) + ) # TEST: increasing probabilities + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + def test_flex_quantizer_space_complexity_lenet_4(self): + model = self.model_lenet + layer_names = ["conv2d", "conv2d_1", "dense", "dense_1", "dense_2"] + kernel_bits = [7, 6, 5, 4, 3] + bias_bits = [3, 4, 5, 6, 7] + kernel_n_levels = [25, 12, 13, 5, 2] + bias_n_levels = [3, 7, 15, 7, 14] + kernel_alphas = [1.0] * 5 + bias_alphas = [1.0] * 5 + kernel_probabilities = [] + bias_probabilities = [] + for kl, bl in zip(kernel_n_levels, bias_n_levels): + kernel_probabilities.append(self.random_probability_vector(kl)) + bias_probabilities.append(self.random_probability_vector(bl)) + self.base_flex_quantizer_space_complexity( + model, + layer_names, + kernel_bits, + bias_bits, + kernel_n_levels, + bias_n_levels, + kernel_probabilities, + bias_probabilities, + kernel_alphas, + bias_alphas, + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/utils/metrics_test.py b/src/utils/metrics_test.py new file mode 100755 index 0000000..760be67 --- /dev/null +++ b/src/utils/metrics_test.py @@ -0,0 +1,113 @@ +#!/usr/bin/env python3 + +import unittest + +import tensorflow as tf + +from configs.qmodel import apply_quantization +from quantizers.uniform_quantizer import UniformQuantizer +from utils.metrics import ( + compute_space_complexity_model, + compute_space_complexity_quantize, +) + + +# From tensorflow internal code +def _compute_memory_size(weight): + weight_counts = weight.shape.num_elements() + per_param_size = weight.dtype.size + return weight_counts * per_param_size + + +def weight_memory_size(weights): + """Compute the memory footprint for weights based on their dtypes. + + Args: + weights: An iterable contains the weights to compute weight size. + + Returns: + The total memory size (in bits) of the weights. + """ + unique_weights = {id(w): w for w in weights}.values() + total_memory_size = 0 + for w in unique_weights: + total_memory_size += _compute_memory_size(w) + return total_memory_size + + +class TestMetrics(unittest.TestCase): + def test_compute_space_complexity_uniform_only(self): + """Verify that for a uniform configuration a layer size is as + expected.""" + layer = tf.keras.layers.Dense(10, input_shape=(5,), name="dense_1") + layer.build((None, 5)) # Build the layer to initialize weights + qconfig = { + "dense_1": { + "weights": { + "kernel": UniformQuantizer(bits=4, signed=True), + "bias": UniformQuantizer(bits=4, signed=True), + }, + }, + } + model = tf.keras.Sequential([layer]) + + qmodel = apply_quantization(model, qconfig) + qmodel.build((None, 5)) + # Run an inference to have access to the variables. + qmodel(tf.random.normal((1, 5))) + # Compute quantized size + quantized_size = compute_space_complexity_quantize(qmodel.layers[1]) + + kernel_expected_size = ( + layer.kernel.shape.num_elements() * 4 + ) # 4 bits for kernel + bias_expected_size = layer.bias.shape.num_elements() * 4 + expected_size = kernel_expected_size + bias_expected_size + + self.assertEqual(quantized_size, expected_size) + + def test_compute_non_quantized_model(self): + """Verify that computing the size of the model.""" + layer = tf.keras.layers.Dense(30, input_shape=(5,), name="dense_1") + layer.build((None, 5)) + model = tf.keras.Sequential([layer]) + + size = compute_space_complexity_model(model) / 8 # To bytes + size_according_to_tensorflow = weight_memory_size(model.weights) + self.assertEqual(size, size_according_to_tensorflow) + + def test_compare(self): + def test_verify_proportional_to_base_size(bits): + layer = tf.keras.layers.Dense(10, input_shape=(5,), name="dense_1") + layer.build((None, 5)) # Build the layer to initialize weights + qconfig = { + "dense_1": { + "weights": { + "kernel": UniformQuantizer(bits=bits, signed=True), + "bias": UniformQuantizer(bits=bits, signed=True), + }, + }, + } + model = tf.keras.Sequential([layer]) + model.build((None, 5)) + model(tf.random.normal((1, 5))) + + qmodel = apply_quantization(model, qconfig) + qmodel.build((None, 5)) + qmodel(tf.random.normal((1, 5))) + non_quantized_size = compute_space_complexity_model(model) + quantized_size = compute_space_complexity_model(qmodel) + + # We expect weights size proportionally smaller + size_scale_factor = bits / 32 + self.assertEqual( + quantized_size, non_quantized_size * size_scale_factor + ) + + for bits in [2, 4, 6, 8, 10, 12, 16]: + with self.subTest(val=bits): + test_verify_proportional_to_base_size(bits) + + +if __name__ == "__main__": + unittest.main() diff --git a/src/utils/utils.py b/src/utils/plot.py similarity index 100% rename from src/utils/utils.py rename to src/utils/plot.py