@@ -39,15 +39,80 @@ def recreate(self, dataset: Dataset, collection_params):
3939 self .conn .execute ("ALTER TABLE items ALTER COLUMN embedding SET STORAGE PLAIN" )
4040
4141 try :
42- hnsw_distance_type = self .DISTANCE_MAPPING [dataset .config .distance ]
42+ distance_type = self .DISTANCE_MAPPING [dataset .config .distance ]
4343 except KeyError :
4444 raise IncompatibilityError (
4545 f"Unsupported distance metric: { dataset .config .distance } "
4646 )
4747
48- self .conn .execute (
49- f"CREATE INDEX on items USING hnsw(embedding { hnsw_distance_type } ) WITH (m = { collection_params ['hnsw_config' ]['m' ]} , ef_construction = { collection_params ['hnsw_config' ]['ef_construct' ]} )"
50- )
48+ # Check if we should create HNSW index or use FLAT (no index for full scan)
49+ if "hnsw_config" in collection_params :
50+ # Auto-detect core count and set parallel workers for faster index builds (pgvector 0.7.0+)
51+ max_parallel_workers = collection_params ['hnsw_config' ].get ('max_parallel_workers' , 'auto' )
52+
53+ if max_parallel_workers == 'auto' :
54+ # Try to get actual CPU core count from PostgreSQL
55+ try :
56+ # Get max_worker_processes setting as baseline
57+ worker_result = self .conn .execute ("SELECT setting FROM pg_settings WHERE name = 'max_worker_processes'" ).fetchone ()
58+ available_workers = int (worker_result [0 ]) if worker_result else 8
59+
60+ # Try to get actual CPU cores if available (PostgreSQL 13+)
61+ try :
62+ cpu_cores_result = self .conn .execute ("SELECT setting FROM pg_settings WHERE name = 'max_parallel_workers'" ).fetchone ()
63+ if cpu_cores_result :
64+ available_workers = min (available_workers , int (cpu_cores_result [0 ]))
65+ except :
66+ pass # Fallback to max_worker_processes
67+
68+ # Use AWS recommendation: total cores - 2 (but at least 1)
69+ max_parallel_workers = max (1 , available_workers - 2 )
70+ print (f"Auto-detected { available_workers } worker processes, using { max_parallel_workers } parallel workers" )
71+
72+ except Exception as e :
73+ print (f"Failed to auto-detect workers, using default of 4: { e } " )
74+ max_parallel_workers = 8
75+
76+ if max_parallel_workers > 0 :
77+ self .conn .execute (f"SET max_parallel_workers = { max_parallel_workers } " )
78+ self .conn .execute (f"SET max_parallel_workers_per_gather = { max_parallel_workers } " )
79+ self .conn .execute (f"SET max_parallel_maintenance_workers = { max_parallel_workers } " )
80+
81+ # Create HNSW index with optimized parameters
82+ self .conn .execute (
83+ f"CREATE INDEX on items USING hnsw(embedding { distance_type } ) WITH (m = { collection_params ['hnsw_config' ]['m' ]} , ef_construction = { collection_params ['hnsw_config' ]['ef_construct' ]} )"
84+ )
85+ elif "flat_config" in collection_params :
86+ # For FLAT, configure parallel workers for faster query execution during full scans
87+ max_parallel_workers = collection_params ['flat_config' ].get ('max_parallel_workers' , 'auto' )
88+
89+ if max_parallel_workers == 'auto' :
90+ # Try to get actual CPU core count from PostgreSQL
91+ try :
92+ # Get max_worker_processes setting as baseline
93+ worker_result = self .conn .execute ("SELECT setting FROM pg_settings WHERE name = 'max_worker_processes'" ).fetchone ()
94+ available_workers = int (worker_result [0 ]) if worker_result else 8
95+
96+ # Try to get actual CPU cores if available (PostgreSQL 13+)
97+ try :
98+ cpu_cores_result = self .conn .execute ("SELECT setting FROM pg_settings WHERE name = 'max_parallel_workers'" ).fetchone ()
99+ if cpu_cores_result :
100+ available_workers = min (available_workers , int (cpu_cores_result [0 ]))
101+ except :
102+ pass # Fallback to max_worker_processes
103+
104+ # Use AWS recommendation: total cores - 2 (but at least 1)
105+ max_parallel_workers = max (1 , available_workers - 2 )
106+ print (f"Auto-detected { available_workers } worker processes, using { max_parallel_workers } parallel workers for FLAT queries" )
107+
108+ except Exception as e :
109+ print (f"Failed to auto-detect workers for FLAT, using default of 8: { e } " )
110+ max_parallel_workers = 8
111+
112+ if max_parallel_workers > 0 :
113+ self .conn .execute (f"SET max_parallel_workers = { max_parallel_workers } " )
114+ self .conn .execute (f"SET max_parallel_workers_per_gather = { max_parallel_workers } " )
115+ # For FLAT, we don't create any index - PostgreSQL will do a full table scan with parallel workers
51116
52117 self .conn .close ()
53118
0 commit comments