diff --git a/dataclaw/anonymizer.py b/dataclaw/anonymizer.py index 045fe01..3f04aa8 100644 --- a/dataclaw/anonymizer.py +++ b/dataclaw/anonymizer.py @@ -1,5 +1,6 @@ """Anonymize PII in Claude Code log data.""" +import functools import hashlib import os import re @@ -15,60 +16,70 @@ def _detect_home_dir() -> tuple[str, str]: return home, username -def anonymize_path(path: str, username: str, username_hash: str, home: str | None = None) -> str: - """Strip a path to project-relative and hash the username.""" - if not path: - return path +@functools.lru_cache(maxsize=32) +def _get_username_pattern(username: str) -> re.Pattern: + escaped = re.escape(username) + # \b does not match word boundaries around underscore, but we need to match them + return re.compile(rf"(? re.Pattern: + escaped = re.escape(username) + # Match /Users/username , \Users\username , \\Users\\username , -Users-username (hyphen-encoded path like Claude Code and HuggingFace cache) + # Match conventional indicators of home dir: 'Users' and 'home' + # Ignore case for Windows-like path + return re.compile(rf"([/\\-]+(Users|home)[/\\-]+){escaped}(?=[^a-zA-Z0-9]|$)", flags=re.IGNORECASE) - # Try longest prefixes first (subdirectory matches before bare home) - home_patterns = sorted(prefixes, key=len, reverse=True) - for prefix in home_patterns: - if path.startswith(prefix): - rest = path[len(prefix):] - if "/Documents/" in prefix or "/Downloads/" in prefix or "/Desktop/" in prefix: - return rest - return f"{username_hash}/{rest}" +@functools.lru_cache(maxsize=32) +def _get_custom_home_pattern(home: str) -> re.Pattern | None: + if home.startswith(("/Users/", "/home/", "C:\\Users\\")): + return None - path = path.replace(f"/Users/{username}/", f"/{username_hash}/") - path = path.replace(f"/home/{username}/", f"/{username_hash}/") + # If home is not conventional, replace with more specific pattern - return path + # Escape home and replace / or \ with `r"[/\\-]+"` + home_escaped = home.replace("\\", "/") + home_escaped = re.escape(home_escaped) + home_escaped = home_escaped.replace("/", r"[/\\-]+") + # In WSL and MSYS2, C:\ may be represented by /c/ + home_escaped = home_escaped.replace(":", ":?") + return re.compile(home_escaped, flags=re.IGNORECASE) -def anonymize_text(text: str, username: str, username_hash: str) -> str: +def anonymize_text(text: str, username: str, username_hash: str, home: str | None = None) -> str: if not text or not username: return text - escaped = re.escape(username) + if username.lower() not in text.lower(): + return text - # Replace /Users/ and /home/ - text = re.sub(rf"/Users/{escaped}(?=/|[^a-zA-Z0-9_-]|$)", f"/{username_hash}", text) - text = re.sub(rf"/home/{escaped}(?=/|[^a-zA-Z0-9_-]|$)", f"/{username_hash}", text) + # Replace bare username in contexts (ls output, prose, etc.) + # Only if username is >= 4 chars to avoid false positives + if len(username) >= 4: + return _get_username_pattern(username).sub(username_hash, text) - # Catch hyphen-encoded paths: -Users-peteromalley- or -Users-peteromalley/ - text = re.sub(rf"-Users-{escaped}(?=-|/|$)", f"-Users-{username_hash}", text) - text = re.sub(rf"-home-{escaped}(?=-|/|$)", f"-home-{username_hash}", text) + # When username is < 4 chars, replace with more specific patterns - # Catch temp paths like /private/tmp/claude-501/-Users-peteromalley/ - text = re.sub(rf"claude-\d+/-Users-{escaped}", f"claude-XXX/-Users-{username_hash}", text) + text = _get_home_pattern(username).sub(rf"\g<1>{username_hash}", text) - # Final pass: replace bare username in remaining contexts (ls output, prose, etc.) - # Only if username is >= 4 chars to avoid false positives - if len(username) >= 4: - text = re.sub(rf"\b{escaped}\b", username_hash, text) + if home: + pat_home = _get_custom_home_pattern(home) + if pat_home: + pat_user = _get_username_pattern(username) + def f(match): + # match.group(0) is a non-escaped string + return pat_user.sub(username_hash, match.group(0)) + text = pat_home.sub(f, text) return text +# Backward compatibility +anonymize_path = anonymize_text + + class Anonymizer: """Stateful anonymizer that consistently hashes usernames.""" @@ -77,29 +88,38 @@ def __init__(self, extra_usernames: list[str] | None = None): self.username_hash = _hash_username(self.username) # Additional usernames to anonymize (GitHub handles, Discord names, etc.) - self._extra: list[tuple[str, str]] = [] + self._extra_dict = {} for name in (extra_usernames or []): name = name.strip() - if name and name != self.username: - self._extra.append((name, _hash_username(name))) + if name and name != self.username and len(name) >= 4: + self._extra_dict[name.lower()] = _hash_username(name) + + self._extra = list(self._extra_dict.keys()) + + if self._extra_dict: + escaped_names = [re.escape(k) for k in sorted(self._extra_dict.keys(), key=len, reverse=True)] + self._extra_pattern = re.compile(rf"(? str: - result = anonymize_path(file_path, self.username, self.username_hash, self.home) - result = anonymize_text(result, self.username, self.username_hash) - for name, hashed in self._extra: - result = _replace_username(result, name, hashed) - return result + return self.text(file_path) def text(self, content: str) -> str: - result = anonymize_text(content, self.username, self.username_hash) - for name, hashed in self._extra: - result = _replace_username(result, name, hashed) + result = anonymize_text(content, self.username, self.username_hash, self.home) + if self._extra_pattern: + def f(match): + return self._extra_dict[match.group(1).lower()] + result = self._extra_pattern.sub(f, result) return result def _replace_username(text: str, username: str, username_hash: str) -> str: - if not text or not username or len(username) < 3: + if not text or not username or len(username) < 4: return text - escaped = re.escape(username) - text = re.sub(escaped, username_hash, text, flags=re.IGNORECASE) - return text + + if username.lower() not in text.lower(): + return text + + pat = _get_username_pattern(username) + return pat.sub(username_hash, text) diff --git a/tests/test_anonymizer.py b/tests/test_anonymizer.py index 0320f7d..81d8111 100644 --- a/tests/test_anonymizer.py +++ b/tests/test_anonymizer.py @@ -39,56 +39,62 @@ class TestAnonymizePath: def test_empty_path(self): assert anonymize_path("", "alice", "user_abc12345") == "" - def test_documents_prefix_stripped(self): + def test_global_replace(self): + # if username is >= 4 chars, username is hashed using global replace result = anonymize_path( - "/Users/alice/Documents/myproject/src/main.py", - "alice", "user_abc12345", home="/Users/alice", + "/Users/alice/something", + "alice", "user_abc12345" ) - assert result == "myproject/src/main.py" + assert result == "/Users/user_abc12345/something" - def test_downloads_prefix_stripped(self): + def test_bare_home_hashed(self): result = anonymize_path( - "/Users/alice/Downloads/file.zip", - "alice", "user_abc12345", home="/Users/alice", + "/Users/s/somedir/file.py", + "s", "user_abc12345", home="/Users/s", ) - assert result == "file.zip" + assert result == "/Users/user_abc12345/somedir/file.py" - def test_desktop_prefix_stripped(self): + def test_linux_home_path(self): result = anonymize_path( - "/Users/alice/Desktop/notes.txt", - "alice", "user_abc12345", home="/Users/alice", + "/home/s/Documents/project/file.py", + "s", "user_abc12345", home="/home/s", ) - assert result == "notes.txt" + assert result == "/home/user_abc12345/Documents/project/file.py" - def test_bare_home_hashed(self): + def test_path_not_under_home(self): result = anonymize_path( - "/Users/alice/somedir/file.py", - "alice", "user_abc12345", home="/Users/alice", + "/var/log/syslog", + "s", "user_abc12345", home="/Users/s", ) - assert result == "user_abc12345/somedir/file.py" + assert result == "/var/log/syslog" - def test_linux_home_path(self): + def test_windows_users_path(self): result = anonymize_path( - "/home/alice/Documents/project/file.py", - "alice", "user_abc12345", home="/home/alice", + r"C:\Users\bob\Documents\file.txt", + "bob", "user_abc12345", ) - assert result == "project/file.py" + assert result == r"C:\Users\user_abc12345\Documents\file.txt" - def test_path_not_under_home(self): + def test_windows_users_path_double_backslashes(self): result = anonymize_path( - "/var/log/syslog", - "alice", "user_abc12345", home="/Users/alice", + r"\\Users\\bob\\Documents\\file.txt", + "bob", "user_abc12345", ) - assert result == "/var/log/syslog" + assert result == r"\\Users\\user_abc12345\\Documents\\file.txt" + + def test_windows_custom_home_path(self): + result = anonymize_path( + "C:\\custom_home\\bob\\project\\file.py", + "bob", "user_abc12345", home=r"C:\custom_home\bob", + ) + assert result == "C:\\custom_home\\user_abc12345\\project\\file.py" - def test_fallback_users_replacement(self): - # Path with username not matching the prefix set + def test_msys2_custom_home_path(self): result = anonymize_path( - "/tmp/Users/alice/something", - "alice", "user_abc12345", home="/Users/alice", + "/c/custom_home/bob/project/file.py", + "bob", "user_abc12345", home=r"C:\custom_home\bob", ) - # Falls through prefix matching, hits the fallback .replace - assert "user_abc12345" in result or "/tmp/" in result + assert result == "/c/custom_home/user_abc12345/project/file.py" # --- anonymize_text --- @@ -109,40 +115,35 @@ def test_users_path_replaced(self): "File at /Users/alice/project/main.py", "alice", "user_abc12345", ) - assert "/user_abc12345/project/main.py" in result + assert result == "File at /Users/user_abc12345/project/main.py" def test_home_path_replaced(self): result = anonymize_text( "File at /home/alice/project/main.py", "alice", "user_abc12345", ) - assert "/user_abc12345/project/main.py" in result + assert result == "File at /home/user_abc12345/project/main.py" def test_hyphen_encoded_path(self): result = anonymize_text( "-Users-alice-Documents-myproject", "alice", "user_abc12345", ) - assert "-Users-user_abc12345" in result + assert result == "-Users-user_abc12345-Documents-myproject" def test_temp_path(self): - # The hyphen-encoded path regex runs before the temp path regex, - # so the username gets replaced but claude-XXX may not trigger. - # The important thing is the username is anonymized. result = anonymize_text( "/private/tmp/claude-501/-Users-alice-Documents-proj/foo", "alice", "user_abc12345", ) - assert "alice" not in result - assert "user_abc12345" in result + assert result == "/private/tmp/claude-501/-Users-user_abc12345-Documents-proj/foo" def test_bare_username_replaced(self): result = anonymize_text( "Hello alice, welcome back", "alice", "user_abc12345", ) - assert "alice" not in result - assert "user_abc12345" in result + assert result == "Hello user_abc12345, welcome back" def test_short_username_not_replaced_bare(self): # Usernames < 4 chars should NOT be replaced as bare words @@ -150,7 +151,7 @@ def test_short_username_not_replaced_bare(self): "Hello bob, welcome back", "bob", "user_abc12345", ) - assert "bob" in result # bare replacement skipped for short username + assert result == "Hello bob, welcome back" def test_short_username_path_still_replaced(self): # Even short usernames should be replaced in path contexts @@ -158,7 +159,7 @@ def test_short_username_path_still_replaced(self): "File at /Users/bob/project", "bob", "user_abc12345", ) - assert "/user_abc12345/project" in result + assert result == "File at /Users/user_abc12345/project" # --- Anonymizer class ---