-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathuri.py
More file actions
164 lines (132 loc) · 5.42 KB
/
uri.py
File metadata and controls
164 lines (132 loc) · 5.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
"""URI validator for ajson:// scheme."""
import re
from dataclasses import dataclass, field
from typing import List
from urllib.parse import urlparse
@dataclass
class URIValidationResult:
"""Result of URI validation."""
is_valid: bool
errors: List[str] = field(default_factory=list)
warnings: List[str] = field(default_factory=list)
uri: str = ""
parsed: dict = field(default_factory=dict)
class URIValidator:
"""Validator for ajson:// URIs per RFC 3986."""
# RFC 3986 compliant patterns
SCHEME_PATTERN = re.compile(r"^ajson://")
# Domain pattern (without port/userinfo for basic validation)
DOMAIN_PATTERN = re.compile(
r"^([a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?\.)*"
r"[a-zA-Z0-9]([a-zA-Z0-9\-]{0,61}[a-zA-Z0-9])?$|^localhost$"
)
PATH_PATTERN = re.compile(r"^(/[a-zA-Z0-9._~!$&'()*+,;=:@\-]*)*$")
FRAGMENT_PATTERN = re.compile(r"^[a-zA-Z0-9._~!$&'()*+,;=:@/?-]*$")
def validate(self, uri: str) -> URIValidationResult:
"""
Validate an ajson:// URI.
Args:
uri: The URI to validate
Returns:
URIValidationResult with validation status
"""
errors: List[str] = []
warnings: List[str] = []
parsed_data: dict = {}
if not uri:
errors.append("URI cannot be empty")
return URIValidationResult(is_valid=False, errors=errors, uri=uri)
# Check scheme
if not self.SCHEME_PATTERN.match(uri):
errors.append(
f"Invalid URI scheme. Expected 'ajson://', got: {uri.split('://')[0] if '://' in uri else 'none'}"
)
return URIValidationResult(is_valid=False, errors=errors, uri=uri)
# Parse URI
try:
parsed = urlparse(uri)
parsed_data = {
"scheme": parsed.scheme,
"authority": parsed.netloc,
"path": parsed.path,
"query": parsed.query,
"fragment": parsed.fragment,
}
except Exception as e:
errors.append(f"Failed to parse URI: {e}")
return URIValidationResult(is_valid=False, errors=errors, uri=uri, parsed=parsed_data)
# Validate authority (netloc)
authority = parsed.netloc
if not authority:
errors.append("URI must include an authority (domain/host) component")
else:
# Check for userinfo (not recommended)
if "@" in authority:
warnings.append("URI contains userinfo (@), which is not recommended for security")
# Extract domain after userinfo
domain_with_port = authority.split("@")[-1]
else:
domain_with_port = authority
# Check for port
if ":" in domain_with_port and domain_with_port != "localhost":
domain, port_str = domain_with_port.rsplit(":", 1)
try:
port = int(port_str)
if port < 1 or port > 65535:
errors.append(f"Invalid port number: {port}")
except ValueError:
errors.append(f"Invalid port in authority: {port_str}")
domain = domain_with_port
else:
domain = domain_with_port
# Validate domain format
if not self.DOMAIN_PATTERN.match(domain):
errors.append(f"Invalid authority (domain): '{domain}'. Must be a valid domain or 'localhost'")
# Validate path
path = parsed.path
if not path:
warnings.append("URI has no path component")
elif not path.startswith("/"):
errors.append(f"Path must start with '/': {path}")
elif not self.PATH_PATTERN.match(path):
errors.append(f"Invalid characters in path: {path}")
# Validate query (optional)
if parsed.query:
warnings.append(f"Query parameters present: {parsed.query}")
# Validate fragment (optional)
if parsed.fragment:
if not self.FRAGMENT_PATTERN.match(parsed.fragment):
errors.append(f"Invalid characters in fragment: {parsed.fragment}")
is_valid = len(errors) == 0
return URIValidationResult(
is_valid=is_valid,
errors=errors,
warnings=warnings,
uri=uri,
parsed=parsed_data,
)
def to_https(self, uri: str) -> str:
"""
Transform ajson:// URI to HTTPS well-known URI.
Args:
uri: ajson:// URI
Returns:
Corresponding HTTPS URL
Example:
ajson://example.com/agents/router
-> https://example.com/.well-known/agents/router.agents.json
"""
result = self.validate(uri)
if not result.is_valid:
raise ValueError(f"Invalid ajson:// URI: {', '.join(result.errors)}")
parsed = result.parsed
authority = parsed["authority"]
path = parsed["path"].lstrip("/")
# Add .agents.json extension if not present
if not path.endswith(".agents.json"):
path = f"{path}.agents.json"
# Build HTTPS URL - path already includes everything we need
https_url = f"https://{authority}/.well-known/{path}"
if parsed["fragment"]:
https_url += f"#{parsed['fragment']}"
return https_url