Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/emu_dll.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def hook_messagebox(emu, api_name, func, params):
return rv


def hook_mem_write(emu, access, address, size, value, ctx):
def hook_mem_write(emu, access, address, size, value):
"""
Hook that is called whenever memory is written to
Args:
Expand Down
2 changes: 1 addition & 1 deletion examples/upx_unpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def save_unpacked_file(self):
up.write(self.mem_read(mm.base, mm.size))
# TODO: Fixup the import table after dumping

def code_hook(self, emu, addr, size, ctx):
def code_hook(self, emu, addr, size):
if self.end_addr >= addr >= self.start_addr:
print("[*] Section hop signature hit, dumping module")
self.save_unpacked_file()
Expand Down
48 changes: 22 additions & 26 deletions speakeasy/binemu.py
Original file line number Diff line number Diff line change
Expand Up @@ -894,14 +894,14 @@ def add_api_hook(self, cb, module="", api_name="", argc=0, call_conv=None, emu=N
self.hooks.update({common.HOOK_API: obj})
return hook

def add_code_hook(self, cb, begin=1, end=0, ctx={}, emu=None):
def add_code_hook(self, cb, begin=1, end=0, emu=None):
"""
Add a hook that will fire for every CPU instruction
"""
hl = self.hooks.get(common.HOOK_CODE, [])
if not emu:
emu = self
hook = common.CodeHook(self, self.emu_eng, cb, begin, end, ctx)
hook = common.CodeHook(self, self.emu_eng, cb, begin, end)
if not hl:
self.hooks.update(
{
Expand All @@ -918,11 +918,7 @@ def add_code_hook(self, cb, begin=1, end=0, ctx={}, emu=None):

return hook

def _dynamic_code_cb(self, emu, addr, size, ctx={}):
"""
Call all subscribers that want callbacks dynamic code callbacks
"""

def _fire_dyn_code_hooks(self, addr):
profiler = self.get_profiler()
mm = self.get_address_map(addr)
if profiler:
Expand All @@ -932,32 +928,32 @@ def _dynamic_code_cb(self, emu, addr, size, ctx={}):
for h in self.hooks.get(common.HOOK_DYN_CODE, []):
h.cb(mm)

# Delete the code hook that got us here
if ctx and isinstance(ctx, dict):
h = ctx.get("_delete_hook")
if h:
h.disable()

def _set_dyn_code_hook(self, addr, size, ctx={}):
def _set_dyn_code_hook(self, addr, size):
"""
Set the top level dispatch hook for dynamic code execution
"""
max_hook_size = 0x10
if size > max_hook_size:
size = max_hook_size

ch = self.add_code_hook(cb=self._dynamic_code_cb, begin=addr, end=addr + size, ctx=ctx)
ctx.update({"_delete_hook": ch})
hook_ref = [None]

def _dynamic_code_cb(emu, addr, size):
self._fire_dyn_code_hooks(addr)
if hook_ref[0]:
hook_ref[0].disable()

hook_ref[0] = self.add_code_hook(cb=_dynamic_code_cb, begin=addr, end=addr + size)

def add_dyn_code_hook(self, cb, ctx=[], emu=None):
def add_dyn_code_hook(self, cb, emu=None):
"""
Add a hook that will fire when dynamically generated/copied code is executed
"""
if not emu:
emu = self
hl = self.hooks.get(common.HOOK_DYN_CODE, [])

hook = common.DynCodeHook(emu, self.emu_eng, cb, ctx)
hook = common.DynCodeHook(emu, self.emu_eng, cb)
if not hl:
self.hooks.update(
{
Expand Down Expand Up @@ -1043,7 +1039,7 @@ def add_mem_map_hook(self, cb, begin=1, end=0, emu=None):

return hook

def _hook_mem_invalid_dispatch(self, emu, access, address, size, value, ctx):
def _hook_mem_invalid_dispatch(self, emu, access, address, size, value):
"""
This handler will dispatch other invalid memory hooks
"""
Expand All @@ -1052,7 +1048,7 @@ def _hook_mem_invalid_dispatch(self, emu, access, address, size, value, ctx):
rv = True
for mem_access_hook in hl[1:]:
if mem_access_hook.enabled:
rv = mem_access_hook.cb(emu, access, address, size, value, ctx)
rv = mem_access_hook.cb(emu, access, address, size, value)
if rv is False:
break
return rv
Expand All @@ -1079,13 +1075,13 @@ def add_mem_invalid_hook(self, cb, emu=None):

return hook

def add_interrupt_hook(self, cb, ctx=[], emu=None):
def add_interrupt_hook(self, cb, emu=None):
"""
Add a hook that will fire for software interrupts
"""
if not emu:
emu = self
hook = common.InterruptHook(emu, self.emu_eng, cb, ctx=[])
hook = common.InterruptHook(emu, self.emu_eng, cb)
hl = self.hooks.get(common.HOOK_INTERRUPT)
if not hl:
self.hooks.update(
Expand All @@ -1103,13 +1099,13 @@ def add_interrupt_hook(self, cb, ctx=[], emu=None):

return hook

def add_instruction_hook(self, cb, begin=1, end=0, ctx=[], emu=None, insn=None):
def add_instruction_hook(self, cb, begin=1, end=0, emu=None, insn=None):
"""
Add a hook that will fire for IN, SYSCALL, or SYSENTER instructions
"""
if not emu:
emu = self
hook = common.InstructionHook(emu, self.emu_eng, cb, ctx=[], insn=insn)
hook = common.InstructionHook(emu, self.emu_eng, cb, insn=insn)
hl = self.hooks.get(common.HOOK_INSN)
if not hl:
self.hooks.update(
Expand All @@ -1127,11 +1123,11 @@ def add_instruction_hook(self, cb, begin=1, end=0, ctx=[], emu=None, insn=None):

return hook

def add_invalid_instruction_hook(self, cb, ctx=[], emu=None):
def add_invalid_instruction_hook(self, cb, emu=None):
if not emu:
emu = self

hook = common.InvalidInstructionHook(emu, self.emu_eng, cb, ctx=[])
hook = common.InvalidInstructionHook(emu, self.emu_eng, cb)
hl = self.hooks.get(common.HOOK_INSN_INVALID)

if not hl:
Expand Down
42 changes: 20 additions & 22 deletions speakeasy/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,12 @@ class Hook:
Base class for all emulator hooks
"""

