Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions src/utils/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@ def compute_space_complexity_quantize(qlayer: QuantizeWrapperV2) -> float:
total_layer_size = 0.0
qconfig = qlayer.quantize_config

# Assumption: order is the same for layer.weights and get_weights_and_quantizers
weights_and_quantizers = qconfig.get_weights_and_quantizers(qlayer.layer)
weights = qlayer.weights[: len(weights_and_quantizers)]

for weight, weight_and_quantizer in zip(weights, weights_and_quantizers):
quantizer = weight_and_quantizer[1]
for weight, quantizer in weights_and_quantizers:
if isinstance(quantizer, UniformQuantizer):
weight_size = weight.shape.num_elements() * quantizer.bits
elif isinstance(quantizer, FlexQuantizer):
Expand Down
80 changes: 80 additions & 0 deletions src/utils/metrics_integration_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env python3
# Integration of saving models and metrics

import tempfile
import unittest

import numpy as np
import tensorflow as tf

from configs.qmodel import apply_quantization
from configs.serialization.serialization import load_qmodel, save_qmodel
from quantizers.uniform_quantizer import UniformQuantizer
from utils.metrics import compute_space_complexity_model


class TestIntegrationMetricsSerialization(unittest.TestCase):
def test_save_and_load_model(self):
"""Test saving and loading a model with metrics."""
model = tf.keras.Sequential(
[
tf.keras.layers.Dense(10, input_shape=(5,), name="dense_1"),
tf.keras.layers.Dense(5, name="dense_2"),
]
)

qconfig = {
"dense_1": {
"weights": {
"kernel": UniformQuantizer(bits=4, signed=True),
"bias": UniformQuantizer(bits=4, signed=True),
},
},
"dense_2": {
"weights": {
"kernel": UniformQuantizer(bits=4, signed=True),
"bias": UniformQuantizer(bits=4, signed=True),
},
},
}
qmodel = apply_quantization(model, qconfig)
qmodel.build((None, 5))

tmpdir = tempfile.mkdtemp()
save_qmodel(qmodel, tmpdir)
loaded_model = load_qmodel(tmpdir)
# make an inference to ensure the model is loaded correctly
loaded_model(tf.random.normal((1, 5)))

original_weights = {w.name: w.numpy() for w in qmodel.weights}
loaded_weights = {w.name: w.numpy() for w in loaded_model.weights}

# First, check that the set of weight names is identical
self.assertEqual(
set(original_weights.keys()),
set(loaded_weights.keys()),
"Models have different sets of weight names.",
)

# Now, compare each weight tensor by name
for name, orig_w in original_weights.items():
loaded_w = loaded_weights[name]
# print(f"Comparing weight tensor: {name}")
# print(f"Weights: {orig_w}")
# print(f"Loaded: {loaded_w}")
np.testing.assert_allclose(
orig_w,
loaded_w,
rtol=1e-6,
atol=1e-6,
err_msg=f"Weight tensor '{name}' differs.",
)

self.assertEqual(
compute_space_complexity_model(qmodel),
compute_space_complexity_model(loaded_model),
)


if __name__ == "__main__":
unittest.main()
82 changes: 50 additions & 32 deletions src/utils/metrics_lenet_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, models
from tensorflow_model_optimization.python.core.quantization.keras.quantize_wrapper import (
QuantizeWrapperV2,
)

from configs.qmodel import apply_quantization
from quantizers.flex_quantizer import FlexQuantizer
Expand All @@ -14,43 +17,57 @@


def apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict):
"""TODO(Colo): This function will is implemented in branch
colo/model_evalution in QTensor/src/examples/functions.py.

When merged, import that functions insted of redefining it here.
"""
"""Sets the internal state (alpha, levels, thresholds) of FlexQuantizers
within a quantized model by directly finding and assigning to the live
tf.Variable objects."""
for layer in qmodel.layers:
orig_layer_name = layer.name
if orig_layer_name.startswith("quant_"):
orig_layer_name = orig_layer_name[len("quant_") :]
if not isinstance(layer, QuantizeWrapperV2):
continue

orig_layer_name = layer.layer.name
if orig_layer_name in alpha_dict:
for alpha_type in ["kernel", "bias", "activation"]:
new_alpha = alpha_dict[orig_layer_name].get(alpha_type, None)
new_levels = levels_dict[orig_layer_name].get(alpha_type, None)
new_thresholds = thresholds_dict[orig_layer_name].get(
alpha_type, None
# Find all variables in the wrapper layer and create a map by name
var_map = {v.name: v for v in layer.weights}

# Iterate through the types ('kernel', 'bias') we might want to change
for attr_type in ["kernel", "bias"]:
new_alpha = alpha_dict.get(orig_layer_name, {}).get(attr_type)
new_levels = levels_dict.get(orig_layer_name, {}).get(
attr_type
)
new_thresholds = thresholds_dict.get(orig_layer_name, {}).get(
attr_type
)
if new_alpha is not None:
for v in layer.weights:
if "alpha" in v.name and alpha_type in v.name:
v.assign(new_alpha)
# print(f"Updated {v.name} ({alpha_type}) with new alpha value {new_alpha}")
elif (
alpha_type == "activation"
and "post_activation" in v.name
and "alpha" in v.name
):
v.assign(new_alpha)
# print(f"Updated {v.name} (activation) with new alpha value {new_alpha}")
if "levels" in v.name and alpha_type in v.name:
v.assign(new_levels)
# print(f"Updated {v.name} ({alpha_type}) with new levels value {new_levels}")
if "thresholds" in v.name and alpha_type in v.name:
v.assign(new_thresholds)
# print(f"Updated {v.name} ({alpha_type}) with new thresholds value {new_thresholds}")

return qmodel
# Construct the expected variable names and assign if they exist
if new_alpha is not None:
# Note: TFMOT might name variables slightly differently.
# This searches for common patterns.
for name_pattern in [
f"/{attr_type}_alpha:0",
f"_{attr_type}_alpha:0",
]:
var_name = layer.name + name_pattern
if var_name in var_map:
var_map[var_name].assign(new_alpha)

if new_levels is not None:
for name_pattern in [
f"/{attr_type}_levels:0",
f"_{attr_type}_levels:0",
]:
var_name = layer.name + name_pattern
if var_name in var_map:
var_map[var_name].assign(new_levels)

if new_thresholds is not None:
for name_pattern in [
f"/{attr_type}_thresholds:0",
f"_{attr_type}_thresholds:0",
]:
var_name = layer.name + name_pattern
if var_name in var_map:
var_map[var_name].assign(new_thresholds)


def apply_alpha_dict(qmodel, alpha_dict):
Expand Down Expand Up @@ -418,6 +435,7 @@ def base_flex_quantizer_space_complexity(
"bias": bthresholds,
}
apply_flex_dict(qmodel, alpha_dict, levels_dict, thresholds_dict)
qmodel(tf.random.normal(input_shape))

# 6) compare to your implementation
computed_bits = compute_space_complexity_model(qmodel)
Expand Down
Loading
Loading