diff --git a/python/tests/detail/distance_helper.py b/python/tests/detail/distance_helper.py index cf2815cfd..cda79cfb8 100644 --- a/python/tests/detail/distance_helper.py +++ b/python/tests/detail/distance_helper.py @@ -15,7 +15,7 @@ from typing import Dict -def is_float_equal(actual, expected, rel_tol=1e-5, abs_tol=1e-8): +def is_float_equal(actual, expected, rel_tol=1e-3, abs_tol=1e-5): if actual is None and expected is None: return True return math.isclose(actual, expected, rel_tol=rel_tol, abs_tol=abs_tol) @@ -63,6 +63,7 @@ def cosine_distance_dense( ): if dtype == DataType.VECTOR_FP16 or quantize_type == QuantizeType.FP16: # More stable conversion to float16 to avoid numerical issues + # Convert to numpy float16 and back to float for consistent handling vec1 = [float(np.float16(a)) for a in vec1] vec2 = [float(np.float16(b)) for b in vec2] elif dtype == DataType.VECTOR_INT8: @@ -74,10 +75,16 @@ def cosine_distance_dense( int(round(min(max(val, -128), 127))) for val in vec2 ] # Clamp to valid INT8 range - dot_product = sum(a * b for a, b in zip(vec1, vec2)) - - magnitude1 = math.sqrt(sum(a * a for a in vec1)) - magnitude2 = math.sqrt(sum(b * b for b in vec2)) + # Calculate dot product and magnitudes with higher precision for FP16 + if dtype == DataType.VECTOR_FP16 or quantize_type == QuantizeType.FP16: + # Use more precise calculation for FP16 to handle precision issues + dot_product = sum(np.float32(a) * np.float32(b) for a, b in zip(vec1, vec2)) + magnitude1 = math.sqrt(sum(np.float32(a) * np.float32(a) for a in vec1)) + magnitude2 = math.sqrt(sum(np.float32(b) * np.float32(b) for b in vec2)) + else: + dot_product = sum(a * b for a, b in zip(vec1, vec2)) + magnitude1 = math.sqrt(sum(a * a for a in vec1)) + magnitude2 = math.sqrt(sum(b * b for b in vec2)) if magnitude1 == 0 or magnitude2 == 0: return 1.0 # Zero vector case - maximum distance @@ -112,6 +119,7 @@ def dp_distance_dense( ): if dtype == DataType.VECTOR_FP16 or quantize_type == QuantizeType.FP16: # More stable computation to avoid numerical issues + # Convert to numpy float16 and back to float for consistent handling products = [ float(np.float16(a)) * float(np.float16(b)) for a, b in zip(vec1, vec2) ] @@ -319,22 +327,25 @@ def is_field_equal(field1, field2, schema: FieldSchema) -> bool: def is_vector_equal(vec1, vec2, schema: VectorSchema) -> bool: - if ( - schema.data_type == DataType.SPARSE_VECTOR_FP16 - or schema.data_type == DataType.VECTOR_FP16 - ): - # skip fp16 vector equal - return True - is_sparse = ( schema.data_type == DataType.SPARSE_VECTOR_FP32 or schema.data_type == DataType.SPARSE_VECTOR_FP16 ) if is_sparse: - return is_sparse_vector_equal(vec1, vec2) + # For SPARSE_VECTOR_FP16, use higher tolerance + if schema.data_type == DataType.SPARSE_VECTOR_FP16: + return is_sparse_vector_equal(vec1, vec2, rtol=1e-2, atol=1e-2) + else: + return is_sparse_vector_equal(vec1, vec2) else: - return is_dense_vector_equal(vec1, vec2) + # For FP16 and INT8 vectors, use appropriate tolerance for comparison + if schema.data_type == DataType.VECTOR_FP16: + return is_dense_vector_equal(vec1, vec2, rtol=1e-2, atol=1e-2) + elif schema.data_type == DataType.VECTOR_INT8: + return is_dense_vector_equal(vec1, vec2, rtol=1e-1, atol=1e-1) + else: + return is_dense_vector_equal(vec1, vec2) def is_doc_equal( diff --git a/python/tests/detail/params_helper.py b/python/tests/detail/params_helper.py index e373005e0..998878674 100644 --- a/python/tests/detail/params_helper.py +++ b/python/tests/detail/params_helper.py @@ -36,9 +36,9 @@ quantize_type=QuantizeType.FP16, ), FlatIndexParam(), - FlatIndexParam(metric_type=MetricType.IP, quantize_type=QuantizeType.INT4), - FlatIndexParam(metric_type=MetricType.L2, quantize_type=QuantizeType.INT8), - FlatIndexParam(metric_type=MetricType.COSINE, quantize_type=QuantizeType.FP16), + FlatIndexParam(metric_type=MetricType.IP), + FlatIndexParam(metric_type=MetricType.L2), + FlatIndexParam(metric_type=MetricType.COSINE), IVFIndexParam(), IVFIndexParam( metric_type=MetricType.IP, @@ -64,13 +64,26 @@ ], DataType.VECTOR_FP16: [ HnswIndexParam(), + HnswIndexParam(metric_type=MetricType.IP, m=16, ef_construction=100), + HnswIndexParam(metric_type=MetricType.COSINE, m=24, ef_construction=150), + HnswIndexParam(metric_type=MetricType.L2, m=32, ef_construction=200), FlatIndexParam(), - # IVFIndexParam(), + FlatIndexParam(metric_type=MetricType.IP), + FlatIndexParam(metric_type=MetricType.L2), + FlatIndexParam(metric_type=MetricType.COSINE), + IVFIndexParam(), + IVFIndexParam(metric_type=MetricType.IP), + IVFIndexParam(metric_type=MetricType.L2), + IVFIndexParam(metric_type=MetricType.COSINE), ], DataType.VECTOR_INT8: [ HnswIndexParam(), FlatIndexParam(), - # IVFIndexParam(), + HnswIndexParam(metric_type=MetricType.IP, m=16, ef_construction=100), + FlatIndexParam(metric_type=MetricType.IP), + IVFIndexParam(metric_type=MetricType.L2), + HnswIndexParam(metric_type=MetricType.L2), + FlatIndexParam(metric_type=MetricType.L2), ], DataType.SPARSE_VECTOR_FP32: [ HnswIndexParam(), @@ -108,6 +121,10 @@ ], DataType.VECTOR_INT8: [ InvertIndexParam(), + IVFIndexParam(metric_type=MetricType.IP), + FlatIndexParam(metric_type=MetricType.COSINE), + IVFIndexParam(metric_type=MetricType.COSINE), + HnswIndexParam(metric_type=MetricType.COSINE), ], DataType.SPARSE_VECTOR_FP32: [ HnswIndexParam(metric_type=MetricType.L2), diff --git a/python/tests/detail/test_collection_ddl.py b/python/tests/detail/test_collection_ddl.py index 6fba8cb2e..9b8dc4b83 100644 --- a/python/tests/detail/test_collection_ddl.py +++ b/python/tests/detail/test_collection_ddl.py @@ -16,6 +16,128 @@ from fixture_helper import * from doc_helper import * from params_helper import * +import threading, time + +indextest_collection_schema = zvec.CollectionSchema( + name="test_collection", + fields=[ + FieldSchema( + "id", + DataType.INT64, + nullable=False, + index_param=InvertIndexParam(enable_range_optimization=True), + ), + FieldSchema( + "name", + DataType.STRING, + nullable=False, + index_param=InvertIndexParam(), + ), + ], + vectors=[ + VectorSchema( + "vector_fp32_field", + DataType.VECTOR_FP32, + dimension=128, + index_param=HnswIndexParam(), + ), + VectorSchema( + "vector_fp16_field", + DataType.VECTOR_FP16, + dimension=128, + index_param=HnswIndexParam(), + ), + VectorSchema( + "vector_int8_field", + DataType.VECTOR_INT8, + dimension=128, + index_param=HnswIndexParam(), + ), + VectorSchema( + "sparse_vector_fp32_field", + DataType.SPARSE_VECTOR_FP32, + dimension=128, + index_param=HnswIndexParam(), + ), + VectorSchema( + "sparse_vector_fp16_field", + DataType.SPARSE_VECTOR_FP16, + dimension=128, + index_param=HnswIndexParam(), + ), + ], +) +columntest_collection_schema = zvec.CollectionSchema( + name="test_collection", + fields=[ + FieldSchema( + "id", + DataType.INT64, + nullable=False, + index_param=InvertIndexParam(enable_range_optimization=True), + ), + FieldSchema( + "name", + DataType.STRING, + nullable=False, + index_param=InvertIndexParam(), + ), + ], + vectors=[ + VectorSchema( + "dense_fp32_field", + DataType.VECTOR_FP32, + dimension=128, + index_param=HnswIndexParam(), + ), + VectorSchema( + "sparse_fp32_field", + DataType.SPARSE_VECTOR_FP32, + dimension=128, + index_param=HnswIndexParam(), + ), + ], +) + + +def batchdoc_and_check(collection: Collection, multiple_docs, operator="insert"): + if operator == "insert": + result = collection.insert(multiple_docs) + elif operator == "upsert": + result = collection.upsert(multiple_docs) + + elif operator == "update": + result = collection.update(multiple_docs) + else: + logging.error("operator value is error!") + + assert len(result) == len(multiple_docs) + for item in result: + assert item.ok(), ( + f"result={result},Insert operation failed with code {item.code()}" + ) + + stats = collection.stats + assert stats is not None, "Collection stats should not be None" + + doc_ids = [doc.id for doc in multiple_docs] + fetched_docs = collection.fetch(doc_ids) + assert len(fetched_docs) == len(multiple_docs), ( + f"fetched_docs={fetched_docs},Expected {len(multiple_docs)} fetched documents, but got {len(fetched_docs)}" + ) + + for original_doc in multiple_docs: + assert original_doc.id in fetched_docs, ( + f"Expected document ID {original_doc.id} in fetched documents" + ) + fetched_doc = fetched_docs[original_doc.id] + + assert is_doc_equal(fetched_doc, original_doc, collection.schema) + + assert hasattr(fetched_doc, "score"), "Document should have a score attribute" + assert fetched_doc.score == 0.0, ( + "Fetch operation should return default score of 0.0" + ) class TestDDL: @@ -63,25 +185,877 @@ def test_collection_flush(self, basic_collection: Collection): assert "1" in fetched_docs assert fetched_docs["1"].id == "1" + def test_collection_flush_with_reopen(self, tmp_path_factory): + # Create collection + temp_dir = tmp_path_factory.mktemp("zvec") + collection_path = temp_dir / "test_collection" + + collection_option = CollectionOption(read_only=False, enable_mmap=True) + # Create and open collection + coll1 = zvec.create_and_open( + path=str(collection_path), + schema=columntest_collection_schema, + option=collection_option, + ) + assert coll1 is not None, "Failed to create and open collection" + + # Insert some data + doc1 = Doc( + id="1", + fields={"id": 1, "name": "test1"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {1: 1.0, 2: 2.0}, + }, + ) + + result = coll1.insert(doc1) + assert result.ok() + + coll1.flush() + + fetched_docs = coll1.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + + +class TestOptimize: + def test_optimize(self, full_collection_new: Collection): + docs = [generate_doc(i, full_collection_new.schema) for i in range(10)] + + result = full_collection_new.insert(docs) + assert bool(result) + for item in result: + assert item.ok() + + stats = full_collection_new.stats + assert stats is not None + assert stats.doc_count == 10 + + full_collection_new.optimize(option=OptimizeOption()) + + fetched_docs = full_collection_new.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + + def test_optimize_with_reopen(self, tmp_path_factory): + # Create collection + temp_dir = tmp_path_factory.mktemp("zvec") + collection_path = temp_dir / "test_collection" + + collection_option = CollectionOption(read_only=False, enable_mmap=True) + # Create and open collection + coll1 = zvec.create_and_open( + path=str(collection_path), + schema=columntest_collection_schema, + option=collection_option, + ) + assert coll1 is not None, "Failed to create and open collection" + + # Insert some data + doc1 = Doc( + id="1", + fields={"id": 1, "name": "test1"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {1: 1.0, 2: 2.0}, + }, + ) + + result = coll1.insert(doc1) + assert result.ok() + + coll1.optimize(option=OptimizeOption()) + + fetched_docs = coll1.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + + @pytest.mark.parametrize("concurrency", [0, 1, 4, 8]) + @pytest.mark.parametrize( + "full_schema_new", + [ + (True, True, HnswIndexParam()), + (False, True, IVFIndexParam()), + (False, True, FlatIndexParam()), + ( + True, + True, + HnswIndexParam( + metric_type=MetricType.IP, + m=16, + ef_construction=100, + ), + ), + ( + True, + True, + HnswIndexParam( + metric_type=MetricType.COSINE, + m=24, + ef_construction=150, + ), + ), + ( + True, + True, + HnswIndexParam( + metric_type=MetricType.L2, + m=32, + ef_construction=200, + ), + ), + ( + False, + True, + FlatIndexParam( + metric_type=MetricType.IP, + ), + ), + ( + True, + True, + FlatIndexParam( + metric_type=MetricType.COSINE, + ), + ), + ( + True, + True, + FlatIndexParam( + metric_type=MetricType.L2, + ), + ), + ( + True, + True, + IVFIndexParam( + metric_type=MetricType.IP, + n_list=100, + n_iters=10, + use_soar=False, + ), + ), + ( + True, + True, + IVFIndexParam( + metric_type=MetricType.L2, + n_list=200, + n_iters=20, + use_soar=True, + ), + ), + ( + True, + True, + IVFIndexParam( + metric_type=MetricType.COSINE, + n_list=150, + n_iters=15, + use_soar=False, + ), + ), + ], + indirect=True, + ) + @pytest.mark.parametrize("doc_num", [500]) + def test_optimize_with_valid_concurrency_values( + self, + full_collection_new: Collection, + full_schema_new, + doc_num, + concurrency, + queries=None, + ): + """Test valid values for concurrency parameter""" + """ + Verify index consistency before and after optimization + + Args: + collection: zvec collection object + queries: Optional query list, use default queries if not provided + """ + multiple_docs = [ + generate_doc_recall(i, full_collection_new.schema) for i in range(doc_num) + ] + + for i in range(10): + batchdoc_and_check( + full_collection_new, + multiple_docs[i * 1000 : 1000 * (i + 1)], + operator="insert", + ) + + stats = full_collection_new.stats + assert stats.doc_count == len(multiple_docs) + # Build some default queries if none provided + if queries is None: + queries = [] + + # Get schema info to build appropriate queries + schema = full_collection_new + + # Build queries for each scalar field + for field in full_schema_new.fields: + if field.data_type == DataType.STRING: + queries.append({"filter": f"{field.name} >= ''", "topk": 10}) + elif field.data_type in [ + DataType.INT32, + DataType.INT64, + DataType.UINT32, + DataType.UINT64, + ]: + queries.append({"filter": f"{field.name} >= 0", "topk": 10}) + elif field.data_type in [DataType.FLOAT, DataType.DOUBLE]: + queries.append({"filter": f"{field.name} >= 0.0", "topk": 10}) + elif field.data_type == DataType.BOOL: + queries.append({"filter": f"{field.name} = true", "topk": 10}) + + # Build queries for each vector field + for vector in full_schema_new.vectors: + # Build random query vectors + import numpy as np + + if vector.data_type == DataType.VECTOR_FP32: + query_vector = np.random.random(vector.dimension).tolist() + elif vector.data_type == DataType.VECTOR_FP16: + query_vector = np.random.random(vector.dimension).tolist() + elif vector.data_type in [ + DataType.SPARSE_VECTOR_FP32, + DataType.SPARSE_VECTOR_FP16, + ]: + query_vector = { + i: float(np.random.random()) + for i in range(min(10, vector.dimension)) + } + else: + continue + + queries.append( + { + "vector_query": { + "field_name": vector.name, + "vector": query_vector, + }, + "topk": 10, + } + ) + + # Store query results before optimization + results_before_optimize = [] + + print("Executing queries before optimization...") + for i, query in enumerate(queries): + if "vector_query" in query: + result = full_collection_new.query( + VectorQuery( + field_name=query["vector_query"]["field_name"], + vector=query["vector_query"]["vector"], + ), + topk=query["topk"], + ) + else: + result = full_collection_new.query( + filter=query["filter"], topk=query["topk"] + ) + + results_before_optimize.append( + { + "query": query, + "result_count": len(result), + "result_ids": set(doc.id for doc in result), + "result_scores": [doc.score for doc in result], + } + ) + print(f"Query {i + 1}: Found {len(result)} results") + + # Record statistics before optimization + stats_before = full_collection_new.stats + print(f"Documents before optimization: {stats_before.doc_count}") + print( + f"Index completeness before optimization: {stats_before.index_completeness}" + ) + + # Execute optimization + print("Executing optimization...") + # Use valid concurrency values for optimization + full_collection_new.optimize(option=OptimizeOption(concurrency=concurrency)) + + stats = full_collection_new.stats + assert stats.doc_count == len(multiple_docs) + + for i in range(doc_num): + fetched_docs = full_collection_new.fetch([str(i)]) + assert str(i) in fetched_docs + assert fetched_docs[str(i)].id == str(i) + + # Store query results after optimization + results_after_optimize = [] + + print("Executing queries after optimization...") + for i, query in enumerate(queries): + if "vector_query" in query: + result = full_collection_new.query( + VectorQuery( + field_name=query["vector_query"]["field_name"], + vector=query["vector_query"]["vector"], + ), + topk=query["topk"], + ) + else: + result = full_collection_new.query( + filter=query["filter"], topk=query["topk"] + ) + + results_after_optimize.append( + { + "query": query, + "result_count": len(result), + "result_ids": set(doc.id for doc in result), + "result_scores": [doc.score for doc in result], + } + ) + print(f"Query {i + 1}: Found {len(result)} results") + + # Record statistics after optimization + stats_after = full_collection_new.stats + print(f"Documents after optimization: {stats_after.doc_count}") + print( + f"Index completeness after optimization: {stats_after.index_completeness}" + ) + + # Verify consistency + print("\nVerifying index consistency before and after optimization...") + all_consistent = True + + for i, (before, after) in enumerate( + zip(results_before_optimize, results_after_optimize) + ): + query_info = before["query"] + + # Check if result counts are consistent + if before["result_count"] != after["result_count"]: + print( + f"Query {i + 1} result count inconsistent: before {before['result_count']}, after {after['result_count']}" + ) + all_consistent = False + continue + + # Check if result ID sets are consistent + if before["result_ids"] != after["result_ids"]: + print(f"Query {i + 1} result ID set inconsistent") + print(f" Before IDs: {sorted(list(before['result_ids']))}") + print(f" After IDs: {sorted(list(after['result_ids']))}") + all_consistent = False + continue + + # Check if scores are basically consistent (allowing minor differences) + import math + + scores_match = True + for b_score, a_score in zip( + before["result_scores"], after["result_scores"] + ): + if not math.isclose(b_score, a_score, rel_tol=1e-2): + scores_match = False + break + + if not scores_match: + print(f"Query {i + 1} result scores inconsistent") + all_consistent = False + continue + + print(f"Query {i + 1}: Consistent") + + # Verify statistics + if stats_before.doc_count != stats_after.doc_count: + print( + f"Document count inconsistent: before {stats_before.doc_count}, after {stats_after.doc_count}" + ) + all_consistent = False + + if all_consistent: + print( + "\n✓ All verifications passed, indexes remain consistent before and after optimization" + ) + else: + print("\n✗ Inconsistencies found, please check index status") + + assert all_consistent == True + + @pytest.mark.parametrize( + "concurrency", + [ + # -1, -5, # Negative values + 1.5, + 2.7, # Float values + "2", + "8", + "auto", # String values + # True, False # Boolean values + ], + ) + def test_optimize_with_invalid_concurrency_values( + self, full_collection_new: Collection, concurrency + ): + """Test various invalid values for concurrency parameter""" + docs = [generate_doc(i, full_collection_new.schema) for i in range(10)] + + result = full_collection_new.insert(docs) + assert bool(result) + for item in result: + assert item.ok() + + stats = full_collection_new.stats + assert stats is not None + assert stats.doc_count == 10 + + # Using invalid concurrency values should raise an exception + with pytest.raises(Exception) as exc_info: + full_collection_new.optimize(option=OptimizeOption(concurrency=concurrency)) + + # Depending on the implementation, there may be different error messages + assert any( + msg in str(exc_info.value) + for msg in ["invalid", "concurrency", "parameter", "value", "type"] + ) + + def test_optimize_with_none_concurrency_value( + self, full_collection_new: Collection + ): + """Test concurrency parameter with None value (invalid value)""" + docs = [generate_doc(i, full_collection_new.schema) for i in range(10)] + + result = full_collection_new.insert(docs) + assert bool(result) + for item in result: + assert item.ok() + + stats = full_collection_new.stats + assert stats is not None + assert stats.doc_count == 10 + + # Using None as concurrency value should raise an exception + with pytest.raises(Exception) as exc_info: + full_collection_new.optimize(option=OptimizeOption(concurrency=None)) + + assert any( + msg in str(exc_info.value) + for msg in ["invalid", "concurrency", "parameter", "value"] + ) + + @pytest.mark.parametrize( + "concurrency", [999999, 1000000] + ) # Assume these are too large values + def test_optimize_with_too_large_concurrency_values( + self, full_collection_new: Collection, concurrency + ): + """Test too large values for concurrency parameter""" + docs = [generate_doc(i, full_collection_new.schema) for i in range(10)] + + result = full_collection_new.insert(docs) + assert bool(result) + for item in result: + assert item.ok() + + stats = full_collection_new.stats + assert stats is not None + assert stats.doc_count == 10 + + # Using too large concurrency values may not raise an exception, but will try to use the maximum available threads + # Or may raise an exception in some implementations + try: + full_collection_new.optimize(option=OptimizeOption(concurrency=concurrency)) + + # Verify data is still accessible after optimization + fetched_docs = full_collection_new.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + except Exception as e: + # If an exception is raised, ensure it's a reasonable error message + assert any( + msg in str(e) + for msg in [ + "invalid", + "concurrency", + "thread", + "parameter", + "value", + "exceeds", + ] + ) + + def test_optimize_in_read_only_mode(self, tmp_path_factory): + collection_schema = zvec.CollectionSchema( + name="test_collection", + fields=[ + FieldSchema( + "id", + DataType.INT64, + nullable=False, + index_param=InvertIndexParam(enable_range_optimization=True), + ), + FieldSchema( + "name", + DataType.STRING, + nullable=False, + index_param=InvertIndexParam(), + ), + ], + vectors=[ + VectorSchema( + "dense", + DataType.VECTOR_FP32, + dimension=128, + index_param=HnswIndexParam(), + ) + ], + ) + collection_option = CollectionOption(read_only=False, enable_mmap=True) + + temp_dir = tmp_path_factory.mktemp("zvec") + collection_path = temp_dir / "test_collection" + + coll1 = zvec.create_and_open( + path=str(collection_path), + schema=collection_schema, + option=collection_option, + ) + + assert coll1 is not None, "Failed to create and open collection" + doc = Doc( + id="1", + fields={"id": 1, "name": "test"}, + vectors={"dense": np.random.random(128).tolist()}, + ) + result = coll1.insert(doc) + assert result.ok() + del coll1 + + collection_option_reopen = CollectionOption(read_only=True, enable_mmap=True) + coll2 = zvec.open(path=str(collection_path), option=collection_option_reopen) + + assert coll2 is not None, "Failed to reopen collection" + assert coll2.path == str(collection_path) + assert coll2.schema.name == collection_schema.name + + with pytest.raises(Exception) as exc_info: + coll2.optimize(option=OptimizeOption()) + + assert any( + msg in str(exc_info.value).lower() + for msg in ["read", "only", "readonly", "permission", "access", "mode"] + ) + + fetched_docs = coll2.fetch(["1"]) + assert "1" in fetched_docs + fetched_doc = fetched_docs["1"] + assert fetched_doc.id == "1" + assert fetched_doc.field("name") == "test" + + if hasattr(coll2, "destroy") and coll2 is not None: + try: + coll2.destroy() + except Exception as e: + print(f"Warning: failed to destroy collection: {e}") + + def test_optimize_on_destroyed_collection( + self, collection_temp_dir, collection_option: CollectionOption + ): + schema = CollectionSchema( + name="test_optimize_destroyed", + fields=[ + FieldSchema("id", DataType.INT64, nullable=False), + FieldSchema("name", DataType.STRING, nullable=True), + ], + vectors=[ + VectorSchema( + "dense", + DataType.VECTOR_FP32, + dimension=128, + index_param=HnswIndexParam(), + ), + ], + ) + collection = zvec.create_and_open( + path=collection_temp_dir, schema=schema, option=collection_option + ) + + docs = [generate_doc(i, collection.schema) for i in range(3)] + result = collection.insert(docs) + assert bool(result) + for item in result: + assert item.ok() + + collection.destroy() + + with pytest.raises(Exception) as exc_info: + collection.optimize(option=OptimizeOption()) + + assert any( + msg in str(exc_info.value) + for msg in ["destroyed", "access", "collection", "path", "exist"] + ) + + def test_concurrent_optimize_calls(self, full_collection: Collection): + docs = [generate_doc(i, full_collection.schema) for i in range(5)] + result = full_collection.insert(docs) + assert bool(result) + for item in result: + assert item.ok() + + stats = full_collection.stats + assert stats is not None + assert stats.doc_count == 5 + + exceptions = [] + + def optimize_worker(): + try: + for i in range(3): + full_collection.optimize(option=OptimizeOption()) + time.sleep(0.01) + except Exception as e: + exceptions.append(e) + + threads = [] + for i in range(3): + thread = threading.Thread(target=optimize_worker) + threads.append(thread) + thread.start() + + for thread in threads: + thread.join() + + for exc in exceptions: + assert any( + msg in str(exc).lower() + for msg in ["concurrent", "lock", "thread", "access", "conflict"] + ), f"Unexpected exception: {exc}" + + fetched_docs = full_collection.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + + def test_multi_thread_optimize_with_operations(self, full_collection: Collection): + docs = [generate_doc(i, full_collection.schema) for i in range(10)] + result = full_collection.insert(docs) + assert bool(result) + for item in result: + assert item.ok() + + stats = full_collection.stats + assert stats is not None + assert stats.doc_count == 10 + + results = { + "insert_success": 0, + "query_success": 0, + "update_success": 0, + "delete_success": 0, + "optimize_success": 0, + "exceptions": [], + } + results_lock = threading.Lock() + + def insert_worker(): + for i in range(10, 15): + try: + doc = generate_doc(i, full_collection.schema) + result = full_collection.insert(doc) + if result and result.ok(): + with results_lock: + results["insert_success"] += 1 + except Exception as e: + with results_lock: + results["exceptions"].append(f"Insert error: {e}") + + def query_worker(): + for _ in range(10): + try: + query_result = full_collection.query(filter="id >= 0", topk=5) + with results_lock: + results["query_success"] += len(query_result) + except Exception as e: + with results_lock: + results["exceptions"].append(f"Query error: {e}") + + def update_worker(): + for i in range(3): + try: + doc = generate_doc(i, full_collection.schema) + result = full_collection.update(doc) + if result and result.ok(): + with results_lock: + results["update_success"] += 1 + except Exception as e: + with results_lock: + results["exceptions"].append(f"Update error: {e}") + + def delete_worker(): + for i in range(15, 18): + try: + result = full_collection.delete([str(i)]) + if result: + with results_lock: + results["delete_success"] += 1 + except Exception as e: + with results_lock: + results["exceptions"].append(f"Delete error: {e}") + + def optimize_worker(): + for _ in range(2): + try: + full_collection.optimize(option=OptimizeOption()) + with results_lock: + results["optimize_success"] += 1 + time.sleep(0.05) + except Exception as e: + with results_lock: + results["exceptions"].append(f"Optimize error: {e}") + + threads = [] + threads.append(threading.Thread(target=insert_worker)) + threads.append(threading.Thread(target=query_worker)) + threads.append(threading.Thread(target=update_worker)) + threads.append(threading.Thread(target=delete_worker)) + threads.append(threading.Thread(target=optimize_worker)) + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + assert results["insert_success"] >= 0, ( + f"Expected some inserts to succeed, got {results['insert_success']}" + ) + assert results["query_success"] >= 0, ( + f"Expected some queries to succeed, got {results['query_success']}" + ) + assert results["update_success"] >= 0, ( + f"Expected some updates to succeed, got {results['update_success']}" + ) + assert results["optimize_success"] >= 0, ( + f"Expected some optimize calls to succeed, got {results['optimize_success']}" + ) + + if results["exceptions"]: + print( + f"Exceptions occurred during concurrent operations: {results['exceptions']}" + ) + + final_stats = full_collection.stats + assert final_stats is not None + assert final_stats.doc_count >= 10 + + fetched_docs = full_collection.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + + def test_optimize_empty_collection(self, basic_collection: Collection): + stats = basic_collection.stats + assert stats is not None + assert stats.doc_count == 0 + + basic_collection.optimize(option=OptimizeOption()) + + stats_after = basic_collection.stats + assert stats_after is not None + assert stats_after.doc_count == 0 + + doc = generate_doc(1, basic_collection.schema) + result = basic_collection.insert(doc) + assert bool(result) + assert result.ok() + + fetched_docs = basic_collection.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + + stats_final = basic_collection.stats + assert stats_final.doc_count == 1 + + def test_optimize_single_record_collection(self, basic_collection: Collection): + doc = generate_doc(1, basic_collection.schema) + result = basic_collection.insert(doc) + assert bool(result) + assert result.ok() + + stats = basic_collection.stats + assert stats is not None + assert stats.doc_count == 1 + + basic_collection.optimize(option=OptimizeOption()) + + fetched_docs = basic_collection.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + + stats_after = basic_collection.stats + assert stats_after.doc_count == 1 + + doc2 = generate_doc(2, basic_collection.schema) + result2 = basic_collection.insert(doc2) + assert bool(result2) + assert result2.ok() + + fetched_docs = basic_collection.fetch(["1", "2"]) + assert len(fetched_docs) == 2 + assert fetched_docs["1"].id == "1" + assert fetched_docs["2"].id == "2" + + def test_optimize_already_optimized_collection(self, full_collection: Collection): + docs = [generate_doc(i, full_collection.schema) for i in range(5)] + result = full_collection.insert(docs) + assert bool(result) + for item in result: + assert item.ok() + + stats = full_collection.stats + assert stats is not None + assert stats.doc_count == 5 + + full_collection.optimize(option=OptimizeOption()) + + fetched_docs = full_collection.fetch(["1"]) + assert "1" in fetched_docs + assert fetched_docs["1"].id == "1" + + full_collection.optimize(option=OptimizeOption()) + + fetched_docs = full_collection.fetch(["1", "2"]) + assert len(fetched_docs) >= 2 + assert fetched_docs["1"].id == "1" + + full_collection.optimize(option=OptimizeOption()) + + query_result = full_collection.query(filter="int32_field >= 0", topk=10) + assert len(query_result) == 5 + + final_stats = full_collection.stats + assert final_stats.doc_count == 5 + class TestIndexDDL: @pytest.mark.parametrize("field_name", DEFAULT_SCALAR_FIELD_NAME.values()) @pytest.mark.parametrize("index_type", SUPPORT_SCALAR_INDEX_TYPES) def test_scalar_index_operation( self, - full_collection: Collection, + full_collection_new: Collection, field_name: str, index_type: IndexType, ): # INSERT 0~5 Doc - docs = [generate_doc(i, full_collection.schema) for i in range(5)] + docs = [generate_doc(i, full_collection_new.schema) for i in range(5)] - result = full_collection.insert(docs) + result = full_collection_new.insert(docs) assert len(result) == 5 for item in result: assert item.ok() - stats = full_collection.stats + stats = full_collection_new.stats assert stats is not None assert stats.doc_count == 5 @@ -114,20 +1088,20 @@ def test_scalar_index_operation( else: assert False, f"Unsupported field type for index creation: {field_name}" - query_result_before = full_collection.query(filter=query_filter, topk=10) + query_result_before = full_collection_new.query(filter=query_filter, topk=10) if index_type not in DEFAULT_INDEX_PARAMS: pytest.fail(f"Unsupported index type for index creation: {index_type}") index_param = DEFAULT_INDEX_PARAMS[index_type] - full_collection.create_index( + full_collection_new.create_index( field_name=field_name, index_param=index_param, option=IndexOption() ) - stats_after_create = full_collection.stats + stats_after_create = full_collection_new.stats assert stats_after_create is not None assert stats_after_create.doc_count == 5 - query_result_after = full_collection.query(filter=query_filter, topk=10) + query_result_after = full_collection_new.query(filter=query_filter, topk=10) assert len(query_result_before) == len(query_result_after), ( f"Query result count mismatch for {field_name} with index type {index_type}: before={len(query_result_before)}, after={len(query_result_after)}" @@ -140,69 +1114,69 @@ def test_scalar_index_operation( ) # INSERT 5~8 Doc - new_docs = [generate_doc(i, full_collection.schema) for i in range(5, 8)] + new_docs = [generate_doc(i, full_collection_new.schema) for i in range(5, 8)] - result = full_collection.insert(new_docs) + result = full_collection_new.insert(new_docs) assert len(result) == 3 for item in result: assert item.ok() - stats_after_insert1 = full_collection.stats + stats_after_insert1 = full_collection_new.stats assert stats_after_insert1 is not None assert stats_after_insert1.doc_count == 8 - fetched_docs = full_collection.fetch([f"{i}" for i in range(5, 8)]) + fetched_docs = full_collection_new.fetch([f"{i}" for i in range(5, 8)]) assert len(fetched_docs) == 3 for i in range(5, 8): doc_id = f"{i}" assert doc_id in fetched_docs - query_result = full_collection.query(filter=query_filter, topk=20) + query_result = full_collection_new.query(filter=query_filter, topk=20) assert len(query_result) >= len(query_result_before) - full_collection.drop_index(field_name=field_name) + full_collection_new.drop_index(field_name=field_name) # Insert 8~10 Doc - more_docs = [generate_doc(i, full_collection.schema) for i in range(8, 10)] + more_docs = [generate_doc(i, full_collection_new.schema) for i in range(8, 10)] - result = full_collection.insert(more_docs) + result = full_collection_new.insert(more_docs) assert len(result) == 2 for item in result: assert item.ok() - stats_after_insert2 = full_collection.stats + stats_after_insert2 = full_collection_new.stats assert stats_after_insert2 is not None assert stats_after_insert2.doc_count == 10 - fetched_docs = full_collection.fetch([f"{i}" for i in range(8, 10)]) + fetched_docs = full_collection_new.fetch([f"{i}" for i in range(8, 10)]) assert len(fetched_docs) == 2 for i in range(8, 10): doc_id = f"{i}" assert doc_id in fetched_docs - query_result = full_collection.query(filter=query_filter, topk=20) + query_result = full_collection_new.query(filter=query_filter, topk=20) assert len(query_result) >= len(query_result_before) - final_stats = full_collection.stats + final_stats = full_collection_new.stats assert final_stats is not None assert final_stats.doc_count == 10 - full_collection.destroy() + full_collection_new.destroy() @pytest.mark.parametrize("field_name", DEFAULT_SCALAR_FIELD_NAME.values()) @pytest.mark.parametrize("index_type", SUPPORT_SCALAR_INDEX_TYPES) def test_duplicate_create_index( - self, full_collection: Collection, field_name: str, index_type: IndexType + self, full_collection_new: Collection, field_name: str, index_type: IndexType ): - docs = [generate_doc(i, full_collection.schema) for i in range(10)] + docs = [generate_doc(i, full_collection_new.schema) for i in range(10)] - result = full_collection.insert(docs) + result = full_collection_new.insert(docs) assert bool(result) for item in result: assert item.ok() - stats = full_collection.stats + stats = full_collection_new.stats assert stats is not None assert stats.doc_count == 10 @@ -235,64 +1209,184 @@ def test_duplicate_create_index( else: assert False, f"Unsupported field type for index creation: {field_name}" - query_result_before = full_collection.query(filter=query_filter, topk=5) + query_result_before = full_collection_new.query(filter=query_filter, topk=5) if index_type not in DEFAULT_INDEX_PARAMS: pytest.fail(f"Unsupported index type for index creation: {index_type}") index_param = DEFAULT_INDEX_PARAMS[index_type] - full_collection.create_index( - field_name=field_name, index_param=index_param, option=IndexOption() + full_collection_new.create_index( + field_name=field_name, index_param=index_param, option=IndexOption() + ) + + query_result_after = full_collection_new.query(filter=query_filter, topk=5) + + assert len(query_result_before) == len(query_result_after), ( + f"Query result count mismatch: before={len(query_result_before)}, after={len(query_result_after)}" + ) + + before_ids = set(doc.id for doc in query_result_before) + after_ids = set(doc.id for doc in query_result_after) + assert before_ids == after_ids, ( + f"Query result IDs mismatch: before={before_ids}, after={after_ids}" + ) + + full_collection_new.create_index( + field_name=field_name, index_param=index_param, option=IndexOption() + ) + + @pytest.mark.parametrize( + "vector_type, index_type", SUPPORT_VECTOR_DATA_TYPE_INDEX_MAP_PARAMS + ) + def test_vector_index_operation( + self, + full_collection_new: Collection, + vector_type: DataType, + index_type: IndexType, + ): + vector_field_name = DEFAULT_VECTOR_FIELD_NAME[vector_type] + + docs = [generate_doc(i, full_collection_new.schema) for i in range(5)] + + result = full_collection_new.insert(docs) + assert len(result) == 5, ( + f"Expected 5 insertion results, got {len(result)} for vector type {vector_type} and index type {index_type}" + ) + for i, item in enumerate(result): + assert item.ok(), ( + f"Before create_index,result={result},Insertion result {i} is not OK for vector type {vector_type} and index type {index_type} and result={result}" + ) + + stats = full_collection_new.stats + assert stats is not None, ( + f"stats is None for vector type {vector_type} and index type {index_type}" + ) + assert stats.doc_count == 5, ( + f"doc_count!=5 for vector type {vector_type} and index type {index_type}" + ) + + if index_type not in DEFAULT_INDEX_PARAMS: + pytest.fail( + f"Unsupported index type {index_type} for vector type {vector_type} in test_vector_all_data_types_index_create_drop_validation" + ) + index_param = DEFAULT_INDEX_PARAMS[index_type] + + full_collection_new.create_index( + field_name=vector_field_name, + index_param=index_param, + option=IndexOption(), + ) + + stats_after_create = full_collection_new.stats + assert stats_after_create is not None, ( + f"stats_after_create_index is None for vector type {vector_type} and index type {index_type}" ) - query_result_after = full_collection.query(filter=query_filter, topk=5) + new_docs = [generate_doc(i, full_collection_new.schema) for i in range(5, 8)] - assert len(query_result_before) == len(query_result_after), ( - f"Query result count mismatch: before={len(query_result_before)}, after={len(query_result_after)}" + result = full_collection_new.insert(new_docs) + assert len(result) == 3, ( + f"Expected 3 insertion results, got {len(result)} for vector type {vector_type} and index type {index_type}" ) + for i, item in enumerate(result): + assert item.ok(), ( + f"Before drop_index,result={result},BInsertion result {i} is not OK for vector type {vector_type} and index type {index_type} and " + ) - before_ids = set(doc.id for doc in query_result_before) - after_ids = set(doc.id for doc in query_result_after) - assert before_ids == after_ids, ( - f"Query result IDs mismatch: before={before_ids}, after={after_ids}" + stats_after_insert1 = full_collection_new.stats + assert stats_after_insert1 is not None, ( + f"stats_after_insert1 is None for vector type {vector_type} and index type {index_type}" + ) + assert stats_after_insert1.doc_count == 8, ( + f"Expected 8 documents, got {stats_after_insert1.doc_count} for vector type {vector_type} and index type {index_type}" ) - full_collection.create_index( - field_name=field_name, index_param=index_param, option=IndexOption() + fetched_docs = full_collection_new.fetch([f"{i}" for i in range(5, 8)]) + assert len(fetched_docs) == 3, ( + f"Expected 3 fetched documents, got {len(fetched_docs)} for vector type {vector_type} and index type {index_type}" ) - def test_optimize(self, full_collection: Collection): - docs = [generate_doc(i, full_collection.schema) for i in range(10)] + for i in range(5, 8): + doc_id = f"{i}" + assert doc_id in fetched_docs, ( + f"Document ID {doc_id} not found in fetched results for vector type {vector_type} and index type {index_type}" + ) + assert fetched_docs[doc_id].id == doc_id, ( + f"Document {doc_id} has incorrect ID field value for vector type {vector_type} and index type {index_type}" + ) - result = full_collection.insert(docs) - assert bool(result) - for item in result: - assert item.ok() + full_collection_new.drop_index(field_name=vector_field_name) - stats = full_collection.stats - assert stats is not None - assert stats.doc_count == 10 + more_docs = [generate_doc(i, full_collection_new.schema) for i in range(8, 10)] + result = full_collection_new.insert(more_docs) + assert len(result) == 2, ( + f"Expected 2 insertion results, got {len(result)} for vector type {vector_type} and index type {index_type}" + ) + for i, item in enumerate(result): + assert item.ok(), ( + f"After drop_index,Insertion result {i} is not OK for vector type {vector_type} and index type {index_type} and result={result}" + ) - full_collection.optimize(option=OptimizeOption()) + # Verify document count after second insertion + stats_after_insert2 = full_collection_new.stats + assert stats_after_insert2 is not None, ( + f"stats_after_insert2 is None for vector type {vector_type} and index type {index_type}" + ) + assert stats_after_insert2.doc_count == 10, ( + f"Expected 10 documents, got {stats_after_insert2.doc_count} for vector type {vector_type} and index type {index_type}" + ) - fetched_docs = full_collection.fetch(["1"]) - assert "1" in fetched_docs - assert fetched_docs["1"].id == "1" + # Fetch data + fetched_docs = full_collection_new.fetch([f"{i}" for i in range(8, 10)]) + assert len(fetched_docs) == 2, ( + f"Expected 2 fetched documents, got {len(fetched_docs)} for vector type {vector_type} and index type {index_type}" + ) + + # Verify fetched documents have correct data + for i in range(8, 10): + doc_id = f"{i}" + assert doc_id in fetched_docs, ( + f"Document ID {doc_id} not found in fetched results for vector type {vector_type} and index type {index_type}" + ) + assert fetched_docs[doc_id].id == doc_id, ( + f"Document {doc_id} has incorrect ID field value for vector type {vector_type} and index type {index_type}" + ) + + # Final verification + final_stats = full_collection_new.stats + assert final_stats is not None, ( + f"final_stats is None for vector type {vector_type} and index type {index_type}" + ) + assert final_stats.doc_count == 10, ( + f"Expected 10 documents, got {final_stats.doc_count} for vector type {vector_type} and index type {index_type}" + ) + full_collection_new.destroy() @pytest.mark.parametrize( "vector_type, index_type", SUPPORT_VECTOR_DATA_TYPE_INDEX_MAP_PARAMS ) - def test_vector_index_operation( - self, - full_collection: Collection, - vector_type: DataType, - index_type: IndexType, + def test_vector_index_operation_with_reopen( + self, tmp_path_factory, vector_type, index_type ): vector_field_name = DEFAULT_VECTOR_FIELD_NAME[vector_type] - docs = [generate_doc(i, full_collection.schema) for i in range(5)] + # Create collection + temp_dir = tmp_path_factory.mktemp("zvec") + collection_path = temp_dir / "test_collection" - result = full_collection.insert(docs) + collection_option = CollectionOption(read_only=False, enable_mmap=True) + # Create and open collection + coll1 = zvec.create_and_open( + path=str(collection_path), + schema=indextest_collection_schema, + option=collection_option, + ) + + assert coll1 is not None, "Failed to create and open collection" + + docs = [generate_doc(i, coll1.schema) for i in range(5)] + + result = coll1.insert(docs) assert len(result) == 5, ( f"Expected 5 insertion results, got {len(result)} for vector type {vector_type} and index type {index_type}" ) @@ -301,7 +1395,7 @@ def test_vector_index_operation( f"Before create_index,result={result},Insertion result {i} is not OK for vector type {vector_type} and index type {index_type} and result={result}" ) - stats = full_collection.stats + stats = coll1.stats assert stats is not None, ( f"stats is None for vector type {vector_type} and index type {index_type}" ) @@ -315,20 +1409,25 @@ def test_vector_index_operation( ) index_param = DEFAULT_INDEX_PARAMS[index_type] - full_collection.create_index( + coll1.create_index( field_name=vector_field_name, index_param=index_param, option=IndexOption(), ) - stats_after_create = full_collection.stats + # Close the first collection (delete reference) + del coll1 + # Reopen the collection + coll2 = zvec.open(path=str(collection_path), option=collection_option) + + stats_after_create = coll2.stats assert stats_after_create is not None, ( f"stats_after_create_index is None for vector type {vector_type} and index type {index_type}" ) - new_docs = [generate_doc(i, full_collection.schema) for i in range(5, 8)] + new_docs = [generate_doc(i, coll2.schema) for i in range(5, 8)] - result = full_collection.insert(new_docs) + result = coll2.insert(new_docs) assert len(result) == 3, ( f"Expected 3 insertion results, got {len(result)} for vector type {vector_type} and index type {index_type}" ) @@ -337,7 +1436,7 @@ def test_vector_index_operation( f"Before drop_index,result={result},BInsertion result {i} is not OK for vector type {vector_type} and index type {index_type} and " ) - stats_after_insert1 = full_collection.stats + stats_after_insert1 = coll2.stats assert stats_after_insert1 is not None, ( f"stats_after_insert1 is None for vector type {vector_type} and index type {index_type}" ) @@ -345,7 +1444,7 @@ def test_vector_index_operation( f"Expected 8 documents, got {stats_after_insert1.doc_count} for vector type {vector_type} and index type {index_type}" ) - fetched_docs = full_collection.fetch([f"{i}" for i in range(5, 8)]) + fetched_docs = coll2.fetch([f"{i}" for i in range(5, 8)]) assert len(fetched_docs) == 3, ( f"Expected 3 fetched documents, got {len(fetched_docs)} for vector type {vector_type} and index type {index_type}" ) @@ -359,10 +1458,15 @@ def test_vector_index_operation( f"Document {doc_id} has incorrect ID field value for vector type {vector_type} and index type {index_type}" ) - full_collection.drop_index(field_name=vector_field_name) + coll2.drop_index(field_name=vector_field_name) + + del coll2 - more_docs = [generate_doc(i, full_collection.schema) for i in range(8, 10)] - result = full_collection.insert(more_docs) + # Reopen the collection + coll3 = zvec.open(path=str(collection_path), option=collection_option) + + more_docs = [generate_doc(i, coll3.schema) for i in range(8, 10)] + result = coll3.insert(more_docs) assert len(result) == 2, ( f"Expected 2 insertion results, got {len(result)} for vector type {vector_type} and index type {index_type}" ) @@ -372,7 +1476,7 @@ def test_vector_index_operation( ) # Verify document count after second insertion - stats_after_insert2 = full_collection.stats + stats_after_insert2 = coll3.stats assert stats_after_insert2 is not None, ( f"stats_after_insert2 is None for vector type {vector_type} and index type {index_type}" ) @@ -381,7 +1485,7 @@ def test_vector_index_operation( ) # Fetch data - fetched_docs = full_collection.fetch([f"{i}" for i in range(8, 10)]) + fetched_docs = coll3.fetch([f"{i}" for i in range(8, 10)]) assert len(fetched_docs) == 2, ( f"Expected 2 fetched documents, got {len(fetched_docs)} for vector type {vector_type} and index type {index_type}" ) @@ -397,14 +1501,14 @@ def test_vector_index_operation( ) # Final verification - final_stats = full_collection.stats + final_stats = coll3.stats assert final_stats is not None, ( f"final_stats is None for vector type {vector_type} and index type {index_type}" ) assert final_stats.doc_count == 10, ( f"Expected 10 documents, got {final_stats.doc_count} for vector type {vector_type} and index type {index_type}" ) - full_collection.destroy() + coll3.destroy() @staticmethod def create_collection( @@ -448,9 +1552,17 @@ def check_error_message(exc_info, invalid_name): "Error message is unreasonable: e=" + str(exc_info.value) ) else: - assert INCOMPATIBLE_FUNCTION_ERROR_MSG in str(exc_info.value), ( - "Error message is unreasonable: e=" + str(exc_info.value) - ) + # For non-string values like None, int, float, etc., we may get either + # INCOMPATIBLE_FUNCTION_ERROR_MSG, SCHEMA_VALIDATE_ERROR_MSG, INCOMPATIBLE_CONSTRUCTOR_ERROR_MSG + error_str = str(exc_info.value) + # Check if the error contains expected patterns + expected_patterns = [ + INCOMPATIBLE_FUNCTION_ERROR_MSG, + SCHEMA_VALIDATE_ERROR_MSG, + INCOMPATIBLE_CONSTRUCTOR_ERROR_MSG, + ] + if not any(pattern in error_str for pattern in expected_patterns): + assert False, "Error message is unreasonable: e=" + error_str @pytest.mark.parametrize( "invalid_field_name,invalid_vector_name", @@ -781,7 +1893,7 @@ def test_compicated_workflow( @pytest.mark.parametrize( "data_type, index_param", VALID_VECTOR_DATA_TYPE_INDEX_PARAM_MAP_PARAMS ) - def test_vector_index_params( + def test_valid_vector_index_params( self, collection_temp_dir, collection_option: CollectionOption, @@ -911,6 +2023,39 @@ def check_result( ) coll.destroy() + @pytest.mark.parametrize( + "data_type, index_param", INVALID_VECTOR_DATA_TYPE_INDEX_PARAM_MAP_PARAMS + ) + def test_invalid_vector_index_params( + self, + collection_temp_dir, + collection_option: CollectionOption, + data_type: DataType, + index_param, + single_vector_schema, + ): + vector_name = DEFAULT_VECTOR_FIELD_NAME[data_type] + dimension = DEFAULT_VECTOR_DIMENSION + + coll = zvec.create_and_open( + path=collection_temp_dir, + schema=single_vector_schema, + option=collection_option, + ) + + assert coll is not None, ( + f"Failed to create and open collection, {data_type}, {index_param}" + ) + + with pytest.raises(Exception) as exc_info: + # create index + coll.create_index( + field_name=vector_name, + index_param=index_param, + option=IndexOption(), + ) + self.check_error_message(exc_info, index_param) + class TestColumnDDL: def test_add_column(self, basic_collection: Collection): @@ -937,6 +2082,290 @@ def test_add_column(self, basic_collection: Collection): assert stats is not None assert stats.doc_count == 1 + def test_add_column_with_reopen(self, tmp_path_factory): + # Create collection + temp_dir = tmp_path_factory.mktemp("zvec") + collection_path = temp_dir / "test_collection" + + collection_option = CollectionOption(read_only=False, enable_mmap=True) + # Create and open collection + coll1 = zvec.create_and_open( + path=str(collection_path), + schema=columntest_collection_schema, + option=collection_option, + ) + + assert coll1 is not None, "Failed to create and open collection" + + # Insert some data + doc1 = Doc( + id="1", + fields={"id": 1, "name": "test1"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {1: 1.0, 2: 2.0}, + }, + ) + + result = coll1.insert(doc1) + assert result.ok() + + coll1.add_column( + field_schema=FieldSchema("income", DataType.INT32), + expression="200", # Simple expression + ) + doc2 = Doc( + id="2", + fields={"id": 2, "name": "test2", "income": 12}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {3: 1.1, 4: 2.1}, + }, + ) + + result = coll1.insert(doc2) + assert bool(result), f"Expected 1 result, but got {len(result)}" + assert result.ok(), ( + f"result={result},Insert operation failed with code = {result.code()}" + ) + stats = coll1.stats + assert stats is not None + assert stats.doc_count == 2 + + collection_schema_new = coll1.schema + + assert collection_schema_new.fields != columntest_collection_schema.fields + + # Close the first collection (delete reference) + del coll1 + + # Reopen the collection + coll2 = zvec.open(path=str(collection_path), option=collection_option) + + assert coll2 is not None, "Failed to reopen collection" + assert coll2.path == str(collection_path) + assert coll2.schema.name == collection_schema_new.name + assert coll2.schema.fields == collection_schema_new.fields + + doc3 = Doc( + id="3", + fields={"id": 3, "name": "test3", "income": 13}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {5: 11.0, 6: 13.0}, + }, + ) + result = coll2.insert(doc3) + assert bool(result), f"Expected 1 result, but got {len(result)}" + assert result.ok(), ( + f"result={result},Insert operation failed with code = {result.code()}" + ) + stats = coll2.stats + assert stats is not None + assert stats.doc_count == 3 + + # Verify data is still there + fetched_docs = coll2.fetch(["1", "2", "3"]) + for id in ["1", "2", "3"]: + assert id in fetched_docs + fetched_doc = fetched_docs[id] + assert fetched_doc.id == id + assert fetched_doc.field("name") == "test" + id + + if hasattr(coll2, "destroy") and coll2 is not None: + try: + coll2.destroy() + except Exception as e: + print(f"Warning: failed to destroy collection: {e}") + + def test_alter_column_with_reopen(self, tmp_path_factory): + # Create collection + temp_dir = tmp_path_factory.mktemp("zvec") + collection_path = temp_dir / "test_collection" + + collection_option = CollectionOption(read_only=False, enable_mmap=True) + # Create and open collection + coll1 = zvec.create_and_open( + path=str(collection_path), + schema=columntest_collection_schema, + option=collection_option, + ) + assert coll1 is not None, "Failed to create and open collection" + + # Insert some data + doc1 = Doc( + id="1", + fields={"id": 1, "name": "test1"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {1: 1.0, 2: 2.0}, + }, + ) + + result = coll1.insert(doc1) + assert result.ok() + + coll1.alter_column( + old_name="id", + new_name="id_new", + option=AlterColumnOption(), + ) + doc2 = Doc( + id="2", + fields={"id_new": 2, "name": "test2"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {3: 1.1, 4: 2.1}, + }, + ) + + result = coll1.insert(doc2) + assert bool(result), f"Expected 1 result, but got {len(result)}" + assert result.ok(), ( + f"result={result},Insert operation failed with code = {result.code()}" + ) + stats = coll1.stats + assert stats is not None + assert stats.doc_count == 2 + + collection_schema_new = coll1.schema + + assert collection_schema_new.fields != columntest_collection_schema.fields + + # Close the first collection (delete reference) + del coll1 + + # Reopen the collection + coll2 = zvec.open(path=str(collection_path), option=collection_option) + + assert coll2 is not None, "Failed to reopen collection" + assert coll2.path == str(collection_path) + assert coll2.schema.name == collection_schema_new.name + assert coll2.schema.fields == collection_schema_new.fields + + doc3 = Doc( + id="3", + fields={"id_new": 3, "name": "test3"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {5: 11.0, 6: 13.0}, + }, + ) + result = coll2.insert(doc3) + assert bool(result), f"Expected 1 result, but got {len(result)}" + assert result.ok(), ( + f"result={result},Insert operation failed with code = {result.code()}" + ) + stats = coll2.stats + assert stats is not None + assert stats.doc_count == 3 + + # Verify data is still there + fetched_docs = coll2.fetch(["1", "2", "3"]) + for id in ["1", "2", "3"]: + assert id in fetched_docs + fetched_doc = fetched_docs[id] + assert fetched_doc.id == id + assert fetched_doc.field("name") == "test" + id + + if hasattr(coll2, "destroy") and coll2 is not None: + try: + coll2.destroy() + except Exception as e: + print(f"Warning: failed to destroy collection: {e}") + + def test_drop_column_with_reopen(self, tmp_path_factory): + # Create collection + temp_dir = tmp_path_factory.mktemp("zvec") + collection_path = temp_dir / "test_collection" + + collection_option = CollectionOption(read_only=False, enable_mmap=True) + # Create and open collection + coll1 = zvec.create_and_open( + path=str(collection_path), + schema=columntest_collection_schema, + option=collection_option, + ) + + assert coll1 is not None, "Failed to create and open collection" + + # Insert some data + doc1 = Doc( + id="1", + fields={"id": 1, "name": "test1"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {1: 1.0, 2: 2.0}, + }, + ) + + result = coll1.insert(doc1) + assert result.ok() + + coll1.drop_column("id") + doc2 = Doc( + id="2", + fields={"name": "test2"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {3: 1.1, 4: 2.1}, + }, + ) + + result = coll1.insert(doc2) + assert bool(result), f"Expected 1 result, but got {len(result)}" + assert result.ok(), ( + f"result={result},Insert operation failed with code = {result.code()}" + ) + stats = coll1.stats + assert stats is not None + assert stats.doc_count == 2 + + collection_schema_new = coll1.schema + + assert collection_schema_new.fields != columntest_collection_schema.fields + + # Close the first collection (delete reference) + del coll1 + + # Reopen the collection + coll2 = zvec.open(path=str(collection_path), option=collection_option) + + assert coll2 is not None, "Failed to reopen collection" + assert coll2.path == str(collection_path) + assert coll2.schema.name == collection_schema_new.name + assert coll2.schema.fields == collection_schema_new.fields + + doc3 = Doc( + id="3", + fields={"name": "test3"}, + vectors={ + "dense_fp32_field": np.random.random(128).tolist(), + "sparse_fp32_field": {5: 11.0, 6: 13.0}, + }, + ) + result = coll2.insert(doc3) + assert bool(result), f"Expected 1 result, but got {len(result)}" + assert result.ok(), ( + f"result={result},Insert operation failed with code = {result.code()}" + ) + stats = coll2.stats + assert stats is not None + assert stats.doc_count == 3 + + # Verify data is still there + fetched_docs = coll2.fetch(["1", "2", "3"]) + for id in ["1", "2", "3"]: + assert id in fetched_docs + fetched_doc = fetched_docs[id] + assert fetched_doc.id == id + + if hasattr(coll2, "destroy") and coll2 is not None: + try: + coll2.destroy() + except Exception as e: + print(f"Warning: failed to destroy collection: {e}") + def test_add_column_with_default_option(self, basic_collection: Collection): # Add a new column with default option basic_collection.add_column(