def __init__(self, se_obj, emu_eng, cb, ctx=[], native_hook=False):
def __init__(self, se_obj, emu_eng, cb, native_hook=False):
"""
Arguments:
se_obj: speakeasy emulator object
emu_eng: emulation engine object
cb: Python callback function
ctx: Arbitrary context that be passed between hook callbacks
native_hook: When set to True, a new, raw callback will be registered with
with the underlying emulation engine that is called directly by the DLL.
Otherwise, this hook will be dispatched via a wrapper hook
Expand All @@ -74,7 +73,6 @@ def __init__(self, se_obj, emu_eng, cb, ctx=[], native_hook=False):
self.native_hook = native_hook
self.emu_eng = emu_eng
self.se_obj = se_obj
self.ctx = ctx

def enable(self):
self.enabled = True
Expand All @@ -84,48 +82,48 @@ def disable(self):
self.enabled = False
self.emu_eng.hook_disable(self.handle)

def _wrap_code_cb(self, emu, addr, size, ctx=[]):
def _wrap_code_cb(self, emu, addr, size, ctx=None):
try:
if self.enabled:
if self.se_obj.exit_event and self.se_obj.exit_event.is_set():
self.se_obj.stop()
return False
return self.cb(self.se_obj, addr, size, self.ctx)
return self.cb(self.se_obj, addr, size)
return True
except KeyboardInterrupt:
self.se_obj.stop()
return False

def _wrap_intr_cb(self, emu, num, ctx=[]):
def _wrap_intr_cb(self, emu, num, ctx=None):
if self.enabled:
return self.cb(self.se_obj, num, self.ctx)
return self.cb(self.se_obj, num)
return True

def _wrap_in_insn_cb(self, emu, port, size, ctx=[]):
def _wrap_in_insn_cb(self, emu, port, size, ctx=None):
if self.enabled:
return self.cb(self.se_obj, port, size)
return True

def _wrap_syscall_insn_cb(self, emu, ctx=[]):
def _wrap_syscall_insn_cb(self, emu, ctx=None):
if self.enabled:
return self.cb(self.se_obj)
return True

def _wrap_memory_access_cb(self, emu, access, addr, size, value, ctx):
def _wrap_memory_access_cb(self, emu, access, addr, size, value, ctx=None):
try:
if self.enabled:
if self.se_obj.exit_event and self.se_obj.exit_event.is_set():
self.se_obj.stop()
return False
return self.cb(self.se_obj, access, addr, size, value, ctx)
return self.cb(self.se_obj, access, addr, size, value)
return True
except KeyboardInterrupt:
self.se_obj.stop()
return False

def _wrap_invalid_insn_cb(self, emu, ctx=[]):
def _wrap_invalid_insn_cb(self, emu, ctx=None):
if self.enabled:
return self.cb(self.se_obj, self.ctx)
return self.cb(self.se_obj)
return True


Expand All @@ -148,7 +146,7 @@ class DynCodeHook(Hook):
Currently, this will only fire once per dynamic code mapping. Could be useful for unpacking.
"""

def __init__(self, se_obj, emu_eng, cb, ctx=[]):
def __init__(self, se_obj, emu_eng, cb):
super().__init__(se_obj, emu_eng, cb)


