Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions .github/workflows/verify.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
name: Verify & Code-Coverage

on:
pull_request:
push:
branches:
- master

jobs:
test:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"

- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install tox

- name: Run tests
run: tox -e check,py3

- name: Upload coverage to Codecov
if: success()
uses: codecov/codecov-action@v4
with:
token: ${{ secrets.CODECOV_TOKEN }}
fail_ci_if_error: true
80 changes: 26 additions & 54 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,74 +12,46 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import
from __future__ import print_function

import sys
from glob import glob
from os.path import basename
from os.path import splitext

from setuptools import find_packages
from setuptools import setup
from setuptools.command.test import test as TestCommand


# Allows one to simply > python3 setup.py test
class PyTest(TestCommand):
user_options = [('pytest-args=', 'a', "Arguments to pass to py.test")]

def initialize_options(self):
TestCommand.initialize_options(self)
self.pytest_args = []
from os.path import basename, splitext

def finalize_options(self):
TestCommand.finalize_options(self)
self.test_args = []
self.test_suite = True
from setuptools import find_packages, setup

def run_tests(self):
# import here, cause outside the eggs aren't loaded
import pytest
errno = pytest.main(self.pytest_args)
sys.exit(errno)


with open('README.md') as f:
with open("README.md", encoding="utf-8") as f:
long_description = f.read()


setup(
cmdclass={'test': PyTest},
name='vinyldns-python',
version='0.9.8',
packages=find_packages('src'),
package_dir={'': 'src'},
py_modules=[splitext(basename(path))[0] for path in glob('src/*.py')],
name="vinyldns-python",
version="0.9.8",
packages=find_packages("src"),
package_dir={"": "src"},
py_modules=[splitext(basename(path))[0] for path in glob("src/*.py")],
include_package_data=True,
zip_safe=False,
url='https://github.com/vinyldns/vinyldns-python',
license='Apache Software License 2.0',
author='vinyldns',
author_email='vinyldns-core@googlegroups.com',
url="https://github.com/vinyldns/vinyldns-python",
license="Apache Software License 2.0",
author="vinyldns",
author_email="vinyldns-core@googlegroups.com",
classifiers=[
'Development Status :: 5 - Production/Stable',
'Intended Audience :: Developers',
'Topic :: Internet :: Name Service (DNS)',
'License :: OSI Approved :: Apache Software License',
"Development Status :: 5 - Production/Stable",
"Intended Audience :: Developers",
"Topic :: Internet :: Name Service (DNS)",
"License :: OSI Approved :: Apache Software License",
"Programming Language :: Python :: 3",
],
keywords=[
'dns', 'python', 'vinyldns',
],
description='Python client library for VinylDNS',
keywords=["dns", "python", "vinyldns"],
description="Python client library for VinylDNS",
long_description=long_description,
long_description_content_type="text/markdown",
install_requires=[
'boto>=2.48.0',
'requests>=2.20.0',
'python-dateutil>=2.7.5',
"boto3>=1.26.0",
"requests>=2.20.0",
"python-dateutil>=2.7.5",
],
tests_require=[
'responses==0.10.4',
'pytest==3.10.1',
],
"responses==0.25.8",
"pytest==9.0.2",
]
)
175 changes: 109 additions & 66 deletions src/vinyldns/boto_request_signer.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,107 +11,150 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""TODO: Add module docstring."""

import logging
from datetime import datetime
from hashlib import sha256
from datetime import datetime, UTC
from typing import Dict, Optional, Union
import urllib.parse as urlparse

import requests.compat as urlparse
from boto.dynamodb2.layer1 import DynamoDBConnection
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
from botocore.credentials import Credentials

logger = logging.getLogger(__name__)

__all__ = [u'BotoRequestSigner']
__all__ = ["BotoRequestSigner"]


class BotoRequestSigner(object):
"""TODO: Add class docstring."""
class BotoRequestSigner:
"""
Signs HTTP requests using AWS Signature Version 4 for the VinylDNS service.
"""

def __init__(self, index_url, access_key, secret_access_key):
"""TODO: Add method docstring."""
def __init__(
self,
index_url: str,
access_key: str,
secret_access_key: str,
) -> None:
url = urlparse.urlparse(index_url)
self.boto_connection = DynamoDBConnection(
host=url.hostname,
port=url.port,
aws_access_key_id=access_key,
aws_secret_access_key=secret_access_key,
is_secure=False)
scheme = url.scheme or "https"
host = url.hostname
port = url.port

if host is None:
raise ValueError(f"Invalid index_url (missing host): {index_url}")

self.netloc = f"{host}:{port}" if port else host
self.base_url = f"{scheme}://{self.netloc}"
self.region_name = "us-east-1"
self.service_name = "VinylDNS"
self.credentials = Credentials(access_key, secret_access_key)

@staticmethod
def __canonical_date(headers):
def __canonical_date(headers: Dict[str, str]) -> str:
"""
Derive canonical date (ISO 8601 string).
Resolve an ISO8601-like date from headers, falling back to current UTC.

