|
12 | 12 |
|
13 | 13 | import io |
14 | 14 | import logging |
| 15 | +import os |
15 | 16 | import os as os_lib |
16 | 17 | import random |
17 | 18 | import string |
18 | 19 | import time |
19 | 20 | import zipfile |
| 21 | +from pathlib import Path |
20 | 22 |
|
21 | 23 | import boto3 |
22 | 24 | import pytest |
23 | 25 | from assertpy import assert_that |
24 | 26 | from cfn_stacks_factory import CfnStack, CfnVpcStack |
25 | | -from conftest_networking import unmarshal_az_override |
26 | | -from framework.fixture_utils import xdist_session_fixture |
27 | | -from framework.tests_configuration.config_utils import get_all_regions |
| 27 | +from framework.fixture_utils import SharedFixture, xdist_session_fixture |
28 | 28 | from paramiko import Ed25519Key |
29 | 29 | from remote_command_executor import RemoteCommandExecutor |
30 | 30 | from retrying import retry |
31 | 31 | from time_utils import seconds |
32 | 32 | from utils import find_stack_by_tag, generate_stack_name, is_directory_supported, random_alphanumeric |
| 33 | +from xdist import get_xdist_worker_id |
33 | 34 |
|
34 | 35 | from tests.ad_integration.cluster_user import ClusterUser |
35 | 36 | from tests.common.utils import run_system_analyzer |
@@ -228,98 +229,131 @@ def _check_ssm_success(ssm_client, command_id, instance_id): |
228 | 229 | ).is_true() |
229 | 230 |
|
230 | 231 |
|
231 | | -@xdist_session_fixture(autouse=True) |
232 | | -def directory_stacks_shared(cfn_stacks_factory, request, vpc_stacks_shared): # noqa C901 |
| 232 | +def _directory_stack_resource_generator( |
| 233 | + existing_directory_stack_name, |
| 234 | + directory_type, |
| 235 | + region, |
| 236 | + vpc_stack, |
| 237 | + cfn_stacks_factory, |
| 238 | + request, |
| 239 | +): |
233 | 240 | """ |
234 | | - Build and share AD Directory CFN stacks across xdist workers (session-scoped). |
235 | | -
|
236 | | - Returns: |
237 | | - dict with keys as (region, directory_type) and values as CFN stack names. |
238 | | -
|
239 | | - Cleanup: |
240 | | - After yield, delete only the stacks created by this fixture (not the pre-existing ones), |
241 | | - and only if neither --no-delete nor --retain_ad_stack is set. |
| 241 | + Generator used by SharedFixture to provide a shared value (stack info) and run cleanup. |
| 242 | + Yields: {"name": <stack_name>, "managed": <bool>} where managed=True iff we created it here. |
242 | 243 | """ |
243 | | - # Collect regions from CLI (same style as vpc_stacks_shared) |
244 | | - regions = request.config.getoption("regions") or get_all_regions(request.config.getoption("tests_config")) |
245 | | - |
246 | | - # AD types you need to support in tests. If you have a CLI to limit types, you can read it here. |
247 | | - directory_types = ["SimpleAD", "MicrosoftAD"] |
248 | | - |
249 | | - stacks = {} # (region, dir_type) -> stack_name |
250 | | - created_by_this_fixture = set() |
251 | | - |
252 | | - # Build per-region stacks |
253 | | - for region in regions: |
254 | | - # In case region can contain an AZ override, normalize to the region only |
255 | | - region = unmarshal_az_override(region) |
256 | | - |
257 | | - for dir_type in directory_types: |
258 | | - # Respect region capability, keep the test's behavior consistent |
259 | | - if not is_directory_supported(region, dir_type): |
260 | | - logging.info("Directory type %s not supported in region %s, skipping.", dir_type, region) |
261 | | - continue |
262 | | - |
263 | | - existing_name = request.config.getoption("directory_stack_name") |
264 | | - if existing_name: |
265 | | - stacks[(region, dir_type)] = existing_name |
266 | | - logging.info("Using pre-existing directory stack %s for (%s, %s).", existing_name, region, dir_type) |
267 | | - continue |
268 | | - |
269 | | - stack_prefix = f"integ-tests-MultiUserInfraStack{dir_type}" |
270 | | - name = find_stack_by_tag("parallelcluster:integ-tests-ad-stack", region, stack_prefix) |
271 | | - |
272 | | - if not name: |
273 | | - # Create a new stack bound to the region's shared VPC |
274 | | - vpc_stack = vpc_stacks_shared[region] |
275 | | - directory_stack = _create_directory_stack(cfn_stacks_factory, request, dir_type, region, vpc_stack) |
276 | | - name = directory_stack.name |
277 | | - created_by_this_fixture.add((region, dir_type)) |
278 | | - logging.info("Created directory stack %s for (%s, %s).", name, region, dir_type) |
279 | | - else: |
280 | | - logging.info("Reusing existing directory stack %s for (%s, %s).", name, region, dir_type) |
281 | | - |
282 | | - stacks[(region, dir_type)] = name |
283 | | - |
284 | | - yield stacks |
285 | | - |
286 | | - if request.config.getoption("no_delete") or request.config.getoption("retain_ad_stack"): |
287 | | - logging.info("Retain/no-delete set; not deleting directory stacks created by this fixture.") |
288 | | - return |
289 | | - |
290 | | - for region, dir_type in created_by_this_fixture: |
291 | | - name = stacks.get((region, dir_type)) |
| 244 | + # Resolve name and whether we own its lifecycle (managed) |
| 245 | + managed = False |
| 246 | + if existing_directory_stack_name: |
| 247 | + name = existing_directory_stack_name |
| 248 | + else: |
| 249 | + stack_prefix = f"integ-tests-MultiUserInfraStack{directory_type}" |
| 250 | + name = find_stack_by_tag("parallelcluster:integ-tests-ad-stack", region, stack_prefix) |
292 | 251 | if not name: |
293 | | - continue |
294 | | - try: |
295 | | - logging.info("Deleting directory stack %s in region %s (%s).", name, region, dir_type) |
296 | | - cfn_stacks_factory.delete_stack(name, region) |
297 | | - except Exception as e: |
298 | | - logging.warning("Failed to delete directory stack %s: %s", name, e) |
299 | | - |
| 252 | + directory_stack = _create_directory_stack(cfn_stacks_factory, request, directory_type, region, vpc_stack) |
| 253 | + name = directory_stack.name |
| 254 | + managed = True |
300 | 255 |
|
301 | | -@pytest.fixture(scope="class") |
302 | | -def directory_factory(directory_stacks_shared): |
| 256 | + try: |
| 257 | + yield {"name": name, "managed": managed} |
| 258 | + finally: |
| 259 | + # Only delete stacks created by this fixture, and only if not retained/no-delete. |
| 260 | + if managed and not (request.config.getoption("no_delete") or request.config.getoption("retain_ad_stack")): |
| 261 | + try: |
| 262 | + cfn_stacks_factory.delete_stack(name, region) |
| 263 | + logging.info("Deleted directory stack %s in %s", name, region) |
| 264 | + except Exception as e: |
| 265 | + logging.warning("Failed deleting directory stack %s: %s", name, e) |
| 266 | + |
| 267 | + |
| 268 | +class _LazyDirectoryRegistry: |
303 | 269 | """ |
304 | | - Parallel-safe factory for AD Directory CFN stacks using the SharedFixture infra. |
305 | | -
|
306 | | - Behavior: |
307 | | - - If an explicit existing stack name is provided, return it (use-only). |
308 | | - - Otherwise, look up the stack from the session-shared directory_stacks_shared mapping. |
| 270 | + Session-shared registry that lazily creates/acquires per-(region, directory_type) SharedFixture. |
| 271 | + Each worker acquires exactly once per key and caches the stack name. |
309 | 272 | """ |
310 | 273 |
|
311 | | - def _factory(existing_directory_stack_name, directory_type, region): |
312 | | - # Use-only path if explicitly specified by the test/CLI |
| 274 | + def __init__(self, shared_dir: Path, cfn_stacks_factory, request, vpc_stacks_shared): |
| 275 | + self._dir = shared_dir |
| 276 | + self._dir.mkdir(parents=True, exist_ok=True) |
| 277 | + self._cfn = cfn_stacks_factory |
| 278 | + self._request = request |
| 279 | + self._vpcs = vpc_stacks_shared |
| 280 | + self._local = {} # key -> (SharedFixture, stack_name) |
| 281 | + |
| 282 | + def get_stack_name(self, existing_directory_stack_name: str, directory_type: str, region: str) -> str: |
313 | 283 | if existing_directory_stack_name: |
314 | 284 | return existing_directory_stack_name |
315 | 285 |
|
316 | | - key = (region, directory_type) |
317 | | - # Fail fast if the key is not present (e.g., unsupported region/type) |
318 | | - if key not in directory_stacks_shared: |
319 | | - raise RuntimeError( |
320 | | - f"No directory stack available for key={key}. " f"Check is_directory_supported and session setup." |
321 | | - ) |
322 | | - return directory_stacks_shared[key] |
| 286 | + if not is_directory_supported(region, directory_type): |
| 287 | + raise RuntimeError(f"Directory type {directory_type} not supported in {region}") |
| 288 | + |
| 289 | + key = f"{region}:{directory_type}" |
| 290 | + if key in self._local: |
| 291 | + return self._local[key][1] |
| 292 | + |
| 293 | + # Build one SharedFixture per key and acquire exactly once |
| 294 | + xdist_worker_id = get_xdist_worker_id(self._request) |
| 295 | + pid = os.getpid() |
| 296 | + xdist_worker_id_and_pid = f"{xdist_worker_id}: {pid}" |
| 297 | + |
| 298 | + vpc_stack = self._vpcs[region] # one VPC per region (from vpc_stacks_shared) |
| 299 | + |
| 300 | + shared_fixture = SharedFixture( |
| 301 | + name=f"directory_stack_{region}_{directory_type}", |
| 302 | + shared_save_location=self._dir, |
| 303 | + fixture_func=_directory_stack_resource_generator, |
| 304 | + fixture_func_args=( |
| 305 | + existing_directory_stack_name, |
| 306 | + directory_type, |
| 307 | + region, |
| 308 | + vpc_stack, |
| 309 | + self._cfn, |
| 310 | + self._request, |
| 311 | + ), |
| 312 | + fixture_func_kwargs={}, |
| 313 | + xdist_worker_id_and_pid=xdist_worker_id_and_pid, |
| 314 | + log_file=self._request.config.getoption("tests_log_file"), |
| 315 | + ) |
| 316 | + |
| 317 | + data = shared_fixture.acquire() |
| 318 | + payload = data.fixture_return_value or {} |
| 319 | + stack_name = payload.get("name") |
| 320 | + self._local[key] = (shared_fixture, stack_name) |
| 321 | + return stack_name |
| 322 | + |
| 323 | + def release_all(self): |
| 324 | + # Release once per key so the last releaser triggers generator cleanup (deletion if managed=True). |
| 325 | + for shared_fixture, _ in self._local.values(): |
| 326 | + try: |
| 327 | + shared_fixture.release() |
| 328 | + except Exception as e: |
| 329 | + logging.warning("Error releasing shared fixture: %s", e) |
| 330 | + |
| 331 | + |
| 332 | +@xdist_session_fixture(autouse=True) |
| 333 | +def directory_shared_registry(cfn_stacks_factory, request, vpc_stacks_shared): |
| 334 | + """ |
| 335 | + Session-scoped, cross-worker shared registry for directory stacks (lazy). |
| 336 | + Each (region, directory_type) is provisioned on first request. |
| 337 | + """ |
| 338 | + base_dir = Path(f"{request.config.getoption('output_dir', '')}/tmp/shared_fixtures") |
| 339 | + registry = _LazyDirectoryRegistry( |
| 340 | + shared_dir=base_dir, |
| 341 | + cfn_stacks_factory=cfn_stacks_factory, |
| 342 | + request=request, |
| 343 | + vpc_stacks_shared=vpc_stacks_shared, |
| 344 | + ) |
| 345 | + try: |
| 346 | + yield registry |
| 347 | + finally: |
| 348 | + registry.release_all() |
| 349 | + |
| 350 | + |
| 351 | +@pytest.fixture(scope="class") |
| 352 | +def directory_factory(directory_shared_registry): |
| 353 | + """Thin adapter to match existing call signature in tests.""" |
| 354 | + |
| 355 | + def _factory(existing_directory_stack_name: str, directory_type: str, region: str) -> str: |
| 356 | + return directory_shared_registry.get_stack_name(existing_directory_stack_name, directory_type, region) |
323 | 357 |
|
324 | 358 | return _factory |
325 | 359 |
|
|
0 commit comments