Expand All @@ -157,8 +155,8 @@ class CodeHook(Hook):
This hook callback will fire for every CPU instruction
"""

def __init__(self, se_obj, emu_eng, cb, begin=1, end=0, ctx=[], native_hook=True):
super().__init__(se_obj, emu_eng, cb, ctx=ctx, native_hook=native_hook)
def __init__(self, se_obj, emu_eng, cb, begin=1, end=0, native_hook=True):
super().__init__(se_obj, emu_eng, cb, native_hook=native_hook)
self.begin = begin
self.end = end

Expand Down Expand Up @@ -242,8 +240,8 @@ class InterruptHook(Hook):
This hook will fire each time a a software interrupt is triggered
"""

def __init__(self, se_obj, emu_eng, cb, ctx=[], native_hook=True):
super().__init__(se_obj, emu_eng, cb, ctx=ctx, native_hook=native_hook)
def __init__(self, se_obj, emu_eng, cb, native_hook=True):
super().__init__(se_obj, emu_eng, cb, native_hook=native_hook)

def add(self):
if not self.added and self.native_hook:
Expand All @@ -258,8 +256,8 @@ class InstructionHook(Hook):
Only the instructions: IN, OUT, SYSCALL, and SYSENTER are supported by unicorn.
"""

def __init__(self, se_obj, emu_eng, cb, ctx=[], native_hook=True, insn=None):
super().__init__(se_obj, emu_eng, cb, ctx=ctx, native_hook=native_hook)
def __init__(self, se_obj, emu_eng, cb, native_hook=True, insn=None):
super().__init__(se_obj, emu_eng, cb, native_hook=native_hook)
self.insn = insn

def add(self):
Expand All @@ -275,8 +273,8 @@ class InvalidInstructionHook(Hook):
to be executed
"""

def __init__(self, se_obj, emu_eng, cb, ctx=[], native_hook=True):
super().__init__(se_obj, emu_eng, cb, ctx=ctx, native_hook=native_hook)
def __init__(self, se_obj, emu_eng, cb, native_hook=True):
super().__init__(se_obj, emu_eng, cb, native_hook=native_hook)

def add(self):
if not self.added and self.native_hook:
Expand Down
4 changes: 2 additions & 2 deletions speakeasy/engines/unicorn_eng.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def start(self, addr, timeout=0, count=0):
timeout = self._sec_to_usec(timeout)
return self.emu.emu_start(addr, 0xFFFFFFFF, timeout=timeout, count=count) # type: ignore[union-attr]

def hook_add(self, addr=None, cb=None, htype=None, ctx=None, begin=1, end=0, arg1=0):
def hook_add(self, addr=None, cb=None, htype=None, begin=1, end=0, arg1=0):
"""
Add a callback function for a specific event type or address
"""
Expand All @@ -238,7 +238,7 @@ def hook_add(self, addr=None, cb=None, htype=None, ctx=None, begin=1, end=0, arg
elif hook_type == uc.UC_HOOK_MEM_INVALID:
cb = ct.cast(UC_HOOK_MEM_INVALID_CB(cb), UC_HOOK_MEM_INVALID_CB)
else:
return self.emu.hook_add(htype=hook_type, callback=cb, user_data=ctx, begin=begin, end=end) # type: ignore[union-attr]
return self.emu.hook_add(htype=hook_type, callback=cb, begin=begin, end=end) # type: ignore[union-attr]
ptr = ct.cast(cb, ct.c_void_p)
# uc_hook_add requires an additional paramter for the hook type UC_HOOK_INSN
if hook_type == uc.UC_HOOK_INSN:
Expand Down
3 changes: 1 addition & 2 deletions speakeasy/memmgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,12 @@ def __init__(self, *args, **kwargs):

def _hook_mem_map_dispatch(self, mm):
hl = self.hooks.get(common.HOOK_MEM_MAP, [])
ctx: dict[str, object] = {}
for mem_map_hook in hl:
if mem_map_hook.enabled:
# the mapped memory region's base address falls within the hook's bounds
if mem_map_hook.begin <= mm.base:
if not mem_map_hook.end or mem_map_hook.end > mm.base:
mem_map_hook.cb(self, mm.base, mm.size, mm.tag, mm.prot, mm.flags, ctx)
mem_map_hook.cb(self, mm.base, mm.size, mm.tag, mm.prot, mm.flags)

def mem_map(self, size, base=None, perms=common.PERM_MEM_RWX, tag=None, flags=0, shared=False, process=None):
"""
Expand Down
2 changes: 1 addition & 1 deletion speakeasy/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ def record_dropped_files_event(self, run, files):
entry = {"path": f.path, "size": len(data), "sha256": _hash, "data_ref": data_ref}
run.dropped_files.append(entry)

def record_api_event(self, run, pos: TracePosition, name, ret, argv, ctx=[]):
def record_api_event(self, run, pos: TracePosition, name, ret, argv):
"""
Log a call to an OS API. This includes arguments, return address, and return value
"""
Expand Down
Loading
Loading