-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathserver
More file actions
122 lines (107 loc) · 4.73 KB
/
server
File metadata and controls
122 lines (107 loc) · 4.73 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#!/usr/bin/env python3
import logging
from http.server import BaseHTTPRequestHandler, HTTPServer
import subprocess
import json
import os
# Setup log file and create if doesn't exist
log_file_path = '/var/log/api.log'
os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
if not os.path.exists(log_file_path):
with open(log_file_path, 'w') as f:
f.write('') # create empty log file
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(log_file_path),
logging.StreamHandler()
]
)
# Read API tokens from file
with open('/etc/api/key', 'r') as token_file:
valid_tokens = [line.strip() for line in token_file if line.strip()]
class RequestHandler(BaseHTTPRequestHandler):
def log_request_info(self, additional_info=''):
client_ip = self.client_address[0]
user_agent = self.headers.get('User-Agent', 'User-Agent not provided')
logging.info(f'Access from IP: {client_ip}, User-Agent: {user_agent}, Path: {self.path}, {additional_info}')
def do_AUTHHEAD(self):
self.send_response(401)
self.send_header('WWW-Authenticate', 'Bearer realm="Authentication required"')
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"message": "Unauthorized: Missing or invalid Bearer token"}')
self.log_request_info('Unauthorized access attempt')
logging.warning('Unauthorized access attempt')
def authorize(self):
auth_header = self.headers.get('Authorization')
if not auth_header or not auth_header.startswith('Bearer '):
self.do_AUTHHEAD()
return False
provided_token = auth_header[7:].strip() # Remove "Bearer "
if provided_token not in valid_tokens:
self.do_AUTHHEAD()
return False
return True
def execute_script(self, script_path, post_data=None):
if not os.path.isfile(script_path):
self.send_response(404)
self.end_headers()
self.wfile.write(b'Script not found')
self.log_request_info(f'Script not found: {script_path}')
logging.error(f'Script not found: {script_path}')
return
try:
if post_data:
result = subprocess.run([script_path], input=post_data, capture_output=True, text=True, check=True)
else:
result = subprocess.run([script_path], capture_output=True, text=True, check=True)
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(result.stdout.encode())
self.log_request_info(f'Successfully executed script: {script_path}')
logging.info(f'Successfully executed script: {script_path}, Output: {result.stdout}')
except subprocess.CalledProcessError as e:
self.send_response(500)
self.send_header('Content-type', 'application/json')
self.end_headers()
error_message = json.dumps({"error": str(e)})
self.wfile.write(error_message.encode())
self.log_request_info(f'Error executing script: {script_path}, Error: {str(e)}')
logging.error(f'Error executing script: {script_path}, Error: {str(e)}')
# Handle HTTP methods (GET, POST, DELETE, etc.)
def do_GET(self):
if not self.authorize():
return
self.execute_script(f'/usr/local/sbin/api/{self.path.lstrip("/")}')
def do_POST(self):
if not self.authorize():
return
content_length = int(self.headers.get('Content-Length', 0))
post_data = self.rfile.read(content_length).decode('utf-8')
self.execute_script(f'/usr/local/sbin/api/{self.path.lstrip("/")}', post_data)
def do_DELETE(self): self.do_POST()
def do_PUT(self): self.do_POST()
def do_PATCH(self): self.do_POST()
def do_CONNECT(self): self.do_GET()
def do_TRACE(self): self.do_GET()
def do_HEAD(self): self.do_GET()
def do_OPTIONS(self):
if not self.authorize():
return
self.send_response(200)
self.send_header('Allow', 'GET, POST, DELETE, PUT, PATCH, CONNECT, TRACE, HEAD, OPTIONS')
self.send_header('Content-type', 'application/json')
self.end_headers()
self.wfile.write(b'{"message": "OPTIONS request received"}')
self.log_request_info()
def run(server_class=HTTPServer, handler_class=RequestHandler, port=9000):
server_address = ('', port)
httpd = server_class(server_address, handler_class)
logging.info(f'Starting httpd server on port {port}')
httpd.serve_forever()
if __name__ == "__main__":
run()