Either from headers (if possible) or synthesize it if no usable header exists.
Checks 'X-Amz-Date' (ISO8601 basic) and 'Date' (HTTP-date),
and returns an ISO8601 basic formatted string.
"""
iso_format = u'%Y%m%dT%H%M%SZ'
http_format = u'%a, %d %b %Y %H:%M:%S GMT'
iso_format = "%Y%m%dT%H%M%SZ"
http_format = "%a, %d %b %Y %H:%M:%S GMT"

def try_parse(date_string, format):
def try_parse(
date_string: Optional[str],
fmt: str,
) -> Optional[datetime]:
if date_string is None:
return None
try:
return datetime.strptime(date_string, format)
return datetime.strptime(date_string, fmt)
except ValueError:
return None

amz_date = try_parse(headers.get(u'X-Amz-Date'), iso_format)
http_date = try_parse(headers.get(u'Date'), http_format)
fallback_date = datetime.utcnow()
amz_date = try_parse(headers.get("X-Amz-Date"), iso_format)
http_date = try_parse(headers.get("Date"), http_format)
fallback_date = datetime.now(UTC)

date = next(d for d in [amz_date, http_date, fallback_date] if d is not None)
date = next(
d for d in (amz_date, http_date, fallback_date) if d is not None
)
return date.strftime(iso_format)

def build_auth_header(self, method, path, headers, body, params=None):
"""Construct an Authorization header, using boto."""
request = self.boto_connection.build_base_http_request(
method=method,
path=path,
auth_path=path,
headers=headers,
data=body,
params=params or {})
def build_auth_header(
self,
method: str,
path: str,
headers: Optional[Dict[str, str]],
body: Optional[Union[str, bytes]],
params: Optional[Dict[str, Union[str, bytes]]] = None,
) -> str:
"""
Build the AWS SigV4 Authorization header for the given request parameters.
"""
hdrs: Dict[str, str] = dict(headers or {})
hdrs.setdefault("Host", self.netloc)
# Remove Date header if present
hdrs.pop("Date", None)

auth_handler = self.boto_connection._auth_handler
# Normalize body to bytes
if body is None:
data = b""
elif isinstance(body, str):
data = body.encode("utf-8")
else:
data = body

timestamp = BotoRequestSigner.__canonical_date(headers)
request.timestamp = timestamp[0:8]
query = generate_canonical_query_string(params or {})

request.region_name = u'us-east-1'
request.service_name = u'VinylDNS'
if not path.startswith("/"):
path = "/" + path

credential_scope = u'/'.join([request.timestamp, request.region_name, request.service_name, u'aws4_request'])
url = f"{self.base_url}{path}"
if query:
url = f"{url}?{query}"

canonical_request = auth_handler.canonical_request(request)
split_request = canonical_request.split('\n')
aws_request = AWSRequest(
method=method,
url=url,
data=data,
headers=hdrs,
)

if params != {} and split_request[2] == '':
split_request[2] = generate_canonical_query_string(params)
canonical_request = '\n'.join(split_request)
SigV4Auth(
self.credentials,
self.service_name,
self.region_name,
).add_auth(aws_request)

hashed_request = sha256(canonical_request.encode(u'utf-8')).hexdigest()
return aws_request.headers["Authorization"]

string_to_sign = u'\n'.join([u'AWS4-HMAC-SHA256', timestamp, credential_scope, hashed_request])
signature = auth_handler.signature(request, string_to_sign)
headers_to_sign = auth_handler.headers_to_sign(request)

auth_header = u','.join([
u'AWS4-HMAC-SHA256 Credential=%s' % auth_handler.scope(request),
u'SignedHeaders=%s' % auth_handler.signed_headers(headers_to_sign),
u'Signature=%s' % signature])
def generate_canonical_query_string(
params: Dict[str, Union[str, bytes]],
) -> str:
"""
Generate a canonical (sorted + percent-encoded) query string suitable for SigV4.
"""
if not params:
return ""

return auth_header
def _to_str(value: Union[str, bytes]) -> str:
if isinstance(value, bytes):
return value.decode("utf-8")
return str(value)

encoded_pairs = []

def generate_canonical_query_string(params):
"""
Using in place of canonical_query_string from boto/auth.py to support POST requests with query parameters
"""
post_params = []
for param in sorted(params):
value = params[param].encode('utf-8')
import urllib
post_params.append('%s=%s' % (urllib.parse.quote(param, safe='-_.~'),
urllib.parse.quote(value, safe='-_.~')))
return '&'.join(post_params)
value = _to_str(params[param])
encoded_pairs.append(
"%s=%s"
% (
urlparse.quote(param, safe="-_.~"),
urlparse.quote(value, safe="-_.~"),
)
)

return "&".join(encoded_pairs)
16 changes: 5 additions & 11 deletions src/vinyldns/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,13 @@
import json
import logging
import os
from builtins import str

import requests
from urllib.parse import parse_qs
from datetime import datetime, UTC
from urllib.parse import parse_qs, urljoin, urlparse, urlsplit

from requests.adapters import HTTPAdapter
# Python 2/3 compatibility
from requests.compat import urljoin
from requests.compat import urlparse
from requests.compat import urlsplit
from requests.packages.urllib3.util.retry import Retry
from urllib3.util.retry import Retry

# TODO: Didn't like this boto request signer, fix when moving back
from vinyldns.boto_request_signer import BotoRequestSigner

from vinyldns.batch_change import BatchChange, ListBatchChangeSummaries, to_review_json
Expand Down Expand Up @@ -208,8 +203,7 @@ def __build_headers(new_headers, suppressed_keys):
def canonical_header_name(field_name):
return u'-'.join(word.capitalize() for word in field_name.split(u'-'))

import datetime
now = datetime.datetime.utcnow()
now = datetime.now(UTC)
headers = {u'Content-Type': u'application/x-amz-json-1.0',
u'Date': now.strftime(u'%a, %d %b %Y %H:%M:%S GMT'),
u'X-Amz-Date': now.strftime(u'%Y%m%dT%H%M%SZ')}
Expand Down
Loading