From 138b98a22af93f8e7606e01ded4b1a0fca24c063 Mon Sep 17 00:00:00 2001 From: Ginger Burns Date: Tue, 30 Dec 2025 12:08:51 +0000 Subject: [PATCH 1/8] feat: allow optional sampling for iter_terms If the exact counts aren't important, we can use a random sample to get a decent estimate of fields and their usage but significantly faster. The default leaves the behaviour as before (no sampling). --- splitgill/utils.py | 60 +++++++++++++++++++++++++++++++++++++--------- 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/splitgill/utils.py b/splitgill/utils.py index ccfde05..c7f368a 100644 --- a/splitgill/utils.py +++ b/splitgill/utils.py @@ -2,7 +2,7 @@ from datetime import date, datetime, timezone from itertools import islice from time import time -from typing import Iterable, Union +from typing import Iterable, Optional, Union from cytoolz import get_in from elasticsearch_dsl import A, Search @@ -84,7 +84,13 @@ class Term: count: int -def iter_terms(search: Search, field: str, chunk_size: int = 50) -> Iterable[Term]: +def iter_terms( + search: Search, + field: str, + chunk_size: int = 50, + sample_probability: float = 1.0, + seed: Optional[int] = None, +) -> Iterable[Term]: """ Yields Term objects, each representing a value and the number of documents which contain that value in the given field. The Terms are yielded in descending order of @@ -93,6 +99,10 @@ def iter_terms(search: Search, field: str, chunk_size: int = 50) -> Iterable[Ter :param search: a Search instance to use to run the aggregation :param field: the name of the field to get the terms for :param chunk_size: the number of buckets to retrieve per request + :param sample_probability: the probability that a given record will be included in a + random sample; set to 1 to use all records (default 1) + :param seed: sets the seed manually (if None or not set, defaults to current date + timestamp / 3600) :return: yields Term objects """ after = None @@ -101,19 +111,47 @@ def iter_terms(search: Search, field: str, chunk_size: int = 50) -> Iterable[Ter # when we don't need them, and it ensures we get a fresh copy of the # search to work with agg_search = search[:0] - agg_search.aggs.bucket( - 'values', - 'composite', - size=chunk_size, - sources={'value': A('terms', field=field)}, + + # this is the core aggregation + composite_agg = A( + 'composite', size=chunk_size, sources={'value': A('terms', field=field)} ) - if after is not None: - agg_search.aggs['values'].after = after + result_keys = ['values', 'buckets'] + after_keys = ['values', 'after_key'] + + if sample_probability < 1: + # this should stay relatively constant for caching purposes, but we can + # change it once a day. + # divide it by 3600 just to make it fit under the ES seed max (2147483647) + # for longer - otherwise this stops working in 2038. The actual number isn't + # important. + seed = ( + seed + if seed is not None + else int( + datetime.now() + .replace(hour=0, minute=0, second=0, microsecond=0) + .timestamp() + / 3600 + ) + ) + # if we're sampling, the core agg gets nested underneath the sampler + agg_search.aggs.bucket( + 'sampling', 'random_sampler', probability=sample_probability, seed=seed + ).bucket('values', composite_agg) + if after is not None: + agg_search.aggs['sampling'].aggs['values'].after = after + result_keys = ['sampling'] + result_keys + after_keys = ['sampling'] + after_keys + else: + agg_search.aggs.bucket('values', composite_agg) + if after is not None: + agg_search.aggs['values'].after = after result = agg_search.execute().aggs.to_dict() - buckets = get_in(('values', 'buckets'), result, []) - after = get_in(('values', 'after_key'), result, None) + buckets = get_in(result_keys, result, []) + after = get_in(after_keys, result, None) if not buckets: break else: From eb1e5debeec1dfa560835ea2bd5994a040503c39 Mon Sep 17 00:00:00 2001 From: Ginger Burns Date: Tue, 30 Dec 2025 10:34:20 +0000 Subject: [PATCH 2/8] feat: allow passing kwargs to iter_terms via get fields methods --- splitgill/manager.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/splitgill/manager.py b/splitgill/manager.py index 7e0a418..696a534 100644 --- a/splitgill/manager.py +++ b/splitgill/manager.py @@ -637,7 +637,10 @@ def get_versions(self) -> List[int]: return sorted(self.get_version_changed_counts().keys()) def get_data_fields( - self, version: Optional[int] = None, query: Optional[Query] = None + self, + version: Optional[int] = None, + query: Optional[Query] = None, + **iter_terms_kwargs, ) -> List[DataField]: """ Retrieves the available data fields for this database, optionally at the given @@ -647,6 +650,8 @@ def get_data_fields( searched :param query: the query to filter records with before finding the data fields, if None, all record data is considered + :param iter_terms_kwargs: kwargs passed directly to iter_terms (e.g. chunk_size, + sample_probability) :return: a list of DataField objects with the most frequent field first """ search = self.search(version if version is not None else SearchVersion.latest) @@ -656,7 +661,7 @@ def get_data_fields( fields: Dict[str, DataField] = {} # create the basic field objects and add type counts - for term in iter_terms(search, DocumentField.DATA_TYPES): + for term in iter_terms(search, DocumentField.DATA_TYPES, **iter_terms_kwargs): path, raw_types = term.value.rsplit('.', 1) if path not in fields: fields[path] = DataField(path) @@ -688,7 +693,10 @@ def get_data_fields( return data_fields def get_parsed_fields( - self, version: Optional[int] = None, query: Optional[Query] = None + self, + version: Optional[int] = None, + query: Optional[Query] = None, + **iter_terms_kwargs, ) -> List[ParsedField]: """ Retrieves the available parsed fields for this database, optionally at the given @@ -698,6 +706,8 @@ def get_parsed_fields( is searched :param query: the query to filter records with before finding the parsed fields, if None, all record data is considered + :param iter_terms_kwargs: kwargs passed directly to iter_terms (e.g. chunk_size, + sample_probability) :return: a list of ParsedField objects with the most frequent field first """ search = self.search(version if version is not None else SearchVersion.latest) @@ -707,7 +717,7 @@ def get_parsed_fields( fields: Dict[str, ParsedField] = {} # create the basic field objects and add type counts - for term in iter_terms(search, DocumentField.PARSED_TYPES): + for term in iter_terms(search, DocumentField.PARSED_TYPES, **iter_terms_kwargs): path, raw_types = term.value.rsplit('.', 1) if path not in fields: fields[path] = ParsedField(path) From ffc5160cf378d297fa861cd065c2bebb32c68864 Mon Sep 17 00:00:00 2001 From: Ginger Burns Date: Tue, 30 Dec 2025 12:13:20 +0000 Subject: [PATCH 3/8] test: add test for iter_terms sampling Tests field ranking over a larger number of random records. --- tests/test_manager.py | 50 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/tests/test_manager.py b/tests/test_manager.py index 5289c01..01ef853 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1,6 +1,8 @@ +import random import time from collections import Counter from datetime import datetime, timezone +from operator import itemgetter from typing import List from unittest.mock import MagicMock, patch @@ -1270,6 +1272,54 @@ def test_hierarchy(self, database: SplitgillDatabase): check_data_fields(data_fields[10].children, [f__h_i]) assert all(field.parent.path == f__h.path for field in data_fields[9].children) + def test_random_sample(self, database: SplitgillDatabase): + # I can't get consistent records returned with this even with seeds and + # explicitly set versions and record IDs, so unfortunately there's a small + # element of randomness to this one + random.seed(1) + records = [ + Record.new({field_name: 1}) + for field_name in random.choices( + ['a', 'b', 'c', 'd'], weights=[1, 3, 2, 4], k=1000 + ) + ] + database.ingest(records, commit=True) + database.sync() + + field_counts = sorted( + Counter( + (['_id'] * 1000) + [list(r.data.keys())[0] for r in records] + ).items(), + key=itemgetter(1), + reverse=True, + ) + + data_fields = database.get_data_fields(sample_probability=0.5, seed=1) + assert len(data_fields) == 5 + id_field = next(f for f in data_fields if f.path == '_id') + assert 900 < id_field.count < 1100 + exact_counts = 0 + for data_field, field in zip(data_fields, field_counts): + field_name, field_count = field + assert data_field.path == field_name + if data_field.count == field_count: + exact_counts += 1 + # this is *technically* possible but very unlikely unless it's not sampling + assert exact_counts != len(data_fields) + + parsed_fields = database.get_parsed_fields(sample_probability=0.5, seed=1) + assert len(parsed_fields) == 5 + id_field = next(f for f in parsed_fields if f.path == '_id') + assert 900 < id_field.count < 1100 + exact_counts = 0 + for parsed_field, field in zip(parsed_fields, field_counts): + field_name, field_count = field + assert parsed_field.path == field_name + if parsed_field.count == field_count: + exact_counts += 1 + # this is *technically* possible but very unlikely unless it's not sampling + assert exact_counts != len(parsed_fields) + def test_get_rounded_version(splitgill: SplitgillClient): database = splitgill.get_database('test') From b8fad4f052aecedff49dd9d63a0546c12212d3d9 Mon Sep 17 00:00:00 2001 From: Ginger Burns Date: Tue, 30 Dec 2025 12:40:10 +0000 Subject: [PATCH 4/8] chore: remove docker version --- docker-compose.yml | 2 -- 1 file changed, 2 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index e421cac..fe6b5fd 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,5 +1,3 @@ -version: "3" - services: # runs the tests test: From a75c550ff300c0ea89da9a80bb3340415b21039f Mon Sep 17 00:00:00 2001 From: Ginger Burns Date: Tue, 30 Dec 2025 12:41:28 +0000 Subject: [PATCH 5/8] ci: mount source folder to test volume Avoids having to constantly rebuild containers when developing and running tests repeatedly. --- docker-compose.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index fe6b5fd..febaf9f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,6 +10,8 @@ services: depends_on: - es - mongo + volumes: + - '.:/usr/src/app' # serves the docs locally with realtime auto-reloading docs: From eb237fd02f22dbc8904a2e48cb56f06d767eb60a Mon Sep 17 00:00:00 2001 From: Ginger Burns Date: Tue, 30 Dec 2025 13:51:54 +0000 Subject: [PATCH 6/8] feat: add method to get field names on latest index Useful for very quickly retrieving a basic list of all fields from the latest index. Returns field type information, but type counts are all set to 1. --- splitgill/manager.py | 26 +++++++++++++++++++++++++- tests/test_manager.py | 26 ++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 1 deletion(-) diff --git a/splitgill/manager.py b/splitgill/manager.py index 696a534..fd6ec63 100644 --- a/splitgill/manager.py +++ b/splitgill/manager.py @@ -4,7 +4,7 @@ from cytoolz.dicttoolz import get_in from elasticsearch import Elasticsearch -from elasticsearch_dsl import Search +from elasticsearch_dsl import Index, Search from elasticsearch_dsl.query import Query from pymongo import ASCENDING, DESCENDING, IndexModel, MongoClient from pymongo.collection import Collection @@ -735,3 +735,27 @@ def get_parsed_fields( # descending frequency (so most frequent fields first) parsed_fields.sort(key=lambda f: f.count, reverse=True) return parsed_fields + + def get_field_names(self) -> List[str]: + """ + Retrieves a list of field names from the latest index mapping. + + Does not take any version or query parameters; simply returns all the "data." + fields available on the index, along with their available types. All relevant + type counts are set to 1 to enable use of e.g. .is_text(). Use get_data_fields + or get_parsed_fields if you need accurate counts, or to filter by version or + query. + """ + latest_index = Index(self.indices.latest, using=self._client.elasticsearch) + mapping = latest_index.get_mapping() + parsed_fields = [] + for field_path, field_props in get_in( + [self.indices.latest, 'mappings', 'properties', 'data', 'properties'], + mapping.body, + default={}, + ).items(): + parsed_field = ParsedField(field_path) + for type_name in field_props['properties'].keys(): + parsed_field.add(type_name, 1) + parsed_fields.append(parsed_field) + return parsed_fields diff --git a/tests/test_manager.py b/tests/test_manager.py index 01ef853..2357486 100644 --- a/tests/test_manager.py +++ b/tests/test_manager.py @@ -1476,3 +1476,29 @@ def test_resync_arcs(splitgill: SplitgillClient): assert count == 2400 assert r_5_count == 3 assert r_780_count == 2 + + +def test_get_field_names(splitgill: SplitgillClient): + database = splitgill.get_database('test') + records = [ + Record.new({'a': 1}), + Record.new({'a': 2}), + Record.new({'b': 3}), + Record.new({'b': 'x'}), + Record.new({'b': 5}), + Record.new({'c': 'y'}), + ] + database.ingest(records, commit=True) + database.sync() + + field_names = database.get_field_names() + assert len(field_names) == 4 + expected_fields = [ + pf('_id', 3, t=1), + pf('a', 4, t=1, n=1), + pf('b', 4, t=1, n=1), + pf('c', 3, t=1), + ] + for f in expected_fields: + f.type_counts[ParsedType.UNPARSED] = 1 + assert field_names == expected_fields From 35fdac14956279964c49325f4061dd3a8326a9cd Mon Sep 17 00:00:00 2001 From: Ginger Burns Date: Tue, 30 Dec 2025 15:13:07 +0000 Subject: [PATCH 7/8] fix: remove search and field from iter_terms kwargs --- splitgill/manager.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/splitgill/manager.py b/splitgill/manager.py index fd6ec63..f45a8cd 100644 --- a/splitgill/manager.py +++ b/splitgill/manager.py @@ -661,7 +661,11 @@ def get_data_fields( fields: Dict[str, DataField] = {} # create the basic field objects and add type counts - for term in iter_terms(search, DocumentField.DATA_TYPES, **iter_terms_kwargs): + for term in iter_terms( + search, + DocumentField.DATA_TYPES, + **{k: v for k, v in iter_terms_kwargs if k not in ['search', 'field']}, + ): path, raw_types = term.value.rsplit('.', 1) if path not in fields: fields[path] = DataField(path) @@ -717,7 +721,11 @@ def get_parsed_fields( fields: Dict[str, ParsedField] = {} # create the basic field objects and add type counts - for term in iter_terms(search, DocumentField.PARSED_TYPES, **iter_terms_kwargs): + for term in iter_terms( + search, + DocumentField.PARSED_TYPES, + **{k: v for k, v in iter_terms_kwargs if k not in ['search', 'field']}, + ): path, raw_types = term.value.rsplit('.', 1) if path not in fields: fields[path] = ParsedField(path) From 46ed3fbf8e0774a8345462de73688e4c5806418e Mon Sep 17 00:00:00 2001 From: Ginger Burns Date: Tue, 30 Dec 2025 15:22:47 +0000 Subject: [PATCH 8/8] fix: iterate on dict items --- splitgill/manager.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/splitgill/manager.py b/splitgill/manager.py index f45a8cd..cfda998 100644 --- a/splitgill/manager.py +++ b/splitgill/manager.py @@ -664,7 +664,11 @@ def get_data_fields( for term in iter_terms( search, DocumentField.DATA_TYPES, - **{k: v for k, v in iter_terms_kwargs if k not in ['search', 'field']}, + **{ + k: v + for k, v in iter_terms_kwargs.items() + if k not in ['search', 'field'] + }, ): path, raw_types = term.value.rsplit('.', 1) if path not in fields: @@ -724,7 +728,11 @@ def get_parsed_fields( for term in iter_terms( search, DocumentField.PARSED_TYPES, - **{k: v for k, v in iter_terms_kwargs if k not in ['search', 'field']}, + **{ + k: v + for k, v in iter_terms_kwargs.items() + if k not in ['search', 'field'] + }, ): path, raw_types = term.value.rsplit('.', 1) if path not in fields: