Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 71 additions & 51 deletions dataclaw/anonymizer.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""Anonymize PII in Claude Code log data."""

import functools
import hashlib
import os
import re
Expand All @@ -15,60 +16,70 @@ def _detect_home_dir() -> tuple[str, str]:
return home, username


def anonymize_path(path: str, username: str, username_hash: str, home: str | None = None) -> str:
"""Strip a path to project-relative and hash the username."""
if not path:
return path
@functools.lru_cache(maxsize=32)
def _get_username_pattern(username: str) -> re.Pattern:
escaped = re.escape(username)
# \b does not match word boundaries around underscore, but we need to match them
return re.compile(rf"(?<![a-zA-Z0-9]){escaped}(?![a-zA-Z0-9])", flags=re.IGNORECASE)


if home is None:
home = os.path.expanduser("~")
prefixes = set()
for base in (f"/Users/{username}", f"/home/{username}", home):
for subdir in ("Documents", "Downloads", "Desktop"):
prefixes.add(f"{base}/{subdir}/")
prefixes.add(f"{base}/")
@functools.lru_cache(maxsize=32)
def _get_home_pattern(username: str) -> re.Pattern:
escaped = re.escape(username)
# Match /Users/username , \Users\username , \\Users\\username , -Users-username (hyphen-encoded path like Claude Code and HuggingFace cache)
# Match conventional indicators of home dir: 'Users' and 'home'
# Ignore case for Windows-like path
return re.compile(rf"([/\\-]+(Users|home)[/\\-]+){escaped}(?=[^a-zA-Z0-9]|$)", flags=re.IGNORECASE)

# Try longest prefixes first (subdirectory matches before bare home)
home_patterns = sorted(prefixes, key=len, reverse=True)

for prefix in home_patterns:
if path.startswith(prefix):
rest = path[len(prefix):]
if "/Documents/" in prefix or "/Downloads/" in prefix or "/Desktop/" in prefix:
return rest
return f"{username_hash}/{rest}"
@functools.lru_cache(maxsize=32)
def _get_custom_home_pattern(home: str) -> re.Pattern | None:
if home.startswith(("/Users/", "/home/", "C:\\Users\\")):
return None

path = path.replace(f"/Users/{username}/", f"/{username_hash}/")
path = path.replace(f"/home/{username}/", f"/{username_hash}/")
# If home is not conventional, replace with more specific pattern

return path
# Escape home and replace / or \ with `r"[/\\-]+"`
home_escaped = home.replace("\\", "/")
home_escaped = re.escape(home_escaped)
home_escaped = home_escaped.replace("/", r"[/\\-]+")
# In WSL and MSYS2, C:\ may be represented by /c/
home_escaped = home_escaped.replace(":", ":?")
return re.compile(home_escaped, flags=re.IGNORECASE)


def anonymize_text(text: str, username: str, username_hash: str) -> str:
def anonymize_text(text: str, username: str, username_hash: str, home: str | None = None) -> str:
if not text or not username:
return text

escaped = re.escape(username)
if username.lower() not in text.lower():
return text

# Replace /Users/<username> and /home/<username>
text = re.sub(rf"/Users/{escaped}(?=/|[^a-zA-Z0-9_-]|$)", f"/{username_hash}", text)
text = re.sub(rf"/home/{escaped}(?=/|[^a-zA-Z0-9_-]|$)", f"/{username_hash}", text)
# Replace bare username in contexts (ls output, prose, etc.)
# Only if username is >= 4 chars to avoid false positives
if len(username) >= 4:
return _get_username_pattern(username).sub(username_hash, text)

# Catch hyphen-encoded paths: -Users-peteromalley- or -Users-peteromalley/
text = re.sub(rf"-Users-{escaped}(?=-|/|$)", f"-Users-{username_hash}", text)
text = re.sub(rf"-home-{escaped}(?=-|/|$)", f"-home-{username_hash}", text)
# When username is < 4 chars, replace with more specific patterns

# Catch temp paths like /private/tmp/claude-501/-Users-peteromalley/
text = re.sub(rf"claude-\d+/-Users-{escaped}", f"claude-XXX/-Users-{username_hash}", text)
text = _get_home_pattern(username).sub(rf"\g<1>{username_hash}", text)

# Final pass: replace bare username in remaining contexts (ls output, prose, etc.)
# Only if username is >= 4 chars to avoid false positives
if len(username) >= 4:
text = re.sub(rf"\b{escaped}\b", username_hash, text)
if home:
pat_home = _get_custom_home_pattern(home)
if pat_home:
pat_user = _get_username_pattern(username)
def f(match):
# match.group(0) is a non-escaped string
return pat_user.sub(username_hash, match.group(0))
text = pat_home.sub(f, text)

return text


# Backward compatibility
anonymize_path = anonymize_text


class Anonymizer:
"""Stateful anonymizer that consistently hashes usernames."""

Expand All @@ -77,29 +88,38 @@ def __init__(self, extra_usernames: list[str] | None = None):
self.username_hash = _hash_username(self.username)

# Additional usernames to anonymize (GitHub handles, Discord names, etc.)
self._extra: list[tuple[str, str]] = []
self._extra_dict = {}
for name in (extra_usernames or []):
name = name.strip()
if name and name != self.username:
self._extra.append((name, _hash_username(name)))
if name and name != self.username and len(name) >= 4:
self._extra_dict[name.lower()] = _hash_username(name)

self._extra = list(self._extra_dict.keys())

if self._extra_dict:
escaped_names = [re.escape(k) for k in sorted(self._extra_dict.keys(), key=len, reverse=True)]
self._extra_pattern = re.compile(rf"(?<![a-zA-Z0-9])({'|'.join(escaped_names)})(?![a-zA-Z0-9])", flags=re.IGNORECASE)
else:
self._extra_pattern = None

def path(self, file_path: str) -> str:
result = anonymize_path(file_path, self.username, self.username_hash, self.home)
result = anonymize_text(result, self.username, self.username_hash)
for name, hashed in self._extra:
result = _replace_username(result, name, hashed)
return result
return self.text(file_path)

def text(self, content: str) -> str:
result = anonymize_text(content, self.username, self.username_hash)
for name, hashed in self._extra:
result = _replace_username(result, name, hashed)
result = anonymize_text(content, self.username, self.username_hash, self.home)
if self._extra_pattern:
def f(match):
return self._extra_dict[match.group(1).lower()]
result = self._extra_pattern.sub(f, result)
return result


def _replace_username(text: str, username: str, username_hash: str) -> str:
if not text or not username or len(username) < 3:
if not text or not username or len(username) < 4:
return text
escaped = re.escape(username)
text = re.sub(escaped, username_hash, text, flags=re.IGNORECASE)
return text

if username.lower() not in text.lower():
return text

pat = _get_username_pattern(username)
return pat.sub(username_hash, text)
85 changes: 43 additions & 42 deletions tests/test_anonymizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,56 +39,62 @@ class TestAnonymizePath:
def test_empty_path(self):
assert anonymize_path("", "alice", "user_abc12345") == ""

def test_documents_prefix_stripped(self):
def test_global_replace(self):
# if username is >= 4 chars, username is hashed using global replace
result = anonymize_path(
"/Users/alice/Documents/myproject/src/main.py",
"alice", "user_abc12345", home="/Users/alice",
"/Users/alice/something",
"alice", "user_abc12345"
)
assert result == "myproject/src/main.py"
assert result == "/Users/user_abc12345/something"

def test_downloads_prefix_stripped(self):
def test_bare_home_hashed(self):
result = anonymize_path(
"/Users/alice/Downloads/file.zip",
"alice", "user_abc12345", home="/Users/alice",
"/Users/s/somedir/file.py",
"s", "user_abc12345", home="/Users/s",
)
assert result == "file.zip"
assert result == "/Users/user_abc12345/somedir/file.py"

def test_desktop_prefix_stripped(self):
def test_linux_home_path(self):
result = anonymize_path(
"/Users/alice/Desktop/notes.txt",
"alice", "user_abc12345", home="/Users/alice",
"/home/s/Documents/project/file.py",
"s", "user_abc12345", home="/home/s",
)
assert result == "notes.txt"
assert result == "/home/user_abc12345/Documents/project/file.py"

def test_bare_home_hashed(self):
def test_path_not_under_home(self):
result = anonymize_path(
"/Users/alice/somedir/file.py",
"alice", "user_abc12345", home="/Users/alice",
"/var/log/syslog",
"s", "user_abc12345", home="/Users/s",
)
assert result == "user_abc12345/somedir/file.py"
assert result == "/var/log/syslog"

def test_linux_home_path(self):
def test_windows_users_path(self):
result = anonymize_path(
"/home/alice/Documents/project/file.py",
"alice", "user_abc12345", home="/home/alice",
r"C:\Users\bob\Documents\file.txt",
"bob", "user_abc12345",
)
assert result == "project/file.py"
assert result == r"C:\Users\user_abc12345\Documents\file.txt"

def test_path_not_under_home(self):
def test_windows_users_path_double_backslashes(self):
result = anonymize_path(
"/var/log/syslog",
"alice", "user_abc12345", home="/Users/alice",
r"\\Users\\bob\\Documents\\file.txt",
"bob", "user_abc12345",
)
assert result == "/var/log/syslog"
assert result == r"\\Users\\user_abc12345\\Documents\\file.txt"

def test_windows_custom_home_path(self):
result = anonymize_path(
"C:\\custom_home\\bob\\project\\file.py",
"bob", "user_abc12345", home=r"C:\custom_home\bob",
)
assert result == "C:\\custom_home\\user_abc12345\\project\\file.py"

def test_fallback_users_replacement(self):
# Path with username not matching the prefix set
def test_msys2_custom_home_path(self):
result = anonymize_path(
"/tmp/Users/alice/something",
"alice", "user_abc12345", home="/Users/alice",
"/c/custom_home/bob/project/file.py",
"bob", "user_abc12345", home=r"C:\custom_home\bob",
)
# Falls through prefix matching, hits the fallback .replace
assert "user_abc12345" in result or "/tmp/" in result
assert result == "/c/custom_home/user_abc12345/project/file.py"


# --- anonymize_text ---
Expand All @@ -109,56 +115,51 @@ def test_users_path_replaced(self):
"File at /Users/alice/project/main.py",
"alice", "user_abc12345",
)
assert "/user_abc12345/project/main.py" in result
assert result == "File at /Users/user_abc12345/project/main.py"

def test_home_path_replaced(self):
result = anonymize_text(
"File at /home/alice/project/main.py",
"alice", "user_abc12345",
)
assert "/user_abc12345/project/main.py" in result
assert result == "File at /home/user_abc12345/project/main.py"

def test_hyphen_encoded_path(self):
result = anonymize_text(
"-Users-alice-Documents-myproject",
"alice", "user_abc12345",
)
assert "-Users-user_abc12345" in result
assert result == "-Users-user_abc12345-Documents-myproject"

def test_temp_path(self):
# The hyphen-encoded path regex runs before the temp path regex,
# so the username gets replaced but claude-XXX may not trigger.
# The important thing is the username is anonymized.
result = anonymize_text(
"/private/tmp/claude-501/-Users-alice-Documents-proj/foo",
"alice", "user_abc12345",
)
assert "alice" not in result
assert "user_abc12345" in result
assert result == "/private/tmp/claude-501/-Users-user_abc12345-Documents-proj/foo"

def test_bare_username_replaced(self):
result = anonymize_text(
"Hello alice, welcome back",
"alice", "user_abc12345",
)
assert "alice" not in result
assert "user_abc12345" in result
assert result == "Hello user_abc12345, welcome back"

def test_short_username_not_replaced_bare(self):
# Usernames < 4 chars should NOT be replaced as bare words
result = anonymize_text(
"Hello bob, welcome back",
"bob", "user_abc12345",
)
assert "bob" in result # bare replacement skipped for short username
assert result == "Hello bob, welcome back"

def test_short_username_path_still_replaced(self):
# Even short usernames should be replaced in path contexts
result = anonymize_text(
"File at /Users/bob/project",
"bob", "user_abc12345",
)
assert "/user_abc12345/project" in result
assert result == "File at /Users/user_abc12345/project"


# --- Anonymizer class ---
Expand Down