From 132f9dc6b60cca28d38f118d965550c2735d1936 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Mon, 14 Apr 2025 11:11:13 -0400 Subject: [PATCH 01/42] feat: Added pgvector extension --- .../migrations/0060_add_pgvector_extension.py | 17 +++++++++++++ compose/local/postgres/Dockerfile | 24 +++++++++++++++---- config/settings/base.py | 1 + processing_services/example/api/schemas.py | 4 ++++ requirements/base.txt | 2 +- 5 files changed, 43 insertions(+), 5 deletions(-) create mode 100644 ami/main/migrations/0060_add_pgvector_extension.py diff --git a/ami/main/migrations/0060_add_pgvector_extension.py b/ami/main/migrations/0060_add_pgvector_extension.py new file mode 100644 index 000000000..61c773461 --- /dev/null +++ b/ami/main/migrations/0060_add_pgvector_extension.py @@ -0,0 +1,17 @@ +# Generated by Django 4.2.10 on 2025-04-14 10:09 + +from django.db import migrations + + +class Migration(migrations.Migration): + + dependencies = [ + ("main", "0059_alter_project_options"), + ] + + operations = [ + migrations.RunSQL( + sql="CREATE EXTENSION IF NOT EXISTS vector;", + reverse_sql="DROP EXTENSION IF EXISTS vector;", + ), + ] diff --git a/compose/local/postgres/Dockerfile b/compose/local/postgres/Dockerfile index 5f864a4a0..89e17d292 100644 --- a/compose/local/postgres/Dockerfile +++ b/compose/local/postgres/Dockerfile @@ -1,7 +1,23 @@ FROM postgres:16 -# FROM esgn/pgtuned:latest + +# Install build dependencies and pgvector +RUN apt-get update && apt-get install -y \ + postgresql-server-dev-16 \ + make \ + g++ \ + git \ +&& git clone --branch v0.5.1 https://github.com/pgvector/pgvector.git \ +&& cd pgvector \ +&& make && make install \ +&& cd .. \ +&& rm -rf pgvector \ +&& apt-get remove -y git g++ make \ +&& apt-get autoremove -y && apt-get clean + + +# Copy maintenance scripts COPY ./compose/local/postgres/maintenance /usr/local/bin/maintenance -RUN chmod +x /usr/local/bin/maintenance/* -RUN mv /usr/local/bin/maintenance/* /usr/local/bin \ - && rmdir /usr/local/bin/maintenance +RUN chmod +x /usr/local/bin/maintenance/* \ +&& mv /usr/local/bin/maintenance/* /usr/local/bin \ +&& rmdir /usr/local/bin/maintenance diff --git a/config/settings/base.py b/config/settings/base.py index 79a063f81..7db0f3157 100644 --- a/config/settings/base.py +++ b/config/settings/base.py @@ -92,6 +92,7 @@ "anymail", "cachalot", "guardian", + "pgvector.django", ] LOCAL_APPS = [ diff --git a/processing_services/example/api/schemas.py b/processing_services/example/api/schemas.py index d64690432..e8b71ae76 100644 --- a/processing_services/example/api/schemas.py +++ b/processing_services/example/api/schemas.py @@ -91,6 +91,10 @@ class ClassificationResponse(pydantic.BaseModel): default_factory=list, description="The raw logits output by the model, before any calibration or normalization.", ) + features: list[float] = pydantic.Field( + default_factory=list, + description="The feature embedding vector from the model's backbone.", + ) inference_time: float | None = None algorithm: AlgorithmReference terminal: bool = True diff --git a/requirements/base.txt b/requirements/base.txt index dd9de69d5..6257f4a2b 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -88,7 +88,7 @@ django-debug-toolbar==4.1.0 # https://github.com/jazzband/django-debug-toolbar django-extensions==3.2.3 # https://github.com/django-extensions/django-extensions django-coverage-plugin==3.0.0 # https://github.com/nedbat/django_coverage_plugin pytest-django==4.5.2 # https://github.com/pytest-dev/pytest-django - +pgvector ## Formerly production-only dependencies # @TODO move to Python Poetry or pyproject.toml for dependencies From dc48ccc0213fdf493b4e90ace2e8dd0f7d1eb857 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Mon, 14 Apr 2025 11:12:09 -0400 Subject: [PATCH 02/42] feat: Added features field to the Classification model --- .../0061_classification_features.py | 20 +++++++++++++++++++ ami/main/models.py | 2 ++ 2 files changed, 22 insertions(+) create mode 100644 ami/main/migrations/0061_classification_features.py diff --git a/ami/main/migrations/0061_classification_features.py b/ami/main/migrations/0061_classification_features.py new file mode 100644 index 000000000..49cbffbb4 --- /dev/null +++ b/ami/main/migrations/0061_classification_features.py @@ -0,0 +1,20 @@ +# Generated by Django 4.2.10 on 2025-04-14 10:14 + +from django.db import migrations +import pgvector.django.vector + + +class Migration(migrations.Migration): + dependencies = [ + ("main", "0060_add_pgvector_extension"), + ] + + operations = [ + migrations.AddField( + model_name="classification", + name="features", + field=pgvector.django.vector.VectorField( + dimensions=2048, help_text="Feature embedding from the model backbone", null=True + ), + ), + ] diff --git a/ami/main/models.py b/ami/main/models.py index a6eb728f6..2e7e04237 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -23,6 +23,7 @@ from django.template.defaultfilters import filesizeformat from django.utils import timezone from django_pydantic_field import SchemaField +from pgvector.django import VectorField import ami.tasks import ami.utils @@ -1919,6 +1920,7 @@ class Classification(BaseModel): logits = ArrayField( models.FloatField(), null=True, help_text="The raw output of the last fully connected layer of the model" ) + features = VectorField(dimensions=2048, null=True, help_text="Feature embedding from the model backbone") scores = ArrayField( models.FloatField(), null=True, From 8dc0c00a1656e4510fe87143bf35bc7bfcf62946 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 16 Apr 2025 19:28:10 -0400 Subject: [PATCH 03/42] changed taxon and detection to autocomplete fields in the ClassificationAdmin model --- ami/main/admin.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ami/main/admin.py b/ami/main/admin.py index e658c657f..dcfa58241 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -315,6 +315,7 @@ class DetectionAdmin(admin.ModelAdmin[Detection]): ) autocomplete_fields = ("source_image", "occurrence") + search_fields = ("id",) def get_queryset(self, request: HttpRequest) -> QuerySet[Any]: qs = super().get_queryset(request) @@ -405,6 +406,7 @@ class ClassificationAdmin(admin.ModelAdmin[Classification]): "detection__source_image__project", "taxon__rank", ) + autocomplete_fields = ("taxon", "detection") def get_queryset(self, request: HttpRequest) -> QuerySet[Any]: qs = super().get_queryset(request) From 4bf07b3db71bf498e5990d12bb3b8933b2c099cf Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 16 Apr 2025 19:31:05 -0400 Subject: [PATCH 04/42] feat: added similar action to the ClassificationViewset --- ami/main/api/serializers.py | 19 +++++++++++++++++++ ami/main/api/views.py | 12 ++++++++++++ ami/main/models.py | 36 ++++++++++++++++++++++++++++++++++-- 3 files changed, 65 insertions(+), 2 deletions(-) diff --git a/ami/main/api/serializers.py b/ami/main/api/serializers.py index cc6e3a0c4..6741dd5e9 100644 --- a/ami/main/api/serializers.py +++ b/ami/main/api/serializers.py @@ -758,6 +758,7 @@ class ClassificationSerializer(DefaultSerializer): taxon = TaxonNestedSerializer(read_only=True) algorithm = AlgorithmSerializer(read_only=True) top_n = ClassificationPredictionItemSerializer(many=True, read_only=True) + features_2048 = serializers.ListField(child=serializers.FloatField(), read_only=True) class Meta: model = Classification @@ -769,6 +770,7 @@ class Meta: "algorithm", "scores", "logits", + "features_2048", "top_n", "created_at", "updated_at", @@ -886,6 +888,23 @@ class Meta: ] +class ClassificationSimilaritySerializer(ClassificationSerializer): + distance = serializers.FloatField(read_only=True) + detection = DetectionNestedSerializer(read_only=True) + + class Meta(ClassificationSerializer.Meta): + fields = [ + "id", + "details", + "distance", + "detection", + "taxon", + "algorithm", + "created_at", + "updated_at", + ] + + class DetectionListSerializer(DefaultSerializer): class Meta: model = Detection diff --git a/ami/main/api/views.py b/ami/main/api/views.py index 5ef252b3f..3a8ba9741 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -69,6 +69,7 @@ from .serializers import ( ClassificationListSerializer, ClassificationSerializer, + ClassificationSimilaritySerializer, ClassificationWithTaxaSerializer, DeploymentListSerializer, DeploymentSerializer, @@ -1409,6 +1410,17 @@ def get_serializer_class(self): else: return ClassificationSerializer + @action(detail=True, methods=["get"]) + def similar(self, request, pk=None): + try: + ref_classification = self.get_object() + similar_qs = ref_classification.get_similar_classifications(distance_metric="cosine") + serializer = ClassificationSimilaritySerializer(similar_qs, many=True, context={"request": request}) + return Response(serializer.data) + + except ValueError as e: + return Response({"error": str(e)}, status=status.HTTP_400_BAD_REQUEST) + class SummaryView(GenericAPIView, ProjectMixin): permission_classes = [IsActiveStaffOrReadOnly] diff --git a/ami/main/models.py b/ami/main/models.py index 2e7e04237..1bf4c7c84 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -23,7 +23,7 @@ from django.template.defaultfilters import filesizeformat from django.utils import timezone from django_pydantic_field import SchemaField -from pgvector.django import VectorField +from pgvector.django import CosineDistance, L2Distance, VectorField import ami.tasks import ami.utils @@ -1920,7 +1920,10 @@ class Classification(BaseModel): logits = ArrayField( models.FloatField(), null=True, help_text="The raw output of the last fully connected layer of the model" ) - features = VectorField(dimensions=2048, null=True, help_text="Feature embedding from the model backbone") + features_2048 = VectorField( + dimensions=2048, null=True, default=None, help_text="Feature embedding from the model backbone" + ) + scores = ArrayField( models.FloatField(), null=True, @@ -2028,6 +2031,35 @@ def top_n(self, n: int = 3) -> list[dict[str, "Taxon | float | None"]]: for i, s in top_scored ] + def get_similar_classifications(self, distance_metric="cosine") -> models.QuerySet: + """ + Return most similar classifications based on feature_2048 embeddings. + Supports: 'cosine' and 'l2' distances + """ + if self.features_2048 is None: + raise ValueError("This classification does not have a feature vector.") + + if not self.algorithm: + raise ValueError("This classification is not associated with an algorithm.") + distance_metrics = { + "cosine": CosineDistance, + "l2": L2Distance, + } + distance_fn = distance_metrics.get(distance_metric) + + if distance_fn is None: + raise ValueError( + f"""Unsupported distance metric: {distance_metric}. + Supported metrics are : {','.join(list(distance_metrics.keys()))}""" + ) + + return ( + Classification.objects.exclude(features_2048=None) + .filter(algorithm=self.algorithm) + .annotate(distance=distance_fn("features_2048", self.features_2048)) + .order_by("distance") + ) + def save(self, *args, **kwargs): """ Set the category map based on the algorithm. From b258c9b7c8aa308195534bf38e91379d2e04e76e Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 16 Apr 2025 19:32:21 -0400 Subject: [PATCH 05/42] chore: changed features vector field name to features_2048 --- .../0061_classification_features.py | 20 ------------------- 1 file changed, 20 deletions(-) delete mode 100644 ami/main/migrations/0061_classification_features.py diff --git a/ami/main/migrations/0061_classification_features.py b/ami/main/migrations/0061_classification_features.py deleted file mode 100644 index 49cbffbb4..000000000 --- a/ami/main/migrations/0061_classification_features.py +++ /dev/null @@ -1,20 +0,0 @@ -# Generated by Django 4.2.10 on 2025-04-14 10:14 - -from django.db import migrations -import pgvector.django.vector - - -class Migration(migrations.Migration): - dependencies = [ - ("main", "0060_add_pgvector_extension"), - ] - - operations = [ - migrations.AddField( - model_name="classification", - name="features", - field=pgvector.django.vector.VectorField( - dimensions=2048, help_text="Feature embedding from the model backbone", null=True - ), - ), - ] From 9490045d0662f5cb0eda565acc7314a3dbd22330 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 16 Apr 2025 19:33:17 -0400 Subject: [PATCH 06/42] chore: changed features vector field name to features_2048 --- ...remove_classification_features_and_more.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) create mode 100644 ami/main/migrations/0061_classification_features_squashed_0062_remove_classification_features_and_more.py diff --git a/ami/main/migrations/0061_classification_features_squashed_0062_remove_classification_features_and_more.py b/ami/main/migrations/0061_classification_features_squashed_0062_remove_classification_features_and_more.py new file mode 100644 index 000000000..9351a7487 --- /dev/null +++ b/ami/main/migrations/0061_classification_features_squashed_0062_remove_classification_features_and_more.py @@ -0,0 +1,22 @@ +# Generated by Django 4.2.10 on 2025-04-16 12:27 + +from django.db import migrations +import pgvector.django.vector + + +class Migration(migrations.Migration): + replaces = [("main", "0061_classification_features"), ("main", "0062_remove_classification_features_and_more")] + + dependencies = [ + ("main", "0060_add_pgvector_extension"), + ] + + operations = [ + migrations.AddField( + model_name="classification", + name="features_2048", + field=pgvector.django.vector.VectorField( + default=None, dimensions=2048, help_text="Feature embedding from the model backbone", null=True + ), + ), + ] From 0ff569fbaf40d3322fd48c116c8bf1255717bec7 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 16 Apr 2025 19:35:06 -0400 Subject: [PATCH 07/42] feat: read features vector from processing service ClassificationResponse and save it to Classification object --- ami/ml/models/pipeline.py | 1 + ami/ml/schemas.py | 1 + processing_services/example/api/pipelines.py | 3 +++ 3 files changed, 5 insertions(+) diff --git a/ami/ml/models/pipeline.py b/ami/ml/models/pipeline.py index 7ce09b134..364818c64 100644 --- a/ami/ml/models/pipeline.py +++ b/ami/ml/models/pipeline.py @@ -615,6 +615,7 @@ def create_classification( score=max(classification_resp.scores), timestamp=classification_resp.timestamp or now(), logits=classification_resp.logits, + features_2048=classification_resp.features, scores=classification_resp.scores, terminal=classification_resp.terminal, category_map=classification_algo.category_map, diff --git a/ami/ml/schemas.py b/ami/ml/schemas.py index 4a653d970..76fbdc63a 100644 --- a/ami/ml/schemas.py +++ b/ami/ml/schemas.py @@ -106,6 +106,7 @@ class ClassificationResponse(pydantic.BaseModel): ) scores: list[float] = [] logits: list[float] | None = None + features: list[float] | None = None inference_time: float | None = None algorithm: AlgorithmReference terminal: bool = True diff --git a/processing_services/example/api/pipelines.py b/processing_services/example/api/pipelines.py index 0d955b417..2c9247e33 100644 --- a/processing_services/example/api/pipelines.py +++ b/processing_services/example/api/pipelines.py @@ -72,6 +72,7 @@ def make_random_prediction( assert algorithm.category_map is not None category_labels = algorithm.category_map.labels logits = [random.random() for _ in category_labels] + features = [random.random() for _ in range(2048)] softmax = [math.exp(logit) / sum([math.exp(logit) for logit in logits]) for logit in logits] top_class = category_labels[softmax.index(max(softmax))] return ClassificationResponse( @@ -79,6 +80,7 @@ def make_random_prediction( labels=category_labels if len(category_labels) <= max_labels else None, scores=softmax, logits=logits, + features=features, timestamp=datetime.datetime.now(), algorithm=AlgorithmReference(name=algorithm.name, key=algorithm.key), terminal=terminal, @@ -140,6 +142,7 @@ def make_constant_detections(source_image: SourceImage, num_detections: int = 10 labels=labels, scores=[0.9], # Constant score for each detection timestamp=timestamp, + features=[0.5] * 2048, # Dummy feature vector algorithm=AlgorithmReference( name=algorithms.CONSTANT_CLASSIFIER.name, key=algorithms.CONSTANT_CLASSIFIER.key ), From 89a3b6cc88a916f5dfef9bcd16a4c985faea1eaf Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 16 Apr 2025 19:36:12 -0400 Subject: [PATCH 08/42] test: added tests for PGVector distance metrics --- ami/ml/tests.py | 57 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 566f489ce..30b32d1a0 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -1,7 +1,9 @@ import datetime import unittest +import numpy as np from django.test import TestCase +from pgvector.django import CosineDistance, L2Distance from rest_framework.test import APIRequestFactory, APITestCase from ami.base.serializers import reverse_with_params @@ -628,3 +630,58 @@ def test_algorithm_category_maps(self): # Ensure the full labels in the data match the simple, ordered list of labels sorted_data = sorted(algorithm.category_map.data, key=lambda x: x["index"]) assert [category["label"] for category in sorted_data] == algorithm.category_map.labels + + +class ClassificationFeatureVectorTests(TestCase): + def setUp(self): + self.dim = 2048 + self.classifications = [] + # Create a base vector pointing along first axis + base_vec = np.zeros(self.dim) + # Create 10 normalized vectors as lists + base_vec[0] = 1.0 # Unit vector + for i in range(10): + # Generate a perturbation in a new random direction + noise = np.random.randn(self.dim) + noise[0] = 0 # Ensure it's orthogonal to base vector + noise = noise / np.linalg.norm(noise) + + # Blend with the base vector using different weights + alpha = 1 - (i * 0.1) # Decreasing similarity + perturbed_vec = alpha * base_vec + (1 - alpha) * noise + perturbed_vec = perturbed_vec / np.linalg.norm(perturbed_vec) + + classification = Classification.objects.create( + algorithm=Algorithm.objects.get(key="random-species-classifier"), + taxon=None, + score=0.5, + features_2048=perturbed_vec.tolist(), + timestamp=datetime.datetime.now(), + detection=None, + ) + self.classifications.append(classification) + + def test_cosine_distance(self): + ref_cls = self.classifications[5] + ref_vector = ref_cls.features_2048 + + qs = ( + Classification.objects.exclude(features_2048=None) + .annotate(cosine_distance=CosineDistance("features_2048", ref_vector)) + .order_by("cosine_distance") + ) + most_similar = qs.first() + self.assertEqual(most_similar.pk, ref_cls.pk, "Most similar classification should be itself") + + def test_l2_distance(self): + ref_cls = self.classifications[5] + ref_vector = ref_cls.features_2048 + + qs = ( + Classification.objects.exclude(features_2048=None) + .annotate(l2_distance=L2Distance("features_2048", ref_vector)) + .order_by("l2_distance") + ) + + most_similar = qs.first() + self.assertEqual(most_similar.pk, ref_cls.pk, "Most similar classification should be itself") From 9e13cc445a4be54b310dd226ad76aa12f73df4b5 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Thu, 17 Apr 2025 10:00:16 -0400 Subject: [PATCH 09/42] updated docker-compose.ci.yml to use the same postgres image --- docker-compose.ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml index 5f3dbd3e0..50c7d3c82 100644 --- a/docker-compose.ci.yml +++ b/docker-compose.ci.yml @@ -15,7 +15,7 @@ services: command: /start postgres: - image: postgres:13 + image: postgres:16 env_file: - ./.envs/.ci/.postgres From 5a51593e414b0c2e87f94ef9467ae20e2691f4fa Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Thu, 17 Apr 2025 10:04:47 -0400 Subject: [PATCH 10/42] updated docker-compose.ci.yml to use the same postgres image as docker-compose.yml --- docker-compose.ci.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml index 50c7d3c82..adfca156b 100644 --- a/docker-compose.ci.yml +++ b/docker-compose.ci.yml @@ -15,7 +15,9 @@ services: command: /start postgres: - image: postgres:16 + context: . + dockerfile: ./compose/local/postgres/Dockerfile + env_file: - ./.envs/.ci/.postgres From 9efff5f904c7bc4de9590ac36f8594e892689bb8 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Thu, 17 Apr 2025 10:07:25 -0400 Subject: [PATCH 11/42] updated docker-compose.ci.yml to use the same postgres image as docker-compose.yml --- docker-compose.ci.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docker-compose.ci.yml b/docker-compose.ci.yml index adfca156b..8205a8102 100644 --- a/docker-compose.ci.yml +++ b/docker-compose.ci.yml @@ -15,8 +15,9 @@ services: command: /start postgres: - context: . - dockerfile: ./compose/local/postgres/Dockerfile + build: + context: . + dockerfile: ./compose/local/postgres/Dockerfile env_file: - ./.envs/.ci/.postgres From 1c66f34b8a578975c46500e492fda2271e73c298 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Tue, 29 Apr 2025 11:56:01 -0400 Subject: [PATCH 12/42] feat: Added support for clustering detections for source image collections --- .../migrations/0017_alter_job_job_type_key.py | 29 +++++ .../migrations/0018_alter_job_job_type_key.py | 29 +++++ ami/jobs/models.py | 38 ++++++- ami/main/api/views.py | 23 ++++ .../migrations/0063_taxon_unknown_species.py | 17 +++ ami/main/models.py | 105 +++++++++++++++++- requirements/base.txt | 2 + 7 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 ami/jobs/migrations/0017_alter_job_job_type_key.py create mode 100644 ami/jobs/migrations/0018_alter_job_job_type_key.py create mode 100644 ami/main/migrations/0063_taxon_unknown_species.py diff --git a/ami/jobs/migrations/0017_alter_job_job_type_key.py b/ami/jobs/migrations/0017_alter_job_job_type_key.py new file mode 100644 index 000000000..a1b74f46d --- /dev/null +++ b/ami/jobs/migrations/0017_alter_job_job_type_key.py @@ -0,0 +1,29 @@ +# Generated by Django 4.2.10 on 2025-04-24 16:25 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("jobs", "0016_job_data_export_job_params_alter_job_job_type_key"), + ] + + operations = [ + migrations.AlterField( + model_name="job", + name="job_type_key", + field=models.CharField( + choices=[ + ("ml", "ML pipeline"), + ("populate_captures_collection", "Populate captures collection"), + ("data_storage_sync", "Data storage sync"), + ("unknown", "Unknown"), + ("data_export", "Data Export"), + ("occurrence_clustering", "Occurrence Feature Clustering"), + ], + default="unknown", + max_length=255, + verbose_name="Job Type", + ), + ), + ] diff --git a/ami/jobs/migrations/0018_alter_job_job_type_key.py b/ami/jobs/migrations/0018_alter_job_job_type_key.py new file mode 100644 index 000000000..b1a4e664f --- /dev/null +++ b/ami/jobs/migrations/0018_alter_job_job_type_key.py @@ -0,0 +1,29 @@ +# Generated by Django 4.2.10 on 2025-04-28 11:06 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("jobs", "0017_alter_job_job_type_key"), + ] + + operations = [ + migrations.AlterField( + model_name="job", + name="job_type_key", + field=models.CharField( + choices=[ + ("ml", "ML pipeline"), + ("populate_captures_collection", "Populate captures collection"), + ("data_storage_sync", "Data storage sync"), + ("unknown", "Unknown"), + ("data_export", "Data Export"), + ("detection_clustering", "Detection Feature Clustering"), + ], + default="unknown", + max_length=255, + verbose_name="Job Type", + ), + ), + ] diff --git a/ami/jobs/models.py b/ami/jobs/models.py index 1166f2c74..a078ba0cd 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -639,6 +639,35 @@ def run(cls, job: "Job"): job.update_status(JobState.SUCCESS, save=True) +class DetectionClusteringJob(JobType): + name = "Detection Feature Clustering" + key = "detection_clustering" + + @classmethod + def run(cls, job: "Job"): + job.update_status(JobState.STARTED) + job.started_at = datetime.datetime.now() + job.finished_at = None + job.save() + + if not job.source_image_collection: + raise ValueError("No source image collection provided") + + job.logger.info(f"Clustering detections for collection {job.source_image_collection}") + job.update_status(JobState.STARTED) + job.started_at = datetime.datetime.now() + job.finished_at = None + job.save() + + # Call the clustering method + job.source_image_collection.cluster_detections(job=job) + job.logger.info(f"Finished clustering detections for collection {job.source_image_collection}") + + job.finished_at = datetime.datetime.now() + job.update_status(JobState.SUCCESS, save=False) + job.save() + + class UnknownJobType(JobType): name = "Unknown" key = "unknown" @@ -648,7 +677,14 @@ def run(cls, job: "Job"): raise ValueError(f"Unknown job type '{job.job_type()}'") -VALID_JOB_TYPES = [MLJob, SourceImageCollectionPopulateJob, DataStorageSyncJob, UnknownJobType, DataExportJob] +VALID_JOB_TYPES = [ + MLJob, + SourceImageCollectionPopulateJob, + DataStorageSyncJob, + UnknownJobType, + DataExportJob, + DetectionClusteringJob, +] def get_job_type_by_key(key: str) -> type[JobType] | None: diff --git a/ami/main/api/views.py b/ami/main/api/views.py index 3a8ba9741..882e914b5 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -42,6 +42,7 @@ ) from ami.base.serializers import FilterParamsSerializer, SingleParamSerializer from ami.base.views import ProjectMixin +from ami.jobs.models import DetectionClusteringJob, Job from ami.utils.requests import get_active_classification_threshold, project_id_doc_param from ami.utils.storages import ConnectionTestResult @@ -744,6 +745,28 @@ def remove(self, request, pk=None): } ) + @action(detail=True, methods=["post"], name="cluster detections") + def cluster_detections(self, request, pk=None): + """ + Trigger a background job to cluster detections from this collection. + """ + collection: SourceImageCollection = self.get_object() + job = Job.objects.create( + name=f"Clustering detections for collection {collection.pk}", + project=collection.project, + source_image_collection=collection, + job_type_key=DetectionClusteringJob.key, + params={ + "threshold": request.data.get("threshold", 1), + "algorithm": request.data.get("algorithm", "kmeans"), + "algorithm_kwargs": request.data.get("algorithm_kwargs", {"n_clusters": 5}), + "pca_dim": request.data.get("pca_dim", 10), + }, + ) + job.enqueue() + logger.info(f"Triggered clustering job for collection {collection.pk}") + return Response({"job_id": job.pk, "project_id": collection.project.pk}) + @extend_schema(parameters=[project_id_doc_param]) def list(self, request, *args, **kwargs): return super().list(request, *args, **kwargs) diff --git a/ami/main/migrations/0063_taxon_unknown_species.py b/ami/main/migrations/0063_taxon_unknown_species.py new file mode 100644 index 000000000..1f3af523a --- /dev/null +++ b/ami/main/migrations/0063_taxon_unknown_species.py @@ -0,0 +1,17 @@ +# Generated by Django 4.2.10 on 2025-04-28 11:11 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("main", "0061_classification_features_squashed_0062_remove_classification_features_and_more"), + ] + + operations = [ + migrations.AddField( + model_name="taxon", + name="unknown_species", + field=models.BooleanField(default=False, help_text="Is this a clustering-generated taxon"), + ), + ] diff --git a/ami/main/models.py b/ami/main/models.py index 1bf4c7c84..f1df4cbfd 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -9,6 +9,7 @@ import urllib.parse from typing import Final, final # noqa: F401 +import numpy as np import pydantic from django.apps import apps from django.conf import settings @@ -22,8 +23,11 @@ from django.dispatch import receiver from django.template.defaultfilters import filesizeformat from django.utils import timezone +from django.utils.timezone import now from django_pydantic_field import SchemaField from pgvector.django import CosineDistance, L2Distance, VectorField +from sklearn.cluster import KMeans +from sklearn.decomposition import PCA import ami.tasks import ami.utils @@ -2782,7 +2786,7 @@ class Taxon(BaseModel): authorship_date = models.DateField(null=True, blank=True, help_text="The date the taxon was described.") ordering = models.IntegerField(null=True, blank=True) sort_phylogeny = models.BigIntegerField(blank=True, null=True) - + unknown_species = models.BooleanField(default=False, help_text="Is this a clustering-generated taxon") objects: TaxonManager = TaxonManager() # Type hints for auto-generated fields @@ -3203,6 +3207,105 @@ def populate_sample(self, job: "Job | None" = None): self.save() task_logger.info(f"Done sampling and saving captures to {self}") + def cluster_detections(self, job: "Job | None" = None): + from ami.jobs.models import JobState + + if job: + task_logger = job.logger + else: + task_logger = logger + params = job.params or {} + threshold = params.get("threshold", 1) + pca_dim = params.get("pca_dim", 10) + algorithm = params.get("algorithm", "kmeans") + algorithm_kwargs = params.get("algorithm_kwargs", {}) + feature_collection_stage = job.progress.add_stage("Collecting Features") + clusering_stage = job.progress.add_stage("Clustering") + create_unknow_taxa_stage = job.progress.add_stage("Creating Unknown Taxa") + job.save() + detections = Detection.objects.filter( + classifications__features_2048__isnull=False, + source_image__collections=self, + occurrence__determination_score__lte=threshold, + ) + + task_logger.info(f"Found {detections.count()} detections to process for clustering") + + features = [] + valid_detections = [] + job.progress.update_stage(feature_collection_stage.key, status=JobState.STARTED) + # Collecting features for detections + for idx, detection in enumerate(detections): + classification = detection.classifications.filter(features_2048__isnull=False).first() + if classification: + features.append(classification.features_2048) + valid_detections.append(detection) + job.progress.update_stage(feature_collection_stage.key, progress=(idx + 1) / (len(detections))) + job.save() + job.progress.update_stage(feature_collection_stage.key, status=JobState.SUCCESS, progress=1.0) + job.save() + logger.info(f"Clustering {len(features)} features from {len(valid_detections)} detections") + + if not features: + task_logger.info("No feature vectors found") + job.update_status(JobState.FAILURE, save=True) + return + + features_np = np.array(features) + + # PCA Reduction + task_logger.info(f"Reducing features from {features_np.shape[1]} to {pca_dim} dimensions") + features_reduced = PCA(n_components=pca_dim).fit_transform(features_np) + + # Clustering + clustering_algorithms = {"kmeans": KMeans} + clustering_algorithm = clustering_algorithms.get(algorithm, KMeans) + n_clusters = algorithm_kwargs.get("n_clusters", 5) + cluster_ids = clustering_algorithm(random_state=42, **algorithm_kwargs).fit_predict(features_reduced) + + task_logger.info(f"Clustering completed with {n_clusters} clusters") + + # Clustering Detections + job.progress.update_stage(clusering_stage.key, status=JobState.STARTED) + clusters = {} + for idx, (cluster_id, detection) in enumerate(zip(cluster_ids, valid_detections)): + clusters.setdefault(cluster_id, []).append(detection) + job.progress.update_stage(clusering_stage.key, progress=(idx + 1) / (len(cluster_ids))) + job.save() + job.progress.update_stage(clusering_stage.key, status=JobState.SUCCESS, progress=1.0) + job.save() + + # Creating Unknown Taxa + job.progress.update_stage(create_unknow_taxa_stage.key, status=JobState.STARTED) + for idx, (cluster_id, detections_list) in enumerate(clusters.items()): + taxon = Taxon.objects.create( + name=f"Cluster {cluster_id} (Job {job.pk})", + rank="SPECIES", + notes=f"Auto-created cluster {cluster_id} from clustering job {job.pk}", + unknown_species=True, + ) + + for idx, detection in enumerate(detections_list): + # Create a new Classification linking the detection to the new taxon + + Classification.objects.create( + detection=detection, + taxon=taxon, + algorithm=None, + score=1.0, + timestamp=now(), + logits=None, + features_2048=None, + scores=None, + terminal=True, + category_map=None, + ) + job.progress.update_stage(create_unknow_taxa_stage.key, progress=(idx + 1) / (len(clusters.keys()))) + job.save() + task_logger.info(f"Created {len(clusters)} clusters and updated {len(valid_detections)} detections") + job.progress.update_stage(create_unknow_taxa_stage.key, status=JobState.SUCCESS, progress=1.0) + job.save() + def sample_random(self, size: int = 100): """Create a random sample of source images""" diff --git a/requirements/base.txt b/requirements/base.txt index 6257f4a2b..c0276e673 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -97,3 +97,5 @@ pgvector newrelic==9.6.0 gunicorn==20.1.0 # https://github.com/benoitc/gunicorn # psycopg[c]==3.1.9 # https://github.com/psycopg/psycopg +# ML +scikit-learn From 99a7f3f0446515443f6cb8b3750374ef9c2edd25 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Tue, 29 Apr 2025 11:56:36 -0400 Subject: [PATCH 13/42] feat: Allowed triggering collection detections clustering from admin page --- ami/main/admin.py | 22 +++++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/ami/main/admin.py b/ami/main/admin.py index dcfa58241..f1b511aba 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -594,7 +594,27 @@ def populate_collection_async(self, request: HttpRequest, queryset: QuerySet[Sou f"Populating {len(queued_tasks)} collection(s) background tasks: {queued_tasks}.", ) - actions = [populate_collection, populate_collection_async] + @admin.action() + def cluster_detections(self, request: HttpRequest, queryset: QuerySet[SourceImageCollection]) -> None: + for collection in queryset: + from ami.jobs.models import DetectionClusteringJob, Job + + job = Job.objects.create( + name=f"Cluster detections for collection {collection.pk}", + project=collection.project, + source_image_collection=collection, + job_type_key=DetectionClusteringJob.key, + params={ + "threshold": 1, + "algorithm": "kmeans", + "algorithm_kwargs": {"n_clusters": 5}, + }, + ) + job.enqueue() + + self.message_user(request, f"Clustered {queryset.count()} collection(s).") + + actions = [populate_collection, populate_collection_async, cluster_detections] # Hide images many-to-many field from form. This would list all source images in the database. exclude = ("images",) From 83f2c08408de74de8855dbd3d66a12367c748a97 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 29 Apr 2025 13:21:16 -0700 Subject: [PATCH 14/42] fix: show unobserved Taxa in view for now --- ami/main/api/views.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ami/main/api/views.py b/ami/main/api/views.py index 882e914b5..6ffe05361 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -1297,7 +1297,7 @@ def get_queryset(self) -> QuerySet: # Allow showing detail views for unobserved taxa include_unobserved = True if self.action == "list": - include_unobserved = self.request.query_params.get("include_unobserved", False) + include_unobserved = self.request.query_params.get("include_unobserved", True) qs = self.get_taxa_observed(qs, project, include_unobserved=include_unobserved) if self.action == "retrieve": qs = qs.prefetch_related( From 5420f85a7c01084700ae6725cece72966a1e5499 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 29 Apr 2025 13:21:57 -0700 Subject: [PATCH 15/42] fix: create & update occurrence determinations after clustering --- ami/main/models.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ami/main/models.py b/ami/main/models.py index f1df4cbfd..919bfd621 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -34,6 +34,7 @@ from ami.base.fields import DateStringField from ami.base.models import BaseModel from ami.main import charts +from ami.ml.models.pipeline import create_and_update_occurrences_for_detections from ami.users.models import User from ami.utils.schemas import OrderedEnum @@ -3305,6 +3306,8 @@ def cluster_detections(self, job: "Job | None" = None): task_logger.info(f"Created {len(clusters)} clusters and updated {len(valid_detections)} detections") job.progress.update_stage(create_unknow_taxa_stage.key, status=JobState.SUCCESS, progress=1.0) job.save() + create_and_update_occurrences_for_detections(detections=detections_list, logger=task_logger) + job.save() def sample_random(self, size: int = 100): """Create a random sample of source images""" From 6b0020d6d06dd1f922f3d95ace376e45c880a29c Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 29 Apr 2025 13:22:55 -0700 Subject: [PATCH 16/42] feat: add unknown species filter to admin --- ami/main/admin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ami/main/admin.py b/ami/main/admin.py index f1b511aba..bf46a0deb 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -461,7 +461,7 @@ class TaxonAdmin(admin.ModelAdmin[Taxon]): "created_at", "updated_at", ) - list_filter = ("lists", "rank", TaxonParentFilter) + list_filter = ("unknown_species", "lists", "rank", TaxonParentFilter) search_fields = ("name",) autocomplete_fields = ( "parent", From 4f8b09b523d81750f27999a4838cb34dce3d17b4 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 29 Apr 2025 23:13:44 -0700 Subject: [PATCH 17/42] fix: circular import --- ami/main/models.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ami/main/models.py b/ami/main/models.py index ba1649c06..f6c53d75d 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -34,7 +34,6 @@ from ami.base.fields import DateStringField from ami.base.models import BaseModel from ami.main import charts -from ami.ml.models.pipeline import create_and_update_occurrences_for_detections from ami.users.models import User from ami.utils.schemas import OrderedEnum @@ -3336,6 +3335,8 @@ def cluster_detections(self, job: "Job | None" = None): task_logger.info(f"Created {len(clusters)} clusters and updated {len(valid_detections)} detections") job.progress.update_stage(create_unknow_taxa_stage.key, status=JobState.SUCCESS, progress=1.0) job.save() + from ami.ml.models.pipeline import create_and_update_occurrences_for_detections + create_and_update_occurrences_for_detections(detections=detections_list, logger=task_logger) job.save() From d255085d36d4b7f9eb08584a7c7a3f83af8babda Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 29 Apr 2025 23:14:47 -0700 Subject: [PATCH 18/42] fix: update migration ordering --- ami/main/migrations/0063_taxon_unknown_species.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ami/main/migrations/0063_taxon_unknown_species.py b/ami/main/migrations/0063_taxon_unknown_species.py index 1f3af523a..2ba04d0b2 100644 --- a/ami/main/migrations/0063_taxon_unknown_species.py +++ b/ami/main/migrations/0063_taxon_unknown_species.py @@ -5,7 +5,7 @@ class Migration(migrations.Migration): dependencies = [ - ("main", "0061_classification_features_squashed_0062_remove_classification_features_and_more"), + ("main", "0062_classification_ood_score_and_more"), ] operations = [ From 2e12b561c4760ceeaf4c11824f5d6c74b2b1c962 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 30 Apr 2025 15:24:43 -0400 Subject: [PATCH 19/42] Integrated Agglomerative clustering --- ami/jobs/models.py | 3 + ami/main/admin.py | 5 +- ami/main/api/views.py | 8 +- ami/main/models.py | 110 ++-------------- ami/ml/clustering_algorithms/__init__.py | 0 ami/ml/clustering_algorithms/agglomerative.py | 85 +++++++++++++ .../clustering_algorithms/base_clusterer.py | 43 +++++++ .../cluster_detections.py | 120 ++++++++++++++++++ .../clustering_metrics.py | 78 ++++++++++++ .../preprocessing_features.py | 16 +++ ami/ml/clustering_algorithms/utils.py | 15 +++ requirements/base.txt | 1 + 12 files changed, 378 insertions(+), 106 deletions(-) create mode 100644 ami/ml/clustering_algorithms/__init__.py create mode 100644 ami/ml/clustering_algorithms/agglomerative.py create mode 100644 ami/ml/clustering_algorithms/base_clusterer.py create mode 100644 ami/ml/clustering_algorithms/cluster_detections.py create mode 100644 ami/ml/clustering_algorithms/clustering_metrics.py create mode 100644 ami/ml/clustering_algorithms/preprocessing_features.py create mode 100644 ami/ml/clustering_algorithms/utils.py diff --git a/ami/jobs/models.py b/ami/jobs/models.py index a078ba0cd..ace21cea0 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -648,6 +648,9 @@ def run(cls, job: "Job"): job.update_status(JobState.STARTED) job.started_at = datetime.datetime.now() job.finished_at = None + job.progress.add_stage(name="Collecting Features", key="feature_collection") + job.progress.add_stage("Clustering", key="clustering") + job.progress.add_stage("Creating Unknown Taxa", key="create_unknown_taxa") job.save() if not job.source_image_collection: diff --git a/ami/main/admin.py b/ami/main/admin.py index bf46a0deb..48dd60f31 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -605,9 +605,8 @@ def cluster_detections(self, request: HttpRequest, queryset: QuerySet[SourceImag source_image_collection=collection, job_type_key=DetectionClusteringJob.key, params={ - "threshold": 1, - "algorithm": "kmeans", - "algorithm_kwargs": {"n_clusters": 5}, + "algorithm": "agglomerative", + "algorithm_kwargs": {"distance_threshold": 0.5}, }, ) job.enqueue() diff --git a/ami/main/api/views.py b/ami/main/api/views.py index 6ffe05361..2ddf3b6f1 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -757,10 +757,10 @@ def cluster_detections(self, request, pk=None): source_image_collection=collection, job_type_key=DetectionClusteringJob.key, params={ - "threshold": request.data.get("threshold", 1), - "algorithm": request.data.get("algorithm", "kmeans"), - "algorithm_kwargs": request.data.get("algorithm_kwargs", {"n_clusters": 5}), - "pca_dim": request.data.get("pca_dim", 10), + "ood_threshold": request.data.get("ood_threshold", 1), + "algorithm": request.data.get("algorithm", "agglomerative"), + "algorithm_kwargs": request.data.get("algorithm_kwargs", {"distance_threshold": 0.5}), + "pca_dim": request.data.get("pca", 10), }, ) job.enqueue() diff --git a/ami/main/models.py b/ami/main/models.py index 919bfd621..5339e308b 100644 --- a/ami/main/models.py +++ b/ami/main/models.py @@ -9,7 +9,6 @@ import urllib.parse from typing import Final, final # noqa: F401 -import numpy as np import pydantic from django.apps import apps from django.conf import settings @@ -23,18 +22,15 @@ from django.dispatch import receiver from django.template.defaultfilters import filesizeformat from django.utils import timezone -from django.utils.timezone import now from django_pydantic_field import SchemaField from pgvector.django import CosineDistance, L2Distance, VectorField -from sklearn.cluster import KMeans -from sklearn.decomposition import PCA import ami.tasks import ami.utils from ami.base.fields import DateStringField from ami.base.models import BaseModel from ami.main import charts -from ami.ml.models.pipeline import create_and_update_occurrences_for_detections +from ami.ml.clustering_algorithms.cluster_detections import cluster_detections from ami.users.models import User from ami.utils.schemas import OrderedEnum @@ -3209,105 +3205,21 @@ def populate_sample(self, job: "Job | None" = None): task_logger.info(f"Done sampling and saving captures to {self}") def cluster_detections(self, job: "Job | None" = None): - from ami.jobs.models import JobState - if job: task_logger = job.logger + params = job.params else: task_logger = logger - params = job.params or {} - threshold = params.get("threshold", 1) - pca_dim = params.get("pca_dim", 10) - algorithm = params.get("algorithm", "kmeans") - algorithm_kwargs = params.get("algorithm_kwargs", {}) - feature_collection_stage = job.progress.add_stage("Collecting Features") - clusering_stage = job.progress.add_stage("Clustering") - create_unknow_taxa_stage = job.progress.add_stage("Creating Unknown Taxa") - job.save() - detections = Detection.objects.filter( - classifications__features_2048__isnull=False, - source_image__collections=self, - occurrence__determination_score__lte=threshold, - ) - - task_logger.info(f"Found {detections.count()} detections to process for clustering") - - features = [] - valid_detections = [] - job.progress.update_stage(feature_collection_stage.key, status=JobState.STARTED) - # Collecting features for detections - for idx, detection in enumerate(detections): - classification = detection.classifications.filter(features_2048__isnull=False).first() - if classification: - features.append(classification.features_2048) - valid_detections.append(detection) - job.progress.update_stage(feature_collection_stage.key, progress=(idx + 1) / (len(detections))) - job.save() - job.progress.update_stage(feature_collection_stage.key, status=JobState.SUCCESS, progress=1.0) - job.save() - logger.info(f"Clustering {len(features)} features from {len(valid_detections)} detections") - - if not features: - task_logger.info("No feature vectors found") - job.update_status(JobState.FAILURE, save=True) - return - - features_np = np.array(features) - - # PCA Reduction - task_logger.info(f"Reducing features from {features_np.shape[1]} to {pca_dim} dimensions") - features_reduced = PCA(n_components=pca_dim).fit_transform(features_np) - - # Clustering - clustering_algorithms = {"kmeans": KMeans} - clustering_algorithm = clustering_algorithms.get(algorithm, KMeans) - n_clusters = algorithm_kwargs.get("n_clusters", 5) - cluster_ids = clustering_algorithm(random_state=42, **algorithm_kwargs).fit_predict(features_reduced) - - task_logger.info(f"Clustering completed with {n_clusters} clusters") - - # Clustering Detections - job.progress.update_stage(clusering_stage.key, status=JobState.STARTED) - clusters = {} - for idx, (cluster_id, detection) in enumerate(zip(cluster_ids, valid_detections)): - clusters.setdefault(cluster_id, []).append(detection) - job.progress.update_stage(clusering_stage.key, progress=(idx + 1) / (len(cluster_ids))) - job.save() - job.progress.update_stage(clusering_stage.key, status=JobState.SUCCESS, progress=1.0) - job.save() - - # Creating Unknown Taxa - job.progress.update_stage(create_unknow_taxa_stage.key, status=JobState.STARTED) - for idx, (cluster_id, detections_list) in enumerate(clusters.items()): - taxon = Taxon.objects.create( - name=f"Cluster {cluster_id} (Job {job.pk})", - rank="SPECIES", - notes=f"Auto-created cluster {cluster_id} from clustering job {job.pk}", - unknown_species=True, - ) + params = { + "algorithm": "agglomerative", + "ood_threshold": 0.5, + "algorithm_kwargs": { + "distance_threshold": 0.5, + }, + "pca": {"n_components": 384}, + } - for idx, detection in enumerate(detections_list): - # Create a new Classification linking the detection to the new taxon - - Classification.objects.create( - detection=detection, - taxon=taxon, - algorithm=None, - score=1.0, - timestamp=now(), - logits=None, - features_2048=None, - scores=None, - terminal=True, - category_map=None, - ) - job.progress.update_stage(create_unknow_taxa_stage.key, progress=(idx + 1) / (len(clusters.keys()))) - job.save() - task_logger.info(f"Created {len(clusters)} clusters and updated {len(valid_detections)} detections") - job.progress.update_stage(create_unknow_taxa_stage.key, status=JobState.SUCCESS, progress=1.0) - job.save() - create_and_update_occurrences_for_detections(detections=detections_list, logger=task_logger) - job.save() + cluster_detections(collection=self, params=params, job=job, task_logger=task_logger) def sample_random(self, size: int = 100): """Create a random sample of source images""" diff --git a/ami/ml/clustering_algorithms/__init__.py b/ami/ml/clustering_algorithms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/ami/ml/clustering_algorithms/agglomerative.py b/ami/ml/clustering_algorithms/agglomerative.py new file mode 100644 index 000000000..76dcd867a --- /dev/null +++ b/ami/ml/clustering_algorithms/agglomerative.py @@ -0,0 +1,85 @@ +import logging +import os + +import numpy as np +from scipy.spatial.distance import pdist, squareform +from sklearn.cluster import AgglomerativeClustering + +from .base_clusterer import BaseClusterer +from .preprocessing_features import dimension_reduction, standardize + +logger = logging.getLogger(__name__) + + +def get_distance_threshold(features, labels): + distance_matrix = squareform(pdist(features)) + intra_cluster_distances = [] + inter_cluster_distances = [] + for i in range(len(features)): + for j in range(i + 1, len(features)): + if labels[i] == labels[j]: + intra_cluster_distances.append(distance_matrix[i, j]) + else: + inter_cluster_distances.append(distance_matrix[i, j]) + # choose the 95th percentile of intra-cluster distances + threshold = np.percentile(intra_cluster_distances, 95) + return threshold + + +class AgglomerativeClusterer(BaseClusterer): + def __init__(self, config: dict): + self.config = config + self.setup_flag = False + self.data_dict = None + # Access from dictionary instead of attribute + self.distance_threshold = config.get("algorithm_kwargs", {}).get("distance_threshold", 0.5) + + def setup(self, data_dict): + # estimate the distance threshold + new_data_dict = {} + # Get output_dir from dictionary + save_dir = self.config.get("output_dir") + + if not self.setup_flag: + for data_type in data_dict: + new_data_dict[data_type] = {} + features = data_dict[data_type]["feat_list"] + # Get n_components from dictionary + features = dimension_reduction(standardize(features), self.config.get("pca", {}).get("n_components")) + labels = data_dict[data_type]["label_list"] + new_data_dict[data_type]["feat_list"] = features + new_data_dict[data_type]["label_list"] = labels + + np.savez( + os.path.join( + save_dir, + f"{data_type}_processed_pca_{self.config.get('pca', {}).get('n_components')}", + ), + feat_list=features, + label_list=labels, + ) + + self.data_dict = new_data_dict + self.setup_flag = True + + # Auto-calculate threshold if not provided + if not self.distance_threshold: + self.distance_threshold = get_distance_threshold( + data_dict["val"]["feat_list"], data_dict["val"]["label_list"] + ) + + def cluster(self, features): + logger.info(f"distance threshold: {self.distance_threshold}") + + # Get n_components and linkage from dictionary + features = dimension_reduction(standardize(features), self.config.get("pca", {}).get("n_components")) + + # Get linkage parameter from config + linkage = self.config.get("algorithm_kwargs", {}).get("linkage", "ward") + + clusterer = AgglomerativeClustering( + n_clusters=None, distance_threshold=self.distance_threshold, linkage=linkage + ).fit(features) + + preds = clusterer.labels_ + return preds diff --git a/ami/ml/clustering_algorithms/base_clusterer.py b/ami/ml/clustering_algorithms/base_clusterer.py new file mode 100644 index 000000000..586a50d82 --- /dev/null +++ b/ami/ml/clustering_algorithms/base_clusterer.py @@ -0,0 +1,43 @@ +import os + +import numpy as np + +from .preprocessing_features import dimension_reduction, standardize + + +class BaseClusterer: + def __init__(self, config): + self.config = config + self.setup_flag = False + self.data_dict = None + + def setup(self, data_dict): + new_data_dict = {} + save_dir = self.config.output_dir + if not self.setup_flag: + for data_type in data_dict: + new_data_dict[data_type] = {} + features = data_dict[data_type]["feat_list"] + features = dimension_reduction(standardize(features), self.config.pca.n_components) + labels = data_dict[data_type]["label_list"] + new_data_dict[data_type]["feat_list"] = features + new_data_dict[data_type]["label_list"] = labels + + np.savez( + os.path.join( + save_dir, + f"{data_type}_processed_pca_{self.config.pca.n_components}", + ), + feat_list=features, + label_list=labels, + ) + self.data_dict = new_data_dict + self.setup_flag = True + else: + pass + + def clustering(self, data_dict): + pass + + def cluster_detections(self, data_dict): + pass diff --git a/ami/ml/clustering_algorithms/cluster_detections.py b/ami/ml/clustering_algorithms/cluster_detections.py new file mode 100644 index 000000000..ba38a9ec5 --- /dev/null +++ b/ami/ml/clustering_algorithms/cluster_detections.py @@ -0,0 +1,120 @@ +import logging + +import numpy as np +from django.utils.timezone import now + +from ami.ml.clustering_algorithms.utils import get_clusterer + +logger = logging.getLogger(__name__) + + +def update_job_progress(job, stage_key, status, progress): + if job: + job.progress.update_stage(stage_key, status=status, progress=progress) + job.save() + + +def job_save(job): + if job: + job.save() + + +def cluster_detections(collection, params: dict, task_logger: logging.Logger = logger, job=None): + from ami.jobs.models import JobState + from ami.main.models import Classification, Detection, Taxon + from ami.ml.models.pipeline import create_and_update_occurrences_for_detections + + ood_threshold = params.get("ood_threshold", 1) + algorithm = params.get("algorithm", "agglomerative") + task_logger.info(f"Clustering Parameters: {params}") + job_save(job) + detections = Detection.objects.filter( + classifications__features_2048__isnull=False, + source_image__collections=collection, + occurrence__determination_score__lte=ood_threshold, + ) + + task_logger.info(f"Found {detections.count()} detections to process for clustering") + + features = [] + valid_detections = [] + update_job_progress(job, stage_key="feature_collection", status=JobState.STARTED, progress=0.0) + # Collecting features for detections + for idx, detection in enumerate(detections): + classification = detection.classifications.filter(features_2048__isnull=False).first() + if classification: + features.append(classification.features_2048) + valid_detections.append(detection) + update_job_progress( + job, + stage_key="feature_collection", + status=JobState.STARTED, + progress=(idx + 1) / detections.count(), + ) + update_job_progress(job, stage_key="feature_collection", status=JobState.SUCCESS, progress=1.0) + logger.info(f"Clustering {len(features)} features from {len(valid_detections)} detections") + + if not features: + raise ValueError("No feature vectors found") + + features_np = np.array(features) + + update_job_progress(job, stage_key="clustering", status=JobState.STARTED, progress=0.0) + # Clustering Detections + ClusteringAlgorithm = get_clusterer(algorithm) + if not ClusteringAlgorithm: + raise ValueError(f"Unsupported clustering algorithm: {algorithm}") + + cluster_ids = ClusteringAlgorithm(params).cluster(features_np) + + task_logger.info(f"Clustering completed with {len(set(cluster_ids))} clusters") + clusters = {} + for idx, (cluster_id, detection) in enumerate(zip(cluster_ids, valid_detections)): + clusters.setdefault(cluster_id, []).append(detection) + update_job_progress( + job, + stage_key="clustering", + status=JobState.STARTED, + progress=(idx + 1) / len(valid_detections), + ) + update_job_progress(job, stage_key="clustering", status=JobState.SUCCESS, progress=1.0) + + # Creating Unknown Taxa + update_job_progress(job, stage_key="create_unknown_taxa", status=JobState.STARTED, progress=0.0) + for idx, (cluster_id, detections_list) in enumerate(clusters.items()): + taxon = Taxon.objects.create( + name=f"Cluster {cluster_id} (Collection {collection.pk}) (Job {job.pk if job else 'unknown'})", + rank="SPECIES", + notes=f"Auto-created cluster {cluster_id} for collection {collection.pk}", + unknown_species=True, + ) + taxon.projects.add(collection.project) + + for idx, detection in enumerate(detections_list): + # Create a new Classification linking the detection to the new taxon + + Classification.objects.create( + detection=detection, + taxon=taxon, + algorithm=None, + score=1.0, + timestamp=now(), + logits=None, + features_2048=None, + scores=None, + terminal=True, + category_map=None, + ) + update_job_progress( + job, + stage_key="create_unknown_taxa", + status=JobState.STARTED, + progress=(idx + 1) / len(clusters), + ) + task_logger.info(f"Created {len(clusters)} clusters and updated {len(valid_detections)} detections") + update_job_progress(job, stage_key="create_unknown_taxa", status=JobState.SUCCESS, progress=1.0) + + # Updating Occurrences + create_and_update_occurrences_for_detections(detections=detections_list, logger=task_logger) + job_save(job) + return clusters diff --git a/ami/ml/clustering_algorithms/clustering_metrics.py b/ami/ml/clustering_algorithms/clustering_metrics.py new file mode 100644 index 000000000..41ce3da9f --- /dev/null +++ b/ami/ml/clustering_algorithms/clustering_metrics.py @@ -0,0 +1,78 @@ +import numpy as np +from sklearn.metrics import adjusted_mutual_info_score as ami_score +from sklearn.metrics import adjusted_rand_score as ari_score +from sklearn.metrics.cluster import normalized_mutual_info_score as nmi_score + +from .estimate_k import cluster_acc + + +def pairwise_cost(y_true, y_pred, split_cost=1, merge_cost=2): + true_match = y_true[:, None] == y_true + pred_match = y_pred[:, None] == y_pred + + split = true_match & ~pred_match # true labels same, but cluster labels different + merge = ~true_match & pred_match # true labels different, but cluster labels same + + cost = np.sum(np.triu(split * split_cost | merge * merge_cost, k=1)) + + return cost + + +def get_clustering_metrics(labels, preds, old_mask, split_cost, merge_cost): + all_acc, ind, _ = cluster_acc(labels.astype(int), preds.astype(int), return_ind=True) + + cluster_mapping = {pair[0]: pair[1] for pair in ind} + + preds = np.array([cluster_mapping[c] for c in preds]) + + all_nmi, all_ari, all_ami = ( + nmi_score(labels, preds), + ari_score(labels, preds), + ami_score(labels, preds), + ) + + all_pw_cost = pairwise_cost(labels, preds, split_cost, merge_cost) + + old_preds = preds[old_mask] + new_preds = preds[~old_mask] + + old_gt = labels[old_mask] + new_gt = labels[~old_mask] + + old_acc, old_nmi, old_ari, old_ami = ( + cluster_acc(old_gt.astype(int), old_preds.astype(int)), + nmi_score(old_gt, old_preds), + ari_score(old_gt, old_preds), + ami_score(old_gt, old_preds), + ) + + old_pw_cost = pairwise_cost(old_gt, old_preds, split_cost, merge_cost) + + new_acc, new_nmi, new_ari, new_ami = ( + cluster_acc(new_gt.astype(int), new_preds.astype(int)), + nmi_score(new_gt, new_preds), + ari_score(new_gt, new_preds), + ami_score(new_gt, new_preds), + ) + + new_pw_cost = pairwise_cost(new_gt, new_preds, split_cost, merge_cost) + + metrics = { + "ACC_all": all_acc, + "NMI_all": all_nmi, + "ARI_all": all_ari, + "AMI_all": all_ami, + "pw_cost_all": all_pw_cost, + "ACC_old": old_acc, + "NMI_old": old_nmi, + "ARI_old": old_ari, + "AMI_old": old_ami, + "pw_cost_old": old_pw_cost, + "ACC_new": new_acc, + "NMI_new": new_nmi, + "ARI_new": new_ari, + "AMI_new": new_ami, + "pw_cost_new": new_pw_cost, + } + + return metrics diff --git a/ami/ml/clustering_algorithms/preprocessing_features.py b/ami/ml/clustering_algorithms/preprocessing_features.py new file mode 100644 index 000000000..70fde61d7 --- /dev/null +++ b/ami/ml/clustering_algorithms/preprocessing_features.py @@ -0,0 +1,16 @@ +from sklearn import preprocessing +from sklearn.decomposition import PCA + + +def standardize(features): + scaler = preprocessing.StandardScaler().fit(features) + features = scaler.transform(features) + print("standardized features") + return features + + +def dimension_reduction(features, n_components): + pca = PCA(n_components=n_components) + features = pca.fit_transform(features) + print("PCA performed") + return features diff --git a/ami/ml/clustering_algorithms/utils.py b/ami/ml/clustering_algorithms/utils.py new file mode 100644 index 000000000..3ca70886d --- /dev/null +++ b/ami/ml/clustering_algorithms/utils.py @@ -0,0 +1,15 @@ +from .agglomerative import AgglomerativeClusterer +from .dbscan import DBSCANClusterer +from .kmeans import KMeansClusterer +from .mean_shift import MeanShiftClusterer + + +def get_clusterer(clustering_algorithm: str): + clusterers = { + "kmeans": KMeansClusterer, + "agglomerative": AgglomerativeClusterer, + "mean_shift": MeanShiftClusterer, + "dbscan": DBSCANClusterer, + } + + return clusterers.get(clustering_algorithm, None) diff --git a/requirements/base.txt b/requirements/base.txt index c0276e673..ebfbeaa32 100644 --- a/requirements/base.txt +++ b/requirements/base.txt @@ -99,3 +99,4 @@ gunicorn==20.1.0 # https://github.com/benoitc/gunicorn # psycopg[c]==3.1.9 # https://github.com/psycopg/psycopg # ML scikit-learn +scipy From 10820bb452f93c62b927037bb11912425c385bd9 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 30 Apr 2025 15:31:36 -0400 Subject: [PATCH 20/42] updated clustering request params --- ami/main/api/views.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/ami/main/api/views.py b/ami/main/api/views.py index c6d21d421..a8ab56047 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -760,7 +760,7 @@ def cluster_detections(self, request, pk=None): "ood_threshold": request.data.get("ood_threshold", 1), "algorithm": request.data.get("algorithm", "agglomerative"), "algorithm_kwargs": request.data.get("algorithm_kwargs", {"distance_threshold": 0.5}), - "pca_dim": request.data.get("pca", 10), + "pca_dim": request.data.get("pca", {}).get("n_components", 384), }, ) job.enqueue() @@ -1070,7 +1070,6 @@ class OccurrenceViewSet(DefaultViewSet, ProjectMixin): "event", "deployment", "determination__rank", - "determination_ood_score", ] ordering_fields = [ "created_at", @@ -1083,7 +1082,6 @@ class OccurrenceViewSet(DefaultViewSet, ProjectMixin): "determination", "determination__name", "determination_score", - "determination_ood_score", "event", "detections_count", "created_at", From 225529e790e6fc5bb70f9a56505cf5350fa29a5e Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Wed, 30 Apr 2025 17:55:25 -0400 Subject: [PATCH 21/42] fixed Agglomerative clustering --- ami/main/admin.py | 4 +++- ami/ml/clustering_algorithms/agglomerative.py | 14 +++++++---- .../cluster_detections.py | 23 +++++++++++++------ ami/ml/models/algorithm.py | 1 + docker-compose.yml | 1 - .../example/docker-compose.yml | 1 + .../minimal/docker-compose.yml | 1 + 7 files changed, 31 insertions(+), 14 deletions(-) diff --git a/ami/main/admin.py b/ami/main/admin.py index 48dd60f31..9032b81c9 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -605,8 +605,10 @@ def cluster_detections(self, request: HttpRequest, queryset: QuerySet[SourceImag source_image_collection=collection, job_type_key=DetectionClusteringJob.key, params={ + "ood_threshold": 0.3, "algorithm": "agglomerative", - "algorithm_kwargs": {"distance_threshold": 0.5}, + "algorithm_kwargs": {"distance_threshold": 80}, + "pca": {"n_components": 384}, }, ) job.enqueue() diff --git a/ami/ml/clustering_algorithms/agglomerative.py b/ami/ml/clustering_algorithms/agglomerative.py index 76dcd867a..37b9589ff 100644 --- a/ami/ml/clustering_algorithms/agglomerative.py +++ b/ami/ml/clustering_algorithms/agglomerative.py @@ -33,6 +33,7 @@ def __init__(self, config: dict): self.data_dict = None # Access from dictionary instead of attribute self.distance_threshold = config.get("algorithm_kwargs", {}).get("distance_threshold", 0.5) + self.n_components = config.get("pca", {}).get("n_components", 0) def setup(self, data_dict): # estimate the distance threshold @@ -72,14 +73,17 @@ def cluster(self, features): logger.info(f"distance threshold: {self.distance_threshold}") # Get n_components and linkage from dictionary - features = dimension_reduction(standardize(features), self.config.get("pca", {}).get("n_components")) + if self.n_components <= min(features.shape[0], features.shape[1]): + features = dimension_reduction(standardize(features), self.n_components) + else: + features = standardize(features) + logger.info(f"Skipping PCA, n_components { self.n_components} is larger than features shape ") # Get linkage parameter from config linkage = self.config.get("algorithm_kwargs", {}).get("linkage", "ward") - clusterer = AgglomerativeClustering( + clusters = AgglomerativeClustering( n_clusters=None, distance_threshold=self.distance_threshold, linkage=linkage - ).fit(features) + ).fit_predict(features) - preds = clusterer.labels_ - return preds + return clusters diff --git a/ami/ml/clustering_algorithms/cluster_detections.py b/ami/ml/clustering_algorithms/cluster_detections.py index ba38a9ec5..75766e21a 100644 --- a/ami/ml/clustering_algorithms/cluster_detections.py +++ b/ami/ml/clustering_algorithms/cluster_detections.py @@ -21,7 +21,8 @@ def job_save(job): def cluster_detections(collection, params: dict, task_logger: logging.Logger = logger, job=None): from ami.jobs.models import JobState - from ami.main.models import Classification, Detection, Taxon + from ami.main.models import Classification, Detection, TaxaList, Taxon + from ami.ml.models import Algorithm from ami.ml.models.pipeline import create_and_update_occurrences_for_detections ood_threshold = params.get("ood_threshold", 1) @@ -31,7 +32,7 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l detections = Detection.objects.filter( classifications__features_2048__isnull=False, source_image__collections=collection, - occurrence__determination_score__lte=ood_threshold, + occurrence__determination_ood_score__gte=ood_threshold, ) task_logger.info(f"Found {detections.count()} detections to process for clustering") @@ -78,10 +79,16 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l progress=(idx + 1) / len(valid_detections), ) update_job_progress(job, stage_key="clustering", status=JobState.SUCCESS, progress=1.0) - + taxa_list = TaxaList.objects.create(name=f"Clusters from (Job {job.pk if job else 'unknown'})") + taxa_list.projects.add(collection.project) + taxa_to_add = [] + clustering_algorithm = Algorithm.objects.get_or_create( + name=algorithm, + task_type="clustering", + ) # Creating Unknown Taxa update_job_progress(job, stage_key="create_unknown_taxa", status=JobState.STARTED, progress=0.0) - for idx, (cluster_id, detections_list) in enumerate(clusters.items()): + for idx, (cluster_id, cluster_detections) in enumerate(clusters.items()): taxon = Taxon.objects.create( name=f"Cluster {cluster_id} (Collection {collection.pk}) (Job {job.pk if job else 'unknown'})", rank="SPECIES", @@ -89,14 +96,15 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l unknown_species=True, ) taxon.projects.add(collection.project) + taxa_to_add.append(taxon) - for idx, detection in enumerate(detections_list): + for idx, detection in enumerate(cluster_detections): # Create a new Classification linking the detection to the new taxon Classification.objects.create( detection=detection, taxon=taxon, - algorithm=None, + algorithm=clustering_algorithm, score=1.0, timestamp=now(), logits=None, @@ -111,10 +119,11 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l status=JobState.STARTED, progress=(idx + 1) / len(clusters), ) + taxa_list.taxa.add(*taxa_to_add) task_logger.info(f"Created {len(clusters)} clusters and updated {len(valid_detections)} detections") update_job_progress(job, stage_key="create_unknown_taxa", status=JobState.SUCCESS, progress=1.0) # Updating Occurrences - create_and_update_occurrences_for_detections(detections=detections_list, logger=task_logger) + create_and_update_occurrences_for_detections(detections=valid_detections, logger=task_logger) job_save(job) return clusters diff --git a/ami/ml/models/algorithm.py b/ami/ml/models/algorithm.py index 82753aac4..5a05d5352 100644 --- a/ami/ml/models/algorithm.py +++ b/ami/ml/models/algorithm.py @@ -137,6 +137,7 @@ class Algorithm(BaseModel): ("depth_estimation", "Depth Estimation"), ("pose_estimation", "Pose Estimation"), ("size_estimation", "Size Estimation"), + ("clustering", "Clustering"), ("other", "Other"), ("unknown", "Unknown"), ], diff --git a/docker-compose.yml b/docker-compose.yml index 20e0462df..09ec5eba2 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -158,5 +158,4 @@ services: networks: antenna_network: - external: true name: antenna_network diff --git a/processing_services/example/docker-compose.yml b/processing_services/example/docker-compose.yml index 44e3531f4..714224967 100644 --- a/processing_services/example/docker-compose.yml +++ b/processing_services/example/docker-compose.yml @@ -20,4 +20,5 @@ services: networks: antenna_network: + external: true name: antenna_network diff --git a/processing_services/minimal/docker-compose.yml b/processing_services/minimal/docker-compose.yml index a35695531..2bdef9c1a 100644 --- a/processing_services/minimal/docker-compose.yml +++ b/processing_services/minimal/docker-compose.yml @@ -13,4 +13,5 @@ services: networks: antenna_network: + external: true name: antenna_network From 04235239a6953d9e5d1c3ac68b17b17f6456c8f4 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 30 Apr 2025 17:01:30 -0700 Subject: [PATCH 22/42] fix: disable missing clustering algorithms --- ami/ml/clustering_algorithms/utils.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/ami/ml/clustering_algorithms/utils.py b/ami/ml/clustering_algorithms/utils.py index 3ca70886d..03af31a7e 100644 --- a/ami/ml/clustering_algorithms/utils.py +++ b/ami/ml/clustering_algorithms/utils.py @@ -1,15 +1,16 @@ from .agglomerative import AgglomerativeClusterer -from .dbscan import DBSCANClusterer -from .kmeans import KMeansClusterer -from .mean_shift import MeanShiftClusterer + +# from .dbscan import DBSCANClusterer +# from .kmeans import KMeansClusterer +# from .mean_shift import MeanShiftClusterer def get_clusterer(clustering_algorithm: str): clusterers = { - "kmeans": KMeansClusterer, + # "kmeans": KMeansClusterer, "agglomerative": AgglomerativeClusterer, - "mean_shift": MeanShiftClusterer, - "dbscan": DBSCANClusterer, + # "mean_shift": MeanShiftClusterer, + # "dbscan": DBSCANClusterer, } return clusterers.get(clustering_algorithm, None) From cb894f473aacf0554e94cb075278ce782d101b2e Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 30 Apr 2025 17:43:18 -0700 Subject: [PATCH 23/42] fix: syntax when creating algorithm entry --- ami/ml/clustering_algorithms/cluster_detections.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/ami/ml/clustering_algorithms/cluster_detections.py b/ami/ml/clustering_algorithms/cluster_detections.py index 75766e21a..23107bc74 100644 --- a/ami/ml/clustering_algorithms/cluster_detections.py +++ b/ami/ml/clustering_algorithms/cluster_detections.py @@ -82,14 +82,15 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l taxa_list = TaxaList.objects.create(name=f"Clusters from (Job {job.pk if job else 'unknown'})") taxa_list.projects.add(collection.project) taxa_to_add = [] - clustering_algorithm = Algorithm.objects.get_or_create( - name=algorithm, + clustering_algorithm, _created = Algorithm.objects.get_or_create( + name=str(ClusteringAlgorithm), task_type="clustering", ) + logging.info(f"Using clustering algorithm: {clustering_algorithm}") # Creating Unknown Taxa update_job_progress(job, stage_key="create_unknown_taxa", status=JobState.STARTED, progress=0.0) for idx, (cluster_id, cluster_detections) in enumerate(clusters.items()): - taxon = Taxon.objects.create( + taxon, _created = Taxon.objects.get_or_create( name=f"Cluster {cluster_id} (Collection {collection.pk}) (Job {job.pk if job else 'unknown'})", rank="SPECIES", notes=f"Auto-created cluster {cluster_id} for collection {collection.pk}", From 39d9b6c271ddd45db53f9079637cf24ca8c6f613 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 30 Apr 2025 17:43:45 -0700 Subject: [PATCH 24/42] feat: command to create clustering job without starting it --- ami/main/admin.py | 24 ++++++++++++++++++++++-- 1 file changed, 22 insertions(+), 2 deletions(-) diff --git a/ami/main/admin.py b/ami/main/admin.py index 9032b81c9..8a74ad5fb 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -594,13 +594,33 @@ def populate_collection_async(self, request: HttpRequest, queryset: QuerySet[Sou f"Populating {len(queued_tasks)} collection(s) background tasks: {queued_tasks}.", ) + @admin.action(description="Create clustering job (but don't run it)") + @admin.action() + def create_clustering_job(self, request: HttpRequest, queryset: QuerySet[SourceImageCollection]) -> None: + from ami.jobs.models import DetectionClusteringJob, Job + + for collection in queryset: + job = Job.objects.create( + name=f"Clustering detections for collection {collection.pk}", + project=collection.project, + source_image_collection=collection, + job_type_key=DetectionClusteringJob.key, + params={ + "ood_threshold": 0.3, + "algorithm": "agglomerative", + "algorithm_kwargs": {"distance_threshold": 80}, + "pca": {"n_components": 384}, + }, + ) + self.message_user(request, f"Created clustering job #{job.pk} for collection #{collection.pk}") + @admin.action() def cluster_detections(self, request: HttpRequest, queryset: QuerySet[SourceImageCollection]) -> None: for collection in queryset: from ami.jobs.models import DetectionClusteringJob, Job job = Job.objects.create( - name=f"Cluster detections for collection {collection.pk}", + name=f"Clustering detections for collection {collection.pk}", project=collection.project, source_image_collection=collection, job_type_key=DetectionClusteringJob.key, @@ -615,7 +635,7 @@ def cluster_detections(self, request: HttpRequest, queryset: QuerySet[SourceImag self.message_user(request, f"Clustered {queryset.count()} collection(s).") - actions = [populate_collection, populate_collection_async, cluster_detections] + actions = [populate_collection, populate_collection_async, cluster_detections, create_clustering_job] # Hide images many-to-many field from form. This would list all source images in the database. exclude = ("images",) From abd9cf1fbf77444e84de47fd72ca7a659415910c Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 30 Apr 2025 22:37:22 -0700 Subject: [PATCH 25/42] feat: increase default batch size --- ami/jobs/models.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ami/jobs/models.py b/ami/jobs/models.py index ace21cea0..189112661 100644 --- a/ami/jobs/models.py +++ b/ami/jobs/models.py @@ -400,7 +400,8 @@ def run(cls, job: "Job"): total_classifications = 0 config = job.pipeline.get_config(project_id=job.project.pk) - chunk_size = config.get("request_source_image_batch_size", 1) + chunk_size = config.get("request_source_image_batch_size", 2) + # @TODO Ensure only images of the same dimensions are processed in a batch chunks = [images[i : i + chunk_size] for i in range(0, image_count, chunk_size)] # noqa request_failed_images = [] From b2a7b3fcf43585566f7e14713e12326ac5f49ce6 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 30 Apr 2025 22:37:55 -0700 Subject: [PATCH 26/42] fix: better algorithm name --- ami/ml/clustering_algorithms/cluster_detections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ami/ml/clustering_algorithms/cluster_detections.py b/ami/ml/clustering_algorithms/cluster_detections.py index 23107bc74..4d402e219 100644 --- a/ami/ml/clustering_algorithms/cluster_detections.py +++ b/ami/ml/clustering_algorithms/cluster_detections.py @@ -83,7 +83,7 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l taxa_list.projects.add(collection.project) taxa_to_add = [] clustering_algorithm, _created = Algorithm.objects.get_or_create( - name=str(ClusteringAlgorithm), + name=ClusteringAlgorithm.__name__, task_type="clustering", ) logging.info(f"Using clustering algorithm: {clustering_algorithm}") From bf67d060e4f0c72e302a42bafb1cd81651b2d021 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Wed, 30 Apr 2025 23:28:50 -0700 Subject: [PATCH 27/42] feat: allow sorting by OOD score --- ami/main/api/views.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ami/main/api/views.py b/ami/main/api/views.py index a8ab56047..ccc8540ff 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -1070,6 +1070,7 @@ class OccurrenceViewSet(DefaultViewSet, ProjectMixin): "event", "deployment", "determination__rank", + "determination_ood_score", ] ordering_fields = [ "created_at", @@ -1082,6 +1083,7 @@ class OccurrenceViewSet(DefaultViewSet, ProjectMixin): "determination", "determination__name", "determination_score", + "determination_ood_score", "event", "detections_count", "created_at", From 853b69d4fd756f16892b88bcb36c92c4c83e0e79 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 1 May 2025 08:29:45 -0700 Subject: [PATCH 28/42] feat: add unknown species and other fields to Taxon serializer --- ami/main/api/serializers.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/ami/main/api/serializers.py b/ami/main/api/serializers.py index 11533ed2b..b55fe2c56 100644 --- a/ami/main/api/serializers.py +++ b/ami/main/api/serializers.py @@ -518,6 +518,7 @@ class Meta: "last_detected", "best_determination_score", "cover_image_url", + "unknown_species", "created_at", "updated_at", ] @@ -710,6 +711,7 @@ class Meta: "duration_label", "first_appearance_timestamp", "last_appearance_timestamp", + "unknown_species", # "first_appearance", # "last_appearance", ] @@ -739,6 +741,8 @@ class Meta: "fieldguide_id", "cover_image_url", "cover_image_credit", + "unknown_species", + "last_detected", # @TODO this has performance impact, review ] From 65868722a02333f49a61446590708d1baca12590 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 1 May 2025 08:32:44 -0700 Subject: [PATCH 29/42] fix: remove missing field --- ami/main/api/serializers.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ami/main/api/serializers.py b/ami/main/api/serializers.py index b55fe2c56..de1c6a340 100644 --- a/ami/main/api/serializers.py +++ b/ami/main/api/serializers.py @@ -711,7 +711,6 @@ class Meta: "duration_label", "first_appearance_timestamp", "last_appearance_timestamp", - "unknown_species", # "first_appearance", # "last_appearance", ] From b242079d360c33e67e4f06936c648c55f2fafe48 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 1 May 2025 08:37:25 -0700 Subject: [PATCH 30/42] fix: migration conflicts --- ..._taxon_cover_image_credit_taxon_cover_image_url_and_more.py} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename ami/main/migrations/{0060_taxon_cover_image_credit_taxon_cover_image_url_and_more.py => 0064_taxon_cover_image_credit_taxon_cover_image_url_and_more.py} (93%) diff --git a/ami/main/migrations/0060_taxon_cover_image_credit_taxon_cover_image_url_and_more.py b/ami/main/migrations/0064_taxon_cover_image_credit_taxon_cover_image_url_and_more.py similarity index 93% rename from ami/main/migrations/0060_taxon_cover_image_credit_taxon_cover_image_url_and_more.py rename to ami/main/migrations/0064_taxon_cover_image_credit_taxon_cover_image_url_and_more.py index 4c74608f2..ca93c19e5 100644 --- a/ami/main/migrations/0060_taxon_cover_image_credit_taxon_cover_image_url_and_more.py +++ b/ami/main/migrations/0064_taxon_cover_image_credit_taxon_cover_image_url_and_more.py @@ -5,7 +5,7 @@ class Migration(migrations.Migration): dependencies = [ - ("main", "0059_alter_project_options"), + ("main", "0063_taxon_unknown_species"), ] operations = [ From fe744f0d4ad986176272cc961eb6d9492e3ce146 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Thu, 1 May 2025 18:30:53 -0700 Subject: [PATCH 31/42] feat: fields for investigating occurrence classifications in admin --- ami/main/admin.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/ami/main/admin.py b/ami/main/admin.py index 8a74ad5fb..101e408e7 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -6,6 +6,7 @@ from django.http.request import HttpRequest from django.template.defaultfilters import filesizeformat from django.utils.formats import number_format +from django.utils.html import format_html from guardian.admin import GuardedModelAdmin import ami.utils @@ -262,6 +263,7 @@ class ClassificationInline(admin.TabularInline): model = Classification extra = 0 fields = ( + "view_classification", "taxon", "algorithm", "timestamp", @@ -269,6 +271,7 @@ class ClassificationInline(admin.TabularInline): "created_at", ) readonly_fields = ( + "view_classification", "taxon", "algorithm", "timestamp", @@ -276,6 +279,11 @@ class ClassificationInline(admin.TabularInline): "created_at", ) + @admin.display(description="Classification") + def view_classification(self, obj): + url = f"/admin/main/classification/{obj.pk}/change/" + return format_html('{}', url, obj.pk) + def get_queryset(self, request: HttpRequest) -> QuerySet[Any]: qs = super().get_queryset(request) return qs.select_related("taxon", "algorithm", "detection") @@ -285,6 +293,7 @@ class DetectionInline(admin.TabularInline): model = Detection extra = 0 fields = ( + "view_detection", "detection_algorithm", "source_image", "timestamp", @@ -292,6 +301,7 @@ class DetectionInline(admin.TabularInline): "occurrence", ) readonly_fields = ( + "view_detection", "detection_algorithm", "source_image", "timestamp", @@ -299,6 +309,11 @@ class DetectionInline(admin.TabularInline): "occurrence", ) + @admin.display(description="Detection") + def view_detection(self, obj): + url = f"/admin/main/detection/{obj.pk}/change/" + return format_html('{}', url, obj.pk) + @admin.register(Detection) class DetectionAdmin(admin.ModelAdmin[Detection]): From 4ac88da6f3c754bf2abdd18110471aafbb3f71dc Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Mon, 5 May 2025 09:19:34 -0400 Subject: [PATCH 32/42] fix: filter by feature extraction algorithm --- ami/main/admin.py | 24 +++- ami/main/api/views.py | 5 +- ami/ml/clustering_algorithms/agglomerative.py | 6 +- .../cluster_detections.py | 29 ++++- ami/ml/tests.py | 112 +++++++++++++++++- ami/tests/fixtures/main.py | 4 +- 6 files changed, 169 insertions(+), 11 deletions(-) diff --git a/ami/main/admin.py b/ami/main/admin.py index 101e408e7..a2007558a 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -221,7 +221,29 @@ def update_calculated_fields(self, request: HttpRequest, queryset: QuerySet[Even self.message_user(request, f"Updated {queryset.count()} events.") list_filter = ("deployment", "project", "start") - actions = [update_calculated_fields] + + @admin.action() + def cluster_detections(self, request: HttpRequest, queryset: QuerySet[SourceImageCollection]) -> None: + for collection in queryset: + from ami.jobs.models import DetectionClusteringJob, Job + + job = Job.objects.create( + name=f"Cluster detections for collection {collection.pk}", + project=collection.project, + source_image_collection=collection, + job_type_key=DetectionClusteringJob.key, + params={ + "ood_threshold": 0.3, + "algorithm": "agglomerative", + "algorithm_kwargs": {"distance_threshold": 80}, + "pca": {"n_components": 384}, + }, + ) + job.enqueue() + + self.message_user(request, f"Clustered {queryset.count()} collection(s).") + + actions = [update_calculated_fields, cluster_detections] @admin.register(SourceImage) diff --git a/ami/main/api/views.py b/ami/main/api/views.py index ccc8540ff..197ca892e 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -757,10 +757,11 @@ def cluster_detections(self, request, pk=None): source_image_collection=collection, job_type_key=DetectionClusteringJob.key, params={ - "ood_threshold": request.data.get("ood_threshold", 1), + "ood_threshold": request.data.get("ood_threshold", 0.3), + "feature_extraction_algorithm": request.data.get("feature_extraction_algorithm", None), "algorithm": request.data.get("algorithm", "agglomerative"), "algorithm_kwargs": request.data.get("algorithm_kwargs", {"distance_threshold": 0.5}), - "pca_dim": request.data.get("pca", {}).get("n_components", 384), + "pca": request.data.get("pca", {"n_components": 384}), }, ) job.enqueue() diff --git a/ami/ml/clustering_algorithms/agglomerative.py b/ami/ml/clustering_algorithms/agglomerative.py index 37b9589ff..23801b32b 100644 --- a/ami/ml/clustering_algorithms/agglomerative.py +++ b/ami/ml/clustering_algorithms/agglomerative.py @@ -33,7 +33,7 @@ def __init__(self, config: dict): self.data_dict = None # Access from dictionary instead of attribute self.distance_threshold = config.get("algorithm_kwargs", {}).get("distance_threshold", 0.5) - self.n_components = config.get("pca", {}).get("n_components", 0) + self.n_components = config.get("pca", {}).get("n_components", 384) def setup(self, data_dict): # estimate the distance threshold @@ -71,7 +71,8 @@ def setup(self, data_dict): def cluster(self, features): logger.info(f"distance threshold: {self.distance_threshold}") - + logger.info("features shape: %s", features.shape) + logger.info(f"self.n_components: {self.n_components}") # Get n_components and linkage from dictionary if self.n_components <= min(features.shape[0], features.shape[1]): features = dimension_reduction(standardize(features), self.n_components) @@ -81,6 +82,7 @@ def cluster(self, features): # Get linkage parameter from config linkage = self.config.get("algorithm_kwargs", {}).get("linkage", "ward") + logger.info(f" features shape after PCA: {features.shape}") clusters = AgglomerativeClustering( n_clusters=None, distance_threshold=self.distance_threshold, linkage=linkage diff --git a/ami/ml/clustering_algorithms/cluster_detections.py b/ami/ml/clustering_algorithms/cluster_detections.py index 4d402e219..beb752a12 100644 --- a/ami/ml/clustering_algorithms/cluster_detections.py +++ b/ami/ml/clustering_algorithms/cluster_detections.py @@ -1,6 +1,7 @@ import logging import numpy as np +from django.db.models import Count from django.utils.timezone import now from ami.ml.clustering_algorithms.utils import get_clusterer @@ -26,13 +27,34 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l from ami.ml.models.pipeline import create_and_update_occurrences_for_detections ood_threshold = params.get("ood_threshold", 1) - algorithm = params.get("algorithm", "agglomerative") + feature_extraction_algorithm = params.get("feature_extraction_algorithm", None) + algorithm = params.get("clustering_algorithm", "agglomerative") task_logger.info(f"Clustering Parameters: {params}") job_save(job) + if feature_extraction_algorithm: + task_logger.info(f"Feature Extraction Algorithm: {feature_extraction_algorithm}") + # Check if the feature extraction algorithm is valid + if not Algorithm.objects.filter(key=feature_extraction_algorithm).exists(): + raise ValueError(f"Invalid feature extraction algorithm key: {feature_extraction_algorithm}") + else: + # Fallback to the most used feature extraction algorithm in this collection + feature_extraction_algorithm_id = ( + Classification.objects.filter(features_2048__isnull=False, detection__source_image__collections=collection) + .values("algorithm") + .annotate(count=Count("id")) + .order_by("-count") + .values_list("algorithm", flat=True) + .first() + ) + if feature_extraction_algorithm_id: + feature_extraction_algorithm = Algorithm.objects.get(pk=feature_extraction_algorithm_id) + task_logger.info(f"Using fallback feature extraction algorithm: {feature_extraction_algorithm.name}") + detections = Detection.objects.filter( classifications__features_2048__isnull=False, + classifications__algorithm=feature_extraction_algorithm, source_image__collections=collection, - occurrence__determination_ood_score__gte=ood_threshold, + occurrence__determination_ood_score__gt=ood_threshold, ) task_logger.info(f"Found {detections.count()} detections to process for clustering") @@ -59,7 +81,8 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l raise ValueError("No feature vectors found") features_np = np.array(features) - + task_logger.info(f"Feature vectors shape: {features_np.shape}") + logger.info(f"First feature vector: {features_np[0]}, shape: {features_np[0].shape}") update_job_progress(job, stage_key="clustering", status=JobState.STARTED, progress=0.0) # Clustering Detections ClusteringAlgorithm = get_clusterer(algorithm) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index 30b32d1a0..f0d539134 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -7,7 +7,8 @@ from rest_framework.test import APIRequestFactory, APITestCase from ami.base.serializers import reverse_with_params -from ami.main.models import Classification, Detection, Project, SourceImage, SourceImageCollection +from ami.main.models import Classification, Deployment, Detection, Project, SourceImage, SourceImageCollection, Taxon +from ami.ml.clustering_algorithms.cluster_detections import cluster_detections from ami.ml.models import Algorithm, Pipeline, ProcessingService from ami.ml.models.pipeline import collect_images, get_or_create_algorithm_and_category_map, save_results from ami.ml.schemas import ( @@ -19,7 +20,13 @@ PipelineResultsResponse, SourceImageResponse, ) -from ami.tests.fixtures.main import create_captures_from_files, create_processing_service, setup_test_project +from ami.tests.fixtures.main import ( + create_captures, + create_captures_from_files, + create_processing_service, + group_images_into_events, + setup_test_project, +) from ami.tests.fixtures.ml import ALGORITHM_CHOICES from ami.users.models import User @@ -685,3 +692,104 @@ def test_l2_distance(self): most_similar = qs.first() self.assertEqual(most_similar.pk, ref_cls.pk, "Most similar classification should be itself") + + +class TestClustering(TestCase): + def setUp(self): + self.project = Project.objects.create(name="Test Clustering Project") + self.deployment = Deployment.objects.create(name="Test Deployment", project=self.project) + + create_captures(deployment=self.deployment, num_nights=2, images_per_night=10, interval_minutes=1) + group_images_into_events(deployment=self.deployment) + sample_size = 10 + self.collection = SourceImageCollection.objects.create( + name="Test Random Source Image Collection", + project=self.project, + method="random", + kwargs={"size": sample_size}, + ) + self.collection.save() + self.collection.populate_sample() + assert self.collection.images.count() == sample_size + self.populate_collection_with_detections() + self.collection.save() + self.detections = Detection.objects.filter(source_image__collections=self.collection) + self.assertGreater(len(self.detections), 0, "No detections found in the collection") + self._populate_detection_features() + for detection in self.detections: + assert detection.classifications.last().features_2048 is not None, "No features found in the detection" + + def populate_collection_with_detections(self): + """Populate the collection with random detections.""" + for image in self.collection.images.all(): + # Create a random detection for each image + Detection.objects.create( + source_image=image, + detection_algorithm=Algorithm.objects.get(key="random-detector"), + bbox=[0.0, 0.0, 1.0, 1.0], + timestamp=datetime.datetime.now(), + ) + + def _populate_detection_features(self): + """Populate detection features with random values.""" + for detection in self.detections: + # Create a random feature vector + feature_vector = np.random.rand(2048).tolist() + # Assign the feature vector to the detection + detection.classifications.create( + algorithm=Algorithm.objects.get(key="random-species-classifier"), + taxon=None, + score=1, + features_2048=feature_vector, + timestamp=datetime.datetime.now(), + ) + detection.save() + + def test_agglomerative_clustering(self): + """Test agglomerative clustering with real implementation.""" + # Call with agglomerative clustering parameters + params = { + "algorithm": "agglomerative", + "ood_threshold": 1, + "agglomerative": {"distance_threshold": 0.5, "linkage": "ward"}, + "pca": {"n_components": 5}, # Use fewer components for test performance + } + # Execute the clustering function + clusters = cluster_detections(self.collection, params) + + # The exact number could vary based on the random features and threshold + self.assertGreaterEqual(len(clusters), 1, "Should create at least 1 cluster") + self.assertLessEqual(len(clusters), 5, "Should not create more than 5 clusters") + + # Verify all detections are assigned to clusters + total_detections = sum(len(detections) for detections in clusters.values()) + self.assertEqual( + total_detections, + len(self.detections), + f"All {len(self.detections)} detections should be assigned to clusters", + ) + + # Check if detections with similar features are in the same cluster + # Create a map of detection to cluster_id + detection_to_cluster = {} + for cluster_id, detections_list in clusters.items(): + for detection in detections_list: + detection_to_cluster[detection.id] = cluster_id + + # Get all classifications + all_classifications = Classification.objects.filter(detection__in=self.detections, terminal=True) + + # Verify that each detection has a new classification linking it to a taxon + self.assertEqual( + all_classifications.count(), len(self.detections), "Each detection should have a new classification" + ) + + # Verify that each cluster has a corresponding taxon + taxa = Taxon.objects.filter(unknown_species=True) + self.assertEqual(taxa.count(), len(clusters), f"Should create {len(clusters)} taxa for the clusters") + + # Verify that each taxon is associated with the project + for taxon in taxa: + self.assertIn( + self.project, taxon.projects.all(), f"Taxon {taxon.name} should be associated with the project" + ) diff --git a/ami/tests/fixtures/main.py b/ami/tests/fixtures/main.py index 812153ff9..31a7a599d 100644 --- a/ami/tests/fixtures/main.py +++ b/ami/tests/fixtures/main.py @@ -202,7 +202,9 @@ def create_taxa(project: Project) -> TaxaList: taxa_list.projects.add(project) root, _created = Taxon.objects.get_or_create(name="Lepidoptera", rank=TaxonRank.ORDER.name) root.projects.add(project) - family_taxon, _ = Taxon.objects.get_or_create(name="Nymphalidae", parent=root, rank=TaxonRank.FAMILY.name) + family_taxon, _ = Taxon.objects.get_or_create( + name="Nymphalidae", defaults={"parent": root, "rank": TaxonRank.FAMILY.name} + ) family_taxon.projects.add(project) genus_taxon, _ = Taxon.objects.get_or_create(name="Vanessa", parent=family_taxon, rank=TaxonRank.GENUS.name) genus_taxon.projects.add(project) From 6d44bdb57fac39a2745a3eb0e8733c08b2e867ea Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Mon, 5 May 2025 10:36:06 -0400 Subject: [PATCH 33/42] chore: Used a serializer to handle job params instead of reading them directly from the request objects --- ami/main/api/serializers.py | 8 ++++++++ ami/main/api/views.py | 13 ++++++------- 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/ami/main/api/serializers.py b/ami/main/api/serializers.py index de1c6a340..06b6b3686 100644 --- a/ami/main/api/serializers.py +++ b/ami/main/api/serializers.py @@ -1550,3 +1550,11 @@ class Meta: "total_size", "last_checked", ] + + +class ClusterDetectionsSerializer(serializers.Serializer): + ood_threshold = serializers.FloatField(required=False, default=0.3) + feature_extraction_algorithm = serializers.CharField(required=False, allow_null=True) + algorithm = serializers.CharField(required=False, default="agglomerative") + algorithm_kwargs = serializers.DictField(required=False, default={"distance_threshold": 0.5}) + pca = serializers.DictField(required=False, default={"n_components": 384}) diff --git a/ami/main/api/views.py b/ami/main/api/views.py index 197ca892e..12937bbd7 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -43,6 +43,7 @@ from ami.base.serializers import FilterParamsSerializer, SingleParamSerializer from ami.base.views import ProjectMixin from ami.jobs.models import DetectionClusteringJob, Job +from ami.main.api.serializers import ClusterDetectionsSerializer from ami.utils.requests import get_active_classification_threshold, project_id_doc_param from ami.utils.storages import ConnectionTestResult @@ -750,19 +751,17 @@ def cluster_detections(self, request, pk=None): """ Trigger a background job to cluster detections from this collection. """ + collection: SourceImageCollection = self.get_object() + serializer = ClusterDetectionsSerializer(data=request.data) + serializer.is_valid(raise_exception=True) + params = serializer.validated_data job = Job.objects.create( name=f"Clustering detections for collection {collection.pk}", project=collection.project, source_image_collection=collection, job_type_key=DetectionClusteringJob.key, - params={ - "ood_threshold": request.data.get("ood_threshold", 0.3), - "feature_extraction_algorithm": request.data.get("feature_extraction_algorithm", None), - "algorithm": request.data.get("algorithm", "agglomerative"), - "algorithm_kwargs": request.data.get("algorithm_kwargs", {"distance_threshold": 0.5}), - "pca": request.data.get("pca", {"n_components": 384}), - }, + params=params, ) job.enqueue() logger.info(f"Triggered clustering job for collection {collection.pk}") From 12b4ee4817cf0ec7c1c94d91cb3f3a149b536139 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Mon, 5 May 2025 15:34:45 -0400 Subject: [PATCH 34/42] set default ood threshold to 0.0 --- ami/main/api/serializers.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ami/main/api/serializers.py b/ami/main/api/serializers.py index 06b6b3686..1f97fa0bc 100644 --- a/ami/main/api/serializers.py +++ b/ami/main/api/serializers.py @@ -1553,7 +1553,7 @@ class Meta: class ClusterDetectionsSerializer(serializers.Serializer): - ood_threshold = serializers.FloatField(required=False, default=0.3) + ood_threshold = serializers.FloatField(required=False, default=0.0) feature_extraction_algorithm = serializers.CharField(required=False, allow_null=True) algorithm = serializers.CharField(required=False, default="agglomerative") algorithm_kwargs = serializers.DictField(required=False, default={"distance_threshold": 0.5}) From 2c7379534a6af90688b5950d81fcc5a5343b8dc3 Mon Sep 17 00:00:00 2001 From: mohamedelabbas1996 Date: Mon, 5 May 2025 15:36:12 -0400 Subject: [PATCH 35/42] test: added tests for clustering --- ami/ml/tests.py | 41 +++++++++++++++++++---------------------- 1 file changed, 19 insertions(+), 22 deletions(-) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index f0d539134..acec00f7f 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -7,7 +7,7 @@ from rest_framework.test import APIRequestFactory, APITestCase from ami.base.serializers import reverse_with_params -from ami.main.models import Classification, Deployment, Detection, Project, SourceImage, SourceImageCollection, Taxon +from ami.main.models import Classification, Detection, Project, SourceImage, SourceImageCollection, Taxon from ami.ml.clustering_algorithms.cluster_detections import cluster_detections from ami.ml.models import Algorithm, Pipeline, ProcessingService from ami.ml.models.pipeline import collect_images, get_or_create_algorithm_and_category_map, save_results @@ -23,7 +23,9 @@ from ami.tests.fixtures.main import ( create_captures, create_captures_from_files, + create_detections, create_processing_service, + create_taxa, group_images_into_events, setup_test_project, ) @@ -696,11 +698,11 @@ def test_l2_distance(self): class TestClustering(TestCase): def setUp(self): - self.project = Project.objects.create(name="Test Clustering Project") - self.deployment = Deployment.objects.create(name="Test Deployment", project=self.project) - + self.project, self.deployment = setup_test_project() + create_taxa(project=self.project) create_captures(deployment=self.deployment, num_nights=2, images_per_night=10, interval_minutes=1) group_images_into_events(deployment=self.deployment) + sample_size = 10 self.collection = SourceImageCollection.objects.create( name="Test Random Source Image Collection", @@ -713,36 +715,39 @@ def setUp(self): assert self.collection.images.count() == sample_size self.populate_collection_with_detections() self.collection.save() + # create_occurrences(deployment=self.deployment) + self.detections = Detection.objects.filter(source_image__collections=self.collection) self.assertGreater(len(self.detections), 0, "No detections found in the collection") self._populate_detection_features() - for detection in self.detections: - assert detection.classifications.last().features_2048 is not None, "No features found in the detection" def populate_collection_with_detections(self): """Populate the collection with random detections.""" for image in self.collection.images.all(): # Create a random detection for each image - Detection.objects.create( - source_image=image, - detection_algorithm=Algorithm.objects.get(key="random-detector"), - bbox=[0.0, 0.0, 1.0, 1.0], - timestamp=datetime.datetime.now(), + create_detections( + source_image=image, bboxes=[(0.0, 0.0, 1.0, 1.0), (0.1, 0.1, 0.9, 0.9), (0.2, 0.2, 0.8, 0.8)] ) def _populate_detection_features(self): """Populate detection features with random values.""" for detection in self.detections: + detection.associate_new_occurrence() # Create a random feature vector feature_vector = np.random.rand(2048).tolist() # Assign the feature vector to the detection - detection.classifications.create( + classification = Classification.objects.create( + detection=detection, algorithm=Algorithm.objects.get(key="random-species-classifier"), taxon=None, - score=1, + score=0.5, features_2048=feature_vector, timestamp=datetime.datetime.now(), ) + detection.classifications.add(classification) + assert classification.features_2048 is not None, "No features found for the detection" + assert detection.occurrence is not None, "No occurrence found for the detection" + detection.occurrence.determination_ood_score = 0.5 detection.save() def test_agglomerative_clustering(self): @@ -751,6 +756,7 @@ def test_agglomerative_clustering(self): params = { "algorithm": "agglomerative", "ood_threshold": 1, + "feature_extraction_algorithm": None, "agglomerative": {"distance_threshold": 0.5, "linkage": "ward"}, "pca": {"n_components": 5}, # Use fewer components for test performance } @@ -759,7 +765,6 @@ def test_agglomerative_clustering(self): # The exact number could vary based on the random features and threshold self.assertGreaterEqual(len(clusters), 1, "Should create at least 1 cluster") - self.assertLessEqual(len(clusters), 5, "Should not create more than 5 clusters") # Verify all detections are assigned to clusters total_detections = sum(len(detections) for detections in clusters.values()) @@ -776,14 +781,6 @@ def test_agglomerative_clustering(self): for detection in detections_list: detection_to_cluster[detection.id] = cluster_id - # Get all classifications - all_classifications = Classification.objects.filter(detection__in=self.detections, terminal=True) - - # Verify that each detection has a new classification linking it to a taxon - self.assertEqual( - all_classifications.count(), len(self.detections), "Each detection should have a new classification" - ) - # Verify that each cluster has a corresponding taxon taxa = Taxon.objects.filter(unknown_species=True) self.assertEqual(taxa.count(), len(clusters), f"Should create {len(clusters)} taxa for the clusters") From e5d7ff01b0e7c6debbf032c35030200072ed23a0 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 6 May 2025 15:10:40 -0700 Subject: [PATCH 36/42] chore: migration for new algorithm type --- .../0023_alter_algorithm_task_type.py | 42 +++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 ami/ml/migrations/0023_alter_algorithm_task_type.py diff --git a/ami/ml/migrations/0023_alter_algorithm_task_type.py b/ami/ml/migrations/0023_alter_algorithm_task_type.py new file mode 100644 index 000000000..2e8fa3975 --- /dev/null +++ b/ami/ml/migrations/0023_alter_algorithm_task_type.py @@ -0,0 +1,42 @@ +# Generated by Django 4.2.10 on 2025-05-06 18:10 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + dependencies = [ + ("ml", "0022_alter_pipeline_default_config"), + ] + + operations = [ + migrations.AlterField( + model_name="algorithm", + name="task_type", + field=models.CharField( + choices=[ + ("detection", "Detection"), + ("localization", "Localization"), + ("segmentation", "Segmentation"), + ("classification", "Classification"), + ("embedding", "Embedding"), + ("tracking", "Tracking"), + ("tagging", "Tagging"), + ("regression", "Regression"), + ("captioning", "Captioning"), + ("generation", "Generation"), + ("translation", "Translation"), + ("summarization", "Summarization"), + ("question_answering", "Question Answering"), + ("depth_estimation", "Depth Estimation"), + ("pose_estimation", "Pose Estimation"), + ("size_estimation", "Size Estimation"), + ("clustering", "Clustering"), + ("other", "Other"), + ("unknown", "Unknown"), + ], + default="unknown", + max_length=255, + null=True, + ), + ), + ] From fdbbf75b62e1005b5d63d79ae043ff6af3c549b9 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 6 May 2025 15:21:53 -0700 Subject: [PATCH 37/42] fix: remove cluster action in Event admin until its ready --- ami/main/admin.py | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/ami/main/admin.py b/ami/main/admin.py index a2007558a..a7b533b98 100644 --- a/ami/main/admin.py +++ b/ami/main/admin.py @@ -222,29 +222,6 @@ def update_calculated_fields(self, request: HttpRequest, queryset: QuerySet[Even list_filter = ("deployment", "project", "start") - @admin.action() - def cluster_detections(self, request: HttpRequest, queryset: QuerySet[SourceImageCollection]) -> None: - for collection in queryset: - from ami.jobs.models import DetectionClusteringJob, Job - - job = Job.objects.create( - name=f"Cluster detections for collection {collection.pk}", - project=collection.project, - source_image_collection=collection, - job_type_key=DetectionClusteringJob.key, - params={ - "ood_threshold": 0.3, - "algorithm": "agglomerative", - "algorithm_kwargs": {"distance_threshold": 80}, - "pca": {"n_components": 384}, - }, - ) - job.enqueue() - - self.message_user(request, f"Clustered {queryset.count()} collection(s).") - - actions = [update_calculated_fields, cluster_detections] - @admin.register(SourceImage) class SourceImageAdmin(AdminBase): From 0e92904fb2b08af61b6cd067345adca8de743563 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 6 May 2025 17:05:57 -0700 Subject: [PATCH 38/42] chore: move algorithm selection to dedicated function --- .../cluster_detections.py | 49 ++++++++++++++----- 1 file changed, 38 insertions(+), 11 deletions(-) diff --git a/ami/ml/clustering_algorithms/cluster_detections.py b/ami/ml/clustering_algorithms/cluster_detections.py index beb752a12..056eb3150 100644 --- a/ami/ml/clustering_algorithms/cluster_detections.py +++ b/ami/ml/clustering_algorithms/cluster_detections.py @@ -1,4 +1,5 @@ import logging +import typing import numpy as np from django.db.models import Count @@ -6,6 +7,10 @@ from ami.ml.clustering_algorithms.utils import get_clusterer +if typing.TYPE_CHECKING: + from ami.main.models import SourceImageCollection + from ami.ml.models import Algorithm + logger = logging.getLogger(__name__) @@ -20,6 +25,38 @@ def job_save(job): job.save() +def get_most_used_algorithm(collection: "SourceImageCollection", task_logger=None) -> "Algorithm | None": + from ami.main.models import Classification + from ami.ml.models import Algorithm + + task_logger = task_logger or logger + + qs = Classification.objects.filter( + features_2048__isnull=False, + detection__source_image__collections=collection, + # @TODO if we have a dedicated task type for feature extraction, we can filter by that + # task_type="feature_extraction", + ) + + # Log the number of classifications per algorithm, if debug is enabled + if task_logger.isEnabledFor(logging.DEBUG): + algorithm_stats = qs.values("algorithm__pk", "algorithm__name").annotate(count=Count("id")).order_by("-count") + task_logger.debug(f"Algorithm stats: {algorithm_stats}") + + feature_extraction_algorithm_id = ( + qs.values("algorithm") + .annotate(count=Count("id")) + .order_by("-count") + .values_list("algorithm", flat=True) + .first() + ) + if feature_extraction_algorithm_id: + algorithm = Algorithm.objects.get(pk=feature_extraction_algorithm_id) + task_logger.info(f"Using feature extraction algorithm: {algorithm.name}") + return algorithm + return None + + def cluster_detections(collection, params: dict, task_logger: logging.Logger = logger, job=None): from ami.jobs.models import JobState from ami.main.models import Classification, Detection, TaxaList, Taxon @@ -38,17 +75,7 @@ def cluster_detections(collection, params: dict, task_logger: logging.Logger = l raise ValueError(f"Invalid feature extraction algorithm key: {feature_extraction_algorithm}") else: # Fallback to the most used feature extraction algorithm in this collection - feature_extraction_algorithm_id = ( - Classification.objects.filter(features_2048__isnull=False, detection__source_image__collections=collection) - .values("algorithm") - .annotate(count=Count("id")) - .order_by("-count") - .values_list("algorithm", flat=True) - .first() - ) - if feature_extraction_algorithm_id: - feature_extraction_algorithm = Algorithm.objects.get(pk=feature_extraction_algorithm_id) - task_logger.info(f"Using fallback feature extraction algorithm: {feature_extraction_algorithm.name}") + feature_extraction_algorithm = get_most_used_algorithm(collection, task_logger=task_logger) detections = Detection.objects.filter( classifications__features_2048__isnull=False, From b26fbe09745eb925424d8f6d4668336164db8cad Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 6 May 2025 18:10:41 -0700 Subject: [PATCH 39/42] fix: update clustering tests and types --- .../clustering_algorithms/cluster_detections.py | 5 ++++- ami/ml/tests.py | 16 ++++++++++------ 2 files changed, 14 insertions(+), 7 deletions(-) diff --git a/ami/ml/clustering_algorithms/cluster_detections.py b/ami/ml/clustering_algorithms/cluster_detections.py index 056eb3150..d06f8a194 100644 --- a/ami/ml/clustering_algorithms/cluster_detections.py +++ b/ami/ml/clustering_algorithms/cluster_detections.py @@ -25,7 +25,9 @@ def job_save(job): job.save() -def get_most_used_algorithm(collection: "SourceImageCollection", task_logger=None) -> "Algorithm | None": +def get_most_used_algorithm( + collection: "SourceImageCollection", task_logger: logging.Logger | None = None +) -> "Algorithm | None": from ami.main.models import Classification from ami.ml.models import Algorithm @@ -34,6 +36,7 @@ def get_most_used_algorithm(collection: "SourceImageCollection", task_logger=Non qs = Classification.objects.filter( features_2048__isnull=False, detection__source_image__collections=collection, + algorithm__isnull=False, # @TODO if we have a dedicated task type for feature extraction, we can filter by that # task_type="feature_extraction", ) diff --git a/ami/ml/tests.py b/ami/ml/tests.py index acec00f7f..eba76a1d4 100644 --- a/ami/ml/tests.py +++ b/ami/ml/tests.py @@ -7,7 +7,7 @@ from rest_framework.test import APIRequestFactory, APITestCase from ami.base.serializers import reverse_with_params -from ami.main.models import Classification, Detection, Project, SourceImage, SourceImageCollection, Taxon +from ami.main.models import Classification, Detection, Occurrence, Project, SourceImage, SourceImageCollection, Taxon from ami.ml.clustering_algorithms.cluster_detections import cluster_detections from ami.ml.models import Algorithm, Pipeline, ProcessingService from ami.ml.models.pipeline import collect_images, get_or_create_algorithm_and_category_map, save_results @@ -731,6 +731,7 @@ def populate_collection_with_detections(self): def _populate_detection_features(self): """Populate detection features with random values.""" + classifier = Algorithm.objects.get(key="random-species-classifier") for detection in self.detections: detection.associate_new_occurrence() # Create a random feature vector @@ -738,25 +739,28 @@ def _populate_detection_features(self): # Assign the feature vector to the detection classification = Classification.objects.create( detection=detection, - algorithm=Algorithm.objects.get(key="random-species-classifier"), + algorithm=classifier, taxon=None, score=0.5, + ood_score=0.5, features_2048=feature_vector, timestamp=datetime.datetime.now(), ) - detection.classifications.add(classification) + detection.classifications.add(classification) # type: ignore assert classification.features_2048 is not None, "No features found for the detection" assert detection.occurrence is not None, "No occurrence found for the detection" - detection.occurrence.determination_ood_score = 0.5 detection.save() + # Call save once on all occurrences + for occurrence in Occurrence.objects.filter(detections__in=self.detections).distinct(): + occurrence.save() def test_agglomerative_clustering(self): """Test agglomerative clustering with real implementation.""" # Call with agglomerative clustering parameters params = { "algorithm": "agglomerative", - "ood_threshold": 1, - "feature_extraction_algorithm": None, + "ood_threshold": 0.4, + "feature_extraction_algorithm": None, # None will select most used algorithm "agglomerative": {"distance_threshold": 0.5, "linkage": "ward"}, "pca": {"n_components": 5}, # Use fewer components for test performance } From 4032aff4c8996910e321da6657ca25b18b51b23b Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 6 May 2025 18:11:24 -0700 Subject: [PATCH 40/42] chore: remove external network config in processing services --- processing_services/example/docker-compose.yml | 1 - processing_services/minimal/docker-compose.yml | 1 - 2 files changed, 2 deletions(-) diff --git a/processing_services/example/docker-compose.yml b/processing_services/example/docker-compose.yml index 714224967..44e3531f4 100644 --- a/processing_services/example/docker-compose.yml +++ b/processing_services/example/docker-compose.yml @@ -20,5 +20,4 @@ services: networks: antenna_network: - external: true name: antenna_network diff --git a/processing_services/minimal/docker-compose.yml b/processing_services/minimal/docker-compose.yml index 2bdef9c1a..a35695531 100644 --- a/processing_services/minimal/docker-compose.yml +++ b/processing_services/minimal/docker-compose.yml @@ -13,5 +13,4 @@ services: networks: antenna_network: - external: true name: antenna_network From 0f8c544a63b599bceac0a5b97e2849ba339acd78 Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 6 May 2025 18:11:32 -0700 Subject: [PATCH 41/42] feat: update GitHub workflows to run tests on other branches --- .github/workflows/test.backend.yml | 4 ++-- .github/workflows/test.frontend.yml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.backend.yml b/.github/workflows/test.backend.yml index 28ca1cf53..e6d08e2c1 100644 --- a/.github/workflows/test.backend.yml +++ b/.github/workflows/test.backend.yml @@ -7,11 +7,11 @@ env: on: pull_request: - branches: ["master", "main"] + branches: ["main", "deployments/*", "releases/*"] paths-ignore: ["docs/**", "ui/**"] push: - branches: ["master", "main"] + branches: ["main", "deployments/*", "releases/*"] paths-ignore: ["docs/**", "ui/**"] concurrency: diff --git a/.github/workflows/test.frontend.yml b/.github/workflows/test.frontend.yml index 449a565f8..e93ed7b49 100644 --- a/.github/workflows/test.frontend.yml +++ b/.github/workflows/test.frontend.yml @@ -7,13 +7,13 @@ env: on: pull_request: - branches: ["master", "main"] + branches: ["main", "deployments/*", "releases/*"] paths: - "!./**" - "ui/**" push: - branches: ["master", "main"] + branches: ["main", "deployments/*", "releases/*"] paths: - "!./**" - "ui/**" From 5fb5c431d415cfddb1f88c00ef15212ce565f83a Mon Sep 17 00:00:00 2001 From: Michael Bunsen Date: Tue, 6 May 2025 18:28:15 -0700 Subject: [PATCH 42/42] fix: hide unobserved taxa by default todo: add frontend filter to toggle this --- ami/main/api/views.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ami/main/api/views.py b/ami/main/api/views.py index 12937bbd7..88a06ffd0 100644 --- a/ami/main/api/views.py +++ b/ami/main/api/views.py @@ -1296,10 +1296,9 @@ def get_queryset(self) -> QuerySet: project = self.get_active_project() if project: - # Allow showing detail views for unobserved taxa - include_unobserved = True + include_unobserved = True # Show detail views for unobserved taxa instead of 404 if self.action == "list": - include_unobserved = self.request.query_params.get("include_unobserved", True) + include_unobserved = self.request.query_params.get("include_unobserved", False) qs = self.get_taxa_observed(qs, project, include_unobserved=include_unobserved) if self.action == "retrieve": qs = qs.prefetch_related(