From d454578142969c9a1c4a06133121e70732cb613e Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Wed, 15 Apr 2026 22:52:14 +0200 Subject: [PATCH 01/10] minor restructuring of URLCollector first (non-very-mature) version of DistributedURLCollector minor refactoring REDIS_URL as env variable Added redis url to .env.example some changes I can not remember but are importand (I guess) bumped version number --- .env.example | 3 +- fraudcrawler/__init__.py | 8 +- fraudcrawler/base/base.py | 2 + fraudcrawler/base/orchestrator.py | 2 +- fraudcrawler/cache/cacher.py | 172 +++++++++++++++++-------- fraudcrawler/launch_demo_pipeline.py | 10 +- fraudcrawler/scraping/url.py | 179 ++++++++++++++++++++++++--- fraudcrawler/settings.py | 4 +- pyproject.toml | 2 +- tests/unittests/test_scraping.py | 127 ++++++++++++++++++- 10 files changed, 424 insertions(+), 85 deletions(-) diff --git a/.env.example b/.env.example index c338eb3..1714abd 100644 --- a/.env.example +++ b/.env.example @@ -3,4 +3,5 @@ SERPAPI_KEY= OPENAIAPI_KEY= DATAFORSEO_USER= DATAFORSEO_PWD= -REDIS_USE_CACHE= \ No newline at end of file +REDIS_USE_CACHE= +REDIS_URL= \ No newline at end of file diff --git a/fraudcrawler/__init__.py b/fraudcrawler/__init__.py index 50c5483..b9feb71 100644 --- a/fraudcrawler/__init__.py +++ b/fraudcrawler/__init__.py @@ -1,7 +1,11 @@ from fraudcrawler.cache.cacher import RedisCacher from fraudcrawler.scraping.search import Searcher, SearchEngineName, WebsiteSearch from fraudcrawler.scraping.enrich import Enricher -from fraudcrawler.scraping.url import URLCollector +from fraudcrawler.scraping.url import ( + URLCollector, + LocalURLCollector, + DistributedURLCollector, +) from fraudcrawler.scraping.zyte import ZyteAPI from fraudcrawler.scraping.saved_search_models import ( WebsiteSource, @@ -40,6 +44,8 @@ "WebsiteSearch", "Enricher", "URLCollector", + "LocalURLCollector", + "DistributedURLCollector", "ZyteAPI", "WebsiteSource", "WebsiteSourceFilterConfig", diff --git a/fraudcrawler/base/base.py b/fraudcrawler/base/base.py index 975aaf4..10fd0c8 100644 --- a/fraudcrawler/base/base.py +++ b/fraudcrawler/base/base.py @@ -24,6 +24,7 @@ DEFAULT_HTTPX_LIMITS, DEFAULT_HTTPX_REDIRECTS, ) +from fraudcrawler.settings import REDIS_DEFAULT_URL logger = logging.getLogger(__name__) @@ -50,6 +51,7 @@ class Setup(BaseSettings): # Redis cache redis_use_cache: bool + redis_url: str = REDIS_DEFAULT_URL class Config: env_file = ".env" diff --git a/fraudcrawler/base/orchestrator.py b/fraudcrawler/base/orchestrator.py index 24ae002..e76fcd7 100644 --- a/fraudcrawler/base/orchestrator.py +++ b/fraudcrawler/base/orchestrator.py @@ -588,7 +588,7 @@ async def run( # Handle previously collected URLs if pcurls := previously_collected_urls: - self._url_collector.add_previously_collected_urls(urls=pcurls) + await self._url_collector.add_previously_collected_urls(urls=pcurls) # Setup the async framework n_saved_sources = len(website_source_sources or []) diff --git a/fraudcrawler/cache/cacher.py b/fraudcrawler/cache/cacher.py index 66ee363..ee4afb2 100644 --- a/fraudcrawler/cache/cacher.py +++ b/fraudcrawler/cache/cacher.py @@ -11,40 +11,65 @@ from aiocache.serializers import PickleSerializer from fraudcrawler.settings import ( - REDIS_USE_CACHE, - REDIS_URL, + REDIS_CACHE_NAMESPACE, + REDIS_DEFAULT_URL, REDIS_TTL, - REDIS_NAMESPACE, + REDIS_USE_CACHE, ) logger = logging.getLogger(__name__) -class RedisCacher(ABC): - """Abstract base class that adds Redis caching. +_DEFAULT_REDIS_HOST = "localhost" +_DEFAULT_REDIS_PORT = 6379 +_DEFAULT_REDIS_DB = 0 - :class:`RedisCacher` is used as a parent class for a subclass with an - `apply()` method that should be wrapped inside a caching mechanism. - Any subclass of RedisCacher must implement `apply()` with their core logic. - The function `capply()` is a wrapper taking care of caching. +def parse_redis_url(url: str) -> Dict[str, str | int | None]: + """Parse a redis:// or rediss:// URL into aiocache connection kwargs. - Cache keys are built deterministically from the class name and serialized - arguments (including Pydantic models), so identical calls produce - identical results. + Args: + url: Redis connection URL (redis:// or rediss://). + + Returns: + Dict with keys ``endpoint`` (str), ``port`` (int), ``password`` + (str | None), and ``db`` (int), suitable for passing as kwargs to + aiocache Redis backends. + + Raises: + ValueError: If ``url`` does not start with ``redis://`` or ``rediss://``. """ + u = urlparse(url) + if u.scheme not in {"redis", "rediss"}: + raise ValueError("redis_url must start with redis:// or rediss://") + return { + "endpoint": u.hostname or _DEFAULT_REDIS_HOST, + "port": u.port or _DEFAULT_REDIS_PORT, + "password": u.password, + "db": int(up) if (up := u.path.lstrip("/")) else _DEFAULT_REDIS_DB, + } + - _default_host = "localhost" - _default_port = 6379 - _default_db = 0 +class RedisCacher(ABC): + """Abstract base class that adds Redis caching to a subclass. + + RedisCacher wraps a subclass's apply() method with a transparent cache + layer. Subclasses implement apply() with their core logic and call + capply() as the public entry point, which handles cache lookup, result + storage, and bypassing Redis when use_cache is False. + + Cache keys are built deterministically from the class name and the + serialized call arguments (including Pydantic models), so identical + calls always map to the same cache entry. + """ def __init__( self, use_cache: bool = REDIS_USE_CACHE, - url: str = REDIS_URL, + url: str = REDIS_DEFAULT_URL, ttl: int = REDIS_TTL, - namespace: str = REDIS_NAMESPACE, + namespace: str = REDIS_CACHE_NAMESPACE, ) -> None: """Initialize the cacher, optionally connecting to Redis. @@ -54,15 +79,13 @@ def __init__( ttl: Time-to-live in seconds for cached entries. namespace: Key namespace to isolate entries in shared Redis instances. """ - # Input parameters self._use_cache = use_cache self._ttl = ttl self._namespace = namespace - # Parameters for caching self._cache: RedisCache | None = None if self._use_cache: - redis_kwargs = self._get_redis_kwargs(url=url) + redis_kwargs = parse_redis_url(url=url) self._cache = cast( RedisCache, Cache( @@ -73,32 +96,24 @@ def __init__( ), ) - def _get_redis_kwargs(self, url: str) -> Dict[str, str | int | None]: - """Get redis parameters as endpoint, port, password and db""" - - # Parse and check url - u = urlparse(url) - if u.scheme not in {"redis", "rediss"}: - raise ValueError("redis_url must start with redis:// or rediss://") - - # Create and return redis kwargs - return { - "endpoint": u.hostname or self._default_host, - "port": u.port or self._default_port, - "password": u.password, - "db": int(up) if (up := u.path.lstrip("/")) else self._default_db, - } - @staticmethod def _stable_key(payload: Dict[str, Any]) -> str: + """Serialize a payload dict to a compact, deterministic JSON string. + + Args: + payload: Dict to serialize as a cache key. + """ return json.dumps(payload, sort_keys=True, default=str, separators=(",", ":")) @staticmethod def _serialize_object(obj: Any) -> Any: - """Recursively serialize args/kwargs for cache keys. + """Recursively convert a value to a JSON-serializable representation. + + Converts pydantic BaseModel instances via model_dump(), recurses into + lists and dicts, and returns all other values unchanged. - Uses model_dump() for pydantic.BaseModels, recurses into list and dict, - leaves the rest unchanged. + Args: + obj: Value to serialize. """ if isinstance(obj, BaseModel): return obj.model_dump() @@ -111,7 +126,12 @@ def _serialize_object(obj: Any) -> Any: return obj def _build_key(self, *args: Any, **kwargs: Any) -> str: - """Builds caching key based on class name, args and kwargs.""" + """Build a deterministic cache key from the class name and call arguments. + + Args: + *args: Positional arguments passed to apply(). + **kwargs: Keyword arguments passed to apply(). + """ args_ = tuple(self._serialize_object(a) for a in args) kwargs_ = {k: self._serialize_object(v) for k, v in kwargs.items()} return self._stable_key( @@ -123,16 +143,26 @@ def _build_key(self, *args: Any, **kwargs: Any) -> str: ) async def _cached_apply(self, *args: Any, **kwargs: Any) -> Any: - """Cached wrapper around self.apply() method.""" + """Execute apply() with a Redis cache lookup. + + Returns the cached result on a hit; otherwise calls apply(), stores + the result, and returns it. + + Args: + *args: Positional arguments forwarded to apply(). + **kwargs: Keyword arguments forwarded to apply(). + + Returns: + Result of apply(), either retrieved from cache or freshly computed. - # Check if self._cache has been defined + Raises: + RuntimeError: If the Redis cache has not been initialized. + """ if self._cache is None: raise RuntimeError("Redis cache not initialized") - # Get caching key from arguments key = self._build_key(*args, **kwargs) - # Check if key exists in the cacher; otherwise compute the response exists = await self._cache.exists(key=key) if exists: logger.debug( @@ -154,18 +184,33 @@ async def _cached_apply(self, *args: Any, **kwargs: Any) -> Any: @abstractmethod async def apply(self, *args: Any, **kwargs: Any) -> Any: - """The cached function that each child of :class:`RedisCacher` must implement.""" + """Core logic that each subclass must implement. + + Called by capply() and, when caching is enabled, by _cached_apply(). + Subclasses should not call this method directly; use capply() instead. + + Args: + *args: Positional arguments specific to the subclass. + **kwargs: Keyword arguments specific to the subclass. + + Returns: + Subclass-defined result that will be cached under the call key. + """ pass async def capply(self, *args: Any, **kwargs: Any) -> Any: - """Calls the method `apply()` with Redis caching if enabled. Otherwise it calls `apply()` directly.""" + """Call apply() with Redis caching when enabled, or directly otherwise. + + Args: + *args: Positional arguments forwarded to apply(). + **kwargs: Keyword arguments forwarded to apply(). - # Cacher wrapped around self.apply() method + Returns: + Result of apply(), either retrieved from cache or freshly computed. + """ if self._use_cache: logger.debug(f"Running cached apply() for {self.__class__.__name__}") result = await self._cached_apply(*args, **kwargs) - - # No cacher, simply run self.apply() method else: logger.debug(f"Running not-cached apply() for {self.__class__.__name__}") result = await self.apply(*args, **kwargs) @@ -175,30 +220,50 @@ async def capply(self, *args: Any, **kwargs: Any) -> Any: # --------------------------------- # Utils for managing Redis remotely # --------------------------------- + async def utils_clear_namespace(self) -> None: + """Delete all cache entries in this instance's namespace. + + No-op when caching is disabled. + + Raises: + RuntimeError: If caching is enabled but the cache is not initialized. + """ if self._use_cache: if self._cache is None: raise RuntimeError("Redis cache not initialized") await self._cache.clear() async def utils_invalidate(self, *args: Any, **kwargs: Any) -> None: + """Delete the cache entry for a specific set of call arguments. + + No-op when caching is disabled. + + Args: + *args: Positional arguments identifying the cache entry to remove. + **kwargs: Keyword arguments identifying the cache entry to remove. + + Raises: + RuntimeError: If caching is enabled but the cache is not initialized. + """ if self._use_cache: if self._cache is None: raise RuntimeError("Redis cache not initialized") await self._cache.delete(key=self._build_key(*args, **kwargs)) async def utils_redis_is_available(self) -> bool: - """Works with aiocache backends: does a small SET/GET/DEL roundtrip.""" - # Dummy key-value pair + """Check Redis availability with a SET/GET/DEL health-check roundtrip. + + Raises: + RuntimeError: If the Redis cache has not been initialized. + """ key = f"__healthcheck__:{self.__class__.__name__}:{uuid.uuid4().hex}" value = "1337" test_ttl = 5 - # Check if cache is defined at all if self._cache is None: raise RuntimeError("Redis cache not initialized") - # Try to set dummy key-value pair try: logger.debug("test to set dummy key-value pair in cacher") await self._cache.set(key=key, value=value, ttl=test_ttl) @@ -206,7 +271,6 @@ async def utils_redis_is_available(self) -> bool: logger.error("failed to set dummy key-value pair in cacher", exc_info=True) return False - # Try to read dummy key-value pair and compare it try: logger.debug("read written dummy value") obtained = await self._cache.get(key=key) diff --git a/fraudcrawler/launch_demo_pipeline.py b/fraudcrawler/launch_demo_pipeline.py index fd180a2..f63f49b 100644 --- a/fraudcrawler/launch_demo_pipeline.py +++ b/fraudcrawler/launch_demo_pipeline.py @@ -8,7 +8,8 @@ HttpxAsyncClient, Searcher, Enricher, - URLCollector, + # LocalURLCollector, + DistributedURLCollector, ZyteAPI, SearchEngineName, Language, @@ -24,7 +25,7 @@ LOG_FMT = "%(asctime)s | %(name)s | %(funcName)s | %(levelname)s | %(message)s" LOG_LVL = "INFO" DATE_FMT = "%Y-%m-%d %H:%M:%S" -REDIS_USE_CACHE = False +REDIS_USE_CACHE = True SETUP = Setup() # type: ignore[call-arg] logging.basicConfig(format=LOG_FMT, level=LOG_LVL, datefmt=DATE_FMT) @@ -179,7 +180,10 @@ async def run(http_client: HttpxAsyncClient, search_term: str): pwd=SETUP.dataforseo_pwd, redis_use_cache=REDIS_USE_CACHE, ) - url_collector = URLCollector() + # url_collector = LocalURLCollector() + url_collector = DistributedURLCollector( + url=SETUP.redis_url, + ) zyteapi = ZyteAPI( http_client=http_client, api_key=SETUP.zyteapi_key, diff --git a/fraudcrawler/scraping/url.py b/fraudcrawler/scraping/url.py index b107c0b..4fe1145 100644 --- a/fraudcrawler/scraping/url.py +++ b/fraudcrawler/scraping/url.py @@ -1,49 +1,71 @@ +from abc import ABC, abstractmethod +import hashlib import logging from typing import List, Set, Tuple from urllib.parse import urlparse, parse_qsl, urlencode, quote, urlunparse, ParseResult -from fraudcrawler.settings import KNOWN_TRACKERS from fraudcrawler.base.base import FilteredAtStage, ProductItem +from aiocache.backends.redis import RedisBackend + +from fraudcrawler.cache.cacher import parse_redis_url +from fraudcrawler.settings import ( + KNOWN_TRACKERS, + REDIS_TTL, + REDIS_URL_COLLECTOR_NAMESPACE, +) logger = logging.getLogger(__name__) -def should_drop_tracking_query_parameter(param_key: str) -> bool: - """Returns True if a query parameter key is considered a tracker.""" +def _should_drop_tracking_query_parameter(param_key: str) -> bool: + """Return True if a query parameter key matches a known tracking prefix. + + Args: + param_key: Query parameter key to test (case-insensitive). + """ key = str(param_key or "").lower() return any(key.startswith(tracker) for tracker in KNOWN_TRACKERS) def filter_tracking_query_entries( queries: List[Tuple[str, str]], - *, remove_all: bool = False, ) -> List[Tuple[str, str]]: - """Filter tracking query entries from an already parsed query list.""" + """Filter tracking query entries from an already parsed query list. + + Args: + queries: Parsed query parameters as (key, value) pairs. + remove_all: If True, return an empty list regardless of content. + """ if remove_all: return [] return [ - query for query in queries if not should_drop_tracking_query_parameter(query[0]) + (key, val) + for key, val in queries + if not _should_drop_tracking_query_parameter(key) ] -class URLCollector: +class URLCollector(ABC): """A class to collect and de-duplicate URLs.""" - def __init__(self): - self._collected_currently: Set[str] = set() - self._collected_previously: Set[str] = set() + _filtered_at_stage_current = "URL collection (current run deduplication)" + _filtered_at_stage_previous = "URL collection (previous run deduplication)" - def add_previously_collected_urls(self, urls: List[str]) -> None: - """Add a set of previously collected URLs to the internal state. + def _filter_tracking_query_entries( + self, + queries: List[Tuple[str, str]], + remove_all: bool = False, + ) -> List[Tuple[str, str]]: + """Filter tracking query entries from an already parsed query list. Args: - urls: A set of URLs that have been collected in previous runs. + queries: Parsed query parameters as (key, value) pairs. + remove_all: If True, return an empty list regardless of content. """ - self._collected_previously.update(urls) + return filter_tracking_query_entries(queries=queries, remove_all=remove_all) - @staticmethod - def _remove_tracking_parameters(url: str) -> str: + def _remove_tracking_parameters(self, url: str) -> str: """Remove tracking parameters from URLs. Args: @@ -64,7 +86,7 @@ def _remove_tracking_parameters(url: str) -> str: remove_all = url.startswith( "https://www.ebay" ) # eBay URLs have all query parameters as tracking parameters - filtered_queries = filter_tracking_query_entries( + filtered_queries = self._filter_tracking_query_entries( queries=queries, remove_all=remove_all ) @@ -79,6 +101,40 @@ def _remove_tracking_parameters(url: str) -> str: ) return urlunparse(clean_url) + @abstractmethod + async def add_previously_collected_urls(self, urls: List[str]) -> None: + """Add a set of previously collected URLs to the internal state. + + Args: + urls: A set of URLs that have been collected in previous runs. + """ + pass + + @abstractmethod + async def apply(self, product: ProductItem) -> ProductItem: + """Manages the collection and deduplication of ProductItems. + + Args: + product: The product item to process. + """ + pass + + +class LocalURLCollector(URLCollector): + """A class to collect and de-duplicate URLs using local storage.""" + + def __init__(self): + self._collected_currently: Set[str] = set() + self._collected_previously: Set[str] = set() + + async def add_previously_collected_urls(self, urls: List[str]) -> None: + """Add a set of previously collected URLs to the internal state. + + Args: + urls: A set of URLs that have been collected in previous runs. + """ + self._collected_previously.update(urls) + async def apply(self, product: ProductItem) -> ProductItem: """Manages the collection and deduplication of ProductItems. @@ -108,3 +164,92 @@ async def apply(self, product: ProductItem) -> ProductItem: self._collected_currently.add(url) return product + + +class DistributedURLCollector(URLCollector): + """A URL collector that de-duplicates across pipeline runs using Redis. + + Seen URLs are stored under a namespaced Redis key computed as + ``md5(cleaned_url + id_suffix)``. Hashing keeps keys fixed-length and + bounded in memory regardless of URL length. An optional ``id_suffix`` + scopes deduplication beyond the Redis namespace (e.g. per-tenant, + per-campaign) by changing the hash input. + """ + + def __init__( + self, + url: str, + ttl: int = REDIS_TTL, + namespace: str = REDIS_URL_COLLECTOR_NAMESPACE, + id_suffix: str = "", + ) -> None: + """Initialize the distributed collector. + + Args: + url: Redis connection URL (redis:// or rediss://). + ttl: Time-to-live in seconds for each stored URL marker. + namespace: Redis key namespace to isolate entries. + id_suffix: String appended to the URL before hashing to deduplication + """ + self._id_suffix = id_suffix + self._ttl = ttl + self._collected_currently: Set[str] = set() + redis_kwargs = parse_redis_url(url=url) + self._cache: RedisBackend = RedisBackend(namespace=namespace, **redis_kwargs) + + def _hash(self, cleaned_url: str) -> str: + """Return the MD5 hex digest of the `cleaned_url` concatenated with `id_suffix`. + + MD5 is used for deduplication only (not for security). + + Args: + cleaned_url: Tracking-parameter-free URL to hash. + """ + payload = f"{cleaned_url}{self._id_suffix}".encode("utf-8") + return hashlib.md5(payload, usedforsecurity=False).hexdigest() + + async def add_previously_collected_urls(self, urls: List[str]) -> None: + """Seed Redis with already-seen URLs. + + Each URL is cleaned of tracking parameters, hashed and stored in + Redis with TTL so subsequent ``apply()`` calls filter it out. + + Args: + urls: URLs collected in previous runs. + """ + for raw in urls: + cleaned = self._remove_tracking_parameters(raw) + key = self._hash(cleaned) + await self._cache.set(key=key, value="1", ttl=self._ttl) + + async def apply(self, product: ProductItem) -> ProductItem: + """De-duplicate product cross-run using Redis. + + Args: + product: The product item to process. + """ + logger.debug(f'Processing product with url="{product.url}"') + + # Remove tracking parameters from the URL + url = self._remove_tracking_parameters(product.url) + product.url = url + key = self._hash(url) + + # deduplicate on current run (in-memory) + if url in self._collected_currently: + product.filtered = True + product.filtered_at_stage = self._filtered_at_stage_current + logger.debug(f"URL {url} already collected in current run") + + # deduplicate on previous runs (Redis, possibly written by another instance) + elif await self._cache.exists(key=key): + product.filtered = True + product.filtered_at_stage = self._filtered_at_stage_previous + logger.debug(f"URL {url} already collected in previous run (distributed)") + + # Add to both current in-memory set and Redis + else: + self._collected_currently.add(url) + await self._cache.set(key=key, value="1", ttl=self._ttl) + + return product diff --git a/fraudcrawler/settings.py b/fraudcrawler/settings.py index 0284a85..45343aa 100644 --- a/fraudcrawler/settings.py +++ b/fraudcrawler/settings.py @@ -106,6 +106,8 @@ # Redis cache settings REDIS_USE_CACHE = False -REDIS_URL = "redis://localhost:6379" +REDIS_DEFAULT_URL = "redis://localhost:6379" REDIS_TTL = 3600 REDIS_NAMESPACE = "fraudcrawler" +REDIS_CACHE_NAMESPACE = f"{REDIS_NAMESPACE}:cache:" +REDIS_URL_COLLECTOR_NAMESPACE = f"{REDIS_NAMESPACE}:urls:" diff --git a/pyproject.toml b/pyproject.toml index 4af9291..07a4260 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "poetry.core.masonry.api" [tool.poetry] name = "fraudcrawler" -version = "0.8.12" +version = "0.8.13" description = "Intelligent Market Monitoring" authors = [ "Domingo Bertus ", diff --git a/tests/unittests/test_scraping.py b/tests/unittests/test_scraping.py index 79d9c87..76ab046 100644 --- a/tests/unittests/test_scraping.py +++ b/tests/unittests/test_scraping.py @@ -3,13 +3,16 @@ import pytest import pytest_asyncio -from fraudcrawler import Enricher, URLCollector, ZyteAPI +from aiocache import Cache + +from fraudcrawler import DistributedURLCollector, Enricher, LocalURLCollector, ZyteAPI from fraudcrawler.base.base import ( Setup, Host, Location, Language, HttpxAsyncClient, + ProductItem, WebsiteSourceMetadata, ) from fraudcrawler.scraping.enrich import Keyword @@ -27,7 +30,6 @@ Toppreise, WebsiteSearch, ) -from fraudcrawler.scraping.url import filter_tracking_query_entries from fraudcrawler.settings import ROOT_DIR @@ -89,7 +91,7 @@ async def enricher(): @pytest.fixture def url_collector(): - return URLCollector() + return LocalURLCollector() @pytest.fixture @@ -439,6 +441,119 @@ def test_remove_tracking_parameters_edge_cases(url_collector): assert cleaned == expected, f"Failed to clean edge case URL: {url}" +def _make_product(url: str) -> ProductItem: + return ProductItem( + search_term="test", + search_term_type="seed", + url=url, + url_resolved=url, + search_engine_name="test", + domain="test.example", + ) + + +@pytest_asyncio.fixture +async def shared_memory_cache(): + """Provides a per-test aiocache in-memory cache and cleans it up afterwards.""" + import uuid + + namespace = f"test-dedup-{uuid.uuid4().hex}" + cache = Cache(cache_class=Cache.MEMORY, namespace=namespace) + yield cache + await cache.clear() + + +def _attach_cache(collector: DistributedURLCollector, cache) -> DistributedURLCollector: + collector._cache = cache + return collector + + +@pytest.mark.asyncio +async def test_distributed_collector_marks_duplicate_within_instance( + shared_memory_cache, +): + collector = _attach_cache(DistributedURLCollector(), shared_memory_cache) + url = "https://www.ricardo.ch/p/123" + + first = await collector.apply(product=_make_product(url)) + assert first.filtered is False + assert first.filtered_at_stage is None + + second = await collector.apply(product=_make_product(url)) + assert second.filtered is True + assert second.filtered_at_stage == "URL collection (current run deduplication)" + + +@pytest.mark.asyncio +async def test_distributed_collector_marks_duplicate_across_instances( + shared_memory_cache, +): + url = "https://www.ricardo.ch/p/456" + + first_collector = _attach_cache(DistributedURLCollector(), shared_memory_cache) + await first_collector.apply(product=_make_product(url)) + + second_collector = _attach_cache(DistributedURLCollector(), shared_memory_cache) + result = await second_collector.apply(product=_make_product(url)) + + assert result.filtered is True + assert result.filtered_at_stage == "URL collection (previous run deduplication)" + + +@pytest.mark.asyncio +async def test_distributed_collector_different_id_suffix_does_not_share( + shared_memory_cache, +): + url = "https://www.ricardo.ch/p/789" + + collector_a = _attach_cache( + DistributedURLCollector(id_suffix="tenant-a"), shared_memory_cache + ) + await collector_a.apply(product=_make_product(url)) + + collector_b = _attach_cache( + DistributedURLCollector(id_suffix="tenant-b"), shared_memory_cache + ) + result = await collector_b.apply(product=_make_product(url)) + + assert result.filtered is False + assert result.filtered_at_stage is None + + +@pytest.mark.asyncio +async def test_distributed_collector_add_previously_collected_urls( + shared_memory_cache, +): + seed_url = "https://www.ricardo.ch/p/seeded" + seeded = _attach_cache(DistributedURLCollector(), shared_memory_cache) + await seeded.add_previously_collected_urls(urls=[seed_url]) + + fresh = _attach_cache(DistributedURLCollector(), shared_memory_cache) + # Tracking params should be stripped before hashing -> still filtered. + product = _make_product(f"{seed_url}?utm_source=foo&srsltid=bar") + result = await fresh.apply(product=product) + + assert result.filtered is True + assert result.filtered_at_stage == "URL collection (previous run deduplication)" + assert result.url == seed_url # cleaned URL was written back + + +def test_distributed_collector_hash_is_deterministic(): + collector = DistributedURLCollector(id_suffix="abc") + h1 = collector._hash("https://example.com/p/1") + h2 = collector._hash("https://example.com/p/1") + h3 = collector._hash("https://example.com/p/2") + + assert h1 == h2 + assert h1 != h3 + assert len(h1) == 32 # md5 hex digest length + + +def test_distributed_collector_rejects_bad_redis_url(): + with pytest.raises(ValueError): + DistributedURLCollector(url="http://not-redis") + + def test_remove_tracking_parameters_known_trackers(url_collector): """Test that all known tracking parameters are removed.""" known_trackers = [ @@ -458,14 +573,14 @@ def test_remove_tracking_parameters_known_trackers(url_collector): ) -def test_filter_tracking_query_entries_matches_url_cleaning_behavior(): +def test_filter_tracking_query_entries_matches_url_cleaning_behavior(url_collector): url = "https://www.ricardo.ch/product/?utm_source=test¶m1=value1&srsltid=abc" queries = parse_qsl(urlparse(url).query, keep_blank_values=True) - filtered = filter_tracking_query_entries(queries=queries) + filtered = url_collector._filter_tracking_query_entries(queries=queries) assert filtered == [("param1", "value1")] - filtered_remove_all = filter_tracking_query_entries( + filtered_remove_all = url_collector._filter_tracking_query_entries( queries=queries, remove_all=True ) assert filtered_remove_all == [] From 42988bc59275a7cc0d87a87299d18929e8d3e952 Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Thu, 30 Apr 2026 15:14:05 +0200 Subject: [PATCH 02/10] major review of code --- fraudcrawler/__init__.py | 3 +- fraudcrawler/base/base.py | 10 +- fraudcrawler/cache/cacher.py | 86 +++++++------ fraudcrawler/launch_demo_pipeline.py | 35 ++++-- fraudcrawler/processing/base.py | 8 +- fraudcrawler/processing/openai.py | 19 ++- fraudcrawler/scraping/enrich.py | 6 +- fraudcrawler/scraping/search.py | 6 +- fraudcrawler/scraping/url.py | 77 ++++++------ fraudcrawler/scraping/zyte.py | 6 +- fraudcrawler/settings.py | 13 +- tests/unittests/test_scraping.py | 173 +++++++++++++++++++++++---- 12 files changed, 308 insertions(+), 134 deletions(-) diff --git a/fraudcrawler/__init__.py b/fraudcrawler/__init__.py index b9feb71..cf62d82 100644 --- a/fraudcrawler/__init__.py +++ b/fraudcrawler/__init__.py @@ -1,4 +1,4 @@ -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig from fraudcrawler.scraping.search import Searcher, SearchEngineName, WebsiteSearch from fraudcrawler.scraping.enrich import Enricher from fraudcrawler.scraping.url import ( @@ -39,6 +39,7 @@ __all__ = [ "RedisCacher", + "RedisConfig", "Searcher", "SearchEngineName", "WebsiteSearch", diff --git a/fraudcrawler/base/base.py b/fraudcrawler/base/base.py index 10fd0c8..694b058 100644 --- a/fraudcrawler/base/base.py +++ b/fraudcrawler/base/base.py @@ -23,8 +23,10 @@ DEFAULT_HTTPX_TIMEOUT, DEFAULT_HTTPX_LIMITS, DEFAULT_HTTPX_REDIRECTS, + REDIS_USE_CACHE, + REDIS_DEFAULT_HOSTNAME, + REDIS_DEFAULT_PORT, ) -from fraudcrawler.settings import REDIS_DEFAULT_URL logger = logging.getLogger(__name__) @@ -50,8 +52,10 @@ class Setup(BaseSettings): pypy_token: str # Redis cache - redis_use_cache: bool - redis_url: str = REDIS_DEFAULT_URL + redis_use_cache: bool = REDIS_USE_CACHE + redis_hostname: str = REDIS_DEFAULT_HOSTNAME + redis_port: int = REDIS_DEFAULT_PORT + redis_password: str | None = None class Config: env_file = ".env" diff --git a/fraudcrawler/cache/cacher.py b/fraudcrawler/cache/cacher.py index ee4afb2..1ebc313 100644 --- a/fraudcrawler/cache/cacher.py +++ b/fraudcrawler/cache/cacher.py @@ -2,53 +2,46 @@ import json import logging from pydantic import BaseModel -from typing import Any, cast, Dict, Sequence -from urllib.parse import urlparse +from typing import Any, cast, Dict, Self, Sequence import uuid from aiocache import Cache from aiocache.backends.redis import RedisCache from aiocache.serializers import PickleSerializer -from fraudcrawler.settings import ( - REDIS_CACHE_NAMESPACE, - REDIS_DEFAULT_URL, - REDIS_TTL, - REDIS_USE_CACHE, -) +from fraudcrawler.base.base import Setup +from fraudcrawler.settings import REDIS_USE_CACHE logger = logging.getLogger(__name__) +_SETUP = Setup() # type: ignore -_DEFAULT_REDIS_HOST = "localhost" -_DEFAULT_REDIS_PORT = 6379 -_DEFAULT_REDIS_DB = 0 +class RedisConfig(BaseModel): + hostname: str + port: int + password: str | None + db: int + namespace: str + ttl: int + @classmethod + def from_setup(cls, db: int, namespace: str, ttl: int) -> Self: -def parse_redis_url(url: str) -> Dict[str, str | int | None]: - """Parse a redis:// or rediss:// URL into aiocache connection kwargs. + if _SETUP.redis_hostname is None: + raise ValueError("REDIS_HOSTNAME env variable is missing") + elif _SETUP.redis_port is None: + raise ValueError("REDIS_PORT env variable is missing") - Args: - url: Redis connection URL (redis:// or rediss://). - - Returns: - Dict with keys ``endpoint`` (str), ``port`` (int), ``password`` - (str | None), and ``db`` (int), suitable for passing as kwargs to - aiocache Redis backends. - - Raises: - ValueError: If ``url`` does not start with ``redis://`` or ``rediss://``. - """ - u = urlparse(url) - if u.scheme not in {"redis", "rediss"}: - raise ValueError("redis_url must start with redis:// or rediss://") - return { - "endpoint": u.hostname or _DEFAULT_REDIS_HOST, - "port": u.port or _DEFAULT_REDIS_PORT, - "password": u.password, - "db": int(up) if (up := u.path.lstrip("/")) else _DEFAULT_REDIS_DB, - } + kwargs = { + "hostname": _SETUP.redis_hostname, + "port": _SETUP.redis_port, + "password": _SETUP.redis_password, + "db": db, + "namespace": namespace, + "ttl": ttl, + } + return cls(**kwargs) class RedisCacher(ABC): @@ -67,32 +60,33 @@ class RedisCacher(ABC): def __init__( self, use_cache: bool = REDIS_USE_CACHE, - url: str = REDIS_DEFAULT_URL, - ttl: int = REDIS_TTL, - namespace: str = REDIS_CACHE_NAMESPACE, + config: RedisConfig | None = None, ) -> None: """Initialize the cacher, optionally connecting to Redis. Args: use_cache: Whether to use Redis cache. - url: Redis connection URL (redis:// or rediss://). - ttl: Time-to-live in seconds for cached entries. - namespace: Key namespace to isolate entries in shared Redis instances. + config: Redis configuration object (mandatory if redis_use_cache=True). """ - self._use_cache = use_cache - self._ttl = ttl - self._namespace = namespace + if use_cache and config is None: + raise ValueError("redis_config must be provided when use_cache=True") + else: + self._config = cast(RedisConfig, config) + self._use_cache = use_cache self._cache: RedisCache | None = None + if self._use_cache: - redis_kwargs = parse_redis_url(url=url) self._cache = cast( RedisCache, Cache( cache_class=Cache.REDIS, # type: ignore[reportArgumentType] serializer=PickleSerializer(), - namespace=self._namespace, - **redis_kwargs, + hostname=self._config.hostname, + port=self._config.port, + password=self._config.password, + db=self._config.db, + namespace=self._config.namespace, ), ) @@ -178,7 +172,7 @@ async def _cached_apply(self, *args: Any, **kwargs: Any) -> Any: logger.debug( f"Set cached response for {self.__class__.__name__}.apply(args={args}, kwargs={kwargs})" ) - await self._cache.set(key=key, value=result, ttl=self._ttl) + await self._cache.set(key=key, value=result, ttl=self._config.ttl) return result diff --git a/fraudcrawler/launch_demo_pipeline.py b/fraudcrawler/launch_demo_pipeline.py index f63f49b..68ad49c 100644 --- a/fraudcrawler/launch_demo_pipeline.py +++ b/fraudcrawler/launch_demo_pipeline.py @@ -4,6 +4,7 @@ from fraudcrawler.base.base import Setup from fraudcrawler import ( + RedisConfig, FraudCrawlerClient, HttpxAsyncClient, Searcher, @@ -21,11 +22,18 @@ ) from fraudcrawler.scraping.search import WebsiteSearch from fraudcrawler.scraping.utils import build_website_source_profile +from fraudcrawler.settings import ( + REDIS_CACHE_DB, + REDIS_CACHE_NAMESPACE_OPENAI, + REDIS_CACHE_TTL, + REDIS_URL_COLLECTOR_DB, + REDIS_URL_COLLECTOR_NAMESPACE, + REDIS_URL_COLLECTOR_TTL, +) LOG_FMT = "%(asctime)s | %(name)s | %(funcName)s | %(levelname)s | %(message)s" LOG_LVL = "INFO" DATE_FMT = "%Y-%m-%d %H:%M:%S" -REDIS_USE_CACHE = True SETUP = Setup() # type: ignore[call-arg] logging.basicConfig(format=LOG_FMT, level=LOG_LVL, datefmt=DATE_FMT) @@ -114,6 +122,12 @@ def _setup_workflows( " - Related Topics/Content: Any text or media that discusses or elaborates on the topic without offering a tangible product for sale.\n" "Make your decision based solely on the context and details provided in the search result. Respond only with the number 1 or 0." ) + namespace = REDIS_CACHE_NAMESPACE_OPENAI + redis_config = RedisConfig.from_setup( + db=REDIS_CACHE_DB, + namespace=namespace, + ttl=REDIS_CACHE_TTL, + ) return [ OpenAIClassification( http_client=http_client, @@ -124,6 +138,7 @@ def _setup_workflows( system_prompt=_AVAILABILITY_SYSTEM_PROMPT, allowed_classes=[0, 1], redis_use_cache=redis_use_cache, + redis_config=redis_config, ), OpenAIClassification( http_client=http_client, @@ -134,6 +149,7 @@ def _setup_workflows( system_prompt=_SERIOUSNESS_SYSTEM_PROMPT, allowed_classes=[0, 1], redis_use_cache=redis_use_cache, + redis_config=redis_config, ), ] @@ -172,25 +188,30 @@ async def run(http_client: HttpxAsyncClient, search_term: str): http_client=http_client, serpapi_key=SETUP.serpapi_key, zyteapi_key=SETUP.zyteapi_key, - redis_use_cache=REDIS_USE_CACHE, + redis_use_cache=SETUP.redis_use_cache, ) enricher = Enricher( http_client=http_client, user=SETUP.dataforseo_user, pwd=SETUP.dataforseo_pwd, - redis_use_cache=REDIS_USE_CACHE, + redis_use_cache=SETUP.redis_use_cache, ) # url_collector = LocalURLCollector() + redis_config = RedisConfig.from_setup( + db=REDIS_URL_COLLECTOR_DB, + namespace=REDIS_URL_COLLECTOR_NAMESPACE, + ttl=REDIS_URL_COLLECTOR_TTL, + ) url_collector = DistributedURLCollector( - url=SETUP.redis_url, + redis_config=redis_config, ) zyteapi = ZyteAPI( http_client=http_client, api_key=SETUP.zyteapi_key, - redis_use_cache=REDIS_USE_CACHE, + redis_use_cache=SETUP.redis_use_cache, ) workflows = _setup_workflows( - http_client=http_client, redis_use_cache=REDIS_USE_CACHE + http_client=http_client, redis_use_cache=SETUP.redis_use_cache ) processor = Processor(workflows=workflows) @@ -242,7 +263,7 @@ async def run_website_source_demo(http_client: HttpxAsyncClient, search_term: st website_search = WebsiteSearch( http_client=http_client, zyteapi_key=SETUP.zyteapi_key, - redis_use_cache=REDIS_USE_CACHE, + redis_use_cache=SETUP.redis_use_cache, ) result = await website_search.ingest_source( source=FUST_WEBSITE_SOURCE_PROFILE, diff --git a/fraudcrawler/processing/base.py b/fraudcrawler/processing/base.py index 6034918..c7d5ee1 100644 --- a/fraudcrawler/processing/base.py +++ b/fraudcrawler/processing/base.py @@ -6,7 +6,7 @@ from tenacity import RetryCallState from fraudcrawler.base.base import ProductItem -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig from fraudcrawler.settings import REDIS_USE_CACHE logger = logging.getLogger(__name__) @@ -44,14 +44,16 @@ def __init__( self, name: str, redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Abstract base class for defining a classification workflow. Args: name: Name of the classification workflow. redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ - super().__init__(use_cache=redis_use_cache) + super().__init__(use_cache=redis_use_cache, config=redis_config) self.name = name @abstractmethod @@ -153,7 +155,7 @@ async def run(self, product: ProductItem) -> ProductItem: product.usage[wf.name] = { "input_tokens": inp_tok, "output_tokens": out_tok, - "model": wf.model, + "model": getattr(wf, "model"), } else: raise ValueError( diff --git a/fraudcrawler/processing/openai.py b/fraudcrawler/processing/openai.py index 359f0a1..6caa21e 100644 --- a/fraudcrawler/processing/openai.py +++ b/fraudcrawler/processing/openai.py @@ -15,12 +15,14 @@ from fraudcrawler.base.base import ProductItem from fraudcrawler.base.retry import get_async_retry +from fraudcrawler.cache.cacher import RedisConfig from fraudcrawler.processing.base import ( ClassificationResult, UserInputs, Workflow, Context, ) +from fraudcrawler.settings import REDIS_USE_CACHE logger = logging.getLogger(__name__) @@ -38,7 +40,8 @@ def __init__( name: str, api_key: str, model: str, - redis_use_cache: bool = False, + redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """(Abstract) OpenAI Workflow. @@ -48,10 +51,12 @@ def __init__( api_key: The OpenAI API key. model: The OpenAI model to use. redis_use_cache: Whether to use Redis cache. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ super().__init__( name=name, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._http_client = http_client self._client = AsyncOpenAI(http_client=http_client, api_key=api_key) @@ -256,7 +261,7 @@ async def _responses_parse( """ # Prepare variables cntx = deepcopy(context) - cntx["enpdoint"] = "response.parse" + cntx["endpoint"] = "response.parse" detail: Literal["low", "high", "auto"] = "high" input_param = self._get_input_param( image_url=image_url, @@ -372,7 +377,8 @@ def __init__( product_item_fields: List[str], system_prompt: str, allowed_classes: List[int], - redis_use_cache: bool = False, + redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Open AI classification workflow. @@ -385,6 +391,7 @@ def __init__( system_prompt: System prompt for the AI model. allowed_classes: Allowed classes for model output (must be positive). redis_use_cache: Whether to use Redis cache. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ super().__init__( http_client=http_client, @@ -392,6 +399,7 @@ def __init__( api_key=api_key, model=model, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._product_item_fields = product_item_fields self._system_prompt = system_prompt @@ -482,7 +490,8 @@ def __init__( system_prompt: str, allowed_classes: List[int], user_inputs: UserInputs, - redis_use_cache: bool = False, + redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Open AI classification workflow from user input. @@ -496,6 +505,7 @@ def __init__( allowed_classes: Allowed classes for model output. user_inputs: Inputs from the frontend by the user. redis_use_cache: Whether to use Redis cache. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ super().__init__( http_client=http_client, @@ -506,6 +516,7 @@ def __init__( system_prompt=system_prompt, allowed_classes=allowed_classes, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._user_inputs = user_inputs diff --git a/fraudcrawler/scraping/enrich.py b/fraudcrawler/scraping/enrich.py index ce68d8d..d4eaac8 100644 --- a/fraudcrawler/scraping/enrich.py +++ b/fraudcrawler/scraping/enrich.py @@ -10,7 +10,7 @@ from fraudcrawler.settings import ENRICHMENT_DEFAULT_LIMIT, REDIS_USE_CACHE from fraudcrawler.base.base import Location, Language from fraudcrawler.base.retry import get_async_retry -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig logger = logging.getLogger(__name__) @@ -37,6 +37,7 @@ def __init__( user: str, pwd: str, redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Initializes the DataForSeoApiClient with the given username and password. @@ -45,8 +46,9 @@ def __init__( user: The username for DataForSEO API. pwd: The password for DataForSEO API. redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ - super().__init__(use_cache=redis_use_cache) + super().__init__(use_cache=redis_use_cache, config=redis_config) self._http_client = http_client self._user = user diff --git a/fraudcrawler/scraping/search.py b/fraudcrawler/scraping/search.py index b7c8d19..6a69a32 100644 --- a/fraudcrawler/scraping/search.py +++ b/fraudcrawler/scraping/search.py @@ -34,7 +34,7 @@ WebsiteSourceMetadata, ) from fraudcrawler.base.retry import get_async_retry -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig from fraudcrawler.scraping.url import filter_tracking_query_entries from fraudcrawler.scraping.zyte import ( SavedSearchRenderedProductListItem, @@ -1147,6 +1147,7 @@ def __init__( serpapi_key: str, zyteapi_key: str, redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Initializes the Search class with the given SerpAPI key. @@ -1155,8 +1156,9 @@ def __init__( serpapi_key: The API key for SERP API. zyteapi_key: ZyteAPI key for fallback when direct access fails. redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ - RedisCacher.__init__(self=self, use_cache=redis_use_cache) + RedisCacher.__init__(self=self, use_cache=redis_use_cache, config=redis_config) self._http_client = http_client self._google = SerpAPIGoogle(http_client=http_client, api_key=serpapi_key) diff --git a/fraudcrawler/scraping/url.py b/fraudcrawler/scraping/url.py index 4fe1145..71cbde3 100644 --- a/fraudcrawler/scraping/url.py +++ b/fraudcrawler/scraping/url.py @@ -4,15 +4,11 @@ from typing import List, Set, Tuple from urllib.parse import urlparse, parse_qsl, urlencode, quote, urlunparse, ParseResult -from fraudcrawler.base.base import FilteredAtStage, ProductItem from aiocache.backends.redis import RedisBackend -from fraudcrawler.cache.cacher import parse_redis_url -from fraudcrawler.settings import ( - KNOWN_TRACKERS, - REDIS_TTL, - REDIS_URL_COLLECTOR_NAMESPACE, -) +from fraudcrawler.base.base import FilteredAtStage, ProductItem +from fraudcrawler.cache.cacher import RedisConfig +from fraudcrawler.settings import KNOWN_TRACKERS logger = logging.getLogger(__name__) @@ -49,9 +45,6 @@ def filter_tracking_query_entries( class URLCollector(ABC): """A class to collect and de-duplicate URLs.""" - _filtered_at_stage_current = "URL collection (current run deduplication)" - _filtered_at_stage_previous = "URL collection (previous run deduplication)" - def _filter_tracking_query_entries( self, queries: List[Tuple[str, str]], @@ -178,34 +171,37 @@ class DistributedURLCollector(URLCollector): def __init__( self, - url: str, - ttl: int = REDIS_TTL, - namespace: str = REDIS_URL_COLLECTOR_NAMESPACE, + redis_config: RedisConfig, id_suffix: str = "", ) -> None: """Initialize the distributed collector. Args: - url: Redis connection URL (redis:// or rediss://). - ttl: Time-to-live in seconds for each stored URL marker. - namespace: Redis key namespace to isolate entries. + redis_config: Redis configuration object. id_suffix: String appended to the URL before hashing to deduplication """ self._id_suffix = id_suffix - self._ttl = ttl - self._collected_currently: Set[str] = set() - redis_kwargs = parse_redis_url(url=url) - self._cache: RedisBackend = RedisBackend(namespace=namespace, **redis_kwargs) + self._ttl = redis_config.ttl + # Uses RedisBackend + default StringSerializer (RedisCacher uses + # Cache+PickleSerializer): dedup markers are short strings and stay + # inspectable in redis-cli. + self._cache: RedisBackend = RedisBackend( + endpoint=redis_config.hostname, + port=redis_config.port, + db=redis_config.db, + password=redis_config.password, + namespace=redis_config.namespace, + ) - def _hash(self, cleaned_url: str) -> str: + def _get_redis_key(self, url: str) -> str: """Return the MD5 hex digest of the `cleaned_url` concatenated with `id_suffix`. MD5 is used for deduplication only (not for security). Args: - cleaned_url: Tracking-parameter-free URL to hash. + url: Tracking-parameter-free URL to hash. """ - payload = f"{cleaned_url}{self._id_suffix}".encode("utf-8") + payload = f"{url}{self._id_suffix}".encode("utf-8") return hashlib.md5(payload, usedforsecurity=False).hexdigest() async def add_previously_collected_urls(self, urls: List[str]) -> None: @@ -217,10 +213,10 @@ async def add_previously_collected_urls(self, urls: List[str]) -> None: Args: urls: URLs collected in previous runs. """ - for raw in urls: - cleaned = self._remove_tracking_parameters(raw) - key = self._hash(cleaned) - await self._cache.set(key=key, value="1", ttl=self._ttl) + for url in urls: + key = self._get_redis_key(url) + value = FilteredAtStage.URL_COLLECTION_PREVIOUS.value + await self._cache.set(key=key, value=value, ttl=self._ttl) async def apply(self, product: ProductItem) -> ProductItem: """De-duplicate product cross-run using Redis. @@ -228,28 +224,33 @@ async def apply(self, product: ProductItem) -> ProductItem: Args: product: The product item to process. """ - logger.debug(f'Processing product with url="{product.url}"') + logger.debug(f'Processing de-duplication of product with url="{product.url}"') # Remove tracking parameters from the URL url = self._remove_tracking_parameters(product.url) product.url = url - key = self._hash(url) - # deduplicate on current run (in-memory) - if url in self._collected_currently: + key = self._get_redis_key(url) + value = await self._cache.get(key=key) + + # already seen in current run + if value == FilteredAtStage.URL_COLLECTION_CURRENT.value: product.filtered = True - product.filtered_at_stage = self._filtered_at_stage_current + product.filtered_at_stage = FilteredAtStage.URL_COLLECTION_CURRENT.value logger.debug(f"URL {url} already collected in current run") - # deduplicate on previous runs (Redis, possibly written by another instance) - elif await self._cache.exists(key=key): + # already seen in a previous run (added via `add_previously_collected_urls`) + elif value == FilteredAtStage.URL_COLLECTION_PREVIOUS.value: product.filtered = True - product.filtered_at_stage = self._filtered_at_stage_previous + product.filtered_at_stage = FilteredAtStage.URL_COLLECTION_PREVIOUS.value logger.debug(f"URL {url} already collected in previous run (distributed)") - # Add to both current in-memory set and Redis + # first sighting -> mark as current + elif value is None: + logger.debug(f"Add url={url} to currently collected urls in redis") + await self._cache.set(key=key, value=FilteredAtStage.URL_COLLECTION_CURRENT.value, ttl=self._ttl) + else: - self._collected_currently.add(url) - await self._cache.set(key=key, value="1", ttl=self._ttl) + raise ValueError(f"Redis returned value={value} for key={key} (url={url})") return product diff --git a/fraudcrawler/scraping/zyte.py b/fraudcrawler/scraping/zyte.py index 839d4ff..9fa2f65 100644 --- a/fraudcrawler/scraping/zyte.py +++ b/fraudcrawler/scraping/zyte.py @@ -14,7 +14,7 @@ ) from fraudcrawler.base.base import DomainUtils, ProductItem from fraudcrawler.base.retry import get_async_retry -from fraudcrawler.cache.cacher import RedisCacher +from fraudcrawler.cache.cacher import RedisCacher, RedisConfig logger = logging.getLogger(__name__) @@ -62,6 +62,7 @@ def __init__( http_client: httpx.AsyncClient, api_key: str, redis_use_cache: bool = REDIS_USE_CACHE, + redis_config: RedisConfig | None = None, ): """Initializes the ZyteApiClient with the given API key and retry configurations. @@ -69,8 +70,9 @@ def __init__( http_client: An httpx.AsyncClient to use for the async requests. api_key: The API key for Zyte API. redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ - super().__init__(use_cache=redis_use_cache) + super().__init__(use_cache=redis_use_cache, config=redis_config) self._http_client = http_client self._api_key = api_key diff --git a/fraudcrawler/settings.py b/fraudcrawler/settings.py index 45343aa..f99538b 100644 --- a/fraudcrawler/settings.py +++ b/fraudcrawler/settings.py @@ -104,10 +104,17 @@ } DEFAULT_HTTPX_REDIRECTS = True -# Redis cache settings +# Redis settings REDIS_USE_CACHE = False -REDIS_DEFAULT_URL = "redis://localhost:6379" -REDIS_TTL = 3600 +REDIS_DEFAULT_HOSTNAME = "localhost" +REDIS_DEFAULT_PORT = 6379 REDIS_NAMESPACE = "fraudcrawler" + +REDIS_CACHE_DB = 0 REDIS_CACHE_NAMESPACE = f"{REDIS_NAMESPACE}:cache:" +REDIS_CACHE_NAMESPACE_OPENAI = f"{REDIS_CACHE_NAMESPACE}:openai" +REDIS_CACHE_TTL = 3600 + +REDIS_URL_COLLECTOR_DB = 1 REDIS_URL_COLLECTOR_NAMESPACE = f"{REDIS_NAMESPACE}:urls:" +REDIS_URL_COLLECTOR_TTL = 3600 diff --git a/tests/unittests/test_scraping.py b/tests/unittests/test_scraping.py index 76ab046..be12337 100644 --- a/tests/unittests/test_scraping.py +++ b/tests/unittests/test_scraping.py @@ -1,16 +1,26 @@ +import os +import uuid from urllib.parse import parse_qsl, urlparse import pytest import pytest_asyncio from aiocache import Cache - -from fraudcrawler import DistributedURLCollector, Enricher, LocalURLCollector, ZyteAPI +from aiocache.backends.redis import RedisBackend + +from fraudcrawler import ( + DistributedURLCollector, + Enricher, + LocalURLCollector, + RedisConfig, + ZyteAPI, +) from fraudcrawler.base.base import ( Setup, Host, Location, Language, + FilteredAtStage, HttpxAsyncClient, ProductItem, WebsiteSourceMetadata, @@ -455,14 +465,65 @@ def _make_product(url: str) -> ProductItem: @pytest_asyncio.fixture async def shared_memory_cache(): """Provides a per-test aiocache in-memory cache and cleans it up afterwards.""" - import uuid - namespace = f"test-dedup-{uuid.uuid4().hex}" cache = Cache(cache_class=Cache.MEMORY, namespace=namespace) yield cache await cache.clear() +@pytest_asyncio.fixture +async def live_redis_cache(): + """Per-test live Redis backend; skips the test if Redis is unreachable. + + Entries written during a test are left in Redis for post-mortem + inspection (e.g. RedisInsight); they self-expire via the dedup TTL + (`_DUMMY_REDIS_CONFIG.ttl`). + + Env-var overrides: + REDIS_TEST_HOSTNAME / REDIS_TEST_PORT / REDIS_TEST_DB + connection params (defaults: localhost:6379, db=15). + REDIS_TEST_NAMESPACE + fixed namespace prefix (default: random per-test). Useful to + point all runs at the same prefix in RedisInsight. + """ + if (fixed_ns := os.environ.get("REDIS_TEST_NAMESPACE")): + namespace = fixed_ns if fixed_ns.endswith(":") else f"{fixed_ns}:" + else: + namespace = f"test-dedup-{uuid.uuid4().hex}:" + + cache = RedisBackend( + endpoint=os.environ.get("REDIS_TEST_HOSTNAME", "localhost"), + port=int(os.environ.get("REDIS_TEST_PORT", "6379")), + db=int(os.environ.get("REDIS_TEST_DB", "15")), + namespace=namespace, + ) + try: + await cache.set(key="__ping__", value="1", ttl=5) + await cache.delete(key="__ping__") + except Exception as exc: + pytest.skip(f"Redis not reachable for live test: {exc}") + + yield cache + + +_DUMMY_REDIS_CONFIG = RedisConfig( + hostname="localhost", + port=6379, + password=None, + db=0, + namespace="test-dedup", + ttl=60, +) + + +def _make_dist_collector(id_suffix: str = "") -> DistributedURLCollector: + """Build a DistributedURLCollector with a dummy config; the Redis backend + instantiated here is replaced by `_attach_cache` before any I/O.""" + return DistributedURLCollector( + redis_config=_DUMMY_REDIS_CONFIG, id_suffix=id_suffix + ) + + def _attach_cache(collector: DistributedURLCollector, cache) -> DistributedURLCollector: collector._cache = cache return collector @@ -472,7 +533,7 @@ def _attach_cache(collector: DistributedURLCollector, cache) -> DistributedURLCo async def test_distributed_collector_marks_duplicate_within_instance( shared_memory_cache, ): - collector = _attach_cache(DistributedURLCollector(), shared_memory_cache) + collector = _attach_cache(_make_dist_collector(), shared_memory_cache) url = "https://www.ricardo.ch/p/123" first = await collector.apply(product=_make_product(url)) @@ -481,7 +542,7 @@ async def test_distributed_collector_marks_duplicate_within_instance( second = await collector.apply(product=_make_product(url)) assert second.filtered is True - assert second.filtered_at_stage == "URL collection (current run deduplication)" + assert second.filtered_at_stage == FilteredAtStage.URL_COLLECTION_CURRENT.value @pytest.mark.asyncio @@ -490,14 +551,14 @@ async def test_distributed_collector_marks_duplicate_across_instances( ): url = "https://www.ricardo.ch/p/456" - first_collector = _attach_cache(DistributedURLCollector(), shared_memory_cache) + first_collector = _attach_cache(_make_dist_collector(), shared_memory_cache) await first_collector.apply(product=_make_product(url)) - second_collector = _attach_cache(DistributedURLCollector(), shared_memory_cache) + second_collector = _attach_cache(_make_dist_collector(), shared_memory_cache) result = await second_collector.apply(product=_make_product(url)) assert result.filtered is True - assert result.filtered_at_stage == "URL collection (previous run deduplication)" + assert result.filtered_at_stage == FilteredAtStage.URL_COLLECTION_CURRENT.value @pytest.mark.asyncio @@ -507,12 +568,12 @@ async def test_distributed_collector_different_id_suffix_does_not_share( url = "https://www.ricardo.ch/p/789" collector_a = _attach_cache( - DistributedURLCollector(id_suffix="tenant-a"), shared_memory_cache + _make_dist_collector(id_suffix="tenant-a"), shared_memory_cache ) await collector_a.apply(product=_make_product(url)) collector_b = _attach_cache( - DistributedURLCollector(id_suffix="tenant-b"), shared_memory_cache + _make_dist_collector(id_suffix="tenant-b"), shared_memory_cache ) result = await collector_b.apply(product=_make_product(url)) @@ -525,35 +586,101 @@ async def test_distributed_collector_add_previously_collected_urls( shared_memory_cache, ): seed_url = "https://www.ricardo.ch/p/seeded" - seeded = _attach_cache(DistributedURLCollector(), shared_memory_cache) + seeded = _attach_cache(_make_dist_collector(), shared_memory_cache) await seeded.add_previously_collected_urls(urls=[seed_url]) - fresh = _attach_cache(DistributedURLCollector(), shared_memory_cache) + fresh = _attach_cache(_make_dist_collector(), shared_memory_cache) # Tracking params should be stripped before hashing -> still filtered. product = _make_product(f"{seed_url}?utm_source=foo&srsltid=bar") result = await fresh.apply(product=product) assert result.filtered is True - assert result.filtered_at_stage == "URL collection (previous run deduplication)" + assert result.filtered_at_stage == FilteredAtStage.URL_COLLECTION_PREVIOUS.value assert result.url == seed_url # cleaned URL was written back +@pytest.mark.asyncio +async def test_distributed_collector_stores_cleaned_url_marker_in_redis( + shared_memory_cache, +): + """First sighting of a tracker-laden URL must persist the CLEANED-URL + marker into Redis (and never the dirty form).""" + base_url = "https://www.ricardo.ch/p/with-trackers" + dirty_url = f"{base_url}?utm_source=foo&srsltid=bar&fbclid=xyz" + + collector = _attach_cache(_make_dist_collector(), shared_memory_cache) + result = await collector.apply(product=_make_product(dirty_url)) + + # First sighting -> not filtered, product.url rewritten to cleaned form + assert result.filtered is False + assert result.filtered_at_stage is None + assert result.url == base_url + + # Cleaned URL's key carries the CURRENT-run marker + clean_key = collector._get_redis_key(base_url) + stored = await shared_memory_cache.get(key=clean_key) + assert stored == FilteredAtStage.URL_COLLECTION_CURRENT.value + + # Dirty URL's key was never written (we never store the tracker form) + dirty_key = collector._get_redis_key(dirty_url) + assert await shared_memory_cache.get(key=dirty_key) is None + + +@pytest.mark.asyncio +async def test_distributed_collector_full_state_machine_on_live_redis( + live_redis_cache, +): + """End-to-end dedup against a real Redis instance. + + Skipped when no Redis is reachable. Override host/port/db via + REDIS_TEST_HOSTNAME / REDIS_TEST_PORT / REDIS_TEST_DB. + + Covers the three legs of the state machine: + 1. seeded URL -> filtered as PREVIOUS + 2. fresh tracker-laden URL -> not filtered, cleaned form persisted + 3. same URL again -> filtered as CURRENT + """ + seeded_url = "https://www.ricardo.ch/p/seed" + base_url = "https://www.ricardo.ch/p/fresh" + dirty_url = f"{base_url}?utm_source=foo&srsltid=bar" + + collector = _attach_cache(_make_dist_collector(), live_redis_cache) + + # 1. PREVIOUS-run marker via explicit seeding + await collector.add_previously_collected_urls(urls=[seeded_url]) + seeded_result = await collector.apply(product=_make_product(seeded_url)) + assert seeded_result.filtered is True + assert seeded_result.filtered_at_stage == FilteredAtStage.URL_COLLECTION_PREVIOUS.value + + # 2. First sighting of a dirty URL -> cleaned form persisted as CURRENT + fresh_result = await collector.apply(product=_make_product(dirty_url)) + assert fresh_result.filtered is False + assert fresh_result.url == base_url + + clean_key = collector._get_redis_key(base_url) + stored = await live_redis_cache.get(key=clean_key) + assert stored == FilteredAtStage.URL_COLLECTION_CURRENT.value + + dirty_key = collector._get_redis_key(dirty_url) + assert await live_redis_cache.get(key=dirty_key) is None + + # 3. Second sighting -> filtered as CURRENT + second = await collector.apply(product=_make_product(dirty_url)) + assert second.filtered is True + assert second.filtered_at_stage == FilteredAtStage.URL_COLLECTION_CURRENT.value + + def test_distributed_collector_hash_is_deterministic(): - collector = DistributedURLCollector(id_suffix="abc") - h1 = collector._hash("https://example.com/p/1") - h2 = collector._hash("https://example.com/p/1") - h3 = collector._hash("https://example.com/p/2") + collector = _make_dist_collector(id_suffix="abc") + h1 = collector._get_redis_key("https://example.com/p/1") + h2 = collector._get_redis_key("https://example.com/p/1") + h3 = collector._get_redis_key("https://example.com/p/2") assert h1 == h2 assert h1 != h3 assert len(h1) == 32 # md5 hex digest length -def test_distributed_collector_rejects_bad_redis_url(): - with pytest.raises(ValueError): - DistributedURLCollector(url="http://not-redis") - - def test_remove_tracking_parameters_known_trackers(url_collector): """Test that all known tracking parameters are removed.""" known_trackers = [ From b976ce3013473c55c076bf8096d5b929f37aaa5d Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Thu, 30 Apr 2026 16:33:16 +0200 Subject: [PATCH 03/10] consistend namespace namings --- fraudcrawler/settings.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/fraudcrawler/settings.py b/fraudcrawler/settings.py index f99538b..0add322 100644 --- a/fraudcrawler/settings.py +++ b/fraudcrawler/settings.py @@ -111,10 +111,10 @@ REDIS_NAMESPACE = "fraudcrawler" REDIS_CACHE_DB = 0 -REDIS_CACHE_NAMESPACE = f"{REDIS_NAMESPACE}:cache:" +REDIS_CACHE_NAMESPACE = f"{REDIS_NAMESPACE}:cache" REDIS_CACHE_NAMESPACE_OPENAI = f"{REDIS_CACHE_NAMESPACE}:openai" REDIS_CACHE_TTL = 3600 REDIS_URL_COLLECTOR_DB = 1 -REDIS_URL_COLLECTOR_NAMESPACE = f"{REDIS_NAMESPACE}:urls:" +REDIS_URL_COLLECTOR_NAMESPACE = f"{REDIS_NAMESPACE}:urls" REDIS_URL_COLLECTOR_TTL = 3600 From aecde74d07f416b2c7f254ec4d8ee2a1c26dabf2 Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Thu, 30 Apr 2026 23:51:08 +0200 Subject: [PATCH 04/10] linting and formating --- fraudcrawler/cache/cacher.py | 18 ++++++++---------- fraudcrawler/launch_demo_pipeline.py | 2 +- fraudcrawler/scraping/url.py | 8 ++++++-- fraudcrawler/scraping/zyte.py | 4 ++-- tests/unittests/test_scraping.py | 6 ++++-- 5 files changed, 21 insertions(+), 17 deletions(-) diff --git a/fraudcrawler/cache/cacher.py b/fraudcrawler/cache/cacher.py index 1ebc313..1ee7835 100644 --- a/fraudcrawler/cache/cacher.py +++ b/fraudcrawler/cache/cacher.py @@ -27,21 +27,19 @@ class RedisConfig(BaseModel): @classmethod def from_setup(cls, db: int, namespace: str, ttl: int) -> Self: - if _SETUP.redis_hostname is None: raise ValueError("REDIS_HOSTNAME env variable is missing") elif _SETUP.redis_port is None: raise ValueError("REDIS_PORT env variable is missing") - kwargs = { - "hostname": _SETUP.redis_hostname, - "port": _SETUP.redis_port, - "password": _SETUP.redis_password, - "db": db, - "namespace": namespace, - "ttl": ttl, - } - return cls(**kwargs) + return cls( + hostname=_SETUP.redis_hostname, + port=_SETUP.redis_port, + password=_SETUP.redis_password, + db=db, + namespace=namespace, + ttl=ttl, + ) class RedisCacher(ABC): diff --git a/fraudcrawler/launch_demo_pipeline.py b/fraudcrawler/launch_demo_pipeline.py index 68ad49c..d369fa8 100644 --- a/fraudcrawler/launch_demo_pipeline.py +++ b/fraudcrawler/launch_demo_pipeline.py @@ -203,7 +203,7 @@ async def run(http_client: HttpxAsyncClient, search_term: str): ttl=REDIS_URL_COLLECTOR_TTL, ) url_collector = DistributedURLCollector( - redis_config=redis_config, + redis_config=redis_config, ) zyteapi = ZyteAPI( http_client=http_client, diff --git a/fraudcrawler/scraping/url.py b/fraudcrawler/scraping/url.py index 71cbde3..38b647e 100644 --- a/fraudcrawler/scraping/url.py +++ b/fraudcrawler/scraping/url.py @@ -248,8 +248,12 @@ async def apply(self, product: ProductItem) -> ProductItem: # first sighting -> mark as current elif value is None: logger.debug(f"Add url={url} to currently collected urls in redis") - await self._cache.set(key=key, value=FilteredAtStage.URL_COLLECTION_CURRENT.value, ttl=self._ttl) - + await self._cache.set( + key=key, + value=FilteredAtStage.URL_COLLECTION_CURRENT.value, + ttl=self._ttl, + ) + else: raise ValueError(f"Redis returned value={value} for key={key} (url={url})") diff --git a/fraudcrawler/scraping/zyte.py b/fraudcrawler/scraping/zyte.py index 9fa2f65..7f503cd 100644 --- a/fraudcrawler/scraping/zyte.py +++ b/fraudcrawler/scraping/zyte.py @@ -45,7 +45,7 @@ class ZyteAPI(RedisCacher, DomainUtils): """A client to interact with the Zyte API for fetching product details.""" _endpoint = "https://api.zyte.com/v1/extract" - _config = { + _request_defaults = { "javascript": False, "browserHtml": False, "screenshot": False, @@ -311,7 +311,7 @@ async def apply(self, url: str) -> dict: with attempt: response = await self._http_client.post( url=self._endpoint, - json={"url": url, **self._config}, + json={"url": url, **self._request_defaults}, auth=(self._api_key, ""), # API key as username, empty password ) response.raise_for_status() diff --git a/tests/unittests/test_scraping.py b/tests/unittests/test_scraping.py index be12337..5e7cea8 100644 --- a/tests/unittests/test_scraping.py +++ b/tests/unittests/test_scraping.py @@ -486,7 +486,7 @@ async def live_redis_cache(): fixed namespace prefix (default: random per-test). Useful to point all runs at the same prefix in RedisInsight. """ - if (fixed_ns := os.environ.get("REDIS_TEST_NAMESPACE")): + if fixed_ns := os.environ.get("REDIS_TEST_NAMESPACE"): namespace = fixed_ns if fixed_ns.endswith(":") else f"{fixed_ns}:" else: namespace = f"test-dedup-{uuid.uuid4().hex}:" @@ -650,7 +650,9 @@ async def test_distributed_collector_full_state_machine_on_live_redis( await collector.add_previously_collected_urls(urls=[seeded_url]) seeded_result = await collector.apply(product=_make_product(seeded_url)) assert seeded_result.filtered is True - assert seeded_result.filtered_at_stage == FilteredAtStage.URL_COLLECTION_PREVIOUS.value + assert ( + seeded_result.filtered_at_stage == FilteredAtStage.URL_COLLECTION_PREVIOUS.value + ) # 2. First sighting of a dirty URL -> cleaned form persisted as CURRENT fresh_result = await collector.apply(product=_make_product(dirty_url)) From 4f4713846c4982a7ed2ec2193ff3fe500b005e95 Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Fri, 1 May 2026 09:22:08 +0200 Subject: [PATCH 05/10] removed pypi_token from Setup --- fraudcrawler/base/base.py | 1 - 1 file changed, 1 deletion(-) diff --git a/fraudcrawler/base/base.py b/fraudcrawler/base/base.py index 694b058..cb8f443 100644 --- a/fraudcrawler/base/base.py +++ b/fraudcrawler/base/base.py @@ -49,7 +49,6 @@ class Setup(BaseSettings): dataforseo_pwd: str zyteapi_key: str openaiapi_key: str - pypy_token: str # Redis cache redis_use_cache: bool = REDIS_USE_CACHE From a2426565858318d01757e7169468736f0c86c41a Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Fri, 1 May 2026 13:52:57 +0200 Subject: [PATCH 06/10] added redis_config for search engines --- fraudcrawler/cache/cacher.py | 2 +- fraudcrawler/launch_demo_pipeline.py | 30 +++++++++++++++--- fraudcrawler/scraping/search.py | 46 +++++++++++++++++++--------- fraudcrawler/settings.py | 5 ++- 4 files changed, 62 insertions(+), 21 deletions(-) diff --git a/fraudcrawler/cache/cacher.py b/fraudcrawler/cache/cacher.py index 1ee7835..f9f9bec 100644 --- a/fraudcrawler/cache/cacher.py +++ b/fraudcrawler/cache/cacher.py @@ -80,7 +80,7 @@ def __init__( Cache( cache_class=Cache.REDIS, # type: ignore[reportArgumentType] serializer=PickleSerializer(), - hostname=self._config.hostname, + endpoint=self._config.hostname, port=self._config.port, password=self._config.password, db=self._config.db, diff --git a/fraudcrawler/launch_demo_pipeline.py b/fraudcrawler/launch_demo_pipeline.py index d369fa8..08c5b68 100644 --- a/fraudcrawler/launch_demo_pipeline.py +++ b/fraudcrawler/launch_demo_pipeline.py @@ -24,7 +24,10 @@ from fraudcrawler.scraping.utils import build_website_source_profile from fraudcrawler.settings import ( REDIS_CACHE_DB, - REDIS_CACHE_NAMESPACE_OPENAI, + REDIS_CACHE_NAMESPACE_SEARCHER, + REDIS_CACHE_NAMESPACE_ENRICHER, + REDIS_CACHE_NAMESPACE_ZYTEAPI, + REDIS_CACHE_NAMESPACE_WORKFLOWS, REDIS_CACHE_TTL, REDIS_URL_COLLECTOR_DB, REDIS_URL_COLLECTOR_NAMESPACE, @@ -122,10 +125,9 @@ def _setup_workflows( " - Related Topics/Content: Any text or media that discusses or elaborates on the topic without offering a tangible product for sale.\n" "Make your decision based solely on the context and details provided in the search result. Respond only with the number 1 or 0." ) - namespace = REDIS_CACHE_NAMESPACE_OPENAI redis_config = RedisConfig.from_setup( db=REDIS_CACHE_DB, - namespace=namespace, + namespace=REDIS_CACHE_NAMESPACE_WORKFLOWS, ttl=REDIS_CACHE_TTL, ) return [ @@ -184,31 +186,49 @@ async def run(http_client: HttpxAsyncClient, search_term: str): # ] # Setup clients + redis_config_searcher = RedisConfig.from_setup( + db=REDIS_CACHE_DB, + namespace=REDIS_CACHE_NAMESPACE_SEARCHER, + ttl=REDIS_CACHE_TTL, + ) searcher = Searcher( http_client=http_client, serpapi_key=SETUP.serpapi_key, zyteapi_key=SETUP.zyteapi_key, redis_use_cache=SETUP.redis_use_cache, + redis_config=redis_config_searcher, + ) + redis_config_enricher = RedisConfig.from_setup( + db=REDIS_CACHE_DB, + namespace=REDIS_CACHE_NAMESPACE_SEARCHER, + ttl=REDIS_CACHE_TTL, ) enricher = Enricher( http_client=http_client, user=SETUP.dataforseo_user, pwd=SETUP.dataforseo_pwd, redis_use_cache=SETUP.redis_use_cache, + redis_config=redis_config_enricher, ) # url_collector = LocalURLCollector() - redis_config = RedisConfig.from_setup( + redis_config_url = RedisConfig.from_setup( db=REDIS_URL_COLLECTOR_DB, namespace=REDIS_URL_COLLECTOR_NAMESPACE, ttl=REDIS_URL_COLLECTOR_TTL, ) url_collector = DistributedURLCollector( - redis_config=redis_config, + redis_config=redis_config_url, + ) + redis_config_zyteapi = RedisConfig.from_setup( + db=REDIS_CACHE_DB, + namespace=REDIS_CACHE_NAMESPACE_ZYTEAPI, + ttl=REDIS_CACHE_TTL, ) zyteapi = ZyteAPI( http_client=http_client, api_key=SETUP.zyteapi_key, redis_use_cache=SETUP.redis_use_cache, + redis_config=redis_config_zyteapi, ) workflows = _setup_workflows( http_client=http_client, redis_use_cache=SETUP.redis_use_cache diff --git a/fraudcrawler/scraping/search.py b/fraudcrawler/scraping/search.py index 6a69a32..bcd3bb0 100644 --- a/fraudcrawler/scraping/search.py +++ b/fraudcrawler/scraping/search.py @@ -544,7 +544,11 @@ class Toppreise(SearchEngine): _endpoint = "https://www.toppreise.ch/" def __init__( - self, http_client: httpx.AsyncClient, zyteapi_key: str, redis_use_cache: bool + self, + http_client: httpx.AsyncClient, + zyteapi_key: str, + redis_use_cache: bool, + redis_config: RedisConfig | None = None, ): """Initializes the Toppreise client. @@ -552,12 +556,14 @@ def __init__( http_client: An httpx.AsyncClient to use for the async requests. zyteapi_key: ZyteAPI key for fallback when direct access fails. redis_use_cache: Whether to use cache (passed to internal ZyteAPI). + redis_config: Redis configuration object (mandatory if redis_use_cache=True). """ super().__init__(http_client=http_client) self._zyteapi = ZyteAPI( http_client=http_client, api_key=zyteapi_key, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) async def http_client_get_with_fallback(self, url: str) -> bytes: @@ -753,6 +759,29 @@ class WebsiteSearch(SearchEngine): _saved_search_query_param_keys = ["q", "query", "keyword", "search"] _max_image_urls_per_candidate = 5 + def __init__( + self, + http_client: httpx.AsyncClient, + zyteapi_key: str, + redis_use_cache: bool, + redis_config: RedisConfig | None = None, + ): + """Search engine for website-source ingestion. + + Args: + http_client: An httpx.AsyncClient to use for the async requests. + zyteapi_key: ZyteAPI key. + redis_use_cache: Whether to use caching by a redis instance or not. + redis_config: Redis configuration object (mandatory if redis_use_cache=True). + """ + super().__init__(http_client=http_client) + self._zyteapi = ZyteAPI( + http_client=http_client, + api_key=zyteapi_key, + redis_use_cache=redis_use_cache, + redis_config=redis_config, + ) + @staticmethod def _build_website_source_engine_name(source_name: str) -> str: """Build a stable engine-like name from a website-source name.""" @@ -767,19 +796,6 @@ def _build_website_source_engine_name(source_name: str) -> str: slug = normalized or "website_source" return f"{slug}_search_engine" - def __init__( - self, - http_client: httpx.AsyncClient, - zyteapi_key: str, - redis_use_cache: bool = REDIS_USE_CACHE, - ): - super().__init__(http_client=http_client) - self._zyteapi = ZyteAPI( - http_client=http_client, - api_key=zyteapi_key, - redis_use_cache=redis_use_cache, - ) - @property def _search_engine_name(self) -> str: return SearchEngineName.WEBSITE_SOURCE.value @@ -1170,11 +1186,13 @@ def __init__( http_client=http_client, zyteapi_key=zyteapi_key, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._saved_search_engine = WebsiteSearch( http_client=http_client, zyteapi_key=zyteapi_key, redis_use_cache=redis_use_cache, + redis_config=redis_config, ) self._search_handlers: Dict[ SearchEngineName, diff --git a/fraudcrawler/settings.py b/fraudcrawler/settings.py index 0add322..607ea84 100644 --- a/fraudcrawler/settings.py +++ b/fraudcrawler/settings.py @@ -112,7 +112,10 @@ REDIS_CACHE_DB = 0 REDIS_CACHE_NAMESPACE = f"{REDIS_NAMESPACE}:cache" -REDIS_CACHE_NAMESPACE_OPENAI = f"{REDIS_CACHE_NAMESPACE}:openai" +REDIS_CACHE_NAMESPACE_SEARCHER = f"{REDIS_CACHE_NAMESPACE}:searcher" +REDIS_CACHE_NAMESPACE_ENRICHER = f"{REDIS_CACHE_NAMESPACE}:enricher" +REDIS_CACHE_NAMESPACE_ZYTEAPI = f"{REDIS_CACHE_NAMESPACE}:zyteapi" +REDIS_CACHE_NAMESPACE_WORKFLOWS = f"{REDIS_CACHE_NAMESPACE}:workflows" REDIS_CACHE_TTL = 3600 REDIS_URL_COLLECTOR_DB = 1 From 58b39789914a4c35365bf8b320a269194ac7aa10 Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Fri, 1 May 2026 14:35:53 +0200 Subject: [PATCH 07/10] better redis keys --- fraudcrawler/cache/cacher.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/fraudcrawler/cache/cacher.py b/fraudcrawler/cache/cacher.py index f9f9bec..96092fd 100644 --- a/fraudcrawler/cache/cacher.py +++ b/fraudcrawler/cache/cacher.py @@ -1,4 +1,5 @@ from abc import ABC, abstractmethod +import hashlib import json import logging from pydantic import BaseModel @@ -54,6 +55,7 @@ class RedisCacher(ABC): serialized call arguments (including Pydantic models), so identical calls always map to the same cache entry. """ + _key_encoding = "ascii" def __init__( self, @@ -88,14 +90,15 @@ def __init__( ), ) - @staticmethod - def _stable_key(payload: Dict[str, Any]) -> str: - """Serialize a payload dict to a compact, deterministic JSON string. + def _stable_key(self, payload: Dict[str, Any]) -> str: + """Serialize a payload dict to a compact, deterministic sha256 hash. Args: payload: Dict to serialize as a cache key. """ - return json.dumps(payload, sort_keys=True, default=str, separators=(",", ":")) + json_str = json.dumps(payload, sort_keys=True, default=str, separators=(",", ":")) + key = hashlib.sha256(json_str.encode('utf-8')).hexdigest() + return f"{payload['cls']}_{key}" @staticmethod def _serialize_object(obj: Any) -> Any: From 59f4aa083a10ab5d2afe5c0e18b1037bd53a6953 Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Fri, 1 May 2026 15:57:00 +0200 Subject: [PATCH 08/10] Using DomainUtils for DistributedURLCollector --- fraudcrawler/scraping/url.py | 25 +++++++++++++++---------- tests/unittests/test_scraping.py | 4 +++- 2 files changed, 18 insertions(+), 11 deletions(-) diff --git a/fraudcrawler/scraping/url.py b/fraudcrawler/scraping/url.py index 38b647e..9ca7ccd 100644 --- a/fraudcrawler/scraping/url.py +++ b/fraudcrawler/scraping/url.py @@ -6,7 +6,7 @@ from aiocache.backends.redis import RedisBackend -from fraudcrawler.base.base import FilteredAtStage, ProductItem +from fraudcrawler.base.base import DomainUtils, FilteredAtStage, ProductItem from fraudcrawler.cache.cacher import RedisConfig from fraudcrawler.settings import KNOWN_TRACKERS @@ -159,14 +159,15 @@ async def apply(self, product: ProductItem) -> ProductItem: return product -class DistributedURLCollector(URLCollector): +class DistributedURLCollector(URLCollector, DomainUtils): """A URL collector that de-duplicates across pipeline runs using Redis. - Seen URLs are stored under a namespaced Redis key computed as - ``md5(cleaned_url + id_suffix)``. Hashing keeps keys fixed-length and - bounded in memory regardless of URL length. An optional ``id_suffix`` - scopes deduplication beyond the Redis namespace (e.g. per-tenant, - per-campaign) by changing the hash input. + Seen URLs are stored under a namespaced Redis key of the form + ``{domain}_{sha256(cleaned_url + id_suffix)}``. The domain prefix keeps + keys inspectable in redis-cli; the hash bounds key length regardless of + URL size. An optional ``id_suffix`` scopes deduplication beyond the + Redis namespace (e.g. per-tenant, per-campaign) by changing the hash + input. """ def __init__( @@ -194,15 +195,19 @@ def __init__( ) def _get_redis_key(self, url: str) -> str: - """Return the MD5 hex digest of the `cleaned_url` concatenated with `id_suffix`. + """Return a Redis key of the form ``{domain}_{sha256_hex}``. - MD5 is used for deduplication only (not for security). + The SHA-256 digest is computed over the URL concatenated with + ``id_suffix``. The domain prefix (via ``DomainUtils._get_domain``) + keeps keys inspectable in redis-cli. Args: url: Tracking-parameter-free URL to hash. """ payload = f"{url}{self._id_suffix}".encode("utf-8") - return hashlib.md5(payload, usedforsecurity=False).hexdigest() + digest = hashlib.sha256(payload).hexdigest() + domain = self._get_domain(url) + return f"{domain}_{digest}" async def add_previously_collected_urls(self, urls: List[str]) -> None: """Seed Redis with already-seen URLs. diff --git a/tests/unittests/test_scraping.py b/tests/unittests/test_scraping.py index 5e7cea8..c8a9759 100644 --- a/tests/unittests/test_scraping.py +++ b/tests/unittests/test_scraping.py @@ -680,7 +680,9 @@ def test_distributed_collector_hash_is_deterministic(): assert h1 == h2 assert h1 != h3 - assert len(h1) == 32 # md5 hex digest length + domain, _, digest = h1.partition("_") + assert domain == "example.com" + assert len(digest) == 64 # sha256 hex digest length def test_remove_tracking_parameters_known_trackers(url_collector): From 944bed5f45971ff78e36f82675d891a6d82174f6 Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Fri, 1 May 2026 16:20:19 +0200 Subject: [PATCH 09/10] bugfix for RedisBackend keys --- fraudcrawler/scraping/url.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/fraudcrawler/scraping/url.py b/fraudcrawler/scraping/url.py index 9ca7ccd..cecd05b 100644 --- a/fraudcrawler/scraping/url.py +++ b/fraudcrawler/scraping/url.py @@ -197,9 +197,12 @@ def __init__( def _get_redis_key(self, url: str) -> str: """Return a Redis key of the form ``{domain}_{sha256_hex}``. - The SHA-256 digest is computed over the URL concatenated with - ``id_suffix``. The domain prefix (via ``DomainUtils._get_domain``) - keeps keys inspectable in redis-cli. + Note: + - The SHA-256 digest is computed over the URL concatenated with + ``id_suffix``. The domain prefix (via ``DomainUtils._get_domain``) + keeps keys inspectable in redis-cli. + - As here we are using RedisBackend, we need to manually put a ":" + in front of the key to split it in folders Args: url: Tracking-parameter-free URL to hash. @@ -207,7 +210,7 @@ def _get_redis_key(self, url: str) -> str: payload = f"{url}{self._id_suffix}".encode("utf-8") digest = hashlib.sha256(payload).hexdigest() domain = self._get_domain(url) - return f"{domain}_{digest}" + return f":{domain}_{digest}" async def add_previously_collected_urls(self, urls: List[str]) -> None: """Seed Redis with already-seen URLs. From ea046226a185cdfac2257d1aa4db696329d66533 Mon Sep 17 00:00:00 2001 From: chriguBERTO Date: Fri, 1 May 2026 21:06:48 +0200 Subject: [PATCH 10/10] removed poluting logs of _cached_apply --- fraudcrawler/cache/cacher.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/fraudcrawler/cache/cacher.py b/fraudcrawler/cache/cacher.py index 96092fd..36fa17c 100644 --- a/fraudcrawler/cache/cacher.py +++ b/fraudcrawler/cache/cacher.py @@ -161,17 +161,17 @@ async def _cached_apply(self, *args: Any, **kwargs: Any) -> Any: exists = await self._cache.exists(key=key) if exists: logger.debug( - f"Found cached response for {self.__class__.__name__}.apply(args={args}, kwargs={kwargs})" + f"Found cached response for {self.__class__.__name__}.apply(...) and key={key}" ) result = await self._cache.get(key=key) else: logger.debug( - f"No cached response for {self.__class__.__name__}.apply(args={args}, kwargs={kwargs})" + f"No cached response for {self.__class__.__name__}.apply(...) and key={key}" ) result = await self.apply(*args, **kwargs) logger.debug( - f"Set cached response for {self.__class__.__name__}.apply(args={args}, kwargs={kwargs})" + f"Set cached response for {self.__class__.__name__}.apply(...) and key={key}" ) await self._cache.set(key=key, value=result, ttl=self._config.ttl)