Skip to content

Commit ffe3f57

Browse files
authored
Add setup script for KMIP Fortanix (#412)
* Add setup script for KMIP Fortanix
1 parent da0ddc2 commit ffe3f57

File tree

1 file changed

+277
-0
lines changed

1 file changed

+277
-0
lines changed

fortanix_kmip_setup.py

Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
import requests
2+
import os
3+
import subprocess
4+
import argparse
5+
import sys
6+
import re
7+
from urllib.parse import urlparse
8+
9+
# Disable SSL warnings
10+
requests.packages.urllib3.disable_warnings()
11+
12+
# Configuration constants
13+
RSA_KEY_SIZE = 2048
14+
CERT_VALIDITY_DAYS = 365
15+
16+
def parse_args():
17+
"""Parse command line arguments"""
18+
parser = argparse.ArgumentParser(description='Fortanix KMIP Application Setup')
19+
parser.add_argument('--email', required=True, help='Email for Fortanix login')
20+
parser.add_argument('--password', required=True, help='Password for Fortanix login')
21+
parser.add_argument('--dsm-url', default='https://eu.smartkey.io', help='Fortanix DSM URL')
22+
parser.add_argument('--api-url', default='https://api.eu.smartkey.io', help='Fortanix API URL')
23+
parser.add_argument('--app-name', default='TestingMySQL', help='Application name')
24+
parser.add_argument('--group-name', default='TestingMySQL', help='Group name')
25+
parser.add_argument('--cert-dir', default='./cert_dir', help='Certificate directory')
26+
parser.add_argument('-v', '--verbose', action='store_true', help='Verbose output')
27+
return parser.parse_args()
28+
29+
30+
def api_request(method, endpoint, token=None, payload=None, cert=None, base_url=None, verbose=False):
31+
"""Make API request with error handling"""
32+
url = f"{base_url}{endpoint}" if endpoint.startswith('/') else f"{base_url}/{endpoint}"
33+
headers = {"Content-Type": "application/json"}
34+
35+
if token:
36+
headers["Authorization"] = f"Bearer {token}"
37+
38+
if verbose:
39+
print(f"{method} {url}")
40+
41+
try:
42+
response = requests.request(method, url, headers=headers, json=payload, cert=cert, verify=False)
43+
response.raise_for_status()
44+
return response.json() if response.content else {}
45+
except requests.exceptions.RequestException as e:
46+
error_msg = f"API request failed: {e}"
47+
if hasattr(e, 'response') and e.response is not None:
48+
error_msg += f" (Status: {e.response.status_code}, Body: {e.response.text})"
49+
raise Exception(error_msg)
50+
51+
52+
def authenticate(args):
53+
"""Authenticate and select account"""
54+
if args.verbose:
55+
print("\n1. Authenticating...")
56+
57+
# Login
58+
auth = api_request('POST', '/sys/v1/session/auth',
59+
payload={"method": "password", "email": args.email, "password": args.password},
60+
base_url=args.dsm_url, verbose=args.verbose)
61+
if not auth:
62+
raise Exception("Authentication failed: Empty response from server")
63+
token = auth.get('access_token')
64+
if not token:
65+
raise Exception("Authentication failed: No access token in response")
66+
print(f"Authenticated")
67+
68+
# Get and select account
69+
accounts = api_request('GET', '/sys/v1/accounts', token=token,
70+
base_url=args.dsm_url, verbose=args.verbose)
71+
if not accounts:
72+
raise Exception("No accounts found for this user")
73+
account_id = accounts[0]['acct_id']
74+
75+
api_request('POST', '/sys/v1/session/select_account',
76+
token=token, payload={"acct_id": account_id},
77+
base_url=args.dsm_url, verbose=args.verbose)
78+
print(f"Account: {account_id}")
79+
80+
return token, account_id
81+
82+
83+
def get_or_create_group(token, args):
84+
"""Get group ID by name, create if not exists"""
85+
groups = api_request('GET', '/sys/v1/groups', token=token,
86+
base_url=args.dsm_url, verbose=args.verbose)
87+
group = next((g for g in groups if g.get('name') == args.group_name), None)
88+
89+
if group:
90+
print(f"Group found: {group['group_id']}")
91+
return group['group_id']
92+
93+
# Create group if not found
94+
if args.verbose:
95+
print(f"Group '{args.group_name}' not found, creating...")
96+
97+
new_group = api_request('POST', '/sys/v1/groups', token=token,
98+
payload={"name": args.group_name, "description": f"Auto-created group for {args.app_name}"},
99+
base_url=args.dsm_url, verbose=args.verbose)
100+
101+
print(f"Group created: {new_group['group_id']}")
102+
return new_group['group_id']
103+
104+
105+
def setup_app(token, group_id, args):
106+
"""Create or recreate application"""
107+
if args.verbose:
108+
print("\n2. Setting up application...")
109+
110+
# Delete existing app if present
111+
apps = api_request('GET', '/sys/v1/apps', token=token,
112+
base_url=args.dsm_url, verbose=args.verbose)
113+
existing = next((a for a in apps if a.get('name') == args.app_name), None)
114+
115+
if existing:
116+
api_request('DELETE', f"/sys/v1/apps/{existing['app_id']}", token=token,
117+
base_url=args.dsm_url, verbose=args.verbose)
118+
print(f"Deleted existing app")
119+
120+
# Create new app
121+
app = api_request('POST', '/sys/v1/apps', token=token,
122+
payload={
123+
"name": args.app_name,
124+
"auth_type": "Secret",
125+
"description": "Application for Percona MySQL encryption",
126+
"add_groups": [group_id],
127+
"default_group": group_id
128+
},
129+
base_url=args.dsm_url, verbose=args.verbose)
130+
131+
print(f"Created app: {app['app_id']}")
132+
return app['app_id']
133+
134+
135+
def generate_certs(app_id, args):
136+
"""Generate client certificates"""
137+
if args.verbose:
138+
print("\n3. Generating certificates...")
139+
140+
# Setup directory
141+
os.makedirs(args.cert_dir, exist_ok=True)
142+
143+
# Clean existing certificate files only
144+
key_path = os.path.join(args.cert_dir, "private.key")
145+
cert_path = os.path.join(args.cert_dir, "certificate.crt")
146+
for cert_file in [key_path, cert_path]:
147+
if os.path.isfile(cert_file):
148+
os.remove(cert_file)
149+
150+
# Generate certificate
151+
cmd = [
152+
"openssl", "req", "-newkey", f"rsa:{RSA_KEY_SIZE}", "-nodes",
153+
"-keyout", key_path, "-x509", "-days", str(CERT_VALIDITY_DAYS),
154+
"-out", cert_path, "-subj", f"/CN={app_id}"
155+
]
156+
157+
try:
158+
subprocess.run(cmd, check=True, capture_output=True, text=True)
159+
except subprocess.CalledProcessError as e:
160+
raise Exception(f"Certificate generation failed: {e.stderr}")
161+
except FileNotFoundError:
162+
raise Exception("OpenSSL not found. Please install OpenSSL.")
163+
164+
print(f"Certificates generated")
165+
return cert_path, key_path
166+
167+
168+
def enable_cert_auth(token, app_id, cert_path, args):
169+
"""Enable certificate authentication"""
170+
if args.verbose:
171+
print("\n4. Enabling certificate auth...")
172+
173+
with open(cert_path, 'r') as f:
174+
cert_content = f.read()
175+
176+
# Clean certificate (remove headers and whitespace)
177+
cert_body = re.sub(r'-----.*?-----|\s', '', cert_content)
178+
179+
api_request('PATCH', f'/sys/v1/apps/{app_id}', token=token,
180+
payload={"auth_type": "Certificate", "credential": {"certificate": cert_body}},
181+
base_url=args.dsm_url, verbose=args.verbose)
182+
183+
print("Certificate auth enabled")
184+
185+
186+
def test_cert_auth(cert_path, key_path, args):
187+
"""Test certificate authentication"""
188+
if args.verbose:
189+
print("\n5. Testing certificate auth...")
190+
191+
auth = api_request('POST', '/sys/v1/session/auth',
192+
cert=(cert_path, key_path),
193+
base_url=args.api_url, verbose=args.verbose)
194+
195+
token = auth.get('access_token')
196+
if not token:
197+
raise Exception("No access token received")
198+
199+
print(f"Certificate auth successful!")
200+
return token
201+
202+
203+
def download_server_cert(args):
204+
"""Download server certificate"""
205+
if args.verbose:
206+
print("\n6. Downloading server certificate...")
207+
208+
# Extract hostname from DSM URL
209+
hostname = urlparse(args.dsm_url).netloc
210+
211+
cmd = ["openssl", "s_client", "-connect", f"{hostname}:443", "-servername", hostname]
212+
213+
try:
214+
result = subprocess.run(
215+
cmd, capture_output=True, text=True,
216+
input="", timeout=10
217+
)
218+
219+
# Extract certificate
220+
match = re.search(r'-----BEGIN CERTIFICATE-----.*?-----END CERTIFICATE-----',
221+
result.stdout, re.DOTALL)
222+
223+
if match:
224+
server_cert_path = os.path.join(args.cert_dir, "server.crt")
225+
with open(server_cert_path, 'w') as f:
226+
f.write(match.group(0))
227+
print(f"Server certificate saved")
228+
else:
229+
print("Could not extract server certificate")
230+
231+
except subprocess.TimeoutExpired:
232+
raise Exception("Server certificate download timed out")
233+
except Exception as e:
234+
raise Exception(f"Server certificate download failed: {e}")
235+
236+
237+
def main():
238+
"""Main execution"""
239+
args = parse_args()
240+
241+
print("Fortanix KMIP Setup")
242+
243+
try:
244+
# Step 1: Authenticate
245+
token, account_id = authenticate(args)
246+
247+
# Get group
248+
group_id = get_or_create_group(token, args)
249+
250+
# Step 2: Setup application
251+
app_id = setup_app(token, group_id, args)
252+
253+
# Step 3: Generate certificates
254+
cert_path, key_path = generate_certs(app_id, args)
255+
256+
# Step 4: Enable certificate auth
257+
enable_cert_auth(token, app_id, cert_path, args)
258+
259+
# Step 5: Test certificate auth
260+
test_cert_auth(cert_path, key_path, args)
261+
262+
# Step 6: Download server cert
263+
download_server_cert(args)
264+
265+
print("\nSetup completed successfully!")
266+
print(f"Certificates: {args.cert_dir}")
267+
268+
except Exception as e:
269+
print(f"\nError: {e}")
270+
if args.verbose:
271+
import traceback
272+
traceback.print_exc()
273+
sys.exit(1)
274+
275+
276+
if __name__ == "__main__":
277+
main()

0 commit comments

Comments
 (0)