diff --git a/.env.example b/.env.example index c338eb3..1714abd 100644 --- a/.env.example +++ b/.env.example @@ -3,4 +3,5 @@ SERPAPI_KEY= OPENAIAPI_KEY= DATAFORSEO_USER= DATAFORSEO_PWD= -REDIS_USE_CACHE= \ No newline at end of file +REDIS_USE_CACHE= +REDIS_URL= \ No newline at end of file diff --git a/fraudcrawler/__init__.py b/fraudcrawler/__init__.py index 50c5483..cf62d82 100644 --- a/fraudcrawler/__init__.py +++ b/fraudcrawler/__init__.py @@ -1,7 +1,11 @@ -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig from fraudcrawler.scraping.search import Searcher, SearchEngineName, WebsiteSearch from fraudcrawler.scraping.enrich import Enricher -from fraudcrawler.scraping.url import URLCollector +from fraudcrawler.scraping.url import ( + URLCollector, + LocalURLCollector, + DistributedURLCollector, +) from fraudcrawler.scraping.zyte import ZyteAPI from fraudcrawler.scraping.saved_search_models import ( WebsiteSource, @@ -35,11 +39,14 @@ __all__ = [ "RedisCacher", + "RedisConfig", "Searcher", "SearchEngineName", "WebsiteSearch", "Enricher", "URLCollector", + "LocalURLCollector", + "DistributedURLCollector", "ZyteAPI", "WebsiteSource", "WebsiteSourceFilterConfig", diff --git a/fraudcrawler/base/base.py b/fraudcrawler/base/base.py index 975aaf4..cb8f443 100644 --- a/fraudcrawler/base/base.py +++ b/fraudcrawler/base/base.py @@ -23,6 +23,9 @@ DEFAULT_HTTPX_TIMEOUT, DEFAULT_HTTPX_LIMITS, DEFAULT_HTTPX_REDIRECTS, + REDIS_USE_CACHE, + REDIS_DEFAULT_HOSTNAME, + REDIS_DEFAULT_PORT, ) logger = logging.getLogger(__name__) @@ -46,10 +49,12 @@ class Setup(BaseSettings): dataforseo_pwd: str zyteapi_key: str openaiapi_key: str - pypy_token: str # Redis cache - redis_use_cache: bool + redis_use_cache: bool = REDIS_USE_CACHE + redis_hostname: str = REDIS_DEFAULT_HOSTNAME + redis_port: int = REDIS_DEFAULT_PORT + redis_password: str | None = None class Config: env_file = ".env" diff --git a/fraudcrawler/base/orchestrator.py b/fraudcrawler/base/orchestrator.py index 24ae002..e76fcd7 100644 --- a/fraudcrawler/base/orchestrator.py +++ b/fraudcrawler/base/orchestrator.py @@ -588,7 +588,7 @@ async def run( # Handle previously collected URLs if pcurls := previously_collected_urls: - self._url_collector.add_previously_collected_urls(urls=pcurls) + await self._url_collector.add_previously_collected_urls(urls=pcurls) # Setup the async framework n_saved_sources = len(website_source_sources or []) diff --git a/fraudcrawler/cache/cacher.py b/fraudcrawler/cache/cacher.py index 66ee363..36fa17c 100644 --- a/fraudcrawler/cache/cacher.py +++ b/fraudcrawler/cache/cacher.py @@ -1,104 +1,114 @@ from abc import ABC, abstractmethod +import hashlib import json import logging from pydantic import BaseModel -from typing import Any, cast, Dict, Sequence -from urllib.parse import urlparse +from typing import Any, cast, Dict, Self, Sequence import uuid from aiocache import Cache from aiocache.backends.redis import RedisCache from aiocache.serializers import PickleSerializer -from fraudcrawler.settings import ( - REDIS_USE_CACHE, - REDIS_URL, - REDIS_TTL, - REDIS_NAMESPACE, -) +from fraudcrawler.base.base import Setup +from fraudcrawler.settings import REDIS_USE_CACHE logger = logging.getLogger(__name__) +_SETUP = Setup() # type: ignore + + +class RedisConfig(BaseModel): + hostname: str + port: int + password: str | None + db: int + namespace: str + ttl: int + + @classmethod + def from_setup(cls, db: int, namespace: str, ttl: int) -> Self: + if _SETUP.redis_hostname is None: + raise ValueError("REDIS_HOSTNAME env variable is missing") + elif _SETUP.redis_port is None: + raise ValueError("REDIS_PORT env variable is missing") + + return cls( + hostname=_SETUP.redis_hostname, + port=_SETUP.redis_port, + password=_SETUP.redis_password, + db=db, + namespace=namespace, + ttl=ttl, + ) class RedisCacher(ABC): - """Abstract base class that adds Redis caching. - - :class:`RedisCacher` is used as a parent class for a subclass with an - `apply()` method that should be wrapped inside a caching mechanism. + """Abstract base class that adds Redis caching to a subclass. - Any subclass of RedisCacher must implement `apply()` with their core logic. - The function `capply()` is a wrapper taking care of caching. + RedisCacher wraps a subclass's apply() method with a transparent cache + layer. Subclasses implement apply() with their core logic and call + capply() as the public entry point, which handles cache lookup, result + storage, and bypassing Redis when use_cache is False. - Cache keys are built deterministically from the class name and serialized - arguments (including Pydantic models), so identical calls produce - identical results. + Cache keys are built deterministically from the class name and the + serialized call arguments (including Pydantic models), so identical + calls always map to the same cache entry. """ - - _default_host = "localhost" - _default_port = 6379 - _default_db = 0 + _key_encoding = "ascii" def __init__( self, use_cache: bool = REDIS_USE_CACHE, - url: str = REDIS_URL, - ttl: int = REDIS_TTL, - namespace: str = REDIS_NAMESPACE, + config: RedisConfig | None = None, ) -> None: """Initialize the cacher, optionally connecting to Redis. Args: use_cache: Whether to use Redis cache. - url: Redis connection URL (redis:// or rediss://). - ttl: Time-to-live in seconds for cached entries. - namespace: Key namespace to isolate entries in shared Redis instances. + config: Redis configuration object (mandatory if redis_use_cache=True). """ - # Input parameters - self._use_cache = use_cache - self._ttl = ttl - self._namespace = namespace + if use_cache and config is None: + raise ValueError("redis_config must be provided when use_cache=True") + else: + self._config = cast(RedisConfig, config) - # Parameters for caching + self._use_cache = use_cache self._cache: RedisCache | None = None + if self._use_cache: - redis_kwargs = self._get_redis_kwargs(url=url) self._cache = cast( RedisCache, Cache( cache_class=Cache.REDIS, # type: ignore[reportArgumentType] serializer=PickleSerializer(), - namespace=self._namespace, - **redis_kwargs, + endpoint=self._config.hostname, + port=self._config.port, + password=self._config.password, + db=self._config.db, + namespace=self._config.namespace, ), ) - def _get_redis_kwargs(self, url: str) -> Dict[str, str | int | None]: - """Get redis parameters as endpoint, port, password and db""" - - # Parse and check url - u = urlparse(url) - if u.scheme not in {"redis", "rediss"}: - raise ValueError("redis_url must start with redis:// or rediss://") - - # Create and return redis kwargs - return { - "endpoint": u.hostname or self._default_host, - "port": u.port or self._default_port, - "password": u.password, - "db": int(up) if (up := u.path.lstrip("/")) else self._default_db, - } + def _stable_key(self, payload: Dict[str, Any]) -> str: + """Serialize a payload dict to a compact, deterministic sha256 hash. - @staticmethod - def _stable_key(payload: Dict[str, Any]) -> str: - return json.dumps(payload, sort_keys=True, default=str, separators=(",", ":")) + Args: + payload: Dict to serialize as a cache key. + """ + json_str = json.dumps(payload, sort_keys=True, default=str, separators=(",", ":")) + key = hashlib.sha256(json_str.encode('utf-8')).hexdigest() + return f"{payload['cls']}_{key}" @staticmethod def _serialize_object(obj: Any) -> Any: - """Recursively serialize args/kwargs for cache keys. + """Recursively convert a value to a JSON-serializable representation. + + Converts pydantic BaseModel instances via model_dump(), recurses into + lists and dicts, and returns all other values unchanged. - Uses model_dump() for pydantic.BaseModels, recurses into list and dict, - leaves the rest unchanged. + Args: + obj: Value to serialize. """ if isinstance(obj, BaseModel): return obj.model_dump() @@ -111,7 +121,12 @@ def _serialize_object(obj: Any) -> Any: return obj def _build_key(self, *args: Any, **kwargs: Any) -> str: - """Builds caching key based on class name, args and kwargs.""" + """Build a deterministic cache key from the class name and call arguments. + + Args: + *args: Positional arguments passed to apply(). + **kwargs: Keyword arguments passed to apply(). + """ args_ = tuple(self._serialize_object(a) for a in args) kwargs_ = {k: self._serialize_object(v) for k, v in kwargs.items()} return self._stable_key( @@ -123,49 +138,74 @@ def _build_key(self, *args: Any, **kwargs: Any) -> str: ) async def _cached_apply(self, *args: Any, **kwargs: Any) -> Any: - """Cached wrapper around self.apply() method.""" + """Execute apply() with a Redis cache lookup. + + Returns the cached result on a hit; otherwise calls apply(), stores + the result, and returns it. + + Args: + *args: Positional arguments forwarded to apply(). + **kwargs: Keyword arguments forwarded to apply(). + + Returns: + Result of apply(), either retrieved from cache or freshly computed. - # Check if self._cache has been defined + Raises: + RuntimeError: If the Redis cache has not been initialized. + """ if self._cache is None: raise RuntimeError("Redis cache not initialized") - # Get caching key from arguments key = self._build_key(*args, **kwargs) - # Check if key exists in the cacher; otherwise compute the response exists = await self._cache.exists(key=key) if exists: logger.debug( - f"Found cached response for {self.__class__.__name__}.apply(args={args}, kwargs={kwargs})" + f"Found cached response for {self.__class__.__name__}.apply(...) and key={key}" ) result = await self._cache.get(key=key) else: logger.debug( - f"No cached response for {self.__class__.__name__}.apply(args={args}, kwargs={kwargs})" + f"No cached response for {self.__class__.__name__}.apply(...) and key={key}" ) result = await self.apply(*args, **kwargs) logger.debug( - f"Set cached response for {self.__class__.__name__}.apply(args={args}, kwargs={kwargs})" + f"Set cached response for {self.__class__.__name__}.apply(...) and key={key}" ) - await self._cache.set(key=key, value=result, ttl=self._ttl) + await self._cache.set(key=key, value=result, ttl=self._config.ttl) return result @abstractmethod async def apply(self, *args: Any, **kwargs: Any) -> Any: - """The cached function that each child of :class:`RedisCacher` must implement.""" + """Core logic that each subclass must implement. + + Called by capply() and, when caching is enabled, by _cached_apply(). + Subclasses should not call this method directly; use capply() instead. + + Args: + *args: Positional arguments specific to the subclass. + **kwargs: Keyword arguments specific to the subclass. + + Returns: + Subclass-defined result that will be cached under the call key. + """ pass async def capply(self, *args: Any, **kwargs: Any) -> Any: - """Calls the method `apply()` with Redis caching if enabled. Otherwise it calls `apply()` directly.""" + """Call apply() with Redis caching when enabled, or directly otherwise. - # Cacher wrapped around self.apply() method + Args: + *args: Positional arguments forwarded to apply(). + **kwargs: Keyword arguments forwarded to apply(). + + Returns: + Result of apply(), either retrieved from cache or freshly computed. + """ if self._use_cache: logger.debug(f"Running cached apply() for {self.__class__.__name__}") result = await self._cached_apply(*args, **kwargs) - - # No cacher, simply run self.apply() method else: logger.debug(f"Running not-cached apply() for {self.__class__.__name__}") result = await self.apply(*args, **kwargs) @@ -175,30 +215,50 @@ async def capply(self, *args: Any, **kwargs: Any) -> Any: # --------------------------------- # Utils for managing Redis remotely # --------------------------------- + async def utils_clear_namespace(self) -> None: + """Delete all cache entries in this instance's namespace. + + No-op when caching is disabled. + + Raises: + RuntimeError: If caching is enabled but the cache is not initialized. + """ if self._use_cache: if self._cache is None: raise RuntimeError("Redis cache not initialized") await self._cache.clear() async def utils_invalidate(self, *args: Any, **kwargs: Any) -> None: + """Delete the cache entry for a specific set of call arguments. + + No-op when caching is disabled. + + Args: + *args: Positional arguments identifying the cache entry to remove. + **kwargs: Keyword arguments identifying the cache entry to remove. + + Raises: + RuntimeError: If caching is enabled but the cache is not initialized. + """ if self._use_cache: if self._cache is None: raise RuntimeError("Redis cache not initialized") await self._cache.delete(key=self._build_key(*args, **kwargs)) async def utils_redis_is_available(self) -> bool: - """Works with aiocache backends: does a small SET/GET/DEL roundtrip.""" - # Dummy key-value pair + """Check Redis availability with a SET/GET/DEL health-check roundtrip. + + Raises: + RuntimeError: If the Redis cache has not been initialized. + """ key = f"__healthcheck__:{self.__class__.__name__}:{uuid.uuid4().hex}" value = "1337" test_ttl = 5 - # Check if cache is defined at all if self._cache is None: raise RuntimeError("Redis cache not initialized") - # Try to set dummy key-value pair try: logger.debug("test to set dummy key-value pair in cacher") await self._cache.set(key=key, value=value, ttl=test_ttl) @@ -206,7 +266,6 @@ async def utils_redis_is_available(self) -> bool: logger.error("failed to set dummy key-value pair in cacher", exc_info=True) return False - # Try to read dummy key-value pair and compare it try: logger.debug("read written dummy value") obtained = await self._cache.get(key=key) diff --git a/fraudcrawler/launch_demo_pipeline.py b/fraudcrawler/launch_demo_pipeline.py index fd180a2..08c5b68 100644 --- a/fraudcrawler/launch_demo_pipeline.py +++ b/fraudcrawler/launch_demo_pipeline.py @@ -4,11 +4,13 @@ from fraudcrawler.base.base import Setup from fraudcrawler import ( + RedisConfig, FraudCrawlerClient, HttpxAsyncClient, Searcher, Enricher, - URLCollector, + # LocalURLCollector, + DistributedURLCollector, ZyteAPI, SearchEngineName, Language, @@ -20,11 +22,21 @@ ) from fraudcrawler.scraping.search import WebsiteSearch from fraudcrawler.scraping.utils import build_website_source_profile +from fraudcrawler.settings import ( + REDIS_CACHE_DB, + REDIS_CACHE_NAMESPACE_SEARCHER, + REDIS_CACHE_NAMESPACE_ENRICHER, + REDIS_CACHE_NAMESPACE_ZYTEAPI, + REDIS_CACHE_NAMESPACE_WORKFLOWS, + REDIS_CACHE_TTL, + REDIS_URL_COLLECTOR_DB, + REDIS_URL_COLLECTOR_NAMESPACE, + REDIS_URL_COLLECTOR_TTL, +) LOG_FMT = "%(asctime)s | %(name)s | %(funcName)s | %(levelname)s | %(message)s" LOG_LVL = "INFO" DATE_FMT = "%Y-%m-%d %H:%M:%S" -REDIS_USE_CACHE = False SETUP = Setup() # type: ignore[call-arg] logging.basicConfig(format=LOG_FMT, level=LOG_LVL, datefmt=DATE_FMT) @@ -113,6 +125,11 @@ def _setup_workflows( " - Related Topics/Content: Any text or media that discusses or elaborates on the topic without offering a tangible product for sale.\n" "Make your decision based solely on the context and details provided in the search result. Respond only with the number 1 or 0." ) + redis_config = RedisConfig.from_setup( + db=REDIS_CACHE_DB, + namespace=REDIS_CACHE_NAMESPACE_WORKFLOWS, + ttl=REDIS_CACHE_TTL, + ) return [ OpenAIClassification( http_client=http_client, @@ -123,6 +140,7 @@ def _setup_workflows( system_prompt=_AVAILABILITY_SYSTEM_PROMPT, allowed_classes=[0, 1], redis_use_cache=redis_use_cache, + redis_config=redis_config, ), OpenAIClassification( http_client=http_client, @@ -133,6 +151,7 @@ def _setup_workflows( system_prompt=_SERIOUSNESS_SYSTEM_PROMPT, allowed_classes=[0, 1], redis_use_cache=redis_use_cache, + redis_config=redis_config, ), ] @@ -167,26 +186,52 @@ async def run(http_client: HttpxAsyncClient, search_term: str): # ] # Setup clients + redis_config_searcher = RedisConfig.from_setup( + db=REDIS_CACHE_DB, + namespace=REDIS_CACHE_NAMESPACE_SEARCHER, + ttl=REDIS_CACHE_TTL, + ) searcher = Searcher( http_client=http_client, serpapi_key=SETUP.serpapi_key, zyteapi_key=SETUP.zyteapi_key, - redis_use_cache=REDIS_USE_CACHE, + redis_use_cache=SETUP.redis_use_cache, + redis_config=redis_config_searcher, + ) + redis_config_enricher = RedisConfig.from_setup( + db=REDIS_CACHE_DB, + namespace=REDIS_CACHE_NAMESPACE_SEARCHER, + ttl=REDIS_CACHE_TTL, ) enricher = Enricher( http_client=http_client, user=SETUP.dataforseo_user, pwd=SETUP.dataforseo_pwd, - redis_use_cache=REDIS_USE_CACHE, + redis_use_cache=SETUP.redis_use_cache, + redis_config=redis_config_enricher, + ) + # url_collector = LocalURLCollector() + redis_config_url = RedisConfig.from_setup( + db=REDIS_URL_COLLECTOR_DB, + namespace=REDIS_URL_COLLECTOR_NAMESPACE, + ttl=REDIS_URL_COLLECTOR_TTL, + ) + url_collector = DistributedURLCollector( + redis_config=redis_config_url, + ) + redis_config_zyteapi = RedisConfig.from_setup( + db=REDIS_CACHE_DB, + namespace=REDIS_CACHE_NAMESPACE_ZYTEAPI, + ttl=REDIS_CACHE_TTL, ) - url_collector = URLCollector() zyteapi = ZyteAPI( http_client=http_client, api_key=SETUP.zyteapi_key, - redis_use_cache=REDIS_USE_CACHE, + redis_use_cache=SETUP.redis_use_cache, + redis_config=redis_config_zyteapi, ) workflows = _setup_workflows( - http_client=http_client, redis_use_cache=REDIS_USE_CACHE + http_client=http_client, redis_use_cache=SETUP.redis_use_cache ) processor = Processor(workflows=workflows) @@ -238,7 +283,7 @@ async def run_website_source_demo(http_client: HttpxAsyncClient, search_term: st website_search = WebsiteSearch( http_client=http_client, zyteapi_key=SETUP.zyteapi_key, - redis_use_cache=REDIS_USE_CACHE, + redis_use_cache=SETUP.redis_use_cache, ) result = await website_search.ingest_source( source=FUST_WEBSITE_SOURCE_PROFILE, diff --git a/fraudcrawler/processing/base.py b/fraudcrawler/processing/base.py index 6034918..c7d5ee1 100644 --- a/fraudcrawler/processing/base.py +++ b/fraudcrawler/processing/base.py @@ -6,7 +6,7 @@ from tenacity import RetryCallState from fraudcrawler.base.base import ProductItem -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig from fraudcrawler.settings import REDIS_USE_CACHE logger = logging.getLogger(__name__) @@ -44,14 +44,16 @@ def __init__( self, name: str, redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Abstract base class for defining a classification workflow. Args: name: Name of the classification workflow. redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ - super().__init__(use_cache=redis_use_cache) + super().__init__(use_cache=redis_use_cache, config=redis_config) self.name = name @abstractmethod @@ -153,7 +155,7 @@ async def run(self, product: ProductItem) -> ProductItem: product.usage[wf.name] = { "input_tokens": inp_tok, "output_tokens": out_tok, - "model": wf.model, + "model": getattr(wf, "model"), } else: raise ValueError( diff --git a/fraudcrawler/processing/openai.py b/fraudcrawler/processing/openai.py index 359f0a1..6caa21e 100644 --- a/fraudcrawler/processing/openai.py +++ b/fraudcrawler/processing/openai.py @@ -15,12 +15,14 @@ from fraudcrawler.base.base import ProductItem from fraudcrawler.base.retry import get_async_retry +from fraudcrawler.cache.cacher import RedisConfig from fraudcrawler.processing.base import ( ClassificationResult, UserInputs, Workflow, Context, ) +from fraudcrawler.settings import REDIS_USE_CACHE logger = logging.getLogger(__name__) @@ -38,7 +40,8 @@ def __init__( name: str, api_key: str, model: str, - redis_use_cache: bool = False, + redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """(Abstract) OpenAI Workflow. @@ -48,10 +51,12 @@ def __init__( api_key: The OpenAI API key. model: The OpenAI model to use. redis_use_cache: Whether to use Redis cache. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ super().__init__( name=name, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._http_client = http_client self._client = AsyncOpenAI(http_client=http_client, api_key=api_key) @@ -256,7 +261,7 @@ async def _responses_parse( """ # Prepare variables cntx = deepcopy(context) - cntx["enpdoint"] = "response.parse" + cntx["endpoint"] = "response.parse" detail: Literal["low", "high", "auto"] = "high" input_param = self._get_input_param( image_url=image_url, @@ -372,7 +377,8 @@ def __init__( product_item_fields: List[str], system_prompt: str, allowed_classes: List[int], - redis_use_cache: bool = False, + redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Open AI classification workflow. @@ -385,6 +391,7 @@ def __init__( system_prompt: System prompt for the AI model. allowed_classes: Allowed classes for model output (must be positive). redis_use_cache: Whether to use Redis cache. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ super().__init__( http_client=http_client, @@ -392,6 +399,7 @@ def __init__( api_key=api_key, model=model, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._product_item_fields = product_item_fields self._system_prompt = system_prompt @@ -482,7 +490,8 @@ def __init__( system_prompt: str, allowed_classes: List[int], user_inputs: UserInputs, - redis_use_cache: bool = False, + redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Open AI classification workflow from user input. @@ -496,6 +505,7 @@ def __init__( allowed_classes: Allowed classes for model output. user_inputs: Inputs from the frontend by the user. redis_use_cache: Whether to use Redis cache. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ super().__init__( http_client=http_client, @@ -506,6 +516,7 @@ def __init__( system_prompt=system_prompt, allowed_classes=allowed_classes, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._user_inputs = user_inputs diff --git a/fraudcrawler/scraping/enrich.py b/fraudcrawler/scraping/enrich.py index ce68d8d..d4eaac8 100644 --- a/fraudcrawler/scraping/enrich.py +++ b/fraudcrawler/scraping/enrich.py @@ -10,7 +10,7 @@ from fraudcrawler.settings import ENRICHMENT_DEFAULT_LIMIT, REDIS_USE_CACHE from fraudcrawler.base.base import Location, Language from fraudcrawler.base.retry import get_async_retry -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig logger = logging.getLogger(__name__) @@ -37,6 +37,7 @@ def __init__( user: str, pwd: str, redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Initializes the DataForSeoApiClient with the given username and password. @@ -45,8 +46,9 @@ def __init__( user: The username for DataForSEO API. pwd: The password for DataForSEO API. redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ - super().__init__(use_cache=redis_use_cache) + super().__init__(use_cache=redis_use_cache, config=redis_config) self._http_client = http_client self._user = user diff --git a/fraudcrawler/scraping/search.py b/fraudcrawler/scraping/search.py index b7c8d19..bcd3bb0 100644 --- a/fraudcrawler/scraping/search.py +++ b/fraudcrawler/scraping/search.py @@ -34,7 +34,7 @@ WebsiteSourceMetadata, ) from fraudcrawler.base.retry import get_async_retry -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig from fraudcrawler.scraping.url import filter_tracking_query_entries from fraudcrawler.scraping.zyte import ( SavedSearchRenderedProductListItem, @@ -544,7 +544,11 @@ class Toppreise(SearchEngine): _endpoint = "https://www.toppreise.ch/" def __init__( - self, http_client: httpx.AsyncClient, zyteapi_key: str, redis_use_cache: bool + self, + http_client: httpx.AsyncClient, + zyteapi_key: str, + redis_use_cache: bool, + redis_config: RedisConfig | None = None, ): """Initializes the Toppreise client. @@ -552,12 +556,14 @@ def __init__( http_client: An httpx.AsyncClient to use for the async requests. zyteapi_key: ZyteAPI key for fallback when direct access fails. redis_use_cache: Whether to use cache (passed to internal ZyteAPI). + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ super().__init__(http_client=http_client) self._zyteapi = ZyteAPI( http_client=http_client, api_key=zyteapi_key, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) async def http_client_get_with_fallback(self, url: str) -> bytes: @@ -753,6 +759,29 @@ class WebsiteSearch(SearchEngine): _saved_search_query_param_keys = ["q", "query", "keyword", "search"] _max_image_urls_per_candidate = 5 + def __init__( + self, + http_client: httpx.AsyncClient, + zyteapi_key: str, + redis_use_cache: bool, + redis_config: RedisConfig | None = None, + ): + """Search engine for website-source ingestion. + + Args: + http_client: An httpx.AsyncClient to use for the async requests. + zyteapi_key: ZyteAPI key. + redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). + """ + super().__init__(http_client=http_client) + self._zyteapi = ZyteAPI( + http_client=http_client, + api_key=zyteapi_key, + redis_use_cache=redis_use_cache, + redis_config=redis_config, + ) + @staticmethod def _build_website_source_engine_name(source_name: str) -> str: """Build a stable engine-like name from a website-source name.""" @@ -767,19 +796,6 @@ def _build_website_source_engine_name(source_name: str) -> str: slug = normalized or "website_source" return f"{slug}_search_engine" - def __init__( - self, - http_client: httpx.AsyncClient, - zyteapi_key: str, - redis_use_cache: bool = REDIS_USE_CACHE, - ): - super().__init__(http_client=http_client) - self._zyteapi = ZyteAPI( - http_client=http_client, - api_key=zyteapi_key, - redis_use_cache=redis_use_cache, - ) - @property def _search_engine_name(self) -> str: return SearchEngineName.WEBSITE_SOURCE.value @@ -1147,6 +1163,7 @@ def __init__( serpapi_key: str, zyteapi_key: str, redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Initializes the Search class with the given SerpAPI key. @@ -1155,8 +1172,9 @@ def __init__( serpapi_key: The API key for SERP API. zyteapi_key: ZyteAPI key for fallback when direct access fails. redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ - RedisCacher.__init__(self=self, use_cache=redis_use_cache) + RedisCacher.__init__(self=self, use_cache=redis_use_cache, config=redis_config) self._http_client = http_client self._google = SerpAPIGoogle(http_client=http_client, api_key=serpapi_key) @@ -1168,11 +1186,13 @@ def __init__( http_client=http_client, zyteapi_key=zyteapi_key, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._saved_search_engine = WebsiteSearch( http_client=http_client, zyteapi_key=zyteapi_key, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._search_handlers: Dict[ SearchEngineName, diff --git a/fraudcrawler/scraping/url.py b/fraudcrawler/scraping/url.py index b107c0b..cecd05b 100644 --- a/fraudcrawler/scraping/url.py +++ b/fraudcrawler/scraping/url.py @@ -1,49 +1,64 @@ +from abc import ABC, abstractmethod +import hashlib import logging from typing import List, Set, Tuple from urllib.parse import urlparse, parse_qsl, urlencode, quote, urlunparse, ParseResult +from aiocache.backends.redis import RedisBackend + +from fraudcrawler.base.base import DomainUtils, FilteredAtStage, ProductItem +from fraudcrawler.cache.cacher import RedisConfig from fraudcrawler.settings import KNOWN_TRACKERS -from fraudcrawler.base.base import FilteredAtStage, ProductItem logger = logging.getLogger(__name__) -def should_drop_tracking_query_parameter(param_key: str) -> bool: - """Returns True if a query parameter key is considered a tracker.""" +def _should_drop_tracking_query_parameter(param_key: str) -> bool: + """Return True if a query parameter key matches a known tracking prefix. + + Args: + param_key: Query parameter key to test (case-insensitive). + """ key = str(param_key or "").lower() return any(key.startswith(tracker) for tracker in KNOWN_TRACKERS) def filter_tracking_query_entries( queries: List[Tuple[str, str]], - *, remove_all: bool = False, ) -> List[Tuple[str, str]]: - """Filter tracking query entries from an already parsed query list.""" + """Filter tracking query entries from an already parsed query list. + + Args: + queries: Parsed query parameters as (key, value) pairs. + remove_all: If True, return an empty list regardless of content. + """ if remove_all: return [] return [ - query for query in queries if not should_drop_tracking_query_parameter(query[0]) + (key, val) + for key, val in queries + if not _should_drop_tracking_query_parameter(key) ] -class URLCollector: +class URLCollector(ABC): """A class to collect and de-duplicate URLs.""" - def __init__(self): - self._collected_currently: Set[str] = set() - self._collected_previously: Set[str] = set() - - def add_previously_collected_urls(self, urls: List[str]) -> None: - """Add a set of previously collected URLs to the internal state. + def _filter_tracking_query_entries( + self, + queries: List[Tuple[str, str]], + remove_all: bool = False, + ) -> List[Tuple[str, str]]: + """Filter tracking query entries from an already parsed query list. Args: - urls: A set of URLs that have been collected in previous runs. + queries: Parsed query parameters as (key, value) pairs. + remove_all: If True, return an empty list regardless of content. """ - self._collected_previously.update(urls) + return filter_tracking_query_entries(queries=queries, remove_all=remove_all) - @staticmethod - def _remove_tracking_parameters(url: str) -> str: + def _remove_tracking_parameters(self, url: str) -> str: """Remove tracking parameters from URLs. Args: @@ -64,7 +79,7 @@ def _remove_tracking_parameters(url: str) -> str: remove_all = url.startswith( "https://www.ebay" ) # eBay URLs have all query parameters as tracking parameters - filtered_queries = filter_tracking_query_entries( + filtered_queries = self._filter_tracking_query_entries( queries=queries, remove_all=remove_all ) @@ -79,6 +94,40 @@ def _remove_tracking_parameters(url: str) -> str: ) return urlunparse(clean_url) + @abstractmethod + async def add_previously_collected_urls(self, urls: List[str]) -> None: + """Add a set of previously collected URLs to the internal state. + + Args: + urls: A set of URLs that have been collected in previous runs. + """ + pass + + @abstractmethod + async def apply(self, product: ProductItem) -> ProductItem: + """Manages the collection and deduplication of ProductItems. + + Args: + product: The product item to process. + """ + pass + + +class LocalURLCollector(URLCollector): + """A class to collect and de-duplicate URLs using local storage.""" + + def __init__(self): + self._collected_currently: Set[str] = set() + self._collected_previously: Set[str] = set() + + async def add_previously_collected_urls(self, urls: List[str]) -> None: + """Add a set of previously collected URLs to the internal state. + + Args: + urls: A set of URLs that have been collected in previous runs. + """ + self._collected_previously.update(urls) + async def apply(self, product: ProductItem) -> ProductItem: """Manages the collection and deduplication of ProductItems. @@ -108,3 +157,112 @@ async def apply(self, product: ProductItem) -> ProductItem: self._collected_currently.add(url) return product + + +class DistributedURLCollector(URLCollector, DomainUtils): + """A URL collector that de-duplicates across pipeline runs using Redis. + + Seen URLs are stored under a namespaced Redis key of the form + ``{domain}_{sha256(cleaned_url + id_suffix)}``. The domain prefix keeps + keys inspectable in redis-cli; the hash bounds key length regardless of + URL size. An optional ``id_suffix`` scopes deduplication beyond the + Redis namespace (e.g. per-tenant, per-campaign) by changing the hash + input. + """ + + def __init__( + self, + redis_config: RedisConfig, + id_suffix: str = "", + ) -> None: + """Initialize the distributed collector. + + Args: + redis_config: Redis configuration object. + id_suffix: String appended to the URL before hashing to deduplication + """ + self._id_suffix = id_suffix + self._ttl = redis_config.ttl + # Uses RedisBackend + default StringSerializer (RedisCacher uses + # Cache+PickleSerializer): dedup markers are short strings and stay + # inspectable in redis-cli. + self._cache: RedisBackend = RedisBackend( + endpoint=redis_config.hostname, + port=redis_config.port, + db=redis_config.db, + password=redis_config.password, + namespace=redis_config.namespace, + ) + + def _get_redis_key(self, url: str) -> str: + """Return a Redis key of the form ``{domain}_{sha256_hex}``. + + Note: + - The SHA-256 digest is computed over the URL concatenated with + ``id_suffix``. The domain prefix (via ``DomainUtils._get_domain``) + keeps keys inspectable in redis-cli. + - As here we are using RedisBackend, we need to manually put a ":" + in front of the key to split it in folders + + Args: + url: Tracking-parameter-free URL to hash. + """ + payload = f"{url}{self._id_suffix}".encode("utf-8") + digest = hashlib.sha256(payload).hexdigest() + domain = self._get_domain(url) + return f":{domain}_{digest}" + + async def add_previously_collected_urls(self, urls: List[str]) -> None: + """Seed Redis with already-seen URLs. + + Each URL is cleaned of tracking parameters, hashed and stored in + Redis with TTL so subsequent ``apply()`` calls filter it out. + + Args: + urls: URLs collected in previous runs. + """ + for url in urls: + key = self._get_redis_key(url) + value = FilteredAtStage.URL_COLLECTION_PREVIOUS.value + await self._cache.set(key=key, value=value, ttl=self._ttl) + + async def apply(self, product: ProductItem) -> ProductItem: + """De-duplicate product cross-run using Redis. + + Args: + product: The product item to process. + """ + logger.debug(f'Processing de-duplication of product with url="{product.url}"') + + # Remove tracking parameters from the URL + url = self._remove_tracking_parameters(product.url) + product.url = url + + key = self._get_redis_key(url) + value = await self._cache.get(key=key) + + # already seen in current run + if value == FilteredAtStage.URL_COLLECTION_CURRENT.value: + product.filtered = True + product.filtered_at_stage = FilteredAtStage.URL_COLLECTION_CURRENT.value + logger.debug(f"URL {url} already collected in current run") + + # already seen in a previous run (added via `add_previously_collected_urls`) + elif value == FilteredAtStage.URL_COLLECTION_PREVIOUS.value: + product.filtered = True + product.filtered_at_stage = FilteredAtStage.URL_COLLECTION_PREVIOUS.value + logger.debug(f"URL {url} already collected in previous run (distributed)") + + # first sighting -> mark as current + elif value is None: + logger.debug(f"Add url={url} to currently collected urls in redis") + await self._cache.set( + key=key, + value=FilteredAtStage.URL_COLLECTION_CURRENT.value, + ttl=self._ttl, + ) + + else: + raise ValueError(f"Redis returned value={value} for key={key} (url={url})") + + return product diff --git a/fraudcrawler/scraping/zyte.py b/fraudcrawler/scraping/zyte.py index 839d4ff..7f503cd 100644 --- a/fraudcrawler/scraping/zyte.py +++ b/fraudcrawler/scraping/zyte.py @@ -14,7 +14,7 @@ ) from fraudcrawler.base.base import DomainUtils, ProductItem from fraudcrawler.base.retry import get_async_retry -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig logger = logging.getLogger(__name__) @@ -45,7 +45,7 @@ class ZyteAPI(RedisCacher, DomainUtils): """A client to interact with the Zyte API for fetching product details.""" _endpoint = "https://api.zyte.com/v1/extract" - _config = { + _request_defaults = { "javascript": False, "browserHtml": False, "screenshot": False, @@ -62,6 +62,7 @@ def __init__( http_client: httpx.AsyncClient, api_key: str, redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Initializes the ZyteApiClient with the given API key and retry configurations. @@ -69,8 +70,9 @@ def __init__( http_client: An httpx.AsyncClient to use for the async requests. api_key: The API key for Zyte API. redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ - super().__init__(use_cache=redis_use_cache) + super().__init__(use_cache=redis_use_cache, config=redis_config) self._http_client = http_client self._api_key = api_key @@ -309,7 +311,7 @@ async def apply(self, url: str) -> dict: with attempt: response = await self._http_client.post( url=self._endpoint, - json={"url": url, **self._config}, + json={"url": url, **self._request_defaults}, auth=(self._api_key, ""), # API key as username, empty password ) response.raise_for_status() diff --git a/fraudcrawler/settings.py b/fraudcrawler/settings.py index 0284a85..607ea84 100644 --- a/fraudcrawler/settings.py +++ b/fraudcrawler/settings.py @@ -104,8 +104,20 @@ } DEFAULT_HTTPX_REDIRECTS = True -# Redis cache settings +# Redis settings REDIS_USE_CACHE = False -REDIS_URL = "redis://localhost:6379" -REDIS_TTL = 3600 +REDIS_DEFAULT_HOSTNAME = "localhost" +REDIS_DEFAULT_PORT = 6379 REDIS_NAMESPACE = "fraudcrawler" + +REDIS_CACHE_DB = 0 +REDIS_CACHE_NAMESPACE = f"{REDIS_NAMESPACE}:cache" +REDIS_CACHE_NAMESPACE_SEARCHER = f"{REDIS_CACHE_NAMESPACE}:searcher" +REDIS_CACHE_NAMESPACE_ENRICHER = f"{REDIS_CACHE_NAMESPACE}:enricher" +REDIS_CACHE_NAMESPACE_ZYTEAPI = f"{REDIS_CACHE_NAMESPACE}:zyteapi" +REDIS_CACHE_NAMESPACE_WORKFLOWS = f"{REDIS_CACHE_NAMESPACE}:workflows" +REDIS_CACHE_TTL = 3600 + +REDIS_URL_COLLECTOR_DB = 1 +REDIS_URL_COLLECTOR_NAMESPACE = f"{REDIS_NAMESPACE}:urls" +REDIS_URL_COLLECTOR_TTL = 3600 diff --git a/pyproject.toml b/pyproject.toml index 4af9291..07a4260 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "fraudcrawler" -version = "0.8.12" +version = "0.8.13" description = "Intelligent Market Monitoring" authors = [ "Domingo Bertus ", diff --git a/tests/unittests/test_scraping.py b/tests/unittests/test_scraping.py index 79d9c87..c8a9759 100644 --- a/tests/unittests/test_scraping.py +++ b/tests/unittests/test_scraping.py @@ -1,15 +1,28 @@ +import os +import uuid from urllib.parse import parse_qsl, urlparse import pytest import pytest_asyncio -from fraudcrawler import Enricher, URLCollector, ZyteAPI +from aiocache import Cache +from aiocache.backends.redis import RedisBackend + +from fraudcrawler import ( + DistributedURLCollector, + Enricher, + LocalURLCollector, + RedisConfig, + ZyteAPI, +) from fraudcrawler.base.base import ( Setup, Host, Location, Language, + FilteredAtStage, HttpxAsyncClient, + ProductItem, WebsiteSourceMetadata, ) from fraudcrawler.scraping.enrich import Keyword @@ -27,7 +40,6 @@ Toppreise, WebsiteSearch, ) -from fraudcrawler.scraping.url import filter_tracking_query_entries from fraudcrawler.settings import ROOT_DIR @@ -89,7 +101,7 @@ async def enricher(): @pytest.fixture def url_collector(): - return URLCollector() + return LocalURLCollector() @pytest.fixture @@ -439,6 +451,240 @@ def test_remove_tracking_parameters_edge_cases(url_collector): assert cleaned == expected, f"Failed to clean edge case URL: {url}" +def _make_product(url: str) -> ProductItem: + return ProductItem( + search_term="test", + search_term_type="seed", + url=url, + url_resolved=url, + search_engine_name="test", + domain="test.example", + ) + + +@pytest_asyncio.fixture +async def shared_memory_cache(): + """Provides a per-test aiocache in-memory cache and cleans it up afterwards.""" + namespace = f"test-dedup-{uuid.uuid4().hex}" + cache = Cache(cache_class=Cache.MEMORY, namespace=namespace) + yield cache + await cache.clear() + + +@pytest_asyncio.fixture +async def live_redis_cache(): + """Per-test live Redis backend; skips the test if Redis is unreachable. + + Entries written during a test are left in Redis for post-mortem + inspection (e.g. RedisInsight); they self-expire via the dedup TTL + (`_DUMMY_REDIS_CONFIG.ttl`). + + Env-var overrides: + REDIS_TEST_HOSTNAME / REDIS_TEST_PORT / REDIS_TEST_DB + connection params (defaults: localhost:6379, db=15). + REDIS_TEST_NAMESPACE + fixed namespace prefix (default: random per-test). Useful to + point all runs at the same prefix in RedisInsight. + """ + if fixed_ns := os.environ.get("REDIS_TEST_NAMESPACE"): + namespace = fixed_ns if fixed_ns.endswith(":") else f"{fixed_ns}:" + else: + namespace = f"test-dedup-{uuid.uuid4().hex}:" + + cache = RedisBackend( + endpoint=os.environ.get("REDIS_TEST_HOSTNAME", "localhost"), + port=int(os.environ.get("REDIS_TEST_PORT", "6379")), + db=int(os.environ.get("REDIS_TEST_DB", "15")), + namespace=namespace, + ) + try: + await cache.set(key="__ping__", value="1", ttl=5) + await cache.delete(key="__ping__") + except Exception as exc: + pytest.skip(f"Redis not reachable for live test: {exc}") + + yield cache + + +_DUMMY_REDIS_CONFIG = RedisConfig( + hostname="localhost", + port=6379, + password=None, + db=0, + namespace="test-dedup", + ttl=60, +) + + +def _make_dist_collector(id_suffix: str = "") -> DistributedURLCollector: + """Build a DistributedURLCollector with a dummy config; the Redis backend + instantiated here is replaced by `_attach_cache` before any I/O.""" + return DistributedURLCollector( + redis_config=_DUMMY_REDIS_CONFIG, id_suffix=id_suffix + ) + + +def _attach_cache(collector: DistributedURLCollector, cache) -> DistributedURLCollector: + collector._cache = cache + return collector + + +@pytest.mark.asyncio +async def test_distributed_collector_marks_duplicate_within_instance( + shared_memory_cache, +): + collector = _attach_cache(_make_dist_collector(), shared_memory_cache) + url = "https://www.ricardo.ch/p/123" + + first = await collector.apply(product=_make_product(url)) + assert first.filtered is False + assert first.filtered_at_stage is None + + second = await collector.apply(product=_make_product(url)) + assert second.filtered is True + assert second.filtered_at_stage == FilteredAtStage.URL_COLLECTION_CURRENT.value + + +@pytest.mark.asyncio +async def test_distributed_collector_marks_duplicate_across_instances( + shared_memory_cache, +): + url = "https://www.ricardo.ch/p/456" + + first_collector = _attach_cache(_make_dist_collector(), shared_memory_cache) + await first_collector.apply(product=_make_product(url)) + + second_collector = _attach_cache(_make_dist_collector(), shared_memory_cache) + result = await second_collector.apply(product=_make_product(url)) + + assert result.filtered is True + assert result.filtered_at_stage == FilteredAtStage.URL_COLLECTION_CURRENT.value + + +@pytest.mark.asyncio +async def test_distributed_collector_different_id_suffix_does_not_share( + shared_memory_cache, +): + url = "https://www.ricardo.ch/p/789" + + collector_a = _attach_cache( + _make_dist_collector(id_suffix="tenant-a"), shared_memory_cache + ) + await collector_a.apply(product=_make_product(url)) + + collector_b = _attach_cache( + _make_dist_collector(id_suffix="tenant-b"), shared_memory_cache + ) + result = await collector_b.apply(product=_make_product(url)) + + assert result.filtered is False + assert result.filtered_at_stage is None + + +@pytest.mark.asyncio +async def test_distributed_collector_add_previously_collected_urls( + shared_memory_cache, +): + seed_url = "https://www.ricardo.ch/p/seeded" + seeded = _attach_cache(_make_dist_collector(), shared_memory_cache) + await seeded.add_previously_collected_urls(urls=[seed_url]) + + fresh = _attach_cache(_make_dist_collector(), shared_memory_cache) + # Tracking params should be stripped before hashing -> still filtered. + product = _make_product(f"{seed_url}?utm_source=foo&srsltid=bar") + result = await fresh.apply(product=product) + + assert result.filtered is True + assert result.filtered_at_stage == FilteredAtStage.URL_COLLECTION_PREVIOUS.value + assert result.url == seed_url # cleaned URL was written back + + +@pytest.mark.asyncio +async def test_distributed_collector_stores_cleaned_url_marker_in_redis( + shared_memory_cache, +): + """First sighting of a tracker-laden URL must persist the CLEANED-URL + marker into Redis (and never the dirty form).""" + base_url = "https://www.ricardo.ch/p/with-trackers" + dirty_url = f"{base_url}?utm_source=foo&srsltid=bar&fbclid=xyz" + + collector = _attach_cache(_make_dist_collector(), shared_memory_cache) + result = await collector.apply(product=_make_product(dirty_url)) + + # First sighting -> not filtered, product.url rewritten to cleaned form + assert result.filtered is False + assert result.filtered_at_stage is None + assert result.url == base_url + + # Cleaned URL's key carries the CURRENT-run marker + clean_key = collector._get_redis_key(base_url) + stored = await shared_memory_cache.get(key=clean_key) + assert stored == FilteredAtStage.URL_COLLECTION_CURRENT.value + + # Dirty URL's key was never written (we never store the tracker form) + dirty_key = collector._get_redis_key(dirty_url) + assert await shared_memory_cache.get(key=dirty_key) is None + + +@pytest.mark.asyncio +async def test_distributed_collector_full_state_machine_on_live_redis( + live_redis_cache, +): + """End-to-end dedup against a real Redis instance. + + Skipped when no Redis is reachable. Override host/port/db via + REDIS_TEST_HOSTNAME / REDIS_TEST_PORT / REDIS_TEST_DB. + + Covers the three legs of the state machine: + 1. seeded URL -> filtered as PREVIOUS + 2. fresh tracker-laden URL -> not filtered, cleaned form persisted + 3. same URL again -> filtered as CURRENT + """ + seeded_url = "https://www.ricardo.ch/p/seed" + base_url = "https://www.ricardo.ch/p/fresh" + dirty_url = f"{base_url}?utm_source=foo&srsltid=bar" + + collector = _attach_cache(_make_dist_collector(), live_redis_cache) + + # 1. PREVIOUS-run marker via explicit seeding + await collector.add_previously_collected_urls(urls=[seeded_url]) + seeded_result = await collector.apply(product=_make_product(seeded_url)) + assert seeded_result.filtered is True + assert ( + seeded_result.filtered_at_stage == FilteredAtStage.URL_COLLECTION_PREVIOUS.value + ) + + # 2. First sighting of a dirty URL -> cleaned form persisted as CURRENT + fresh_result = await collector.apply(product=_make_product(dirty_url)) + assert fresh_result.filtered is False + assert fresh_result.url == base_url + + clean_key = collector._get_redis_key(base_url) + stored = await live_redis_cache.get(key=clean_key) + assert stored == FilteredAtStage.URL_COLLECTION_CURRENT.value + + dirty_key = collector._get_redis_key(dirty_url) + assert await live_redis_cache.get(key=dirty_key) is None + + # 3. Second sighting -> filtered as CURRENT + second = await collector.apply(product=_make_product(dirty_url)) + assert second.filtered is True + assert second.filtered_at_stage == FilteredAtStage.URL_COLLECTION_CURRENT.value + + +def test_distributed_collector_hash_is_deterministic(): + collector = _make_dist_collector(id_suffix="abc") + h1 = collector._get_redis_key("https://example.com/p/1") + h2 = collector._get_redis_key("https://example.com/p/1") + h3 = collector._get_redis_key("https://example.com/p/2") + + assert h1 == h2 + assert h1 != h3 + domain, _, digest = h1.partition("_") + assert domain == "example.com" + assert len(digest) == 64 # sha256 hex digest length + + def test_remove_tracking_parameters_known_trackers(url_collector): """Test that all known tracking parameters are removed.""" known_trackers = [ @@ -458,14 +704,14 @@ def test_remove_tracking_parameters_known_trackers(url_collector): ) -def test_filter_tracking_query_entries_matches_url_cleaning_behavior(): +def test_filter_tracking_query_entries_matches_url_cleaning_behavior(url_collector): url = "https://www.ricardo.ch/product/?utm_source=test¶m1=value1&srsltid=abc" queries = parse_qsl(urlparse(url).query, keep_blank_values=True) - filtered = filter_tracking_query_entries(queries=queries) + filtered = url_collector._filter_tracking_query_entries(queries=queries) assert filtered == [("param1", "value1")] - filtered_remove_all = filter_tracking_query_entries( + filtered_remove_all = url_collector._filter_tracking_query_entries( queries=queries, remove_all=True ) assert filtered_remove_all == []