diff --git a/.github/workflows/nightly.yml b/.github/workflows/nightly.yml new file mode 100644 index 0000000..a7c0212 --- /dev/null +++ b/.github/workflows/nightly.yml @@ -0,0 +1,43 @@ +name: Nightly Build + +on: + workflow_dispatch: + inputs: + branch: + description: Branch to build from + required: true + default: main + type: string + +permissions: + contents: read + packages: write + +jobs: + docker: + runs-on: ubuntu-latest + + steps: + - name: Checkout code + uses: actions/checkout@v4 + with: + ref: ${{ inputs.branch }} + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Login to GitHub Container Registry + uses: docker/login-action@v3 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Build and push nightly image + uses: docker/build-push-action@v6 + with: + context: . + push: true + tags: ghcr.io/${{ github.repository }}:nightly + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..aa29b91 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,148 @@ +# Contributing to uPKI CLI + +Thank you for your interest in contributing to uPKI CLI. This document provides guidelines and best practices for contributing to this project. + +## Table of Contents + +- [Code of Conduct](#code-of-conduct) +- [Getting Started](#getting-started) +- [Development Environment](#development-environment) +- [Architecture](#architecture) +- [Coding Standards](#coding-standards) +- [Testing](#testing) +- [Submitting Changes](#submitting-changes) +- [Reporting Issues](#reporting-issues) + +## Code of Conduct + +By participating in this project, you agree to maintain a respectful and inclusive environment. We are committed to providing a welcoming and safe experience for everyone. + +- Be respectful and inclusive in your communications +- Accept constructive criticism positively +- Focus on what is best for the community +- Show empathy towards other community members + +## Getting Started + +1. **Fork the repository** — Click the "Fork" button on GitHub to create your own copy +2. **Clone your fork** — `git clone https://github.com/YOUR_USERNAME/upki-cli.git` +3. **Add upstream remote** — `git remote add upstream https://github.com/circle-rd/upki-cli.git` +4. **Create a branch** — `git checkout -b feature/your-feature-name` + +## Development Environment + +### Prerequisites + +- Python 3.11 or higher +- Git +- A running uPKI RA instance (for integration testing) + +### Setup + +```bash +# Clone the repository +git clone https://github.com/circle-rd/upki-cli.git +cd upki-cli + +# Create a virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install dependencies +pip install -r requirements.txt +``` + +## Architecture + +uPKI CLI is structured as a thin command-line wrapper around an ACME v2 client: + +``` +client/ + acme_client.py — ACME v2 client (RFC 8555): account key, JWS signing, + certificate enrollment, renewal, revocation, CRL fetch. + Uses httpx + cryptography; no subprocess openssl. + bot.py — High-level coordinator: CA cert management, browser + integration (certutil/pk12util), delegation to AcmeClient. + collection.py — Local node registry (cli.nodes.json). + upkiLogger.py — Logging helper. +``` + +### Key design decisions + +- **No josepy dependency** — JWS flattened JSON format is implemented directly using `cryptography`. +- **No subprocess openssl** — CSR and PKCS#12 generation use `cryptography` builders. +- **EC P-256 account key** — Stored at `/acme_account.key`, loaded on subsequent runs. +- **RFC 7638 thumbprint** — Computed over `{"crv","kty","x","y"}` in lexicographic order. +- **Flattened JSON JWS** — `{"protected","payload","signature"}` as required by RFC 8555. +- **P1363 EC signatures** — `r || s` byte format used in JWS (not DER). + +## Coding Standards + +### Style + +- Follow [PEP 8](https://peps.python.org/pep-0008/) with a line length of 100. +- Use f-strings for string formatting (no `.format()` or `%`). +- Use type annotations for all public function signatures. +- Use `from __future__ import annotations` for Python 3.11+ forward-reference support. + +### Dead code policy + +- **No deprecated code** — Remove it entirely. +- **No backward-compatibility shims** — Break cleanly and document in release notes. +- **No commented-out code** in commits. + +### Error handling + +- Use `raise ... from err` when re-raising exceptions to preserve the chain. +- Only validate at system boundaries (user input, HTTP responses, file I/O). +- Do not add defensive guards for conditions the caller guarantees. + +### Security + +- Never log private key material. +- Store key files with mode `0o400` (owner read-only). +- Store account config files with mode `0o600`. +- CA certificate is write-protected at `0o444` after installation. + +## Testing + +Currently the CLI does not have unit tests (ACME integration is tested in upki-ra). When adding tests: + +- Unit test pure logic (e.g. `_b64url`, `_thumbprint`) without network access. +- Mock `httpx.Client` for ACME endpoint tests. +- Do not write tests that shell out to `openssl`. + +## Submitting Changes + +### Pull Request Process + +1. **Write clear commit messages** following [Conventional Commits](https://www.conventionalcommits.org/): + - `feat:` — new feature + - `fix:` — bug fix + - `refactor:` — code change that neither fixes a bug nor adds a feature + - `docs:` — documentation only + - `chore:` — maintenance (deps, CI, etc.) + +2. **Keep PRs focused** — one logical change per PR. + +3. **Update documentation** if you change behavior visible to users. + +4. **Pass CI** — ensure `pip install -r requirements.txt` completes and there are no import errors. + +### Branch Naming + +- `feature/short-description` +- `fix/short-description` +- `refactor/short-description` + +## Reporting Issues + +When filing a bug report, please include: + +- Python version (`python --version`) +- OS name and version +- Steps to reproduce +- Expected vs actual behaviour +- Relevant log output (redact any private key or password material) + +For security vulnerabilities, please **do not open a public issue**. Contact the maintainers directly. diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..0db39f5 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,13 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +ENV PYTHONUNBUFFERED=1 +ENV PYTHONDONTWRITEBYTECODE=1 + +ENTRYPOINT ["python", "client.py"] diff --git a/LICENSE b/LICENSE index db2ef7a..62ba4e0 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2018 MIT ProHacktive - team@prohacktive.io +Copyright (c) 2024 CIRCLE Cyber - contact@circle-cyber.com Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/MIGRATION.md b/MIGRATION.md new file mode 100644 index 0000000..0a317ef --- /dev/null +++ b/MIGRATION.md @@ -0,0 +1,245 @@ +# upki-cli - Plan de Migration + +## Vue d'ensemble du projet + +- **Projet**: upki-cli (Client CLI pour uPKI) +- **Langage**: Python 3 +- **Technologies actuelles**: Click (CLI), ZMQ (communication avec CA/RA), requests (API REST optionnelle) +- **Objectif**: Améliorer l'expérience utilisateur, ajouter TypeScript SDK + +--- + +## 1. Refactoring CLI + +### 1.1 Architecture actuelle + +L CLI actuel utilise Click pour gérer les commandes. La refactorisation vise à améliorer la structure. + +### 1.2 Structure cible + +``` +upki-cli/ +├── upkicli/ +│ ├── __init__.py +│ ├── cli/ +│ │ ├── __init__.py +│ │ ├── main.py # Entry point Click +│ │ ├── commands/ +│ │ │ ├── __init__.py +│ │ │ ├── cert.py # Commandes certificats +│ │ │ ├── profile.py # Commandes profils +│ │ │ └── admin.py # Commandes admin +│ │ └── options.py # Options partagées +│ ├── client/ +│ │ ├── __init__.py +│ │ ├── base.py # Client de base +│ │ ├── zmq_client.py # Client ZMQ +│ │ └── rest_client.py # Client REST (optionnel) +│ ├── validators/ +│ │ └── input.py # Validation des entrées +│ └── formatters/ +│ ├── __init__.py +│ └── output.py # Formatage JSON/table +├── requirements.txt +└── setup.py +``` + +--- + +## 2. Expérience utilisateur + +### 2.1 Assistant interactif + +**Objectif**: Faciliter l'utilisation pour les néophytes. + +```python +# Exemple: Commande interactive pour créer un certificat +import click +from upkicli.commands import interactive + +@cert.command() +@interactive +def create_interactive(): + """Créer un certificat avec assistant interactif.""" + fqdn = click.prompt("FQDN du service", type=str) + profile = click.prompt( + "Profil à utiliser", + type=click.Choice(['docker-internal', 'web-server', 'internal-service']), + default='docker-internal' + ) + # Suite du processus... +``` + +### 2.2 Profils suggérés + +| Commande | Description | +| ------------------------- | ---------------------------------------- | +| `upki cert suggest fqdn` | Suggère le profil optimal | +| `upki cert create --auto` | Création automatique avec profil suggéré | + +### 2.3 Sortie améliorée + +| Format | Utilisation | +| ------- | -------------------------------- | +| `json` | Sortie structurée pour scripting | +| `table` | Sortie lisible pour humains | +| `yaml` | Configuration Docker/Traefik | + +```bash +# Exemples d'utilisation +upki cert list --format table +upki cert get mycert --format json +upki cert export traefik --format yaml +``` + +--- + +## 3. Communication + +### 3.1 Support double protocole + +Le CLI doit supporter ZMQ et REST (pour les futures intégrations). + +```python +from abc import ABC, abstractmethod +from typing import Optional + +class ClientBase(ABC): + @abstractmethod + def request_certificate(self, fqdn: str, profile: str) -> dict: + pass + + @abstractmethod + def get_certificate(self, cert_id: str) -> dict: + pass + +class ZMQClient(ClientBase): + """Client ZMQ pour communication directe avec CA.""" + def __init__(self, host: str, port: int): + self.host = host + self.port = port + + def request_certificate(self, fqdn: str, profile: str) -> dict: + # Implémentation ZMQ + pass + +class RESTClient(ClientBase): + """Client REST pour communication avec RA via API.""" + def __init__(self, base_url: str, api_key: Optional[str] = None): + self.base_url = base_url + self.api_key = api_key + + def request_certificate(self, fqdn: str, profile: str) -> dict: + # Implémentation REST + pass +``` + +### 3.2 Configuration centralisée + +```yaml +# ~/.upki/config.yml +ca: + host: 127.0.0.1 + port: 5000 + protocol: zmq # zmq | rest + +ra: + url: http://127.0.0.1:8000 + api_key: null + +defaults: + profile: docker-internal + format: table +``` + +--- + +## 4. TypeScript SDK - Future + +### 4.1 Objectif + +Créer un SDK TypeScript pour intégration native dans: + +- Applications Node.js +- Scripts de déploiement +- Agents IA + +### 4.2 Structure TypeScript + +``` +upki-sdk/ +├── package.json +├── tsconfig.json +├── src/ +│ ├── index.ts # Export principal +│ ├── client.ts # Client HTTP +│ ├── types.ts # Types partagés +│ └── errors.ts # Erreurs personnalisées +├── dist/ # Compilé +└── README.md +``` + +### 4.3 API TypeScript + +```typescript +import { UPKIClient, CertificateRequest } from "@circle/upki-sdk"; + +const client = new UPKIClient({ + baseUrl: "http://ra.upki.local:8000", + apiKey: process.env.upki_api_key, +}); + +async function requestCert() { + const request: CertificateRequest = { + fqdn: "nginx.docker.internal", + profile: "docker-internal", + }; + + const cert = await client.certificates.create(request); + console.log(cert); +} +``` + +--- + +## 5. Roadmap + +### 5.1 Court terme (v2.1.x) + +- [ ] Refactoring structure CLI +- [ ] Assistant interactif +- [ ] Sortie multi-format (JSON, table, YAML) + +### 5.2 Moyen terme (v2.2.x) + +- [ ] Support REST en plus de ZMQ +- [ ] Configuration centralisée YAML +- [ ] Validation des entrées avancée + +### 5.3 Long terme (v3.0) + +- [ ] SDK TypeScript +- [ ] Intégration MCP pour agents IA +- [ ] Plugins système (completion, hooks) + +--- + +## 6. Notes de migration + +### Compatibilité + +- **Commandes**: Conserver les mêmes noms de commandes pour compatibilité +- **Arguments**: backward compatible avec les arguments existants +- **API**: Support simultané ZMQ et REST + +### Dépendances à ajouter (Python) + +- `pyyaml` pour configuration +- `tabulate` pour sortie table +- `requests` pour REST (optionnel) + +### Dépendances TypeScript (future) + +- `typescript` >= 5.0 +- `@types/node` >= 18 +- `axios` pour HTTP diff --git a/README.md b/README.md index c575bb9..6c16148 100644 --- a/README.md +++ b/README.md @@ -1,45 +1,46 @@ -![ProHacktive](https://prohacktive.io/assets/v2/img/logo-prohacktive-purple.png "uPKI from ProHacktive.io") - # µPKI-CLI -***NOT READY FOR PRODUCTION USE*** -This project has only been tested on few distributions with Python3.6. -Due to python usage it *SHOULD* works on many other configurations, but it has NOT been tested. -Known working OS: -> - Debian 9 Strech (CA/RA/CLI) -> - Debian 10 Buster (CA/RA/CLI) -> - Ubuntu 18.04 (CA/RA/CLI) -> - MacOS Catalina 10.15 (CLI - without update services) -> - MacOS Mojave 10.14 (CLI - without update services) + +**NOT READY FOR PRODUCTION USE** + +This project has only been tested on Python 3.11 to 3.13. +Due to Python usage it _SHOULD_ work on many other configurations, but it has NOT been tested. ## 1. About -µPki [maɪkroʊ ˈpiː-ˈkeɪ-ˈaɪ] is a small PKI in python that should let you make basic tasks without effort. + +µPKI [maɪkroʊ ˈpiː-ˈkeɪ-ˈaɪ] is a small PKI in Python that should let you make basic tasks without effort. It works in combination with: -> - [µPKI-CA](https://github.com/proh4cktive/upki) -> - [µPKI-RA](https://github.com/proh4cktive/upki-ra) -> - [µPKI-WEB](https://github.com/proh4cktive/upki-web) -µPki-CLI is the client app that interact with the [µPKI-RA](https://github.com/proh4cktive/upki-ra) Registration Authority. +- [µPKI-CA](https://github.com/circle-rd/upki) - Certification Authority +- [µPKI-RA](https://github.com/circle-rd/upki-ra) - Registration Authority + +µPKI-CLI is the client app that interacts with the [µPKI-RA](https://github.com/circle-rd/upki-ra) Registration Authority. ### 1.1 Dependencies -The following modules are required + +The following modules are required: + - Requests -Some systems libs & tools are also required, make sure you have them pre-installed +Some systems libs & tools are also required, make sure you have them pre-installed: + ```bash sudo apt update sudo apt -y install build-essential python3-dev python3-pip git ``` ## 2. Install -The Installation process require three different phases: -1. clone the current repository +The Installation process requires three different phases: + +1. Clone the current repository: + ```bash -git clone https://github.com/proh4cktive/upki-cli +git clone https://github.com/circle-rd/upki-cli cd ./upki-cli ``` -2. Install the dependencies and upki-client service timer in order to re-generate local certificates if needed. Registration Authority URL is required at this step +2. Install the dependencies and upki-client service timer in order to re-generate local certificates if needed. Registration Authority URL is required at this step: + ```bash ./install.sh --url https://certificates.domain.com ``` @@ -47,62 +48,100 @@ cd ./upki-cli 3. Setup certificates required (cf. Usage below) ## 3. Usage -µPki-CLI is the µPki client and should be installed on server/customer host that will receive the final certificate. µPki-CLI is responsible for private key and certificate request generation. + +µPKI-CLI is the µPKI client and should be installed on server/customer host that will receive the final certificate. µPKI-CLI is responsible for private key and certificate request generation. ### 3.1 Add a certificate -*Note: On basic configuration you can add a certificate localy only if it add been registered on RA by an admin. To setup your Registration Authority (RA) please check [µPKI-RA](https://github.com/proh4cktive/upki-ra).* -Call the client script with 'add' action +_Note: On basic configuration you can add a certificate locally only if it has been registered on RA by an admin. To setup your Registration Authority (RA) please check [µPKI-RA](https://github.com/circle-rd/upki-ra)._ + +Call the client script with 'add' action: + ```bash ./client.py --url https://certificates.domain.com add ``` -For browser integration call the client script with 'add' action and browser flags +For browser integration call the client script with 'add' action and browser flags: + ```bash ./client.py --url https://certificates.domain.com add --firefox --chrome ``` ### 3.2 List all certificates -You can list all certificates registered locally (this does not reflect what is configured on the RA server). + +You can list all certificates registered locally (this does not reflect what is configured on the RA server): + ```bash ./client.py --url https://certificates.domain.com list ``` ### 3.3 Delete a certificate -You can un-register a locally defined certificate (note: this will not affect RA configuration). + +You can un-register a locally defined certificate (note: this will not affect RA configuration): + ```bash ./client.py --url https://certificates.domain.com delete ``` ### 3.4 Renew all certificates -You can force a certificate renewal for all certificate, which is basicaly what the upki-client services timer is doing. + +You can force a certificate renewal for all certificates, which is basically what the upki-client services timer is doing: + ```bash ./client.py --url https://certificates.domain.com renew ``` -### 3.5 Renew Certificates Revokation List +### 3.5 Renew Certificates Revocation List + Re-download CRL, useful when client is a server and web server needs to have an updated list. -An example systemd timer for Nginx is given in *upki-cli-crl.service* and *upki-cli-crl.timer* +An example systemd timer for Nginx is given in _upki-cli-crl.service_ and _upki-cli-crl.timer_: + ```bash ./client.py --url https://certificates.domain.com crl ``` ### 3.6 Help -For more advanced usage please check the app help global + +For more advanced usage please check the app help global: + ```bash ./client.py --help ``` -You can also have specific help for each actions +You can also have specific help for each action: + ```bash ./client.py --url https://certificates.domain.com add --help ``` -## 4. TODO -Until being ready for production some tasks remains: -> - Setup Unit Tests -> - Refactoring of Bot class -> - Migrate storage to TinyDB or sqlite -> - Store URL in config file -> - Associate each node with specific URL in order to allow support for multiple RA -> - Add uninstall.sh script +## Project Structure + +``` +upki-cli/ +├── README.md +├── LICENSE +├── __metadata.py +├── requirements.txt +├── setup.py +├── install.sh +├── upki-cli.sh +├── client.py +├── upki-cli-crl.service +├── upki-cli-crl.timer +└── client/ + ├── __init__.py + ├── collection.py + ├── node.py + ├── bot.py + └── upkiLogger.py +``` + +## License + +MIT License - See LICENSE file for details. + +## Links + +- Website: https://circle-cyber.com +- GitHub: https://github.com/circle-rd +- Documentation: https://circle-rd.github.io/upki-cli diff --git a/__metadata.py b/__metadata.py index 16acb72..f1a70f2 100644 --- a/__metadata.py +++ b/__metadata.py @@ -1,4 +1,4 @@ -__author__ = "Ben Mz" -__authoremail__ = "bmz@prohacktive.io" -__version__ = "0.5.2" -__url__ = "https://github.com/proh4cktive/upki-cli" \ No newline at end of file +__author__ = "CIRCLE Cyber" +__authoremail__ = "contact@circle-cyber.com" +__version__ = "2.0.0" +__url__ = "https://github.com/circle-rd/upki-cli" diff --git a/client.py b/client.py index 187f892..c2b6f08 100755 --- a/client.py +++ b/client.py @@ -1,56 +1,103 @@ #!/usr/bin/env python3 # -*- coding:utf-8 -*- +""" +UPKI CLI Client. + +This is the main entry point for the UPKI Command Line Interface client. +""" import os import sys import re import argparse import logging +from typing import Any import client + def main(argv): - BASE_DIR = os.path.join(os.path.expanduser("~"), '.upki') - LOG_FILE = ".cli.log" - LOG_LEVEL = logging.INFO - VERBOSE = True - OUTPUT = 'cli' - PEM = False - NO_PASS = False + BASE_DIR = os.path.join(os.path.expanduser("~"), ".upki") + LOG_FILE = ".cli.log" + LOG_LEVEL = logging.INFO + VERBOSE = True + OUTPUT = "cli" + PEM = False + NO_PASS = False parser = argparse.ArgumentParser(description="µPki-CLI is the µPKI client.") parser.add_argument("-q", "--quiet", help="Output less infos", action="store_true") parser.add_argument("-d", "--debug", help="Enable debug mode", action="store_true") - parser.add_argument("-j", "--json", help="Output result in json", action="store_true") + parser.add_argument( + "-j", "--json", help="Output result in json", action="store_true" + ) parser.add_argument("-u", "--url", help="Define the RA url", required=True) - parser.add_argument("-p", "--path", help="Set the directory path where private keys, csr and certificates are stored. (Default: {p})".format(p=BASE_DIR)) - + parser.add_argument( + "-p", + "--path", + help="Set the directory path where private keys, csr and certificates are stored. (Default: {p})".format( + p=BASE_DIR + ), + ) + # Allow subparsers - subparsers = parser.add_subparsers(title='commands') - - parser_add = subparsers.add_parser('add', help="Add a node to certify.") - parser_add.set_defaults(which='add') - parser_add.add_argument("-n", "--name", help="Define the requested CN for node", default=None) - parser_add.add_argument("-c", "--chrome", help="Add node certificate to Chrome", action="store_true", default=False) - parser_add.add_argument("-f", "--firefox", help="Add node certificate to Firefox", action="store_true", default=False) - parser_add.add_argument("-p", "--profile", help="Set the profile name for node", default=None) - parser_add.add_argument("--p12", help="Generate a p12 certificate file (default: .pem only)", action="store_true", default=False) - parser_add.add_argument("--passwd", help="Protect p12 file with pass (default: False)", action="store", default=None) - - parser_renew = subparsers.add_parser('renew', help="Renew nodes registered.") - parser_renew.set_defaults(which='renew') - - parser_crl = subparsers.add_parser('crl', help="Regenerate CRL.") - parser_crl.set_defaults(which='crl') - - parser_list = subparsers.add_parser('list', help="List nodes registered.") - parser_list.set_defaults(which='list') - - parser_delete = subparsers.add_parser('delete', help="Delete node from local db (does not impact server).") - parser_delete.set_defaults(which='delete') - parser_delete.add_argument("-n", "--name", help="Define the requested CN for node", default=None) - parser_delete.add_argument("-p", "--profile", help="Set the profile name for node", default=None) - + subparsers = parser.add_subparsers(title="commands") + + parser_add = subparsers.add_parser("add", help="Add a node to certify.") + parser_add.set_defaults(which="add") + parser_add.add_argument( + "-n", "--name", help="Define the requested CN for node", default=None + ) + parser_add.add_argument( + "-c", + "--chrome", + help="Add node certificate to Chrome", + action="store_true", + default=False, + ) + parser_add.add_argument( + "-f", + "--firefox", + help="Add node certificate to Firefox", + action="store_true", + default=False, + ) + parser_add.add_argument( + "-p", "--profile", help="Set the profile name for node", default=None + ) + parser_add.add_argument( + "--p12", + help="Generate a p12 certificate file (default: .pem only)", + action="store_true", + default=False, + ) + parser_add.add_argument( + "--passwd", + help="Protect p12 file with pass (default: False)", + action="store", + default=None, + ) + + parser_renew = subparsers.add_parser("renew", help="Renew nodes registered.") + parser_renew.set_defaults(which="renew") + + parser_crl = subparsers.add_parser("crl", help="Regenerate CRL.") + parser_crl.set_defaults(which="crl") + + parser_list = subparsers.add_parser("list", help="List nodes registered.") + parser_list.set_defaults(which="list") + + parser_delete = subparsers.add_parser( + "delete", help="Delete node from local db (does not impact server)." + ) + parser_delete.set_defaults(which="delete") + parser_delete.add_argument( + "-n", "--name", help="Define the requested CN for node", default=None + ) + parser_delete.add_argument( + "-p", "--profile", help="Set the profile name for node", default=None + ) + args = parser.parse_args() try: @@ -69,8 +116,8 @@ def main(argv): LOG_LEVEL = logging.DEBUG if args.json: - OUTPUT = 'json' - + OUTPUT = "json" + # Ensure directory exists if not os.path.isdir(BASE_DIR): try: @@ -82,19 +129,23 @@ def main(argv): try: # Generate logger object - logger = client.PHKLogger(LOG_FILE, LOG_LEVEL, proc_name="upki CLI", verbose=VERBOSE) + logger = client.UPKILogger( + LOG_FILE, LOG_LEVEL, proc_name="upki CLI", verbose=VERBOSE + ) except Exception as err: - raise Exception('Unable to setup logger: {e}'.format(e=err)) + raise Exception("Unable to setup logger: {e}".format(e=err)) # Meta information dirname = os.path.dirname(__file__) # Retrieve all metadata from project - with open(os.path.join(dirname, '__metadata.py'), 'rt') as meta_file: - metadata = dict(re.findall(r"^__([a-z]+)__ = ['\"]([^'\"]*)['\"]", meta_file.read(), re.M)) - + with open(os.path.join(dirname, "__metadata.py"), "rt") as meta_file: + metadata = dict( + re.findall(r"^__([a-z]+)__ = ['\"]([^'\"]*)['\"]", meta_file.read(), re.M) + ) + logger.info("\t\t..:: µPKI Client ::..", color="WHITE", light=True) - logger.info("version: {v}".format(v=metadata['version']), color="WHITE") + logger.info("version: {v}".format(v=metadata["version"]), color="WHITE") try: bot = client.Bot(logger, args.url, BASE_DIR, verbose=VERBOSE) @@ -102,14 +153,21 @@ def main(argv): logger.error(err) return False - if args.which == 'add': + if args.which == "add": # Url is mandatory for node addition if not args.url: - raise Exception('You MUST set an url for node certificate retrieval') + raise Exception("You MUST set an url for node certificate retrieval") try: # Register node in local config - bot.add_node(args.name, args.profile, p12=args.p12, passwd=args.passwd, firefox=args.firefox, chrome=args.chrome) + bot.add_node( + args.name, + args.profile, + p12=args.p12, + passwd=args.passwd, + firefox=args.firefox, + chrome=args.chrome, + ) # Does not block code when node or certificate exist except RuntimeError as err: logger.error(err) @@ -118,31 +176,31 @@ def main(argv): except Exception as err: logger.error(err) sys.exit(1) - elif args.which == 'renew': + elif args.which == "renew": try: # Renew all nodes in config bot.renew() except Exception as err: logger.error(err) sys.exit(1) - elif args.which == 'crl': + elif args.which == "crl": # Url is mandatory for CRL retrieval if not args.url: - raise Exception('You MUST set an url for CRL retrieval') + raise Exception("You MUST set an url for CRL retrieval") try: # Regenerate CRL file bot.crl() except Exception as err: logger.error(err) sys.exit(1) - elif args.which == 'list': + elif args.which == "list": try: # List all nodes in config bot.list() except Exception as err: logger.error(err) sys.exit(1) - elif args.which == 'delete': + elif args.which == "delete": try: # Delete node in config bot.delete(args.name, args.profile) @@ -150,9 +208,9 @@ def main(argv): logger.error(err) sys.exit(1) - -if __name__ == '__main__': + +if __name__ == "__main__": try: main(sys.argv) except KeyboardInterrupt: - sys.stdout.write('\nBye.\n') \ No newline at end of file + sys.stdout.write("\nBye.\n") diff --git a/client/__init__.py b/client/__init__.py index e124369..0d0ff64 100644 --- a/client/__init__.py +++ b/client/__init__.py @@ -1,9 +1,5 @@ -from .phkLogger import PHKLogger +from .upkiLogger import UPKILogger from .collection import Collection from .bot import Bot -__all__ = ( - 'PHKLogger', - 'Collection', - 'Bot' -) \ No newline at end of file +__all__ = ("UPKILogger", "Collection", "Bot") diff --git a/client/acme_client.py b/client/acme_client.py new file mode 100644 index 0000000..285e645 --- /dev/null +++ b/client/acme_client.py @@ -0,0 +1,648 @@ +""" +uPKI CLI - ACME v2 Client (RFC 8555). + +Pure-Python ACME client using: +- cryptography — key generation, CSR, P12, JWS signing +- httpx — HTTP/HTTPS requests + +No josepy, no subprocess openssl. + +JWS details: +- Account key: EC P-256 +- Serialization: flattened JSON {"protected","payload","signature"} +- EC signature format: IEEE P1363 (r || s), NOT DER — per RFC 8555 / JWS +- RFC 7638 key thumbprint: SHA-256 over {"crv","kty","x","y"} in lex order +""" + +from __future__ import annotations + +import hashlib +import json +import os +import time +from base64 import b64decode, b64encode +from typing import Any + +import httpx +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec +from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature +from cryptography.hazmat.primitives.serialization import pkcs12 as _pkcs12 +from cryptography.x509 import ( + CertificateSigningRequestBuilder, + DNSName, + Name, + NameAttribute, + NameOID, + SubjectAlternativeName, + load_pem_x509_certificate, +) + + +# ============================================================================ +# Base64URL helpers (RFC 4648 §5) +# ============================================================================ + + +def _b64url(data: bytes) -> str: + """Encode bytes to base64url without padding.""" + return b64encode(data).decode().rstrip("=").replace("+", "-").replace("/", "_") + + +def _b64url_decode(data: str) -> bytes: + """Decode base64url string (adds missing padding).""" + pad = 4 - (len(data) % 4) + if pad != 4: + data += "=" * pad + return b64decode(data.replace("-", "+").replace("_", "/")) + + +# ============================================================================ +# AcmeClient +# ============================================================================ + + +class AcmeClient: + """ACME v2 client for uPKI CLI. + + Manages an account key per data directory. On first run the key and + account are created automatically via the RA's ACME endpoints. + + Args: + ra_url: Base URL of the RA (e.g. "https://ra.example.com"). + data_dir: Local directory where keys and certificates are stored. + ca_cert_path: Path to the CA certificate used to verify the RA TLS + connection. None disables server-certificate verification (only + acceptable on private networks where the CA is self-signed and + the cert hasn't been fetched yet). + """ + + def __init__( + self, + ra_url: str, + data_dir: str, + ca_cert_path: str | None = None, + ) -> None: + self._ra_url = ra_url.rstrip("/") + self._data_dir = data_dir + self._ca_cert_path = ca_cert_path + + self._key_path = os.path.join(data_dir, "acme_account.key") + self._account_path = os.path.join(data_dir, "acme_account.json") + + self._private_key: ec.EllipticCurvePrivateKey | None = None + self._account_id: str | None = None + self._directory: dict[str, Any] | None = None + + os.makedirs(data_dir, exist_ok=True) + + # ------------------------------------------------------------------------- + # Account key helpers + # ------------------------------------------------------------------------- + + def _load_or_create_key(self) -> ec.EllipticCurvePrivateKey: + """Load the account key from disk, generating a new one if absent. + + Returns: + EC P-256 private key. + """ + if self._private_key is not None: + return self._private_key + + if os.path.isfile(self._key_path): + with open(self._key_path, "rb") as fh: + loaded = serialization.load_pem_private_key(fh.read(), password=None) + if not isinstance(loaded, ec.EllipticCurvePrivateKey): + raise TypeError( + f"Expected EC private key in {self._key_path!r}, " + f"got {type(loaded).__name__}" + ) + self._private_key = loaded + else: + self._private_key = ec.generate_private_key(ec.SECP256R1()) + pem = self._private_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + with open(self._key_path, "wb") as fh: + fh.write(pem) + os.chmod(self._key_path, 0o600) + + return self._private_key + + def _get_jwk(self) -> dict[str, Any]: + """Return the public key as a JWK dict (EC P-256). + + Returns: + JWK dictionary with kty, crv, x, y. + """ + key = self._load_or_create_key() + pub = key.public_key().public_numbers() + return { + "kty": "EC", + "crv": "P-256", + "x": _b64url(pub.x.to_bytes(32, "big")), + "y": _b64url(pub.y.to_bytes(32, "big")), + } + + def _thumbprint(self) -> str: + """Compute RFC 7638 JWK Thumbprint for the account key. + + Only required members in lexicographic order, compact JSON encoding. + + Returns: + Base64url-encoded SHA-256 of the canonical JWK. + """ + jwk = self._get_jwk() + members = {"crv": jwk["crv"], "kty": jwk["kty"], "x": jwk["x"], "y": jwk["y"]} + digest = hashlib.sha256( + json.dumps(members, sort_keys=True, separators=(",", ":")).encode() + ).digest() + return _b64url(digest) + + # ------------------------------------------------------------------------- + # JWS signing + # ------------------------------------------------------------------------- + + def _sign_jws( + self, + url: str, + payload: dict[str, Any] | None, + *, + use_jwk: bool = False, + nonce: str | None = None, + ) -> bytes: + """Build and sign a flattened-JSON JWS body. + + Args: + url: The target URL (goes into the "url" protected header field). + payload: The payload dict. None produces an empty-string payload + (used for POST-as-GET requests per RFC 8555 §6.3). + use_jwk: If True, embed the public key in the protected header + instead of a kid. Required for new-account. + nonce: Nonce to embed. If None, a fresh one is fetched from the RA. + + Returns: + UTF-8 encoded JSON body ready to POST. + """ + if nonce is None: + nonce = self._get_nonce() + + protected: dict[str, Any] = { + "alg": "ES256", + "nonce": nonce, + "url": url, + } + if use_jwk: + protected["jwk"] = self._get_jwk() + else: + protected["kid"] = f"{self._ra_url}/acme/account/{self._account_id}" + + protected_b64 = _b64url(json.dumps(protected, separators=(",", ":")).encode()) + + if payload is None: + payload_b64 = "" + else: + payload_b64 = _b64url(json.dumps(payload, separators=(",", ":")).encode()) + + sign_input = f"{protected_b64}.{payload_b64}".encode() + + key = self._load_or_create_key() + sig_der = key.sign(sign_input, ec.ECDSA(hashes.SHA256())) + r, s = decode_dss_signature(sig_der) + sig_p1363 = r.to_bytes(32, "big") + s.to_bytes(32, "big") + + body = { + "protected": protected_b64, + "payload": payload_b64, + "signature": _b64url(sig_p1363), + } + return json.dumps(body, separators=(",", ":")).encode() + + # ------------------------------------------------------------------------- + # HTTP helpers + # ------------------------------------------------------------------------- + + def _http_client(self) -> httpx.Client: + """Return an httpx.Client with appropriate TLS settings. + + Returns: + Configured httpx.Client. + """ + verify: bool | str = self._ca_cert_path if self._ca_cert_path else False + return httpx.Client(verify=verify, timeout=30.0) + + def _get_directory(self) -> dict[str, Any]: + """Fetch and cache the ACME directory. + + Returns: + Directory dict with endpoint URLs. + """ + if self._directory: + return self._directory + with self._http_client() as client: + resp = client.get(f"{self._ra_url}/acme/directory") + resp.raise_for_status() + self._directory = resp.json() + if not isinstance(self._directory, dict): + raise RuntimeError("Invalid directory response") + return self._directory + + def _get_nonce(self) -> str: + """Fetch a fresh anti-replay nonce from the RA. + + Returns: + Nonce string. + + Raises: + RuntimeError: If the RA does not return a Replay-Nonce header. + """ + directory = self._get_directory() + url = directory.get("newNonce", f"{self._ra_url}/acme/new-nonce") + with self._http_client() as client: + resp = client.get(url) + nonce = resp.headers.get("Replay-Nonce") or resp.headers.get("replay-nonce") + if not nonce: + raise RuntimeError("RA returned no Replay-Nonce header") + return nonce + + def _post(self, url: str, body: bytes) -> httpx.Response: + """POST a JWS body to the RA. + + Args: + url: Target URL. + body: JSON-encoded JWS body. + + Returns: + httpx.Response. + """ + with self._http_client() as client: + resp = client.post( + url, + content=body, + headers={"Content-Type": "application/jose+json"}, + ) + return resp + + # ------------------------------------------------------------------------- + # Account lifecycle + # ------------------------------------------------------------------------- + + def bootstrap_account(self) -> str: + """Ensure an ACME account exists; create one if not. + + Stores the account ID in ``acme_account.json`` for reuse. + + Returns: + Account ID string (RFC 7638 key thumbprint). + """ + if os.path.isfile(self._account_path): + with open(self._account_path) as fh: + state = json.load(fh) + self._account_id = state.get("id") + self._load_or_create_key() + if self._account_id: + return self._account_id + + # Create account + directory = self._get_directory() + url = directory.get("newAccount", f"{self._ra_url}/acme/new-account") + payload = {"termsOfServiceAgreed": True} + body = self._sign_jws(url, payload, use_jwk=True) + resp = self._post(url, body) + if resp.status_code not in (200, 201): + raise RuntimeError( + f"Account creation failed: {resp.status_code} {resp.text}" + ) + + self._account_id = self._thumbprint() + with open(self._account_path, "w") as fh: + json.dump({"id": self._account_id}, fh) + os.chmod(self._account_path, 0o600) + return self._account_id + + # ------------------------------------------------------------------------- + # Certificate enrollment / renewal + # ------------------------------------------------------------------------- + + def enroll( + self, + cn: str, + profile: str = "server", + sans: list[str] | None = None, + p12: bool = False, + passwd: str | None = None, + ) -> dict[str, str]: + """Enroll a new certificate via ACME. + + Steps: new-order → (skip challenges if pre-auth) → finalize → download. + + Args: + cn: Common Name for the certificate. + profile: Certificate profile (passed to the RA CA). + sans: Subject Alternative Names (DNS). cn is always included. + p12: If True, also write a PKCS#12 bundle. + passwd: Password for the P12 file (None = no password). + + Returns: + Dict with keys ``key``, ``cert``, ``pem`` (and ``p12`` if requested). + """ + self.bootstrap_account() + all_sans = list({cn} | set(sans or [])) + identifiers = [{"type": "dns", "value": s} for s in all_sans] + + # Generate node key + CSR + node_key = ec.generate_private_key(ec.SECP256R1()) + key_file = self._node_path(cn, profile, "key") + key_pem = node_key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + with open(key_file, "wb") as fh: + fh.write(key_pem) + os.chmod(key_file, 0o400) + + csr = ( + CertificateSigningRequestBuilder() + .subject_name(Name([NameAttribute(NameOID.COMMON_NAME, cn)])) + .add_extension( + SubjectAlternativeName([DNSName(s) for s in all_sans]), + critical=False, + ) + .sign(node_key, hashes.SHA256()) + ) + # RFC 8555 §7.4: the csr field MUST be DER-encoded, base64url. + csr_der = csr.public_bytes(serialization.Encoding.DER) + csr_b64 = _b64url(csr_der) + csr_pem = csr.public_bytes(serialization.Encoding.PEM) + + # New-order + directory = self._get_directory() + order_url = directory.get("newOrder", f"{self._ra_url}/acme/new-order") + body = self._sign_jws( + order_url, {"identifiers": identifiers, "profile": profile} + ) + resp = self._post(order_url, body) + if resp.status_code not in (200, 201): + raise RuntimeError(f"new-order failed: {resp.status_code} {resp.text}") + order = resp.json() + + # Poll until ready (pre-authorized clients skip challenges) + finalize_url = order.get("finalize") + if not finalize_url: + raise RuntimeError("RA returned no finalize URL") + + if order.get("status") not in ("ready", "valid"): + order = self._wait_for_order_ready(order, directory) + + # Finalize + body = self._sign_jws(finalize_url, {"csr": csr_b64}) + resp = self._post(finalize_url, body) + if resp.status_code not in (200, 201): + try: + detail = resp.json().get("detail", resp.text) + except Exception: + detail = resp.text + raise RuntimeError(f"finalize failed ({resp.status_code}): {detail}") + result = resp.json() + + cert_url = result.get("certificate") + if not cert_url: + # Poll order until valid + order_id = finalize_url.rstrip("/finalize").split("/")[-1] + full_url = f"{self._ra_url}/acme/order/{order_id}" + cert_url = self._wait_for_cert_url(full_url) + + # Download certificate + with self._http_client() as client: + cert_resp = client.get(cert_url) + cert_resp.raise_for_status() + cert_pem: str = cert_resp.json().get("certificate", "") + if not cert_pem: + raise RuntimeError("RA returned empty certificate") + + # Write files + crt_file = self._node_path(cn, profile, "crt") + pem_file = self._node_path(cn, profile, "pem") + + with open(crt_file, "w") as fh: + fh.write(cert_pem) + os.chmod(crt_file, 0o444) + + with open(pem_file, "w") as fh: + fh.write(cert_pem) + fh.write(key_pem.decode()) + os.chmod(pem_file, 0o400) + + paths = {"key": key_file, "cert": crt_file, "pem": pem_file} + + if p12: + p12_file = self._write_p12(cn, profile, node_key, cert_pem, passwd) + paths["p12"] = p12_file + + return paths + + def renew( + self, + cn: str, + profile: str = "server", + sans: list[str] | None = None, + p12: bool = False, + passwd: str | None = None, + ) -> dict[str, str]: + """Renew an existing certificate (identical to enroll; overwrites files). + + Args: + cn: Common Name. + profile: Certificate profile. + sans: Subject Alternative Names. + p12: Regenerate P12 bundle. + passwd: P12 password. + + Returns: + Dict with file paths. + """ + # Unlock existing files first + for ext in ("key", "crt", "pem", "p12"): + path = self._node_path(cn, profile, ext) + if os.path.isfile(path): + os.chmod(path, 0o600) + + return self.enroll(cn, profile=profile, sans=sans, p12=p12, passwd=passwd) + + def revoke(self, cert_pem: str, reason: int = 0) -> None: + """Revoke a certificate. + + Args: + cert_pem: PEM-encoded certificate to revoke. + reason: RFC 5280 revocation reason code (0 = unspecified). + """ + self.bootstrap_account() + directory = self._get_directory() + revoke_url = directory.get("revokeCert", f"{self._ra_url}/acme/revoke-cert") + cert_b64 = _b64url(cert_pem.encode()) + body = self._sign_jws(revoke_url, {"certificate": cert_b64, "reason": reason}) + resp = self._post(revoke_url, body) + if resp.status_code not in (200, 204): + raise RuntimeError(f"Revocation failed: {resp.status_code} {resp.text}") + + # ------------------------------------------------------------------------- + # CA cert and CRL + # ------------------------------------------------------------------------- + + def get_ca_certificate(self) -> str: + """Fetch the CA certificate from the RA. + + Returns: + PEM-encoded CA certificate string. + """ + with self._http_client() as client: + resp = client.get(f"{self._ra_url}/api/v1/ca") + resp.raise_for_status() + return resp.json().get("data", {}).get("certificate", "") + + def get_crl(self) -> str: + """Fetch the current CRL from the RA. + + Returns: + PEM-encoded CRL string. + """ + with self._http_client() as client: + resp = client.get(f"{self._ra_url}/api/v1/crl") + resp.raise_for_status() + return resp.json().get("data", {}).get("crl", "") + + # ------------------------------------------------------------------------- + # Helpers + # ------------------------------------------------------------------------- + + def _node_path(self, cn: str, profile: str, ext: str) -> str: + """Return the canonical path for a node file. + + Args: + cn: Common Name. + profile: Certificate profile. + ext: File extension (key / crt / pem / p12). + + Returns: + Absolute file path. + """ + return os.path.join(self._data_dir, f"{profile}.{cn}.{ext}") + + def _write_p12( + self, + cn: str, + profile: str, + private_key: ec.EllipticCurvePrivateKey, + cert_pem: str, + passwd: str | None, + ) -> str: + """Write a PKCS#12 bundle to disk. + + Args: + cn: Common Name. + profile: Certificate profile. + private_key: Node private key. + cert_pem: PEM-encoded certificate. + passwd: Optional P12 password. + + Returns: + Path to the written .p12 file. + """ + p12_file = self._node_path(cn, profile, "p12") + cert_obj = load_pem_x509_certificate(cert_pem.encode()) + password = passwd.encode() if passwd else None + p12_bytes = _pkcs12.serialize_key_and_certificates( + name=cn.encode(), + key=private_key, + cert=cert_obj, + cas=None, + encryption_algorithm=( + serialization.BestAvailableEncryption(password) + if password + else serialization.NoEncryption() + ), + ) + with open(p12_file, "wb") as fh: + fh.write(p12_bytes) + os.chmod(p12_file, 0o444) + return p12_file + + def _wait_for_order_ready( + self, + initial_order: dict[str, Any], + directory: dict[str, Any], + timeout: int = 60, + ) -> dict[str, Any]: + """Poll order until status is 'ready' or raise on failure. + + For orders without pre-authorization the client must trigger challenges + and wait. This implementation supports mTLS pre-authorized flows only — + for non-pre-auth flows the caller must handle challenges first. + + Args: + initial_order: Order dict returned by new-order. + directory: ACME directory. + timeout: Maximum seconds to wait. + + Returns: + Updated order dict with status 'ready'. + + Raises: + RuntimeError: If the order times out or becomes 'invalid'. + """ + # Try to find the order poll URL + finalize_url: str = initial_order.get("finalize", "") + if not finalize_url: + raise RuntimeError("No finalize URL in order") + # Derive order poll URL: .../order/{id}/finalize → .../order/{id} + order_poll_url = finalize_url.rstrip("/").removesuffix("/finalize") + + deadline = time.monotonic() + timeout + order = initial_order + while time.monotonic() < deadline: + status = order.get("status") + if status == "ready": + return order + if status in ("valid", "processing"): + return order + if status == "invalid": + raise RuntimeError(f"Order became invalid: {order.get('error')}") + time.sleep(2) + with self._http_client() as client: + resp = client.get(order_poll_url) + if resp.status_code == 200: + order = resp.json() + + raise RuntimeError( + f"Order not ready after {timeout}s (status={order.get('status')})" + ) + + def _wait_for_cert_url(self, order_url: str, timeout: int = 60) -> str: + """Poll order until a certificate URL is available. + + Args: + order_url: URL to poll for order status. + timeout: Maximum seconds to wait. + + Returns: + Certificate download URL. + + Raises: + RuntimeError: On timeout or invalid order. + """ + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + with self._http_client() as client: + resp = client.get(order_url) + if resp.status_code == 200: + order = resp.json() + if order.get("certificate"): + return order["certificate"] + if order.get("status") == "invalid": + raise RuntimeError(f"Order invalid: {order.get('error')}") + time.sleep(2) + + raise RuntimeError(f"Certificate not available after {timeout}s") diff --git a/client/bot.py b/client/bot.py index 832fe98..db1db2c 100644 --- a/client/bot.py +++ b/client/bot.py @@ -1,643 +1,419 @@ # -*- coding:utf-8 -*- +from __future__ import annotations -import os -import sys -import json +import configparser import hashlib -import requests +import os import platform -import configparser -# Prevent CLI output pollution -requests.packages.urllib3.disable_warnings() import subprocess +import sys -import client +from cryptography import x509 -class Bot(object): - def __init__(self, logger, ra_url, path, verbose=True): - - self._logger = logger - self._verbose = verbose - self._unsecure = False - - self._path = path +import client +from client.acme_client import AcmeClient + + +class Bot: + def __init__(self, logger, ra_url: str, path: str, verbose: bool = True) -> None: + self._logger = logger + self._verbose = verbose + self._path = path + + # Normalize URL + ra_url = ra_url.rstrip("/") + if ra_url.startswith("http://"): + self._output( + 'Using unsecured protocol "http://" is NOT recommended...', + level="warning", + ) + while True: + rep = input("Do you want to continue ? [y/N]") + if rep.lower() == "y": + break + raise Exception("Unsecure protocol refused by user.") + elif not ra_url.startswith("https://"): + ra_url = "https://" + ra_url self._ra_url = ra_url - self.ca_cert = os.path.join(self._path, 'ca.crt') - self.crl_crt = os.path.join(self._path, 'crl.pem') + + self.ca_cert = os.path.join(self._path, "ca.crt") + self.crl_crt = os.path.join(self._path, "crl.pem") try: self.collection = client.Collection(self._path) except Exception as err: - raise Exception('Unable to initialize collection: {e}'.format(e=err)) + raise Exception(f"Unable to initialize collection: {err}") from err try: - # Store every certificates found self.collection.list_nodes() - # Check compliance self.collection.check_compliance(self._ra_url) except Exception as err: - raise Exception('Unable to list certificates: {e}'.format(e=err)) + raise Exception(f"Unable to list certificates: {err}") from err - try: - # Configure connection settings - self.__setup_connection() - except Exception as err: - raise Exception(err) + # AcmeClient: disable cert verification until we have the CA cert + ca_cert_path = self.ca_cert if os.path.isfile(self.ca_cert) else None + self._acme = AcmeClient(self._ra_url, self._path, ca_cert_path=ca_cert_path) try: - # Always check CA certificate self.get_ca_checksum() except Exception as err: - raise Exception('Unable to calculate CA certificate checksum: {e}'.format(e=err)) - - if os.path.isfile(self.ca_cert): - stream = os.popen("openssl x509 -noout -subject -in {ca} -nameopt multiline | sed -n 's/ *commonName *= //p'".format(ca=self.ca_cert)) - self.ca_name = stream.read().rstrip() - - - def __setup_connection(self): - # Remove trailing slash if needed - if self._ra_url[-1] == '/': - self._ra_url = self._ra_url[:-1] - - if self._ra_url.startswith('http://'): - self._output('Using unsecured protocol "http://" is NOT recommended...', level="warning") - while True: - rep = input('Do you want to continue ? [y/N]') - if rep.lower() == 'y': - self._unsecure = True - break - raise Exception('Unsecure protocol refused by user.') + raise Exception(f"Unable to validate CA certificate: {err}") from err - elif not self._ra_url.startswith('https://'): - self._ra_url = 'https://' + self._ra_url - - self.headers = {'User-Agent':'uPKI client agent', 'Content-Type': 'application/json'} + # After CA cert is on disk, re-instantiate with TLS verification + self._acme = AcmeClient(self._ra_url, self._path, ca_cert_path=self.ca_cert) - def __request(self, url, data=None, cert=None, verb='GET', verify=False, text=False): - if verb.upper() not in ['GET','POST']: - raise NotImplementedError('Unsupported action') - - action = getattr(requests, verb.lower()) - json_data = json.dumps(data) if data else None - - try: - r = action(url, data=json_data, headers=self.headers, verify=verify, cert=cert) - except Exception as err: - raise Exception('Unable to make TLS request: {e}'.format(e=err)) - - if r.status_code != 200: - raise Exception("HTTP(S) Request Error: {e}".format(e=r.content)) - - # For CA and CRL certificates - if text: - return r.text - - try: - data = r.json() - except ValueError as err: + # Extract CA common name from the certificate + self.ca_name = "uPKI-CA" + if os.path.isfile(self.ca_cert): try: - error = r.text - raise Exception(error) - except AttributeError: - raise Exception('Unable to parse JSON answer; {e}'.format(e=err)) - - if data.get('status') != 'success': - raise Exception(data.get('message', 'Unknown error')) - - return data - - def __execute(self, cmd, cwd=None): - try: - self._output("Execute command: {c}".format(c=cmd), level='DEBUG') - p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, cwd=self._path, executable='/bin/bash') - p.wait() - except Exception as err: - raise Exception('Unable to execute command: {e}'.format(e=err)) - - - def _output(self, message, level=None): + with open(self.ca_cert, "rb") as fh: + cert = x509.load_pem_x509_certificate(fh.read()) + attrs = cert.subject.get_attributes_for_oid(x509.NameOID.COMMON_NAME) + if attrs: + self.ca_name = attrs[0].value + except Exception: + pass # keep default + + def _output(self, message: str, level: str | None = None) -> None: try: self._logger.write(message, level=level) except Exception as err: - sys.out.write('Unable to log: {e}'.format(e=err)) - - def _get_mozilla_profile(self): - # Switch based on platform - if platform.system() == 'Linux': - f_path = os.path.expanduser('~/.mozilla/firefox') - alt_path = os.path.expanduser('~/snap/firefox/common/.mozilla/firefox') + sys.stdout.write(f"Unable to log: {err}\n") + + def _run_cmd(self, cmd: str) -> None: + """Run a shell command, ignoring non-zero exit codes (browser tools).""" + self._output(f"> {cmd}", level="debug") + subprocess.run( + cmd, + shell=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + executable="/bin/bash", + ) + + def _get_mozilla_profile(self) -> str: + if platform.system() == "Linux": + f_path = os.path.expanduser("~/.mozilla/firefox") + alt_path = os.path.expanduser("~/snap/firefox/common/.mozilla/firefox") if os.path.isdir(f_path): mozilla_profile = f_path elif os.path.isdir(alt_path): mozilla_profile = alt_path else: - raise NotImplementedError('Firefox has not been detected on this system') - elif platform.system() == 'Darwin': - if os.path.isdir(os.path.expanduser('~/Library/Application Support/Firefox/Profiles')): - mozilla_profile = os.path.expanduser('~/Library/Application Support/Firefox/Profiles') + raise NotImplementedError( + "Firefox has not been detected on this system" + ) + elif platform.system() == "Darwin": + fp = os.path.expanduser("~/Library/Application Support/Firefox/Profiles") + if os.path.isdir(fp): + mozilla_profile = fp else: - raise NotImplementedError('Firefox has not been detected on this system') - elif platform.system() == 'Windows': - if os.path.isdir(os.path.join(os.getenv('APPDATA'), r'Mozilla\Firefox')): - mozilla_profile = os.path.join(os.getenv('APPDATA'), r'Mozilla\Firefox') + raise NotImplementedError( + "Firefox has not been detected on this system" + ) + elif platform.system() == "Windows": + wp = os.path.join(os.getenv("APPDATA", ""), r"Mozilla\Firefox") + if os.path.isdir(wp): + mozilla_profile = wp else: - raise NotImplementedError('Firefox has not been detected on this system') - - mozilla_profile_ini = os.path.join(mozilla_profile, r'profiles.ini') - profile = configparser.ConfigParser() - profile.read(mozilla_profile_ini) - data_path = os.path.normpath(os.path.join(mozilla_profile, profile.get('Profile0', 'Path'))) + raise NotImplementedError( + "Firefox has not been detected on this system" + ) + else: + raise NotImplementedError(f"Unsupported platform: {platform.system()}") + ini_path = os.path.join(mozilla_profile, "profiles.ini") + profile = configparser.ConfigParser() + profile.read(ini_path) + data_path = os.path.normpath( + os.path.join(mozilla_profile, profile.get("Profile0", "Path")) + ) return data_path - def _add_to_firefox(self, p12_file, passwd): - self._output('Get Mozilla profile', level='debug') + def _add_to_firefox(self, p12_file: str, passwd: str | None) -> bool: + self._output("Get Mozilla profile", level="debug") data_path = self._get_mozilla_profile() - self._output('Found Firefox profile DB: {f}'.format(f=data_path), level='debug') - + self._output(f"Found Firefox profile DB: {data_path}", level="debug") + try: - self._output('Add {n} in Firefox'.format(n=self.ca_name)) - cmd = "certutil -A -n '{n}' -t 'TC,,' -i {ca} -d sql:{d}".format(n=self.ca_name, ca=self.ca_cert, d=data_path) - self._output("> {c}".format(c=cmd), level='debug') - self.__execute(cmd) - except Exception as err: - self._output('Unable to add Root CA in Firefox', level='error') + self._output(f"Add {self.ca_name} in Firefox") + self._run_cmd( + f"certutil -A -n '{self.ca_name}' -t 'TC,,' -i {self.ca_cert} -d sql:{data_path}" + ) + except Exception: + self._output("Unable to add Root CA in Firefox", level="error") try: - self._output('Add user certificate in Firefox') - cmd = "pk12util -i {c} -d sql:{d} -W '{p}'".format(c=p12_file, d=data_path, p=passwd) - self._output("> {c}".format(c=cmd), level='debug') - self.__execute(cmd) - except Exception as err: - self._output('Unable to add user certificate in Firefox', level='error') + self._output("Add user certificate in Firefox") + self._run_cmd( + f"pk12util -i {p12_file} -d sql:{data_path} -W '{passwd or ''}'" + ) + except Exception: + self._output("Unable to add user certificate in Firefox", level="error") return True - def _add_to_chrome(self, p12_file, pem_file, passwd): - if platform.system() == 'Linux': - if os.path.isdir(os.path.expanduser('~/.pki/nssdb')): - data_path = os.path.expanduser('~/.pki/nssdb') - - try: - self._output('Add {n} in Chrome'.format(n=self.ca_name)) - cmd = "certutil -A -n '{n}' -t 'TC,,' -i {ca} -d sql:{d}".format(n=self.ca_name, ca=self.ca_cert, d=data_path) - self._output("> {c}".format(c=cmd), level='debug') - self.__execute(cmd) - except Exception as err: - self._output('Unable to add Root CA in Chrome', level='error') + def _add_to_chrome(self, p12_file: str, pem_file: str, passwd: str | None) -> bool: + if platform.system() == "Linux": + db_path = os.path.expanduser("~/.pki/nssdb") + if not os.path.isdir(db_path): + raise FileNotFoundError("Chrome has not been detected on this system") - try: - self._output('Add user certificate in Chrome') - cmd = "pk12util -i {c} -d sql:{d} -W '{p}'".format(c=p12_file, d=data_path, p=passwd) - self._output("> {c}".format(c=cmd), level='debug') - self.__execute(cmd) - except Exception as err: - self._output('Unable to add user certificate in Chrome', level='error') - else: - raise FileNotFoundError('Chrome has not been detected on this system') - - elif platform.system() == 'Darwin': - # Add to System KeyChain - if os.path.isfile('/Library/Keychains/System.keychain'): - data_path = '/Library/Keychains/System.keychain' - - try: - self._output('[+] Run following command to import ProHacktive Root CA in System KeyChain') - cmd = "sudo security add-trusted-cert -d -r trustRoot -k {d} {ca}".format(d=data_path, ca=self.ca_cert) - self._output("> {c}".format(c=cmd), level='debug') - self.__execute(cmd) - except Exception as err: - self._output('Unable to add Root CA in System KeyChain', level='error') - # Add to User KeyChain - if os.path.isfile(os.path.expanduser('~/Library/Keychains/login.keychain')): - data_path = os.path.expanduser('~/Library/Keychains/login.keychain') - - try: - self._output('[+] Run following command to import ProHacktive Root CA in User KeyChain') - cmd = "sudo security add-trusted-cert -d -r trustRoot -k {d} {ca}".format(d=data_path, ca=self.ca_cert) - self._output("> {c}".format(c=cmd), level='debug') - self.__execute(cmd) - except Exception as err: - self._output('Unable to add Root CA in Login KeyChain', level='error') + try: + self._output(f"Add {self.ca_name} in Chrome") + self._run_cmd( + f"certutil -A -n '{self.ca_name}' -t 'TC,,' -i {self.ca_cert} -d sql:{db_path}" + ) + except Exception: + self._output("Unable to add Root CA in Chrome", level="error") + try: + self._output("Add user certificate in Chrome") + self._run_cmd( + f"pk12util -i {p12_file} -d sql:{db_path} -W '{passwd or ''}'" + ) + except Exception: + self._output("Unable to add user certificate in Chrome", level="error") + + elif platform.system() == "Darwin": + sys_kc = "/Library/Keychains/System.keychain" + if os.path.isfile(sys_kc): try: - self._output('Add user certificate in KeyChain') - # # Old version need a password - # cmd = "security import {c} -k {d} -P '{p}'".format(c=p12_file, d=data_path, p=passwd) - # New version is passwordless - cmd = "certtool i {c}".format(c=pem_file) - self._output("> {c}".format(c=cmd), level='debug') - self.__execute(cmd) - except Exception as err: - self._output('Unable to add user certificate in Login KeyChain', level='error') - else: - raise FileNotFoundError('No KeyChain detected on this system') + self._output( + "[+] Run following command to import uPKI Root CA in System KeyChain" + ) + self._run_cmd( + f"sudo security add-trusted-cert -d -r trustRoot -k {sys_kc} {self.ca_cert}" + ) + except Exception: + self._output( + "Unable to add Root CA in System KeyChain", level="error" + ) + + login_kc = os.path.expanduser("~/Library/Keychains/login.keychain") + if not os.path.isfile(login_kc): + raise FileNotFoundError("No KeyChain detected on this system") + + try: + self._output( + "[+] Run following command to import uPKI Root CA in Login KeyChain" + ) + self._run_cmd( + f"sudo security add-trusted-cert -d -r trustRoot -k {login_kc} {self.ca_cert}" + ) + except Exception: + self._output("Unable to add Root CA in Login KeyChain", level="error") + + try: + self._output("Add user certificate in KeyChain") + self._run_cmd(f"certtool i {pem_file}") + except Exception: + self._output( + "Unable to add user certificate in Login KeyChain", level="error" + ) else: - raise NotImplementedError('Sorry this OS is not supported yet.') + raise NotImplementedError("Sorry this OS is not supported yet.") return True - def get_ca_checksum(self): - try: - self._output('Check CA certificate', level="DEBUG") - ca_pem = self.__request(self._ra_url + '/certs/ca.crt', text=True) - except Exception as err: - raise Exception(err) - - # Init hash function - received = hashlib.sha256(ca_pem.encode('utf-8')).hexdigest() - self._output('CA certificate hash received: {s}'.format(s=received), level='debug') + def get_ca_checksum(self) -> bool: + self._output("Check CA certificate", level="debug") + ca_pem = self._acme.get_ca_certificate() + received = hashlib.sha256(ca_pem.encode("utf-8")).hexdigest() + self._output(f"CA certificate hash received: {received}", level="debug") if os.path.isfile(self.ca_cert): - with open(self.ca_cert, 'rt') as f: - raw = f.read() - - found = hashlib.sha256(raw.encode('utf-8')).hexdigest() - + with open(self.ca_cert, "rt") as fh: + raw = fh.read() + found = hashlib.sha256(raw.encode("utf-8")).hexdigest() if found != received: - self._output('OLD CA certificate hash was: {s}'.format(s=found), level='debug') - self._output('NEW CA certificate received!', level="warning") + self._output(f"OLD CA certificate hash was: {found}", level="debug") + self._output("NEW CA certificate received!", level="warning") while True: - rep = input('Would you like to update it ? [y/N]') - if rep.lower() == 'y': + rep = input("Would you like to update it ? [y/N]") + if rep.lower() == "y": break - raise Exception('CA certificate change refused by user.') - # Remove CA protection + raise Exception("CA certificate change refused by user.") try: os.chmod(self.ca_cert, 0o600) except Exception as err: - raise Exception('Unable to remove CA certificate protection') + raise Exception( + "Unable to remove CA certificate protection" + ) from err else: - # If nothing has changed abort - self._output('CA certificate unchanged', level='debug') + self._output("CA certificate unchanged", level="debug") return True else: - self._output('CA certificate first installation', level="warning") + self._output("CA certificate first installation", level="warning") - # Rewrite CA certificate - with open(self.ca_cert,'wt') as f: - f.write(ca_pem) + with open(self.ca_cert, "wt") as fh: + fh.write(ca_pem) - # Protect CA certificate try: os.chmod(self.ca_cert, 0o444) except Exception as err: - raise Exception('Unable to protect CA certificate') + raise Exception("Unable to protect CA certificate") from err return True - def add_node(self, name, profile, sans=[], p12=False, passwd=None, chrome=False, firefox=False): + def add_node( + self, + name: str | None, + profile: str | None, + sans: list[str] | None = None, + p12: bool = False, + passwd: str | None = None, + chrome: bool = False, + firefox: bool = False, + ) -> bool: if name is None: - name = input('Enter your node name (CN): ') + name = input("Enter your node name (CN): ") if profile is None: - profile = input('Enter your profile: ') + profile = input("Enter your profile: ") - # Force p12 output if browser certificate needs to be generated + # Force p12 output if browser certificate is needed p12 = True if (chrome or firefox) else p12 - # Store filenames - key_file = os.path.join(self._path, "{p}.{n}.key".format(p=profile, n=name)) - req_file = os.path.join(self._path, "{p}.{n}.csr".format(p=profile, n=name)) - crt_file = os.path.join(self._path, "{p}.{n}.crt".format(p=profile, n=name)) - try: - self._output('Register node in local collection', level="DEBUG") - self.collection.register(self._ra_url, name, profile, sans, p12=p12, passwd=passwd, chrome=chrome, firefox=firefox) + self._output("Register node in local collection", level="debug") + self.collection.register( + self._ra_url, + name, + profile, + sans or [], + p12=p12, + passwd=passwd, + chrome=chrome, + firefox=firefox, + ) except Exception as err: if "node already exists" in str(err).lower(): raise RuntimeError(err) - raise Exception('Unable to add node: {e}'.format(e=err)) - - # Avoid re-generate key if exists - if os.path.isfile(key_file) and os.path.isfile(req_file): - self._output('Skip key and CSR generation as they already exists', level='WARNING') - else: - try: - self._output('Request openssl command', level="DEBUG") - data = self.__request(self._ra_url + '/magic/' + profile, data={'cn': name, 'sans': sans}, verify=self.ca_cert, verb="POST") - except Exception as err: - raise Exception(err) - - try: - cmd = data['command'] - except KeyError: - raise Exception('Unable to get magic command') - - try: - self.__execute(cmd) - except Exception as err: - raise Exception('Unable to execute magic command: {e}'.format(e=err)) + raise Exception(f"Unable to add node: {err}") from err try: - # Protect key and csr from re-write - os.chmod(key_file, 0o440) - os.chmod(req_file, 0o444) + self._output("Enroll certificate via ACME", level="debug") + result = self._acme.enroll(name, profile, sans, p12, passwd) except Exception as err: - raise Exception('Unable to protect key and certificate request') - - with open(req_file, 'rt') as f: - csr = f.read() + raise Exception(f"Unable to enroll certificate: {err}") from err try: - self._output('Request certificate', level="DEBUG") - data = self.__request(self._ra_url + '/certify', data={'CSR':csr}, verb="POST", verify=self.ca_cert) - except Exception as err: - if "certificate already generated" in str(err).lower(): - raise RuntimeError(err) - raise Exception(err) - - try: - data['certificate'] - except KeyError: - raise Exception('Missing certificate') - - try: - self._output('Update certificate status to signed', level="DEBUG") self.collection.sign(name, profile) except Exception as err: - raise Exception('Unable to update certificate status: {e}'.format(e=err)) - - with open(crt_file, 'wb') as f: - self._output('Writing certificate to {p}'.format(p=crt_file)) - f.write(data['certificate'].encode('utf-8')) - - try: - # Protect certificate from re-write - os.chmod(crt_file, 0o444) - except Exception as err: - raise Exception('Unable to protect certificate') - - self._output('Generate PEM file with key and certificates') - with open(crt_file , 'rt') as f: - crt_content = f.read() - with open(key_file, 'rt') as f: - key_content = f.read() - - pem_file = os.path.join(self._path, "{p}.{n}.pem".format(p=profile, n=name)) - with open(pem_file, 'wt') as f: - f.write(crt_content) - f.write(key_content) - - # Protect pem from re-write - try: - os.chmod(pem_file, 0o444) - except Exception as err: - raise Exception('Unable to protect certificate pem file') + raise Exception(f"Unable to update certificate status: {err}") from err if p12: - # Generate p12 certificate - p12_file = os.path.join(self._path, "{p}.{n}.p12".format(p=profile, n=name)) - - # Protect p12 if required - if passwd: - self._output('Generate P12 file with password') - else: - self._output('Generate P12 file without password', level='warning') - - try: - openssl_cmd = 'openssl pkcs12 -export -out {c} -inkey {k} -in {crt} -certfile {ca} -passout pass:{p}'.format(c=p12_file, k=key_file, crt=crt_file, ca=self.ca_cert, p=passwd) - self.__execute(openssl_cmd) - except Exception as err: - raise Exception('Unable to generate p12 certificate: {e}'.format(e=err)) - - # Protect p12 from re-write - try: - os.chmod(p12_file, 0o444) - except Exception as err: - raise Exception('Unable to protect certificate p12 file') - if firefox: - self._add_to_firefox(p12_file, passwd) - + self._add_to_firefox(result["p12"], passwd) if chrome: - self._add_to_chrome(p12_file, pem_file, passwd) + self._add_to_chrome(result["p12"], result["pem"], passwd) return True - def renew(self): - if self._unsecure: - raise Exception('Can not renew certificates with unsecured protocol') - + def renew(self) -> bool: try: self.collection.list_nodes() except Exception as err: - raise Exception('Unable to list nodes: {e}'.format(e=err)) + raise Exception(f"Unable to list nodes: {err}") from err - if not len(self.collection.nodes): - raise Exception('No node to renew.') + if not self.collection.nodes: + raise Exception("No node to renew.") for node in self.collection.nodes: - try: - # Store filenames - key_file = os.path.join(self._path, "{p}.{n}.key".format(p=node['profile'], n=node['name'])) - req_file = os.path.join(self._path, "{p}.{n}.csr".format(p=node['profile'], n=node['name'])) - crt_file = os.path.join(self._path, "{p}.{n}.crt".format(p=node['profile'], n=node['name'])) - pem_file = os.path.join(self._path, "{p}.{n}.pem".format(p=node['profile'], n=node['name'])) - except KeyError: - raise Exception('Missing mandatory params') + name = node["name"] + profile = node["profile"] + self._output(f"Renew certificate {name} ({profile})") try: - self._output('Renew certificate {n} ({p})'.format(n=node['name'], p=node['profile'])) - ra_url = self._ra_url if node['url'] else self._ra_url - if not ra_url: - raise Exception('RA url is empty.') - data = self.__request(ra_url + '/clients/renew', verify=self.ca_cert, cert=(crt_file, key_file)) + result = self._acme.renew( + name, + profile, + node.get("sans"), + node.get("p12", False), + node.get("passwd"), + ) except Exception as err: - self._output('Unable to renew certificate: {e}'.format(e=err), level="WARNING") + self._output(f"Unable to renew certificate: {err}", level="warning") continue try: - if not data['renew']: - self._output(data['reason'], level="WARNING") - continue - data['certificate'] - except KeyError: - raise Exception('Missing certificate') - - try: - # Unlock protection - os.chmod(crt_file, 0o600) - except Exception as err: - raise Exception('Unable to unlock certificate protection') - - with open(crt_file, 'wb') as f: - self._output('Renew certificate to {p}'.format(p=crt_file), level="DEBUG") - f.write(data['certificate'].encode('utf-8')) - - try: - self._output('Update certificate status to signed', level="DEBUG") - self.collection.sign(node['name'], node['profile']) - except Exception as err: - raise Exception('Unable to update certificate status: {e}'.format(e=err)) - - try: - # Re-enable protection - os.chmod(crt_file, 0o444) - except Exception as err: - raise Exception('Unable to protect certificate') - - try: - # Unlock protection - os.chmod(pem_file, 0o600) - except Exception as err: - raise Exception('Unable to unlock certificate pem file protection') - - self._output('Re-Generate PEM file with key and new certificate') - with open(crt_file , 'rt') as f: - crt_content = f.read() - with open(key_file, 'rt') as f: - key_content = f.read() - - with open(pem_file, 'wt') as f: - f.write(crt_content) - f.write(key_content) - - try: - # Re-enable protection - os.chmod(pem_file, 0o444) + self.collection.sign(name, profile) except Exception as err: - raise Exception('Unable to protect certificate pem file') - - if node['p12']: - p12_file = os.path.join(self._path, "{p}.{n}.p12".format(p=node['profile'], n=node['name'])) - if node['passwd']: - self._output('Re-Generate P12 file with password') - else: - self._output('Re-Generate P12 file without password', level='warning') - - try: - # Unlock protection - os.chmod(p12_file, 0o600) - except Exception as err: - raise Exception('Unable to unlock certificate p12 file protection') - - try: - # Generate p12 certificate - openssl_cmd = openssl_cmd = 'openssl pkcs12 -export -out {c} -inkey {k} -in {crt} -certfile {ca} -passout pass:{p}'.format(c=p12_file, k=key_file, crt=crt_file, ca=self.ca_cert, p=node['passwd']) - self.__execute(openssl_cmd) - except Exception as err: - raise Exception('Unable to re-generate p12 certificate: {e}'.format(e=err)) + raise Exception(f"Unable to update node status: {err}") from err - try: - # Re-enable protection - os.chmod(p12_file, 0o444) - except Exception as err: - raise Exception('Unable to protect certificate p12 file') - - if node['firefox']: - self._add_to_firefox(p12_file, node['passwd']) - - if node['chrome']: - self._add_to_chrome(p12_file, pem_file, node['passwd']) + if node.get("p12"): + p12_file = result.get("p12", "") + pem_file = result.get("pem", "") + if node.get("firefox") and p12_file: + self._add_to_firefox(p12_file, node.get("passwd")) + if node.get("chrome") and p12_file: + self._add_to_chrome(p12_file, pem_file, node.get("passwd")) return True - def crl(self): - try: - self._output('Retrieve CRL', level="DEBUG") - crl_pem = self.__request(self._ra_url + '/certs/crl.pem', text=True) - except Exception as err: - raise Exception(err) - - # Rewrite CRL file - with open(self.crl_crt,'wt') as f: - f.write(crl_pem) - + def crl(self) -> bool: + self._output("Retrieve CRL", level="debug") + crl_pem = self._acme.get_crl() + with open(self.crl_crt, "wt") as fh: + fh.write(crl_pem) return True - def list(self): - + def list(self) -> bool: try: nodes = self.collection.list_nodes() except Exception as err: - raise Exception('Unable to retrieve nodes: {e}'.format(e=err)) + raise Exception(f"Unable to retrieve nodes: {err}") from err - if not len(nodes): - self._output('No node found in config.') + if not nodes: + self._output("No node found in config.") return False - self._output('\t\t..:: Nodes found in config ::..') + self._output("\t\t..:: Nodes found in config ::..") for i, node in enumerate(nodes): - self._output('\t- [{i}] {n}\t({p})'.format(i=i, n=node['name'],p=node['profile'])) + self._output(f"\t- [{i}] {node['name']}\t({node['profile']})") return True - def delete(self, name, profile): + def delete(self, name: str | None, profile: str | None) -> bool: if name is None: - name = input('Enter node name to delete (CN): ') + name = input("Enter node name to delete (CN): ") if profile is None: - profile = input('Enter node profile to delete: ') + profile = input("Enter node profile to delete: ") try: node = self.collection.get_node(name, profile) except Exception as err: - raise Exception('Unable to load node: {e}'.format(e=err)) + raise Exception(f"Unable to load node: {err}") from err if node is None: - raise Exception('Node does not exists.') + raise Exception("Node does not exist.") - try: - # Ensure params exists - name = node['name'] - profile = node['profile'] - p12 = node['p12'] - except KeyError: - raise Exception('Missing mandatory params') - - # Store filenames - key_file = os.path.join(self._path, "{p}.{n}.key".format(p=profile, n=name)) - req_file = os.path.join(self._path, "{p}.{n}.csr".format(p=profile, n=name)) - crt_file = os.path.join(self._path, "{p}.{n}.crt".format(p=profile, n=name)) - pem_file = os.path.join(self._path, "{p}.{n}.pem".format(p=profile, n=name)) - p12_file = os.path.join(self._path, "{p}.{n}.p12".format(p=profile, n=name)) + name = node["name"] + profile = node["profile"] + p12 = node.get("p12", False) try: self.collection.remove(name, profile) except Exception as err: - raise Exception('Unable to add node: {e}'.format(e=err)) + raise Exception(f"Unable to remove node from collection: {err}") from err - if os.path.isfile(key_file): - try: - self.__delete_file(key_file) - except Exception as err: - raise Exception('Unable to delete private key: {e}'.format(e=err)) - if os.path.isfile(req_file): - try: - self.__delete_file(req_file) - except Exception as err: - raise Exception('Unable to delete certificate request: {e}'.format(e=err)) - if os.path.isfile(crt_file): - try: - self.__delete_file(crt_file) - except Exception as err: - raise Exception('Unable to delete certificate: {e}'.format(e=err)) - if os.path.isfile(pem_file): - try: - self.__delete_file(pem_file) - except Exception as err: - raise Exception('Unable to delete pem certificate: {e}'.format(e=err)) - if os.path.isfile(p12_file): - try: - self.__delete_file(p12_file) - except Exception as err: - raise Exception('Unable to delete p12 certificate: {e}'.format(e=err)) + for ext in ("key", "csr", "crt", "pem"): + path = os.path.join(self._path, f"{profile}.{name}.{ext}") + if os.path.isfile(path): + try: + os.chmod(path, 0o600) + os.unlink(path) + except Exception as err: + raise Exception(f"Unable to delete {ext} file: {err}") from err - self._output('Node {n} ({p}) deleted.'.format(n=name, p=profile)) + if p12: + p12_path = os.path.join(self._path, f"{profile}.{name}.p12") + if os.path.isfile(p12_path): + try: + os.chmod(p12_path, 0o600) + os.unlink(p12_path) + except Exception as err: + raise Exception(f"Unable to delete p12 file: {err}") from err + self._output(f"Node {name} ({profile}) deleted.") return True - - def __delete_file(self, filename): - if not os.path.isfile(filename): - return False - - try: - # Remove file lock - os.chmod(filename, 0o600) - except Exception as err: - raise Exception('Unable to protect file: {e}'.format(e=err)) - - try: - os.unlink(filename) - except Exception as err: - raise Exception('Unable to delete file: {e}'.format(e=err)) diff --git a/client/collection.py b/client/collection.py index 1275db7..4d08b57 100644 --- a/client/collection.py +++ b/client/collection.py @@ -1,145 +1,173 @@ # -*- coding:utf-8 -*- +from __future__ import annotations -import os import json +import os +from typing import TypedDict + + +class NodeRecord(TypedDict, total=False): + """A certificate node entry stored in cli.nodes.json.""" + + state: str + url: str + name: str + profile: str + sans: list[str] + p12: bool + passwd: str | None + firefox: bool + chrome: bool -class Collection(object): - def __init__(self, path): - self.nodes = list() - self.path = path - self.conf = os.path.join(self.path, 'cli.nodes.json') - # Initialize if needed +class Collection: + """Manages the local certificate node registry (cli.nodes.json).""" + + def __init__(self, path: str) -> None: + self.nodes: list[NodeRecord] = [] + self.path = path + self.conf = os.path.join(self.path, "cli.nodes.json") + if not os.path.isfile(self.conf): try: self.__update() except Exception as err: - raise Exception('Unable to initialize collection: {e}'.format(e=err)) - - def __update(self, data=[]): - # Update file - with open(self.conf, 'wt') as raw: - raw.write(json.dumps(data, indent=4)) - - def check_compliance(self, url, firefox=False, chrome=False): + raise Exception(f"Unable to initialize collection: {err}") from err + + def __update(self, data: list[NodeRecord] | None = None) -> None: + with open(self.conf, "wt") as raw: + raw.write(json.dumps(data if data is not None else [], indent=4)) + + def check_compliance( + self, + url: str, + firefox: bool = False, + chrome: bool = False, + ) -> bool: + """Ensure every node has the required fields, back-filling defaults. + + Args: + url: RA URL to set on nodes that are missing it. + firefox: Default value for the 'firefox' flag. + chrome: Default value for the 'chrome' flag. + + Returns: + True on success. + """ if not url: - raise Exception('Missing mandatory url') + raise Exception("Missing mandatory url") - for i, n in enumerate(self.nodes): - try: - n['url'] - except KeyError: - # Set url if not set - self.nodes[i]['url'] = url - try: - n['firefox'] - except KeyError: - # Set firefox flag if not set - self.nodes[i]['firefox'] = firefox - try: - n['chrome'] - except KeyError: - # Set firefox flag if not set - self.nodes[i]['chrome'] = chrome + for node in self.nodes: + node.setdefault("url", url) + node.setdefault("firefox", firefox) + node.setdefault("chrome", chrome) self.__update(self.nodes) - return True - def list_nodes(self): - with open(self.conf, 'rt') as raw: - self.nodes = json.loads(raw.read()) + def list_nodes(self) -> list[NodeRecord]: + """Load nodes from disk and return them. + Returns: + List of NodeRecord dicts. + """ + with open(self.conf, "rt") as raw: + self.nodes = json.loads(raw.read()) return self.nodes - def get_node(self, name, profile): + def get_node(self, name: str, profile: str) -> NodeRecord | None: + """Find a node by name and profile. + + Args: + name: Common Name. + profile: Certificate profile. + + Returns: + Matching NodeRecord or None. + """ for n in self.nodes: - if (n['name'] == name) and (n['profile'] == profile): + if n.get("name") == name and n.get("profile") == profile: return n - return None - def load_node_keychain(self, node): - keypath = os.path.join(self.path, '{p}.{n}.key'.format(p=node['profile'], n=node['name'])) - crtpath = os.path.join(self.path, '{p}.{n}.crt'.format(p=node['profile'], n=node['name'])) - - with open(keypath, 'rt') as raw: - key = raw.read() - with open(crtpath, 'rt') as raw: - cert = raw.read() - - return (key, cert) - - def register(self, url, name, profile, sans, p12=False, passwd=None, chrome=False, firefox=False): - node = dict({ - 'state': 'init', - 'url': url, - 'name': name, - 'profile': profile, - 'sans': sans, - 'p12': p12, - 'passwd': passwd, - 'firefox': firefox, - 'chrome': chrome}) + def register( + self, + url: str, + name: str, + profile: str, + sans: list[str], + p12: bool = False, + passwd: str | None = None, + chrome: bool = False, + firefox: bool = False, + ) -> None: + """Register a new node in the collection. + + Args: + url: RA URL. + name: Common Name. + profile: Certificate profile. + sans: Subject Alternative Names. + p12: Whether to generate a P12 bundle. + passwd: P12 password. + chrome: Register in Chrome NSS database. + firefox: Register in Firefox NSS database. + + Raises: + Exception: If a node with the same name and profile already exists. + """ + node: NodeRecord = { + "state": "init", + "url": url, + "name": name, + "profile": profile, + "sans": sans, + "p12": p12, + "passwd": passwd, + "firefox": firefox, + "chrome": chrome, + } for n in self.nodes: - if (n['name'] == node['name']) and (n['profile'] == node['profile']): - raise Exception('This node already exists') + if n.get("name") == node["name"] and n.get("profile") == node["profile"]: + raise Exception("This node already exists") - # Append node to list self.nodes.append(node) - try: self.__update(self.nodes) except Exception as err: - raise Exception('Unable to register node: {e}'.format(e=err)) + raise Exception(f"Unable to register node: {err}") from err - def sign(self, name, profile): + def sign(self, name: str, profile: str) -> None: + """Mark a node as signed. + + Args: + name: Common Name. + profile: Certificate profile. + """ for i, n in enumerate(self.nodes): - if (n['name'] == name) and (n['profile'] == profile): - self.nodes[i]['state'] = 'signed' + if n.get("name") == name and n.get("profile") == profile: + self.nodes[i]["state"] = "signed" break try: self.__update(self.nodes) except Exception as err: - raise Exception('Unable to register node: {e}'.format(e=err)) + raise Exception(f"Unable to update node state: {err}") from err + + def remove(self, name: str, profile: str) -> None: + """Remove a node from the collection. - def remove(self, name, profile): + Args: + name: Common Name. + profile: Certificate profile. + """ for i, n in enumerate(self.nodes): - if (n['name'] == name) and (n['profile'] == profile): + if n.get("name") == name and n.get("profile") == profile: del self.nodes[i] break - - try: - self.__update(self.nodes) - except Exception as err: - raise Exception('Unable to remove node: {e}'.format(e=err)) - def __renew_node(self, node): try: - (key, cert) = self.__load_keychain(node) + self.__update(self.nodes) except Exception as err: - raise Exception('Unable to load node {n} ({p}) keychain: {e}'.format(n=node['name'], p=node['profile'], e=err)) - - if rep.status_code != 200: - raise Exception('HTTP Error on renew request: {}'.format(rep.status_code)) - - try: - data = rep.json() - except ValueError as err: - try: - error = rep.text - raise Exception(error) - except AttributeError: - raise Exception('Unable to parse JSON answer; {e}'.format(e=err)) - - if data.get('status') != 'success': - raise Exception(data.get('message')) - - try: - data['certificate'] - except KeyError: - raise Exception('Missing certificate') - - \ No newline at end of file + raise Exception(f"Unable to remove node: {err}") from err diff --git a/client/node.py b/client/node.py deleted file mode 100644 index 16371d5..0000000 --- a/client/node.py +++ /dev/null @@ -1,12 +0,0 @@ - -# -*- coding:utf-8 -*- - -class Node(object): - def __init__(self, conf): - self.url = conf['url'] - self.state = conf['state'] - self.name = conf['name'] - self.profile = conf['profile'] - self.sans = conf.get('sans', []) - self.p12 = conf.get('p12', False) - self.passwd = conf.get('passwd', None) diff --git a/client/phkLogger.py b/client/phkLogger.py deleted file mode 100644 index 84e8c80..0000000 --- a/client/phkLogger.py +++ /dev/null @@ -1,167 +0,0 @@ -# -*- coding: utf-8 -*- - -import os -import errno -import sys -import logging -import logging.handlers - - -class PHKLogger(object): - """Simple Logging class - Allow to log to file and syslog server if set - """ - def __init__(self, filename, level=logging.WARNING, proc_name=None, verbose=False, backup=3, when="midnight", syshost=None, sysport=514): - if proc_name is None: - proc_name = __name__ - - try: - self.level = int(level) - except ValueError: - self.level = logging.INFO - - self.logger = logging.getLogger(proc_name) - - try: - os.makedirs(os.path.dirname(filename)) - except OSError as err: - if (err.errno != errno.EEXIST) or not os.path.isdir(os.path.dirname(filename)): - raise Exception(err) - pass - - try: - handler = logging.handlers.TimedRotatingFileHandler(filename, when=when, backupCount=backup) - except IOError: - sys.stderr.write('[!] Unable to write to log file: {f}\n'.format(f=filename)) - sys.exit(1) - - formatter = logging.Formatter('%(asctime)s %(levelname)-8s %(message)s') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - self.logger.setLevel(self.level) - - self.verbose = verbose - - def _is_string(self, string): - try: - return isinstance(string, str) - except NameError: - return isinstance(string, basestring) - - def debug(self, msg, color=None, light=None): - """Shortcut to debug message - """ - self.write(msg, level=logging.DEBUG, color=color, light=light) - - def info(self, msg, color=None, light=None): - """Shortcut to info message - """ - self.write(msg, level=logging.INFO, color=color, light=light) - - def warning(self, msg, color=None, light=None): - """Shortcut to warning message - """ - self.write(msg, level=logging.WARNING, color=color, light=light) - - def error(self, msg, color=None, light=None): - """Shortcut to error message - """ - self.write(msg, level=logging.ERROR, color=color, light=light) - - def critical(self, msg, color=None, light=None): - """Shortcut to critical message - """ - self.write(msg, level=logging.CRITICAL, color=color, light=light) - - def write(self, message, level=None, color=None, light=None): - """Accept log message with level set with string or logging int - """ - - # Clean message - message = str(message).rstrip() - - # Only log if there is a message (not just a new line) - if message == "": - return True - - # Autoset level if necessary - if level is None: - level = self.level - - # Convert string level to logging int - if self._is_string(level): - level = level.upper() - if level == "DEBUG": - level = logging.DEBUG - elif level in ["INFO", "INFOS"]: - level = logging.INFO - elif level == "WARNING": - level = logging.WARNING - elif level == "ERROR": - level = logging.ERROR - elif level == "CRITICAL": - level = logging.CRITICAL - else: - level = self.level - - # Output to with correct level - if level == logging.DEBUG: - def_color = "BLUE" - def_light = True - prefix = '*' - self.logger.debug(message) - elif level == logging.INFO: - def_color = "GREEN" - def_light = False - prefix = '+' - self.logger.info(message) - elif level == logging.WARNING: - def_color = "YELLOW" - def_light = False - prefix = '-' - self.logger.warning(message) - elif level == logging.ERROR: - def_color = "RED" - def_light = False - prefix = '!' - self.logger.error(message) - elif level == logging.CRITICAL: - def_color = "RED" - def_light = True - prefix = '!' - self.logger.critical(message) - else: - raise Exception('Invalid log level') - - if color is None: - color = def_color - if light is None: - light = def_light - - # Output to CLI if verbose flag is set - if self.verbose: - color = color.upper() - # Position color based on level if not forced - c = '\033[1' if light else '\033[0' - if color == 'BLACK': - c += ';30m' - elif color == 'BLUE': - c += ';34m' - elif color == 'GREEN': - c += ';32m' - elif color == 'CYAN': - c += ';36m' - elif color == 'RED': - c += ';31m' - elif color == 'PURPLE': - c += ';35m' - elif color == 'YELLOW': - c += ';33m' - elif color == 'WHITE': - c += ';37m' - else: - # No Color - c += 'm' - - if level >= self.level: - sys.stdout.write("{color}[{p}] {msg}\033[0m\n".format(color=c, p=prefix, msg=message)) \ No newline at end of file diff --git a/client/upkiLogger.py b/client/upkiLogger.py new file mode 100644 index 0000000..9f5facf --- /dev/null +++ b/client/upkiLogger.py @@ -0,0 +1,213 @@ +# -*- coding: utf-8 -*- + +""" +UPKI Logger Module. + +This module provides logging functionality for the UPKI CLI client. +Supports file logging with rotation and optional syslog output. +""" + +import os +import errno +import sys +import logging +import logging.handlers +from typing import Optional, Union + + +class UPKILogger: + """ + Logging class for UPKI CLI client. + + Allows logging to file and optional syslog server. + Supports colored console output in verbose mode. + """ + + def __init__( + self, + filename: str, + level: Union[int, str] = logging.WARNING, + proc_name: Optional[str] = None, + verbose: bool = False, + backup: int = 3, + when: str = "midnight", + syshost: Optional[str] = None, + sysport: int = 514, + ) -> None: + """ + Initialize the logger. + + Args: + filename: Path to the log file. + level: Logging level (int or string). + proc_name: Process name for the logger. + verbose: Enable colored console output. + backup: Number of backup log files to keep. + when: When to rotate the log file. + syshost: Syslog server host (not currently used). + sysport: Syslog server port. + """ + if proc_name is None: + proc_name = __name__ + + try: + self.level = int(level) + except ValueError: + self.level = logging.INFO + + self.logger = logging.getLogger(proc_name) + + try: + os.makedirs(os.path.dirname(filename)) + except OSError as err: + if (err.errno != errno.EEXIST) or not os.path.isdir( + os.path.dirname(filename) + ): + raise Exception(err) from err + + try: + handler = logging.handlers.TimedRotatingFileHandler( + filename, when=when, backupCount=backup + ) + except IOError: + sys.stderr.write(f"[!] Unable to write to log file: {filename}\n") + sys.exit(1) + + formatter = logging.Formatter("%(asctime)s %(levelname)-8s %(message)s") + handler.setFormatter(formatter) + self.logger.addHandler(handler) + self.logger.setLevel(self.level) + + self.verbose = verbose + + def debug( + self, msg: str, color: Optional[str] = None, light: Optional[bool] = None + ) -> None: + """Shortcut to debug message.""" + self.write(msg, level=logging.DEBUG, color=color, light=light) + + def info( + self, msg: str, color: Optional[str] = None, light: Optional[bool] = None + ) -> None: + """Shortcut to info message.""" + self.write(msg, level=logging.INFO, color=color, light=light) + + def warning( + self, msg: str, color: Optional[str] = None, light: Optional[bool] = None + ) -> None: + """Shortcut to warning message.""" + self.write(msg, level=logging.WARNING, color=color, light=light) + + def error( + self, msg: str, color: Optional[str] = None, light: Optional[bool] = None + ) -> None: + """Shortcut to error message.""" + self.write(msg, level=logging.ERROR, color=color, light=light) + + def critical( + self, msg: str, color: Optional[str] = None, light: Optional[bool] = None + ) -> None: + """Shortcut to critical message.""" + self.write(msg, level=logging.CRITICAL, color=color, light=light) + + def write( + self, + message: str, + level: Optional[Union[int, str]] = None, + color: Optional[str] = None, + light: Optional[bool] = None, + ) -> bool: + """ + Accept log message with level set with string or logging int. + + Args: + message: The message to log. + level: Logging level (int or string). + color: Color for console output. + light: Use light color variant. + + Returns: + True if message was logged, False if empty message. + """ + message = str(message).rstrip() + + if message == "": + return True + + if level is None: + level = self.level + + if isinstance(level, str): + level = level.upper() + if level == "DEBUG": + level = logging.DEBUG + elif level in ["INFO", "INFOS"]: + level = logging.INFO + elif level == "WARNING": + level = logging.WARNING + elif level == "ERROR": + level = logging.ERROR + elif level == "CRITICAL": + level = logging.CRITICAL + else: + level = self.level + + if level == logging.DEBUG: + def_color = "BLUE" + def_light = True + prefix = "*" + self.logger.debug(message) + elif level == logging.INFO: + def_color = "GREEN" + def_light = False + prefix = "+" + self.logger.info(message) + elif level == logging.WARNING: + def_color = "YELLOW" + def_light = False + prefix = "-" + self.logger.warning(message) + elif level == logging.ERROR: + def_color = "RED" + def_light = False + prefix = "!" + self.logger.error(message) + elif level == logging.CRITICAL: + def_color = "RED" + def_light = True + prefix = "!" + self.logger.critical(message) + else: + raise Exception("Invalid log level") + + if color is None: + color = def_color + if light is None: + light = def_light + + if self.verbose: + color = color.upper() + c = "\033[1" if light else "\033[0" + if color == "BLACK": + c += ";30m" + elif color == "BLUE": + c += ";34m" + elif color == "GREEN": + c += ";32m" + elif color == "CYAN": + c += ";36m" + elif color == "RED": + c += ";31m" + elif color == "PURPLE": + c += ";35m" + elif color == "YELLOW": + c += ";33m" + elif color == "WHITE": + c += ";37m" + else: + c += "m" + + if level >= self.level: + sys.stdout.write(f"{c}[{prefix}] {message}\033[0m\n") + + return True diff --git a/requirements.txt b/requirements.txt index fa8b793..8d1a94c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,4 @@ -certifi==2019.9.11 -chardet==3.0.4 -idna==2.8 -requests==2.22.0 -urllib3==1.25.7 -setuptools==40.8.0 \ No newline at end of file +cryptography>=42.0.0 +dnspython>=2.6.0 +httpx>=0.27.0 +setuptools>=68.0.0 diff --git a/setup.py b/setup.py index d3117a6..f0295fc 100644 --- a/setup.py +++ b/setup.py @@ -1,36 +1,45 @@ #!/usr/bin/env python3 # -*- coding:utf-8 -*- +""" +Setup script for UPKI CLI Client. +""" -import re, os +import re +import os from setuptools import setup, find_packages # Meta information dirname = os.path.dirname(__file__) # Retrieve all metadata from project -with open(os.path.join(dirname, '__metadata.py'), 'rt') as meta_file: - metadata = dict(re.findall(r"^__([a-z]+)__ = ['\"]([^'\"]*)['\"]", meta_file.read(), re.M)) +with open(os.path.join(dirname, "__metadata.py"), "rt") as meta_file: + metadata = dict( + re.findall(r"^__([a-z]+)__ = ['\"]([^'\"]*)['\"]", meta_file.read(), re.M) + ) # Get required packages from requirements.txt # Make it compatible with setuptools and pip -with open(os.path.join(dirname, 'requirements.txt')) as f: +with open(os.path.join(dirname, "requirements.txt")) as f: requirements = f.read().splitlines() setup( - name='uPKI_CLI', - description='µPKI client', - long_description=open('README.md').read(), - author=metadata['author'], - author_email=metadata['authoremail'], - version=metadata['version'], + name="upki-cli", + description="µPKI CLI client", + long_description=open("README.md").read(), + author=metadata["author"], + author_email=metadata["authoremail"], + version=metadata["version"], classifiers=[ - 'Development Status :: 3 - Alpha', - 'License :: OSI Approved :: MIT License', - 'Programming Language :: Python :: 3.6', - 'Intended Audience :: System Administrators' - ], - url='https://github.com/proh4cktive/upki-cli', + "Development Status :: 3 - Alpha", + "License :: OSI Approved :: MIT License", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", + "Intended Audience :: System Administrators", + ], + url="https://github.com/circle-rd/upki-cli", packages=find_packages(), - license='MIT', - install_requires=requirements -) \ No newline at end of file + license="MIT", + install_requires=requirements, + python_requires=">=3.11", +) diff --git a/upki-cli.sh b/upki-cli.sh index d6bf632..0464b3b 100755 --- a/upki-cli.sh +++ b/upki-cli.sh @@ -7,7 +7,7 @@ fi echo -e "\t\t..:: uPKI Client ::.." # Set default VARS -RA_URL='certificates.prohacktive.io' # Should be remove in public release !!! ;) +RA_URL='certificates.example.com' # Set your RA URL here CERT_PATH="${HOME}/.upki" if [[ ! -d ${CERT_PATH} ]]; then