From 2108f440f4f1c30e1be8efcd2216bceffd379bbd Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Wed, 17 Sep 2025 08:59:42 +0200 Subject: [PATCH 1/2] Fixed #14 --- .github/workflows/auto_prefix_issues.yml | 86 +++++ .github/workflows/python-pip-build.yml | 4 +- .github/workflows/release-tests.yml | 70 ++++ Makefile | 65 +++- sectools/__init__.py | 1 - sectools/data/regex.py | 18 +- sectools/fileformats/Markdown.py | 8 +- sectools/fileformats/__init__.py | 4 +- sectools/network/domains.py | 10 + sectools/network/ip.py | 113 +++++- sectools/windows/crypto.py | 21 +- sectools/windows/ldap.py | 472 ----------------------- sectools/windows/ldap/ldap.py | 408 ++++++++++++++++++++ sectools/windows/ldap/wrappers.py | 184 +++++++++ 14 files changed, 952 insertions(+), 512 deletions(-) create mode 100644 .github/workflows/auto_prefix_issues.yml create mode 100644 .github/workflows/release-tests.yml delete mode 100644 sectools/windows/ldap.py create mode 100644 sectools/windows/ldap/ldap.py create mode 100644 sectools/windows/ldap/wrappers.py diff --git a/.github/workflows/auto_prefix_issues.yml b/.github/workflows/auto_prefix_issues.yml new file mode 100644 index 0000000..e5900f0 --- /dev/null +++ b/.github/workflows/auto_prefix_issues.yml @@ -0,0 +1,86 @@ +name: Auto-prefix & Label Issues + +on: + issues: + types: [opened, edited] + +jobs: + prefix_and_label: + runs-on: ubuntu-latest + steps: + - name: Ensure labels exist, then prefix titles & add labels + uses: actions/github-script@v6 + with: + script: | + const owner = context.repo.owner; + const repo = context.repo.repo; + + // 1. Ensure required labels exist + const required = [ + { name: 'bug', color: 'd73a4a', description: 'Something isn\'t working' }, + { name: 'enhancement', color: 'a2eeef', description: 'New feature or request' } + ]; + + // Fetch current labels in the repo + const { data: existingLabels } = await github.rest.issues.listLabelsForRepo({ + owner, repo, per_page: 100 + }); + const existingNames = new Set(existingLabels.map(l => l.name)); + + // Create any missing labels + for (const lbl of required) { + if (!existingNames.has(lbl.name)) { + await github.rest.issues.createLabel({ + owner, + repo, + name: lbl.name, + color: lbl.color, + description: lbl.description + }); + console.log(`Created label "${lbl.name}"`); + } + } + + // 2. Fetch all open issues + const issues = await github.paginate( + github.rest.issues.listForRepo, + { owner, repo, state: 'open', per_page: 100 } + ); + + // 3. Keyword sets + const enhancementWords = ["add", "added", "improve", "improved"]; + const bugWords = ["bug", "error", "problem", "crash", "failed", "fix", "fixed"]; + + // 4. Process each issue + for (const issue of issues) { + const origTitle = issue.title; + const lower = origTitle.toLowerCase(); + + // skip if already prefixed + if (/^\[(bug|enhancement)\]/i.test(origTitle)) continue; + + let prefix, labelToAdd; + if (enhancementWords.some(w => lower.includes(w))) { + prefix = "[enhancement]"; + labelToAdd = "enhancement"; + } else if (bugWords.some(w => lower.includes(w))) { + prefix = "[bug]"; + labelToAdd = "bug"; + } + + if (prefix) { + // update title + await github.rest.issues.update({ + owner, repo, issue_number: issue.number, + title: `${prefix} ${origTitle}` + }); + console.log(`Prefixed title of #${issue.number}`); + + // add label + await github.rest.issues.addLabels({ + owner, repo, issue_number: issue.number, + labels: [labelToAdd] + }); + console.log(`Added label "${labelToAdd}" to #${issue.number}`); + } + } diff --git a/.github/workflows/python-pip-build.yml b/.github/workflows/python-pip-build.yml index c2ddeec..b037759 100644 --- a/.github/workflows/python-pip-build.yml +++ b/.github/workflows/python-pip-build.yml @@ -18,9 +18,9 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - name: Set up Python 3.11 - uses: actions/setup-python@v3 + uses: actions/setup-python@v4 with: python-version: "3.11" - name: Install dependencies diff --git a/.github/workflows/release-tests.yml b/.github/workflows/release-tests.yml new file mode 100644 index 0000000..1b783e9 --- /dev/null +++ b/.github/workflows/release-tests.yml @@ -0,0 +1,70 @@ +name: Release Tests + +on: + release: + types: [ published, created ] + +jobs: + comprehensive-test: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: ["3.11", "3.12"] + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install -e . + + - name: Run all tests + run: | + cd bhopengraph/tests + python run_tests.py + + - name: Run tests with verbose output + run: | + cd bhopengraph/tests + python -m unittest discover -v + + - name: Run tests with coverage + run: | + pip install coverage + coverage run --source=bhopengraph bhopengraph/tests/run_tests.py + coverage report + + build-package: + runs-on: ubuntu-latest + needs: comprehensive-test + + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Python 3.12 + uses: actions/setup-python@v4 + with: + python-version: "3.12" + + - name: Install build dependencies + run: | + python -m pip install --upgrade pip + pip install build wheel + + - name: Build package + run: | + python -m build + + - name: Upload build artifacts + uses: actions/upload-artifact@v4 + with: + name: bhopengraph-package + path: dist/ diff --git a/Makefile b/Makefile index 237dc10..adcc91c 100644 --- a/Makefile +++ b/Makefile @@ -1,16 +1,71 @@ -.PHONY : all clean build upload +.PHONY : all clean build upload test test-verbose test-coverage lint lint-fix fix + +PROJECTNAME := "sectools" all: install clean clean: @rm -rf `find ./ -type d -name "*__pycache__"` - @rm -rf ./build/ ./dist/ ./sectools.egg-info/ + @rm -rf ./build/ ./dist/ ./$(PROJECTNAME).egg-info/ + @rm -rf ./$(PROJECTNAME)/tests/datasets/dataset_n*.json + +generate-docs: + @python3 -m pip install pdoc --break-system-packages + @echo "[$(shell date)] Generating docs ..." + @PDOC_ALLOW_EXEC=1 python3 -m pdoc -d markdown -o ./documentation/ ./$(PROJECTNAME)/ + @echo "[$(shell date)] Done!" + +uninstall: + pip uninstall $(PROJECTNAME) --yes --break-system-packages install: build - python3 setup.py install + python3 -m pip install . --break-system-packages build: - python3 setup.py sdist bdist_wheel + python3 -m pip uninstall $(PROJECTNAME) --yes --break-system-packages + python3 -m pip install build --break-system-packages + python3 -m build --wheel upload: build - twine upload dist/* + python3 -m pip install twine --break-system-packages + python3 -m twine upload dist/* + +test: + @echo "[$(shell date)] Running tests ..." + @cd $(PROJECTNAME)/tests && python3 run_tests.py + @echo "[$(shell date)] Tests completed!" + +test-verbose: + @echo "[$(shell date)] Running tests with verbose output ..." + @cd $(PROJECTNAME)/tests && python3 -m unittest discover -v + @echo "[$(shell date)] Tests completed!" + +test-coverage: + @echo "[$(shell date)] Installing coverage and running tests with coverage ..." + @python3 -m pip install coverage --break-system-packages + @coverage run --source=$(PROJECTNAME) $(PROJECTNAME)/tests/run_tests.py + @coverage report + @coverage html + @echo "[$(shell date)] Coverage report generated in htmlcov/" + +lint: + @echo "[$(shell date)] Installing linting tools ..." + @python3 -m pip install flake8 black isort --break-system-packages + @echo "[$(shell date)] Running flake8 linting ..." + @python3 -m flake8 $(PROJECTNAME)/ --max-line-length=88 --extend-ignore=E501 + @echo "[$(shell date)] Running black code formatting check ..." + @python3 -m black --check --diff $(PROJECTNAME)/ + @echo "[$(shell date)] Running isort import sorting check ..." + @python3 -m isort --check-only --diff $(PROJECTNAME)/ + @echo "[$(shell date)] Linting completed!" + +lint-fix: + @echo "[$(shell date)] Installing linting tools ..." + @python3 -m pip install flake8 black isort --break-system-packages + @echo "[$(shell date)] Running black to fix formatting issues ..." + @python3 -m black $(PROJECTNAME)/ + @echo "[$(shell date)] Running isort to fix import sorting ..." + @python3 -m isort $(PROJECTNAME)/ + @echo "[$(shell date)] Running flake8 to check remaining issues ..." + @python3 -m flake8 $(PROJECTNAME)/ --max-line-length=88 --extend-ignore=E501 + @echo "[$(shell date)] Code formatting fixes completed!" diff --git a/sectools/__init__.py b/sectools/__init__.py index c68b768..f2d00d3 100644 --- a/sectools/__init__.py +++ b/sectools/__init__.py @@ -3,4 +3,3 @@ # File name : __init__.py # Author : Podalirius (@podalirius_) # Date created : 28 Jul 2022 - diff --git a/sectools/data/regex.py b/sectools/data/regex.py index a6bdd4a..04accd7 100644 --- a/sectools/data/regex.py +++ b/sectools/data/regex.py @@ -5,26 +5,28 @@ # Date created : 25 Sep 2021 # source: https://stackoverflow.com/questions/201323/how-can-i-validate-an-email-address-using-a-regular-expression -regex_email = r'''(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])''' +regex_email = r"""(?:[a-z0-9!#$%&'*+/=?^_`{|}~-]+(?:\.[a-z0-9!#$%&'*+/=?^_`{|}~-]+)*|"(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21\x23-\x5b\x5d-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])*")@(?:(?:[a-z0-9](?:[a-z0-9-]*[a-z0-9])?\.)+[a-z0-9](?:[a-z0-9-]*[a-z0-9])?|\[(?:(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9]))\.){3}(?:(2(5[0-5]|[0-4][0-9])|1[0-9][0-9]|[1-9]?[0-9])|[a-z0-9-]*[a-z0-9]:(?:[\x01-\x08\x0b\x0c\x0e-\x1f\x21-\x5a\x53-\x7f]|\\[\x01-\x09\x0b\x0c\x0e-\x7f])+)\])""" regex_email_b = bytes(regex_email, "UTF-8") -regex_domain = r'(([a-zA-Z0-9\-_]+\.)+([a-zA-Z0-9\-_]+\.[a-zA-Z0-9\-_]+))' +regex_domain = r"(([a-zA-Z0-9\-_]+\.)+([a-zA-Z0-9\-_]+\.[a-zA-Z0-9\-_]+))" regex_domain_b = bytes(regex_domain, "UTF-8") -regex_url = r'((http|https|ftp)://[a-zA-Z0-9\-]+\.[a-zA-Z0-9\-]+([:][0-9]+)?(/[a-zA-Z0-9_\-\.]+)+([/])?)' +regex_url = r"((http|https|ftp)://[a-zA-Z0-9\-]+\.[a-zA-Z0-9\-]+([:][0-9]+)?(/[a-zA-Z0-9_\-\.]+)+([/])?)" regex_url_b = bytes(regex_url, "UTF-8") -regex_ipv4 = r'(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])' +regex_ipv4 = r"(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])" regex_ipv4_b = bytes(regex_ipv4, "UTF-8") -regex_ipv4_cidr = r'((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9]))/([123][0-9]|[1-9])' +regex_ipv4_cidr = r"((25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9])\.(25[0-5]|2[0-4][0-9]|1[0-9][0-9]|[1-9]?[0-9]))/([123][0-9]|[1-9])" regex_ipv4_cidr_b = bytes(regex_ipv4_cidr, "UTF-8") -regex_ipv6 = r'((([0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){6}:[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){5}:([0-9A-Fa-f]{1,4}:)?[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){4}:([0-9A-Fa-f]{1,4}:){0,2}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){3}:([0-9A-Fa-f]{1,4}:){0,3}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){2}:([0-9A-Fa-f]{1,4}:){0,4}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){6}((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|(([0-9A-Fa-f]{1,4}:){0,5}:((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|(::([0-9A-Fa-f]{1,4}:){0,5}((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|([0-9A-Fa-f]{1,4}::([0-9A-Fa-f]{1,4}:){0,5}[0-9A-Fa-f]{1,4})|(::([0-9A-Fa-f]{1,4}:){0,6}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){1,7}:))' +regex_ipv6 = r"((([0-9A-Fa-f]{1,4}:){7}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){6}:[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){5}:([0-9A-Fa-f]{1,4}:)?[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){4}:([0-9A-Fa-f]{1,4}:){0,2}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){3}:([0-9A-Fa-f]{1,4}:){0,3}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){2}:([0-9A-Fa-f]{1,4}:){0,4}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){6}((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|(([0-9A-Fa-f]{1,4}:){0,5}:((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|(::([0-9A-Fa-f]{1,4}:){0,5}((b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b).){3}(b((25[0-5])|(1d{2})|(2[0-4]d)|(d{1,2}))b))|([0-9A-Fa-f]{1,4}::([0-9A-Fa-f]{1,4}:){0,5}[0-9A-Fa-f]{1,4})|(::([0-9A-Fa-f]{1,4}:){0,6}[0-9A-Fa-f]{1,4})|(([0-9A-Fa-f]{1,4}:){1,7}:))" regex_ipv6_b = bytes(regex_ipv6, "UTF-8") -regex_mac = r'([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})' +regex_mac = r"([0-9A-Fa-f]{2}[:-]){5}([0-9A-Fa-f]{2})" regex_mac_b = bytes(regex_mac, "UTF-8") -regex_phone_france = r'(0[1-9]([0-9]{8}))|(0[1-9][ .\-]([0-9][0-9][ .\-]){3}([0-9][0-9]))' +regex_phone_france = ( + r"(0[1-9]([0-9]{8}))|(0[1-9][ .\-]([0-9][0-9][ .\-]){3}([0-9][0-9]))" +) regex_phone_france_b = bytes(regex_phone_france, "UTF-8") diff --git a/sectools/fileformats/Markdown.py b/sectools/fileformats/Markdown.py index a008386..476068f 100644 --- a/sectools/fileformats/Markdown.py +++ b/sectools/fileformats/Markdown.py @@ -22,7 +22,7 @@ def __init__(self, data): @classmethod def fromFile(cls, path_to_file): if os.path.exists(path_to_file): - f = open(path_to_file, 'r') + f = open(path_to_file, "r") self = cls(data=f.read()) f.close() return self @@ -37,7 +37,9 @@ def fromData(cls, data): def extract_links(self): found = [] if self.data is not None: - for match in re.findall(r'[^!](\[([^\]]*)\]\(([^)]*)\))|(^\[([^\]]*)\]\(([^)]*)\))', self.data): + for match in re.findall( + r"[^!](\[([^\]]*)\]\(([^)]*)\))|(^\[([^\]]*)\]\(([^)]*)\))", self.data + ): if len(match[0]) != 0: md_format, text, link = match[:3] found.append({"markdown": md_format, "text": text, "link": link}) @@ -49,7 +51,7 @@ def extract_links(self): def extract_images(self): found = [] if self.data is not None: - for match in re.findall(r'(!\[([^\]]*)\]\(([^)]*)\))', self.data): + for match in re.findall(r"(!\[([^\]]*)\]\(([^)]*)\))", self.data): md_format, text, link = match found.append({"markdown": md_format, "text": text, "link": link}) return found diff --git a/sectools/fileformats/__init__.py b/sectools/fileformats/__init__.py index def908a..bb22c4d 100644 --- a/sectools/fileformats/__init__.py +++ b/sectools/fileformats/__init__.py @@ -4,4 +4,6 @@ # Author : Podalirius (@podalirius_) # Date created : 6 Nov 2022 -from .Markdown import Markdown \ No newline at end of file +from .Markdown import Markdown + +__all__ = ["Markdown"] diff --git a/sectools/network/domains.py b/sectools/network/domains.py index 434def2..1f96aa7 100644 --- a/sectools/network/domains.py +++ b/sectools/network/domains.py @@ -5,10 +5,20 @@ # Date created : 2 Aug 2022 import re + from sectools.data.regex import regex_domain def is_fqdn(target): + """ + Check if the target is a valid FQDN (Fully Qualified Domain Name). + + Args: + target (str): The string to check + + Returns: + bool: True if target is valid FQDN, False otherwise + """ outcome = False matched = re.match("^" + regex_domain + "$", target.strip()) if matched is not None: diff --git a/sectools/network/ip.py b/sectools/network/ip.py index 387f3df..37f7dc5 100644 --- a/sectools/network/ip.py +++ b/sectools/network/ip.py @@ -5,10 +5,20 @@ # Date created : 28 Jul 2022 import re + from sectools.data.regex import regex_ipv4, regex_ipv4_cidr, regex_ipv6 def is_ipv4_cidr(target) -> bool: + """ + Check if the target is a valid IPv4 CIDR notation address. + + Args: + target (str): The string to check + + Returns: + bool: True if target is valid IPv4 CIDR notation, False otherwise + """ outcome = False matched = re.match("^" + regex_ipv4_cidr + "$", target.strip()) if matched is not None: @@ -17,6 +27,15 @@ def is_ipv4_cidr(target) -> bool: def is_ipv4_addr(target) -> bool: + """ + Check if the target is a valid IPv4 address. + + Args: + target (str): The string to check + + Returns: + bool: True if target is valid IPv4 address, False otherwise + """ outcome = False matched = re.match("^" + regex_ipv4 + "$", target.strip()) if matched is not None: @@ -25,6 +44,15 @@ def is_ipv4_addr(target) -> bool: def is_ipv6_addr(target): + """ + Check if the target is a valid IPv6 address. + + Args: + target (str): The string to check + + Returns: + bool: True if target is valid IPv6 address, False otherwise + """ outcome = False matched = re.match("^" + regex_ipv6 + "$", target.strip()) if matched is not None: @@ -33,6 +61,15 @@ def is_ipv6_addr(target): def expand_cidr(cidr): + """ + Expands a CIDR notation address into a list of all individual IPv4 addresses in that range. + + Args: + cidr (str): CIDR notation address (e.g. "192.168.1.0/24") + + Returns: + list: List of IPv4 addresses as strings, empty list if invalid CIDR + """ if is_ipv4_cidr(cidr): matched = re.match("^" + regex_ipv4_cidr + "$", cidr.strip()) network_ip = matched.groups()[0] @@ -40,7 +77,9 @@ def expand_cidr(cidr): bits_mask = int(matched.groups()[-1]) # Applying bitmask network_ip_int = (network_ip_int >> (32 - bits_mask)) << (32 - bits_mask) - addresses = [ipv4_int_to_str(network_ip_int + k) for k in range(2 ** (32-bits_mask))] + addresses = [ + ipv4_int_to_str(network_ip_int + k) for k in range(2 ** (32 - bits_mask)) + ] return addresses else: print("[!] Invalid CIDR '%s'" % cidr) @@ -48,9 +87,18 @@ def expand_cidr(cidr): def expand_port_range(port_range): + """ + Expands a port range string into a list of individual port numbers. + + Args: + port_range (str): Port range string (e.g. "80", "1-1024", "-1024", "1024-", "-") + + Returns: + list: List of port numbers in the range + """ port_range = port_range.strip() ports = [] - matched = re.match('([0-9]+)?(-)?([0-9]+)?', port_range) + matched = re.match("([0-9]+)?(-)?([0-9]+)?", port_range) if matched is not None: start, sep, stop = matched.groups() if start is not None and (sep is None and stop is None): @@ -62,7 +110,7 @@ def expand_port_range(port_range): # Port range from start to 65535 start = int(start) if 0 <= start <= 65535: - ports = list(range(start, 65535+1)) + ports = list(range(start, 65535 + 1)) elif start is None and (sep is not None and stop is not None): # Port range from 0 to stop stop = int(stop) @@ -80,32 +128,67 @@ def expand_port_range(port_range): return ports - # IP conversion functions def ipv4_str_to_hex_str(ipv4) -> str: - a, b, c, d = map(int, ipv4.split('.')) - hexip = hex(a)[2:].rjust(2, '0') - hexip += hex(b)[2:].rjust(2, '0') - hexip += hex(c)[2:].rjust(2, '0') - hexip += hex(d)[2:].rjust(2, '0') + """ + Convert an IPv4 address string to hexadecimal string representation. + + Args: + ipv4 (str): IPv4 address string (e.g. "192.168.1.1") + + Returns: + str: Hexadecimal string representation of the IPv4 address + """ + a, b, c, d = map(int, ipv4.split(".")) + hexip = hex(a)[2:].rjust(2, "0") + hexip += hex(b)[2:].rjust(2, "0") + hexip += hex(c)[2:].rjust(2, "0") + hexip += hex(d)[2:].rjust(2, "0") return hexip def ipv4_str_to_raw_bytes(ipv4) -> bytes: - a, b, c, d = map(int, ipv4.split('.')) + """ + Convert an IPv4 address string to raw bytes. + + Args: + ipv4 (str): IPv4 address string (e.g. "192.168.1.1") + + Returns: + bytes: Raw bytes representation of the IPv4 address + """ + a, b, c, d = map(int, ipv4.split(".")) return bytes([a, b, c, d]) def ipv4_str_to_int(ipv4) -> bytes: - a, b, c, d = map(int, ipv4.split('.')) + """ + Convert an IPv4 address string to 32-bit integer. + + Args: + ipv4 (str): IPv4 address string (e.g. "192.168.1.1") + + Returns: + int: 32-bit integer representation of the IPv4 address + """ + a, b, c, d = map(int, ipv4.split(".")) return (a << 24) + (b << 16) + (c << 8) + d def ipv4_int_to_str(ipv4) -> str: - a = (ipv4 >> 24) & 0xff - b = (ipv4 >> 16) & 0xff - c = (ipv4 >> 8) & 0xff - d = (ipv4 >> 0) & 0xff + """ + Convert a 32-bit integer to IPv4 address string. + + Args: + ipv4 (int): 32-bit integer representation of IPv4 address + + Returns: + str: IPv4 address string (e.g. "192.168.1.1") + """ + a = (ipv4 >> 24) & 0xFF + b = (ipv4 >> 16) & 0xFF + c = (ipv4 >> 8) & 0xFF + d = (ipv4 >> 0) & 0xFF return "%d.%d.%d.%d" % (a, b, c, d) diff --git a/sectools/windows/crypto.py b/sectools/windows/crypto.py index ab25d4d..b169026 100644 --- a/sectools/windows/crypto.py +++ b/sectools/windows/crypto.py @@ -4,14 +4,16 @@ # Author : Podalirius (@podalirius_) # Date created : 30 Jul 2022 -import re import hashlib +import re def parse_lm_nt_hashes(lm_nt_hashes_string): lm_hash_value, nt_hash_value = "", "" if lm_nt_hashes_string is not None: - matched = re.match("([0-9a-f]{32})?(:)?([0-9a-f]{32})?", lm_nt_hashes_string.strip().lower()) + matched = re.match( + "([0-9a-f]{32})?(:)?([0-9a-f]{32})?", lm_nt_hashes_string.strip().lower() + ) m_lm_hash, _, m_nt_hash = matched.groups() if m_lm_hash is None and m_nt_hash is not None: lm_hash_value = "aad3b435b51404eeaad3b435b51404ee" @@ -26,10 +28,19 @@ def parse_lm_nt_hashes(lm_nt_hashes_string): def nt_hash(data): - if type(data) == str: - data = bytes(data, 'utf-16-le') + """ + Calculate the NT hash of the given data. + + Args: + data (str): The data to hash + + Returns: + str: The NT hash of the data + """ + if isinstance(data, str): + data = bytes(data, "utf-16-le") - ctx = hashlib.new('md4', data) + ctx = hashlib.new("md4", data) nt_hash_value = ctx.hexdigest() return nt_hash_value diff --git a/sectools/windows/ldap.py b/sectools/windows/ldap.py deleted file mode 100644 index 9aaecf3..0000000 --- a/sectools/windows/ldap.py +++ /dev/null @@ -1,472 +0,0 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# File name : ldap.py -# Author : Podalirius (@podalirius_) -# Date created : 30 Jul 2022 - - -from sectools.windows.crypto import parse_lm_nt_hashes -import ldap3 -import ssl - - -def ldap3_kerberos_login(connection, target, user, password, domain='', lmhash='', nthash='', aesKey='', kdcHost=None, TGT=None, TGS=None, useCache=True): - from pyasn1.codec.ber import encoder, decoder - from pyasn1.type.univ import noValue - """ - logins into the target system explicitly using Kerberos. Hashes are used if RC4_HMAC is supported. - :param string user: username - :param string password: password for the user - :param string domain: domain where the account is valid for (required) - :param string lmhash: LMHASH used to authenticate using hashes (password is not used) - :param string nthash: NTHASH used to authenticate using hashes (password is not used) - :param string aesKey: aes256-cts-hmac-sha1-96 or aes128-cts-hmac-sha1-96 used for Kerberos authentication - :param string kdcHost: hostname or IP Address for the KDC. If None, the domain will be used (it needs to resolve tho) - :param struct TGT: If there's a TGT available, send the structure here and it will be used - :param struct TGS: same for TGS. See smb3.py for the format - :param bool useCache: whether or not we should use the ccache for credentials lookup. If TGT or TGS are specified this is False - :return: True, raises an Exception if error. - """ - - # Importing down here so pyasn1 is not required if kerberos is not used. - from impacket.krb5.ccache import CCache - from impacket.krb5.asn1 import AP_REQ, Authenticator, TGS_REP, seq_set - from impacket.krb5.kerberosv5 import getKerberosTGT, getKerberosTGS - from impacket.krb5 import constants - from impacket.krb5.types import Principal, KerberosTime, Ticket - from impacket.spnego import SPNEGO_NegTokenInit, TypesMech - import datetime - - if TGT is not None or TGS is not None: - useCache = False - - targetName = 'ldap/%s' % target - if useCache: - domain, user, TGT, TGS = CCache.parseFile(domain, user, targetName) - - # First of all, we need to get a TGT for the user - userName = Principal(user, type=constants.PrincipalNameType.NT_PRINCIPAL.value) - if TGT is None: - if TGS is None: - tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT(userName, password, domain, lmhash, nthash, - aesKey, kdcHost) - else: - tgt = TGT['KDC_REP'] - cipher = TGT['cipher'] - sessionKey = TGT['sessionKey'] - - if TGS is None: - serverName = Principal(targetName, type=constants.PrincipalNameType.NT_SRV_INST.value) - tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS( - serverName, - domain, - kdcHost, - tgt, - cipher, - sessionKey - ) - else: - tgs = TGS['KDC_REP'] - cipher = TGS['cipher'] - sessionKey = TGS['sessionKey'] - - # Let's build a NegTokenInit with a Kerberos REQ_AP - - blob = SPNEGO_NegTokenInit() - - # Kerberos - blob['MechTypes'] = [TypesMech['MS KRB5 - Microsoft Kerberos 5']] - - # Let's extract the ticket from the TGS - tgs = decoder.decode(tgs, asn1Spec=TGS_REP())[0] - ticket = Ticket() - ticket.from_asn1(tgs['ticket']) - - # Now let's build the AP_REQ - apReq = AP_REQ() - apReq['pvno'] = 5 - apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value) - - opts = [] - apReq['ap-options'] = constants.encodeFlags(opts) - seq_set(apReq, 'ticket', ticket.to_asn1) - - authenticator = Authenticator() - authenticator['authenticator-vno'] = 5 - authenticator['crealm'] = domain - seq_set(authenticator, 'cname', userName.components_to_asn1) - now = datetime.datetime.utcnow() - - authenticator['cusec'] = now.microsecond - authenticator['ctime'] = KerberosTime.to_asn1(now) - - encodedAuthenticator = encoder.encode(authenticator) - - # Key Usage 11 - # AP-REQ Authenticator (includes application authenticator - # subkey), encrypted with the application session key - # (Section 5.5.1) - encryptedEncodedAuthenticator = cipher.encrypt(sessionKey, 11, encodedAuthenticator, None) - - apReq['authenticator'] = noValue - apReq['authenticator']['etype'] = cipher.enctype - apReq['authenticator']['cipher'] = encryptedEncodedAuthenticator - - blob['MechToken'] = encoder.encode(apReq) - - request = ldap3.operation.bind.bind_operation( - connection.version, - ldap3.SASL, - user, - None, - 'GSS-SPNEGO', - blob.getData() - ) - - # Done with the Kerberos saga, now let's get into LDAP - if connection.closed: # try to open connection if closed - connection.open(read_server_info=False) - - connection.sasl_in_progress = True - response = connection.post_send_single_response(connection.send('bindRequest', request, None)) - connection.sasl_in_progress = False - if response[0]['result'] != 0: - raise Exception(response) - - connection.bound = True - - return True - - -def __init_ldap_connection(target, tls_version, domain, username, password, lmhash, nthash, aesKey=None, kerberos=False, kdcHost=None): - DEBUG = False - - if nthash is None: - nthash = "" - if lmhash is None: - lmhash = "" - - if tls_version is not None: - use_ssl = True - port = 636 - tls = ldap3.Tls( - validate=ssl.CERT_NONE, - version=tls_version, - ciphers='ALL:@SECLEVEL=0' - ) - else: - use_ssl = False - port = 389 - tls = None - ldap_server = ldap3.Server( - host=target, - port=port, - use_ssl=use_ssl, - get_info=ldap3.ALL, - tls=tls - ) - - if kerberos: - if DEBUG: - print("[%s] Using Kerberos authentication" % __name__) - print(" | target", target) - print(" | username", username) - print(" | password", password) - print(" | domain", domain) - print(" | lmhash", lmhash) - print(" | nthash", nthash) - print(" | aesKey", aesKey) - print(" | kdcHost", kdcHost) - - ldap_session = ldap3.Connection(server=ldap_server) - ldap_session.bind() - - ldap3_kerberos_login( - connection=ldap_session, - target=target, - user=username, - password=password, - domain=domain, - lmhash=lmhash, - nthash=nthash, - aesKey=aesKey, - kdcHost=kdcHost - ) - - elif any([len(nthash) != 0, len(lmhash) != 0]): - if DEBUG: - print("[%s] Using Pass the Hash authentication" % __name__) - if (len(lmhash) == 0): - lmhash = "aad3b435b51404eeaad3b435b51404ee" - if (len(nthash) == 0): - nthash = "31d6cfe0d16ae931b73c59d7e0c089c0" - try: - ldap_session = ldap3.Connection( - server=ldap_server, - user='%s\\%s' % (domain, username), - password=lmhash + ":" + nthash, - authentication=ldap3.NTLM, - auto_bind=True - ) - except ldap3.core.exceptions.LDAPBindError as e: - if "strongerAuthRequired" in str(e): # to handle ldap signing on port 389 - if DEBUG: - print("[%s] Trying to handle LDAP signing" % __name__) - ldap_session = ldap3.Connection( - server=ldap_server, - user='%s\\%s' % (domain, username), - password=lmhash + ":" + nthash, - authentication=ldap3.NTLM, - auto_bind=True, - session_security='ENCRYPT' - ) - elif port == 636 and "invalidCredentials" in str(e): # to handle channel binding on port 636 (Exception is not different from truly invalid credentials...) - if DEBUG: - print("[%s] Trying to handle channel binding" % __name__) - ldap_session = ldap3.Connection( - server=ldap_server, - user='%s\\%s' % (domain, username), - password=lmhash + ":" + nthash, - authentication=ldap3.NTLM, - auto_bind=True, - channel_binding=ldap3.TLS_CHANNEL_BINDING - ) - - else: - if DEBUG: - print("[%s] Using user/password authentication" % __name__) - try: - ldap_session = ldap3.Connection( - server=ldap_server, - user='%s\\%s' % (domain, username), - password=password, - authentication=ldap3.NTLM, - auto_bind=True - ) - except ldap3.core.exceptions.LDAPBindError as e: - if "strongerAuthRequired" in str(e): # to handle ldap signing on port 389 - if DEBUG: - print("[%s] Trying to handle LDAP signing" % __name__) - ldap_session = ldap3.Connection( - server=ldap_server, - user='%s\\%s' % (domain, username), - password=password, - authentication=ldap3.NTLM, - auto_bind=True, - session_security='ENCRYPT' - ) - elif port == 636 and "invalidCredentials" in str(e): # to handle channel binding on port 636 (Exception is not different from truly invalid credentials...) - if DEBUG: - print("[%s] Trying to handle channel binding" % __name__) - ldap_session = ldap3.Connection( - server=ldap_server, - user='%s\\%s' % (domain, username), - password=password, - authentication=ldap3.NTLM, - auto_bind=True, - channel_binding=ldap3.TLS_CHANNEL_BINDING - ) - - return ldap_server, ldap_session - - -def init_ldap_session(auth_domain, auth_dc_ip, auth_username, auth_password, auth_lm_hash, auth_nt_hash, auth_key=None, use_kerberos=False, kdcHost=None, use_ldaps=False): - if use_kerberos: - target_dc = kdcHost - else: - target_dc = (auth_dc_ip if auth_dc_ip is not None else auth_domain) - - if use_ldaps is True: - try: - return __init_ldap_connection( - target=target_dc, - tls_version=ssl.PROTOCOL_TLSv1_2, - domain=auth_domain, - username=auth_username, - password=auth_password, - lmhash=auth_lm_hash, - nthash=auth_nt_hash, - aesKey=auth_key, - kdcHost=kdcHost, - kerberos=use_kerberos - ) - except ldap3.core.exceptions.LDAPSocketOpenError: - return __init_ldap_connection( - target=target_dc, - tls_version=ssl.PROTOCOL_TLSv1, - domain=auth_domain, - username=auth_username, - password=auth_password, - lmhash=auth_lm_hash, - nthash=auth_nt_hash, - aesKey=auth_key, - kdcHost=kdcHost, - kerberos=use_kerberos - ) - else: - return __init_ldap_connection( - target=target_dc, - tls_version=None, - domain=auth_domain, - username=auth_username, - password=auth_password, - lmhash=auth_lm_hash, - nthash=auth_nt_hash, - aesKey=auth_key, - kdcHost=kdcHost, - kerberos=use_kerberos - ) - - -def get_computers_from_domain(auth_domain, auth_dc_ip, auth_username, auth_password, auth_hashes, auth_key=None, use_kerberos=False, kdcHost=None, use_ldaps=False, __print=False): - auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(auth_hashes) - - ldap_server, ldap_session = init_ldap_session( - auth_domain=auth_domain, - auth_dc_ip=auth_dc_ip, - auth_username=auth_username, - auth_password=auth_password, - auth_lm_hash=auth_lm_hash, - auth_nt_hash=auth_nt_hash, - auth_key=auth_key, - use_kerberos=use_kerberos, - kdcHost=kdcHost, - use_ldaps=use_ldaps - ) - - if __print: - print("[>] Extracting all computers ...") - - computers = [] - searchbase = ldap_server.info.other["defaultNamingContext"] - results = list(ldap_session.extend.standard.paged_search(searchbase, "(objectCategory=computer)", attributes=["dNSHostName"])) - for entry in results: - if entry['type'] != 'searchResEntry': - continue - dNSHostName = entry["attributes"]['dNSHostName'] - if type(dNSHostName) == str: - computers.append(dNSHostName) - if type(dNSHostName) == list: - if len(dNSHostName) != 0: - for entry in dNSHostName: - computers.append(entry) - - if __print: - print("[+] Found %d computers in the domain." % len(computers)) - - return computers - - -def get_servers_from_domain(auth_domain, auth_dc_ip, auth_username, auth_password, auth_hashes, auth_key=None, use_kerberos=False, kdcHost=None, use_ldaps=False, __print=False): - auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(auth_hashes) - - ldap_server, ldap_session = init_ldap_session( - auth_domain=auth_domain, - auth_dc_ip=auth_dc_ip, - auth_username=auth_username, - auth_password=auth_password, - auth_lm_hash=auth_lm_hash, - auth_nt_hash=auth_nt_hash, - auth_key=auth_key, - use_kerberos=use_kerberos, - kdcHost=kdcHost, - use_ldaps=use_ldaps - ) - - if __print: - print("[>] Extracting all servers ...") - - servers = [] - searchbase = ldap_server.info.other["defaultNamingContext"] - results = list(ldap_session.extend.standard.paged_search(searchbase, "(&(objectCategory=computer)(operatingSystem=*Server*))",attributes=["dNSHostName"])) - for entry in results: - if entry['type'] != 'searchResEntry': - continue - dNSHostName = entry["attributes"]['dNSHostName'] - if type(dNSHostName) == str: - servers.append(dNSHostName) - if type(dNSHostName) == list: - if len(dNSHostName) != 0: - for entry in dNSHostName: - servers.append(entry) - - if __print: - print("[+] Found %d servers in the domain." % len(servers)) - - return servers - - -def get_subnets(auth_domain, auth_dc_ip, auth_username, auth_password, auth_hashes, auth_key=None, use_kerberos=False, kdcHost=None, use_ldaps=False, __print=False): - auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(auth_hashes) - - ldap_server, ldap_session = init_ldap_session( - auth_domain=auth_domain, - auth_dc_ip=auth_dc_ip, - auth_username=auth_username, - auth_password=auth_password, - auth_lm_hash=auth_lm_hash, - auth_nt_hash=auth_nt_hash, - auth_key=auth_key, - use_kerberos=use_kerberos, - kdcHost=kdcHost, - use_ldaps=use_ldaps - ) - - if __print: - print("[>] Extracting all subnets ...") - - subnets = [] - searchbase = ldap_server.info.other["configurationNamingContext"] - results = list(ldap_session.extend.standard.paged_search(searchbase, "(objectClass=site)",attributes=['distinguishedName', 'name', 'description'])) - sites = [] - for entry in results: - if entry['type'] != 'searchResEntry': - continue - sites.append((entry["dn"], entry["attributes"]["name"])) - - subnets = [] - for site_dn, site_name in sites: - results = list(ldap_session.extend.standard.paged_search( - "CN=Sites,"+ldap_server.info.other["configurationNamingContext"][0], - '(siteObject=%s)' % site_dn, - attributes=['distinguishedName', 'name', 'description']) - ) - for entry in results: - if entry['type'] != 'searchResEntry': - continue - subnets.append(entry["attributes"]["name"]) - - if __print: - print("[+] Found %d subnets in the domain." % len(subnets)) - - return subnets - - -def raw_ldap_query(auth_domain, auth_dc_ip, auth_username, auth_password, auth_hashes, query, auth_key=None, attributes=['*'], searchbase=None, use_kerberos=False, kdcHost=None, use_ldaps=False): - auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(auth_hashes) - - ldap_server, ldap_session = init_ldap_session( - auth_domain=auth_domain, - auth_dc_ip=auth_dc_ip, - auth_username=auth_username, - auth_password=auth_password, - auth_lm_hash=auth_lm_hash, - auth_nt_hash=auth_nt_hash, - auth_key=auth_key, - use_kerberos=use_kerberos, - kdcHost=kdcHost, - use_ldaps=use_ldaps - ) - - if searchbase is None: - searchbase = ldap_server.info.other["defaultNamingContext"] - - ldapresults = list(ldap_session.extend.standard.paged_search(searchbase, query, attributes=attributes)) - - results = {} - for entry in ldapresults: - if entry['type'] != 'searchResEntry': - continue - results[entry['dn']] = entry["attributes"] - - return results diff --git a/sectools/windows/ldap/ldap.py b/sectools/windows/ldap/ldap.py new file mode 100644 index 0000000..ec3b966 --- /dev/null +++ b/sectools/windows/ldap/ldap.py @@ -0,0 +1,408 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# File name : ldap.py +# Author : Podalirius (@podalirius_) +# Date created : 30 Jul 2022 + + +import ssl + +import ldap3 + +from sectools.windows.crypto import parse_lm_nt_hashes + + +def ldap3_kerberos_login( + connection, + target, + user, + password, + domain="", + lmhash="", + nthash="", + aesKey="", + kdcHost=None, + TGT=None, + TGS=None, + useCache=True, +): + from pyasn1.codec.ber import decoder, encoder + from pyasn1.type.univ import noValue + + """ + logins into the target system explicitly using Kerberos. Hashes are used if RC4_HMAC is supported. + :param string user: username + :param string password: password for the user + :param string domain: domain where the account is valid for (required) + :param string lmhash: LMHASH used to authenticate using hashes (password is not used) + :param string nthash: NTHASH used to authenticate using hashes (password is not used) + :param string aesKey: aes256-cts-hmac-sha1-96 or aes128-cts-hmac-sha1-96 used for Kerberos authentication + :param string kdcHost: hostname or IP Address for the KDC. If None, the domain will be used (it needs to resolve tho) + :param struct TGT: If there's a TGT available, send the structure here and it will be used + :param struct TGS: same for TGS. See smb3.py for the format + :param bool useCache: whether or not we should use the ccache for credentials lookup. If TGT or TGS are specified this is False + :return: True, raises an Exception if error. + """ + + # Importing down here so pyasn1 is not required if kerberos is not used. + import datetime + + from impacket.krb5 import constants + from impacket.krb5.asn1 import AP_REQ, TGS_REP, Authenticator, seq_set + from impacket.krb5.ccache import CCache + from impacket.krb5.kerberosv5 import getKerberosTGS, getKerberosTGT + from impacket.krb5.types import KerberosTime, Principal, Ticket + from impacket.spnego import SPNEGO_NegTokenInit, TypesMech + + if TGT is not None or TGS is not None: + useCache = False + + targetName = "ldap/%s" % target + if useCache: + domain, user, TGT, TGS = CCache.parseFile(domain, user, targetName) + + # First of all, we need to get a TGT for the user + userName = Principal(user, type=constants.PrincipalNameType.NT_PRINCIPAL.value) + if TGT is None: + if TGS is None: + tgt, cipher, oldSessionKey, sessionKey = getKerberosTGT( + userName, password, domain, lmhash, nthash, aesKey, kdcHost + ) + else: + tgt = TGT["KDC_REP"] + cipher = TGT["cipher"] + sessionKey = TGT["sessionKey"] + + if TGS is None: + serverName = Principal( + targetName, type=constants.PrincipalNameType.NT_SRV_INST.value + ) + tgs, cipher, oldSessionKey, sessionKey = getKerberosTGS( + serverName, domain, kdcHost, tgt, cipher, sessionKey + ) + else: + tgs = TGS["KDC_REP"] + cipher = TGS["cipher"] + sessionKey = TGS["sessionKey"] + + # Let's build a NegTokenInit with a Kerberos REQ_AP + + blob = SPNEGO_NegTokenInit() + + # Kerberos + blob["MechTypes"] = [TypesMech["MS KRB5 - Microsoft Kerberos 5"]] + + # Let's extract the ticket from the TGS + tgs = decoder.decode(tgs, asn1Spec=TGS_REP())[0] + ticket = Ticket() + ticket.from_asn1(tgs["ticket"]) + + # Now let's build the AP_REQ + apReq = AP_REQ() + apReq["pvno"] = 5 + apReq["msg-type"] = int(constants.ApplicationTagNumbers.AP_REQ.value) + + opts = [] + apReq["ap-options"] = constants.encodeFlags(opts) + seq_set(apReq, "ticket", ticket.to_asn1) + + authenticator = Authenticator() + authenticator["authenticator-vno"] = 5 + authenticator["crealm"] = domain + seq_set(authenticator, "cname", userName.components_to_asn1) + now = datetime.datetime.utcnow() + + authenticator["cusec"] = now.microsecond + authenticator["ctime"] = KerberosTime.to_asn1(now) + + encodedAuthenticator = encoder.encode(authenticator) + + # Key Usage 11 + # AP-REQ Authenticator (includes application authenticator + # subkey), encrypted with the application session key + # (Section 5.5.1) + encryptedEncodedAuthenticator = cipher.encrypt( + sessionKey, 11, encodedAuthenticator, None + ) + + apReq["authenticator"] = noValue + apReq["authenticator"]["etype"] = cipher.enctype + apReq["authenticator"]["cipher"] = encryptedEncodedAuthenticator + + blob["MechToken"] = encoder.encode(apReq) + + request = ldap3.operation.bind.bind_operation( + connection.version, ldap3.SASL, user, None, "GSS-SPNEGO", blob.getData() + ) + + # Done with the Kerberos saga, now let's get into LDAP + if connection.closed: # try to open connection if closed + connection.open(read_server_info=False) + + connection.sasl_in_progress = True + response = connection.post_send_single_response( + connection.send("bindRequest", request, None) + ) + connection.sasl_in_progress = False + if response[0]["result"] != 0: + raise Exception(response) + + connection.bound = True + + return True + + +def __init_ldap_connection( + target, + tls_version, + domain, + username, + password, + lmhash, + nthash, + aesKey=None, + kerberos=False, + kdcHost=None, +): + DEBUG = False + + if nthash is None: + nthash = "" + if lmhash is None: + lmhash = "" + + if tls_version is not None: + use_ssl = True + port = 636 + tls = ldap3.Tls( + validate=ssl.CERT_NONE, version=tls_version, ciphers="ALL:@SECLEVEL=0" + ) + else: + use_ssl = False + port = 389 + tls = None + ldap_server = ldap3.Server( + host=target, port=port, use_ssl=use_ssl, get_info=ldap3.ALL, tls=tls + ) + + ldap_session = None + + if kerberos: + if DEBUG: + print("[%s] Using Kerberos authentication" % __name__) + print(" | target", target) + print(" | username", username) + print(" | password", password) + print(" | domain", domain) + print(" | lmhash", lmhash) + print(" | nthash", nthash) + print(" | aesKey", aesKey) + print(" | kdcHost", kdcHost) + + ldap_session = ldap3.Connection(server=ldap_server) + ldap_session.bind() + + ldap3_kerberos_login( + connection=ldap_session, + target=target, + user=username, + password=password, + domain=domain, + lmhash=lmhash, + nthash=nthash, + aesKey=aesKey, + kdcHost=kdcHost, + ) + + elif any([len(nthash) != 0, len(lmhash) != 0]): + if DEBUG: + print("[%s] Using Pass the Hash authentication" % __name__) + if len(lmhash) == 0: + lmhash = "aad3b435b51404eeaad3b435b51404ee" + if len(nthash) == 0: + nthash = "31d6cfe0d16ae931b73c59d7e0c089c0" + try: + ldap_session = ldap3.Connection( + server=ldap_server, + user="%s\\%s" % (domain, username), + password=lmhash + ":" + nthash, + authentication=ldap3.NTLM, + auto_bind=True, + ) + except ldap3.core.exceptions.LDAPBindError as e: + if "strongerAuthRequired" in str(e): # to handle ldap signing on port 389 + if DEBUG: + print("[%s] Trying to handle LDAP signing" % __name__) + ldap_session = ldap3.Connection( + server=ldap_server, + user="%s\\%s" % (domain, username), + password=lmhash + ":" + nthash, + authentication=ldap3.NTLM, + auto_bind=True, + session_security="ENCRYPT", + ) + elif port == 636 and "invalidCredentials" in str( + e + ): # to handle channel binding on port 636 (Exception is not different from truly invalid credentials...) + if DEBUG: + print("[%s] Trying to handle channel binding" % __name__) + ldap_session = ldap3.Connection( + server=ldap_server, + user="%s\\%s" % (domain, username), + password=lmhash + ":" + nthash, + authentication=ldap3.NTLM, + auto_bind=True, + channel_binding=ldap3.TLS_CHANNEL_BINDING, + ) + else: + raise + + else: + if DEBUG: + print("[%s] Using user/password authentication" % __name__) + try: + ldap_session = ldap3.Connection( + server=ldap_server, + user="%s\\%s" % (domain, username), + password=password, + authentication=ldap3.NTLM, + auto_bind=True, + ) + except ldap3.core.exceptions.LDAPBindError as e: + if "strongerAuthRequired" in str(e): # to handle ldap signing on port 389 + if DEBUG: + print("[%s] Trying to handle LDAP signing" % __name__) + ldap_session = ldap3.Connection( + server=ldap_server, + user="%s\\%s" % (domain, username), + password=password, + authentication=ldap3.NTLM, + auto_bind=True, + session_security="ENCRYPT", + ) + elif port == 636 and "invalidCredentials" in str( + e + ): # to handle channel binding on port 636 (Exception is not different from truly invalid credentials...) + if DEBUG: + print("[%s] Trying to handle channel binding" % __name__) + ldap_session = ldap3.Connection( + server=ldap_server, + user="%s\\%s" % (domain, username), + password=password, + authentication=ldap3.NTLM, + auto_bind=True, + channel_binding=ldap3.TLS_CHANNEL_BINDING, + ) + else: + raise + + if ldap_session is None: + raise ldap3.core.exceptions.LDAPBindError("Failed to establish LDAP session") + + return ldap_server, ldap_session + + +def init_ldap_session( + auth_domain, + auth_dc_ip, + auth_username, + auth_password, + auth_lm_hash, + auth_nt_hash, + auth_key=None, + use_kerberos=False, + kdcHost=None, + use_ldaps=False, +): + if use_kerberos: + target_dc = kdcHost + else: + target_dc = auth_dc_ip if auth_dc_ip is not None else auth_domain + + if use_ldaps is True: + try: + return __init_ldap_connection( + target=target_dc, + tls_version=ssl.PROTOCOL_TLSv1_2, + domain=auth_domain, + username=auth_username, + password=auth_password, + lmhash=auth_lm_hash, + nthash=auth_nt_hash, + aesKey=auth_key, + kdcHost=kdcHost, + kerberos=use_kerberos, + ) + except ldap3.core.exceptions.LDAPSocketOpenError: + return __init_ldap_connection( + target=target_dc, + tls_version=ssl.PROTOCOL_TLSv1, + domain=auth_domain, + username=auth_username, + password=auth_password, + lmhash=auth_lm_hash, + nthash=auth_nt_hash, + aesKey=auth_key, + kdcHost=kdcHost, + kerberos=use_kerberos, + ) + else: + return __init_ldap_connection( + target=target_dc, + tls_version=None, + domain=auth_domain, + username=auth_username, + password=auth_password, + lmhash=auth_lm_hash, + nthash=auth_nt_hash, + aesKey=auth_key, + kdcHost=kdcHost, + kerberos=use_kerberos, + ) + + +def raw_ldap_query( + auth_domain, + auth_dc_ip, + auth_username, + auth_password, + auth_hashes, + query, + auth_key=None, + attributes=["*"], + searchbase=None, + use_kerberos=False, + kdcHost=None, + use_ldaps=False, +): + auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(auth_hashes) + + ldap_server, ldap_session = init_ldap_session( + auth_domain=auth_domain, + auth_dc_ip=auth_dc_ip, + auth_username=auth_username, + auth_password=auth_password, + auth_lm_hash=auth_lm_hash, + auth_nt_hash=auth_nt_hash, + auth_key=auth_key, + use_kerberos=use_kerberos, + kdcHost=kdcHost, + use_ldaps=use_ldaps, + ) + + if searchbase is None: + searchbase = ldap_server.info.other["defaultNamingContext"] + + ldapresults = list( + ldap_session.extend.standard.paged_search( + searchbase, query, attributes=attributes + ) + ) + + results = {} + for entry in ldapresults: + if entry["type"] != "searchResEntry": + continue + results[entry["dn"]] = entry["attributes"] + + return results diff --git a/sectools/windows/ldap/wrappers.py b/sectools/windows/ldap/wrappers.py new file mode 100644 index 0000000..2ebd9b3 --- /dev/null +++ b/sectools/windows/ldap/wrappers.py @@ -0,0 +1,184 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# File name : wrappers.py +# Author : Podalirius (@podalirius_) +# Date created : 2 Aug 2022 + + +from sectools.windows.crypto import parse_lm_nt_hashes +from sectools.windows.ldap import init_ldap_session + + +def get_computers_from_domain( + auth_domain, + auth_dc_ip, + auth_username, + auth_password, + auth_hashes, + auth_key=None, + use_kerberos=False, + kdcHost=None, + use_ldaps=False, + __print=False, +): + auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(auth_hashes) + + ldap_server, ldap_session = init_ldap_session( + auth_domain=auth_domain, + auth_dc_ip=auth_dc_ip, + auth_username=auth_username, + auth_password=auth_password, + auth_lm_hash=auth_lm_hash, + auth_nt_hash=auth_nt_hash, + auth_key=auth_key, + use_kerberos=use_kerberos, + kdcHost=kdcHost, + use_ldaps=use_ldaps, + ) + + if __print: + print("[>] Extracting all computers ...") + + computers = [] + searchbase = ldap_server.info.other["defaultNamingContext"] + results = list( + ldap_session.extend.standard.paged_search( + searchbase, "(objectCategory=computer)", attributes=["dNSHostName"] + ) + ) + for entry in results: + if entry["type"] != "searchResEntry": + continue + dNSHostName = entry["attributes"]["dNSHostName"] + if isinstance(dNSHostName, str): + computers.append(dNSHostName) + if isinstance(dNSHostName, list): + if len(dNSHostName) != 0: + for entry in dNSHostName: + computers.append(entry) + + if __print: + print("[+] Found %d computers in the domain." % len(computers)) + + return computers + + +def get_servers_from_domain( + auth_domain, + auth_dc_ip, + auth_username, + auth_password, + auth_hashes, + auth_key=None, + use_kerberos=False, + kdcHost=None, + use_ldaps=False, + __print=False, +): + auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(auth_hashes) + + ldap_server, ldap_session = init_ldap_session( + auth_domain=auth_domain, + auth_dc_ip=auth_dc_ip, + auth_username=auth_username, + auth_password=auth_password, + auth_lm_hash=auth_lm_hash, + auth_nt_hash=auth_nt_hash, + auth_key=auth_key, + use_kerberos=use_kerberos, + kdcHost=kdcHost, + use_ldaps=use_ldaps, + ) + + if __print: + print("[>] Extracting all servers ...") + + servers = [] + searchbase = ldap_server.info.other["defaultNamingContext"] + results = list( + ldap_session.extend.standard.paged_search( + searchbase, + "(&(objectCategory=computer)(operatingSystem=*Server*))", + attributes=["dNSHostName"], + ) + ) + for entry in results: + if entry["type"] != "searchResEntry": + continue + dNSHostName = entry["attributes"]["dNSHostName"] + if isinstance(dNSHostName, str): + servers.append(dNSHostName) + if isinstance(dNSHostName, list): + if len(dNSHostName) != 0: + for entry in dNSHostName: + servers.append(entry) + + if __print: + print("[+] Found %d servers in the domain." % len(servers)) + + return servers + + +def get_subnets( + auth_domain, + auth_dc_ip, + auth_username, + auth_password, + auth_hashes, + auth_key=None, + use_kerberos=False, + kdcHost=None, + use_ldaps=False, + __print=False, +): + auth_lm_hash, auth_nt_hash = parse_lm_nt_hashes(auth_hashes) + + ldap_server, ldap_session = init_ldap_session( + auth_domain=auth_domain, + auth_dc_ip=auth_dc_ip, + auth_username=auth_username, + auth_password=auth_password, + auth_lm_hash=auth_lm_hash, + auth_nt_hash=auth_nt_hash, + auth_key=auth_key, + use_kerberos=use_kerberos, + kdcHost=kdcHost, + use_ldaps=use_ldaps, + ) + + if __print: + print("[>] Extracting all subnets ...") + + subnets = [] + searchbase = ldap_server.info.other["configurationNamingContext"] + results = list( + ldap_session.extend.standard.paged_search( + searchbase, + "(objectClass=site)", + attributes=["distinguishedName", "name", "description"], + ) + ) + sites = [] + for entry in results: + if entry["type"] != "searchResEntry": + continue + sites.append((entry["dn"], entry["attributes"]["name"])) + + subnets = [] + for site_dn, site_name in sites: + results = list( + ldap_session.extend.standard.paged_search( + "CN=Sites," + ldap_server.info.other["configurationNamingContext"][0], + "(siteObject=%s)" % site_dn, + attributes=["distinguishedName", "name", "description"], + ) + ) + for entry in results: + if entry["type"] != "searchResEntry": + continue + subnets.append(entry["attributes"]["name"]) + + if __print: + print("[+] Found %d subnets in the domain." % len(subnets)) + + return subnets From 05212129695368606f5db2172682bb12aa06ec39 Mon Sep 17 00:00:00 2001 From: "Remi GASCOU (Podalirius)" <79218792+p0dalirius@users.noreply.github.com> Date: Wed, 17 Sep 2025 09:03:48 +0200 Subject: [PATCH 2/2] Updated workflow --- .github/workflows/release-tests.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/release-tests.yml b/.github/workflows/release-tests.yml index 1b783e9..12d6ee1 100644 --- a/.github/workflows/release-tests.yml +++ b/.github/workflows/release-tests.yml @@ -27,18 +27,18 @@ jobs: - name: Run all tests run: | - cd bhopengraph/tests + cd sectools/tests python run_tests.py - name: Run tests with verbose output run: | - cd bhopengraph/tests + cd sectools/tests python -m unittest discover -v - name: Run tests with coverage run: | pip install coverage - coverage run --source=bhopengraph bhopengraph/tests/run_tests.py + coverage run --source=sectools sectools/tests/run_tests.py coverage report build-package: @@ -66,5 +66,5 @@ jobs: - name: Upload build artifacts uses: actions/upload-artifact@v4 with: - name: bhopengraph-package + name: sectools-package path: dist/