From 01aa693d8feb446d6cf669c123071fff13d46a07 Mon Sep 17 00:00:00 2001 From: Alex-Costin Puflene Date: Mon, 24 Jun 2024 02:29:52 +0300 Subject: [PATCH 1/2] Fixes timeout decorator issues and radare2 parsing problems --- zeratool_lib/formatDetector.py | 6 +++--- zeratool_lib/formatExploiter.py | 6 +++--- zeratool_lib/overflowDetector.py | 6 +++--- zeratool_lib/overflowExploitSender.py | 12 +++++++++++- zeratool_lib/overflowExploiter.py | 13 +++++++++++-- zeratool_lib/overflowRemoteLeaker.py | 2 +- 6 files changed, 32 insertions(+), 13 deletions(-) diff --git a/zeratool_lib/formatDetector.py b/zeratool_lib/formatDetector.py index 8c0fe2d..22334cc 100644 --- a/zeratool_lib/formatDetector.py +++ b/zeratool_lib/formatDetector.py @@ -56,11 +56,11 @@ def checkFormat(binary_name, inputType): # Lame way to do a timeout try: - @timeout_decorator.timeout(1200) + @timeout_decorator.timeout(1200, use_signals=False) def exploreBinary(simgr): - simgr.explore(find=lambda s: "type" in s.globals) + return simgr.explore(find=lambda s: "type" in s.globals) - exploreBinary(simgr) + simgr = exploreBinary(simgr) if "found" in simgr.stashes and len(simgr.found): end_state = simgr.found[0] run_environ["type"] = end_state.globals["type"] diff --git a/zeratool_lib/formatExploiter.py b/zeratool_lib/formatExploiter.py index cd83772..241c88b 100644 --- a/zeratool_lib/formatExploiter.py +++ b/zeratool_lib/formatExploiter.py @@ -158,11 +158,11 @@ def rediscoverAndExploit(binary_name, properties, stack_position, leak_format): # Lame way to do a timeout try: - @timeout_decorator.timeout(1200) + @timeout_decorator.timeout(1200, use_signals=False) def exploreBinary(simgr): - simgr.explore(find=lambda s: "type" in s.globals) + return simgr.explore(find=lambda s: "type" in s.globals) - exploreBinary(simgr) + simgr = exploreBinary(simgr) if "found" in simgr.stashes and len(simgr.found): end_state = simgr.found[0] run_environ["type"] = end_state.globals["type"] diff --git a/zeratool_lib/overflowDetector.py b/zeratool_lib/overflowDetector.py index 6d71200..6dd2ced 100644 --- a/zeratool_lib/overflowDetector.py +++ b/zeratool_lib/overflowDetector.py @@ -49,13 +49,13 @@ def checkOverflow(binary_name, inputType): # Lame way to do a timeout try: - @timeout_decorator.timeout(120) + @timeout_decorator.timeout(120, use_signals=False) def exploreBinary(simgr): - simgr.explore( + return simgr.explore( find=lambda s: "type" in s.globals, step_func=overflow_detect_filter ) - exploreBinary(simgr) + simgr = exploreBinary(simgr) if "found" in simgr.stashes and len(simgr.found): end_state = simgr.found[0] run_environ["type"] = end_state.globals["type"] diff --git a/zeratool_lib/overflowExploitSender.py b/zeratool_lib/overflowExploitSender.py index 6622f54..8e32943 100644 --- a/zeratool_lib/overflowExploitSender.py +++ b/zeratool_lib/overflowExploitSender.py @@ -1,4 +1,6 @@ import logging +import os +import stat from overflowExploiter import exploitOverflow from pwn import ELF, gdb, process, u32, u64 @@ -14,8 +16,16 @@ def sendExploit( ): send_results = {} + radare2_binary_name = "/radare2_binary" + fin = open(binary_name, "rb") + fout = open(radare2_binary_name, "wb") + fout.write(fin.read()) + fin.close() + fout.close() + os.chmod(radare2_binary_name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + # Create local process - proc = process(binary_name) + proc = process(radare2_binary_name) if debug: gdb.attach( proc, diff --git a/zeratool_lib/overflowExploiter.py b/zeratool_lib/overflowExploiter.py index 0be6868..8478f6a 100644 --- a/zeratool_lib/overflowExploiter.py +++ b/zeratool_lib/overflowExploiter.py @@ -1,5 +1,6 @@ import logging import os +import stat import angr import claripy @@ -54,6 +55,14 @@ def getOneGadget(properties): def exploitOverflow(binary_name, properties, inputType): + radare2_binary_name = "/radare2_binary" + fin = open(binary_name, "rb") + fout = open(radare2_binary_name, "wb") + fout.write(fin.read()) + fin.close() + fout.close() + os.chmod(radare2_binary_name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) + run_environ = properties["pwn_type"].get("results", {}) run_environ["type"] = run_environ.get("type", None) @@ -104,7 +113,7 @@ def exploitOverflow(binary_name, properties, inputType): if inputType == "STDIN": entry_addr = p.loader.main_object.entry if not has_pie: - reg_values = getRegValues(binary_name, entry_addr) + reg_values = getRegValues(radare2_binary_name, entry_addr) state = p.factory.full_init_state( args=argv, add_options=extras, @@ -146,7 +155,7 @@ def exploitOverflow(binary_name, properties, inputType): simgr.explore(find=lambda s: "type" in s.globals, step_func=step_func) try: - @timeout_decorator.timeout(1200) + @timeout_decorator.timeout(1200, use_signals=False) def exploreBinary(simgr): simgr.explore(find=lambda s: "type" in s.globals, step_func=step_func) diff --git a/zeratool_lib/overflowRemoteLeaker.py b/zeratool_lib/overflowRemoteLeaker.py index 7641d02..8b11319 100644 --- a/zeratool_lib/overflowRemoteLeaker.py +++ b/zeratool_lib/overflowRemoteLeaker.py @@ -69,7 +69,7 @@ def leak_remote_functions(binary_name, properties, inputType): # Lame way to do a timeout try: - @timeout_decorator.timeout(1200) + @timeout_decorator.timeout(1200, use_signals=False) def exploreBinary(simgr): simgr.explore( find=lambda s: "libc" in s.globals, step_func=leak_remote_libc_functions From fe9074c3aba48f3d56d685b6f0c99b85c547eb15 Mon Sep 17 00:00:00 2001 From: Alex-Costin Puflene Date: Sat, 16 May 2026 04:14:11 +0300 Subject: [PATCH 2/2] Fixes angr reference issues + format string problems --- zeratool_lib/__init__.py | 37 ++++++++++ zeratool_lib/formatDetector.py | 19 +++-- zeratool_lib/formatExploiter.py | 82 +++++++++++++++++----- zeratool_lib/formatLeak.py | 7 ++ zeratool_lib/overflowDetector.py | 6 +- zeratool_lib/overflowExploitSender.py | 12 +--- zeratool_lib/overflowExploiter.py | 13 +--- zeratool_lib/overflowRemoteLeaker.py | 2 +- zeratool_lib/printf_model.py | 99 +++++++++++++++++++++++++-- zeratool_lib/puts_model.py | 4 +- zeratool_lib/simgr_helper.py | 8 ++- 11 files changed, 233 insertions(+), 56 deletions(-) diff --git a/zeratool_lib/__init__.py b/zeratool_lib/__init__.py index e69de29..9107cfb 100644 --- a/zeratool_lib/__init__.py +++ b/zeratool_lib/__init__.py @@ -0,0 +1,37 @@ +# Pickle-compatible re-exports for the host side of the gRPC round-trip. +# +# The docker container puts /zeratool_lib/zeratool_lib on PYTHONPATH, which +# makes zeratool_lib.py import as a *top-level module* named `zeratool_lib`. +# Classes defined there get __module__ == "zeratool_lib", so the pickled +# exploit returned over gRPC references e.g. `zeratool_lib.ZeratoolExploit`. +# +# On the host, `zeratool_lib` is installed as a *package* (this directory). +# We can't `from zeratool_lib.zeratool_lib import ...` here because that +# submodule does `import formatDetector` etc. (unqualified, docker-only), +# which would explode at import time on the host. So we redefine the small +# pickle-relevant classes here instead. Pickle resolves classes by +# (module, qualname) lookup and rebuilds dataclass/enum instances by field, +# so structural equivalence is enough. +from dataclasses import dataclass +from enum import Enum + + +class ZeratoolInputStreams(Enum): + """Sync names with commons.input_streams.InputStreams.""" + + STDIN = "STDIN" + ARGUMENTS = "ARG" + + +@dataclass +class ZeratoolExploit: + class Outcomes(Enum): + SHELL = "SHELL" + CALL_TO_WIN = "CALL_TO_WIN" + LEAK = "LEAK" + + payload: bytes + outcome: Outcomes + + +__all__ = ["ZeratoolExploit", "ZeratoolInputStreams"] diff --git a/zeratool_lib/formatDetector.py b/zeratool_lib/formatDetector.py index 22334cc..cf7e99e 100644 --- a/zeratool_lib/formatDetector.py +++ b/zeratool_lib/formatDetector.py @@ -56,11 +56,21 @@ def checkFormat(binary_name, inputType): # Lame way to do a timeout try: - @timeout_decorator.timeout(1200, use_signals=False) + # @timeout_decorator.timeout(1200) def exploreBinary(simgr): - return simgr.explore(find=lambda s: "type" in s.globals) + simgr.explore(find=lambda s: "type" in s.globals) - simgr = exploreBinary(simgr) + exploreBinary(simgr) + log.info( + "[stashes] active=%d found=%d errored=%d deadended=%d unconstrained=%d", + len(simgr.active), + len(simgr.found), + len(simgr.errored), + len(simgr.deadended), + len(simgr.unconstrained), + ) + for es in simgr.errored[:5]: + log.info("[errored] addr=%s err=%s", hex(es.state.addr), es.error) if "found" in simgr.stashes and len(simgr.found): end_state = simgr.found[0] run_environ["type"] = end_state.globals["type"] @@ -70,7 +80,8 @@ def exploreBinary(simgr): except (KeyboardInterrupt, timeout_decorator.TimeoutError) as e: print("[~] Format check timed out") - if "input" in end_state.globals.keys(): + # if "input" in end_state.globals.keys(): + if end_state is not None and "input" in end_state.globals.keys(): run_environ["input"] = end_state.globals["input"] print("[+] Triggerable with input : {}".format(end_state.globals["input"])) diff --git a/zeratool_lib/formatExploiter.py b/zeratool_lib/formatExploiter.py index 241c88b..ef31575 100644 --- a/zeratool_lib/formatExploiter.py +++ b/zeratool_lib/formatExploiter.py @@ -158,11 +158,11 @@ def rediscoverAndExploit(binary_name, properties, stack_position, leak_format): # Lame way to do a timeout try: - @timeout_decorator.timeout(1200, use_signals=False) + # @timeout_decorator.timeout(1200) def exploreBinary(simgr): - return simgr.explore(find=lambda s: "type" in s.globals) + simgr.explore(find=lambda s: "type" in s.globals) - simgr = exploreBinary(simgr) + exploreBinary(simgr) if "found" in simgr.stashes and len(simgr.found): end_state = simgr.found[0] run_environ["type"] = end_state.globals["type"] @@ -173,15 +173,18 @@ def exploreBinary(simgr): log.info("[~] Format check timed out") if inputType == "STDIN" and end_state is not None: - stdin_str = str(end_state.posix.dumps(0)) - log.info("[+] Triggerable with STDIN : {}".format(stdin_str)) + # `posix.dumps(0)` already returns bytes. The previous str(...) wrap + # produced a Python repr literal ("b'...'") which broke downstream + # consumers expecting bytes (ZeratoolExploit.payload, hexdump, etc.). + stdin_bytes = end_state.posix.dumps(0) + log.info("[+] Triggerable with STDIN : {!r}".format(stdin_bytes)) - return stdin_str, end_state.globals["fmt_outcome"] + return stdin_bytes, end_state.globals["fmt_outcome"] elif inputType == "ARG" and end_state is not None: - arg_str = str(end_state.solver.eval(arg, cast_to=str)) - run_environ["input"] = arg_str + arg_bytes = end_state.solver.eval(arg, cast_to=bytes) + run_environ["input"] = arg_bytes - return arg_str, end_state.globals["fmt_outcome"] + return arg_bytes, end_state.globals["fmt_outcome"] def get_num_constraints(chop_byte, state): @@ -190,7 +193,7 @@ def get_num_constraints(chop_byte, state): # Do any constraints mention this BV? for constraint in constraints: if any( - chop_byte.structurally_match(x) for x in constraint.recursive_children_asts + chop_byte.structurally_match(x) for x in constraint.children_asts() ): i += 1 # log.info("{} : {} : {}".format(chop_byte,i,state.solver.eval(chop_byte,cast_to=bytes))) @@ -264,10 +267,19 @@ def checkExploitable(self, fmt): var_loc = solv(printf_arg) + if printf_model.format_string_ptr_is_read_only(state.project, var_loc): + log.debug( + "Skipping format exploit hook for read-only format pointer %s", + hex(var_loc), + ) + return False + # Parts of this argument could be symbolic, so we need # to check every byte var_data = state.memory.load(var_loc, max_read_len) var_len = get_max_strlen(state, var_data) + if var_len == 0: + var_len = min(max_read_len, 256) fmt_len = self._sim_strlen(fmt) # if len(state.solver.eval_upto(fmt_len,2)) > 1: @@ -320,8 +332,19 @@ def checkExploitable(self, fmt): state_copy = None results_n = None + # Snapshot the pre-loop state once. Each GOT iteration must start + # from this; otherwise constrainBytes() keeps stacking constraints + # on `state` (which aliases self.state) so the input bytes get + # pinned to iter-0's address (e.g. __gmon_start__) and every later + # solver.eval(user_input) returns that stale payload regardless of + # which GOT entry we now intend to overwrite. + original_state = state.copy() + for got_name, got_addr in list(properties["protections"]["got"].items()): # for got_name,got_addr in [(x,y) for (x,y) in properties['protections']['got'].items() if x in " exit"]: #debug for hard_format + # Reset state to the pre-loop snapshot so this iteration's + # constrainBytes only sees its own constraints. + state = original_state.copy() backup_state = state.copy() log.info("[+] Overwiting {} at {}".format(got_name, hex(got_addr))) @@ -377,9 +400,26 @@ def checkExploitable(self, fmt): results = sendExploit(binary_name, properties, user_input, leak_format) exploit_results = {} if results["success"] == True: - exploit_results["success"] = results["success"] - exploit_results["input"] = user_input + # Promote the working payload onto self.state and bail. + # Without this, the simple-success path used to fall + # through into the (state_copy-gated) verification block, + # which is skipped on the very first successful iteration + # (state_copy is still None) and we'd never return True. + var_value = self.state.memory.load(var_loc, var_value_length) + self.constrainBytes( + self.state, + var_value, + var_loc, + position, + var_value_length, + strVal=format_payload, + ) + log.info("[+] Vulnerable path found {}".format(repr(user_input))) + self.state.globals["type"] = "Format" + self.state.globals["position"] = position + self.state.globals["length"] = greatest_count self.state.globals["fmt_outcome"] = results["fmt_outcome"] + return True else: # Maybe angr still messed up the pointer log.info("[-] Payload launch failed. Fixing angr stack pointer") @@ -387,7 +427,7 @@ def checkExploitable(self, fmt): first_input = state.posix.dumps(0) - end_eip = state.se.eval(state.regs.pc) + end_eip = state.solver.eval(state.regs.pc) last_bb = [ x @@ -543,8 +583,10 @@ def constrainBytes(self, state, symVar, loc, position, length, strVal="%x_"): for i in range(length): strValIndex = i % len(strVal) curr_byte = self.state.memory.load(loc + i, 1).get_byte(0) - constraint = state.se.And(strVal[strValIndex] == curr_byte) - if state.se.satisfiable(extra_constraints=[constraint]): + # `state.solver` doesn't expose `.And`; wrapping a single boolean + # AST in And(...) is a no-op so we just use the comparison directly. + constraint = strVal[strValIndex] == curr_byte + if state.solver.satisfiable(extra_constraints=[constraint]): state.add_constraints(constraint) else: log.info( @@ -553,7 +595,10 @@ def constrainBytes(self, state, symVar, loc, position, length, strVal="%x_"): ) ) - def run(self, _, fmt): + def run(self, fmt): + # See note in printf_model.printFormat.run: angr's printf takes a + # single fmt arg; the upstream `(self, _, fmt)` signature is for + # __printf_chk and was forwarding stack garbage to super().run. if not self.checkExploitable(fmt): return super(type(self), self).run(fmt) @@ -646,7 +691,10 @@ def sendExploit(binary_name, properties, input_string, leak_format): log.info(results) send_results["success"] = False - if not hadIssue and re.match(leak_format, results): + # Same str/bytes guard as formatLeak.checkLeak: no leak target -> no match. + if isinstance(leak_format, str): + leak_format = leak_format.encode() if leak_format else b"" + if not hadIssue and leak_format and re.match(leak_format, results): send_results["success"] = True send_results["payload"] = input_string send_results["fmt_outcome"] = "leak" diff --git a/zeratool_lib/formatLeak.py b/zeratool_lib/formatLeak.py index 0b40658..a6b7f8e 100644 --- a/zeratool_lib/formatLeak.py +++ b/zeratool_lib/formatLeak.py @@ -6,6 +6,13 @@ def checkLeak(binary_name, properties, leak_format) -> bytes: + # No leak target requested (e.g. plain shellcode exploit) -> nothing to do. + # Avoids `re.match("", bytes)` TypeError when the default `""` falls through. + if not leak_format: + return None + if isinstance(leak_format, str): + leak_format = leak_format.encode() + full_string = b"" run_count = 50 diff --git a/zeratool_lib/overflowDetector.py b/zeratool_lib/overflowDetector.py index 6dd2ced..9c60bd8 100644 --- a/zeratool_lib/overflowDetector.py +++ b/zeratool_lib/overflowDetector.py @@ -49,13 +49,13 @@ def checkOverflow(binary_name, inputType): # Lame way to do a timeout try: - @timeout_decorator.timeout(120, use_signals=False) + # @timeout_decorator.timeout(120) def exploreBinary(simgr): - return simgr.explore( + simgr.explore( find=lambda s: "type" in s.globals, step_func=overflow_detect_filter ) - simgr = exploreBinary(simgr) + exploreBinary(simgr) if "found" in simgr.stashes and len(simgr.found): end_state = simgr.found[0] run_environ["type"] = end_state.globals["type"] diff --git a/zeratool_lib/overflowExploitSender.py b/zeratool_lib/overflowExploitSender.py index 8e32943..6622f54 100644 --- a/zeratool_lib/overflowExploitSender.py +++ b/zeratool_lib/overflowExploitSender.py @@ -1,6 +1,4 @@ import logging -import os -import stat from overflowExploiter import exploitOverflow from pwn import ELF, gdb, process, u32, u64 @@ -16,16 +14,8 @@ def sendExploit( ): send_results = {} - radare2_binary_name = "/radare2_binary" - fin = open(binary_name, "rb") - fout = open(radare2_binary_name, "wb") - fout.write(fin.read()) - fin.close() - fout.close() - os.chmod(radare2_binary_name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - # Create local process - proc = process(radare2_binary_name) + proc = process(binary_name) if debug: gdb.attach( proc, diff --git a/zeratool_lib/overflowExploiter.py b/zeratool_lib/overflowExploiter.py index 8478f6a..dda0e67 100644 --- a/zeratool_lib/overflowExploiter.py +++ b/zeratool_lib/overflowExploiter.py @@ -1,6 +1,5 @@ import logging import os -import stat import angr import claripy @@ -55,14 +54,6 @@ def getOneGadget(properties): def exploitOverflow(binary_name, properties, inputType): - radare2_binary_name = "/radare2_binary" - fin = open(binary_name, "rb") - fout = open(radare2_binary_name, "wb") - fout.write(fin.read()) - fin.close() - fout.close() - os.chmod(radare2_binary_name, stat.S_IRWXU | stat.S_IRWXG | stat.S_IRWXO) - run_environ = properties["pwn_type"].get("results", {}) run_environ["type"] = run_environ.get("type", None) @@ -113,7 +104,7 @@ def exploitOverflow(binary_name, properties, inputType): if inputType == "STDIN": entry_addr = p.loader.main_object.entry if not has_pie: - reg_values = getRegValues(radare2_binary_name, entry_addr) + reg_values = getRegValues(binary_name, entry_addr) state = p.factory.full_init_state( args=argv, add_options=extras, @@ -155,7 +146,7 @@ def exploitOverflow(binary_name, properties, inputType): simgr.explore(find=lambda s: "type" in s.globals, step_func=step_func) try: - @timeout_decorator.timeout(1200, use_signals=False) + # @timeout_decorator.timeout(1200) def exploreBinary(simgr): simgr.explore(find=lambda s: "type" in s.globals, step_func=step_func) diff --git a/zeratool_lib/overflowRemoteLeaker.py b/zeratool_lib/overflowRemoteLeaker.py index 8b11319..eab111e 100644 --- a/zeratool_lib/overflowRemoteLeaker.py +++ b/zeratool_lib/overflowRemoteLeaker.py @@ -69,7 +69,7 @@ def leak_remote_functions(binary_name, properties, inputType): # Lame way to do a timeout try: - @timeout_decorator.timeout(1200, use_signals=False) + # @timeout_decorator.timeout(1200) def exploreBinary(simgr): simgr.explore( find=lambda s: "libc" in s.globals, step_func=leak_remote_libc_functions diff --git a/zeratool_lib/printf_model.py b/zeratool_lib/printf_model.py index 49bf234..c6f0030 100644 --- a/zeratool_lib/printf_model.py +++ b/zeratool_lib/printf_model.py @@ -9,6 +9,31 @@ log = logging.getLogger(__name__) +# ELF sh_flags: SHF_WRITE — literals live in sections without this bit. +_SHF_WRITE = 0x1 + + +def format_string_ptr_is_read_only(project, ptr): + """ + True if ptr is in a non-writable ELF section (e.g. .rodata). + + Calls like printf("Hello ") use such pointers; they are never the + attacker-controlled buffer from read()/argv, so the format-string check + should skip them and let execution reach printf(user_buf). + """ + sec = project.loader.find_section_containing(ptr) + if sec is None: + return False + flags = getattr(sec, "flags", None) + if flags is not None: + try: + return (int(flags) & _SHF_WRITE) == 0 + except (TypeError, ValueError): + pass + name = getattr(sec, "name", "") or "" + return name.startswith(".rodata") or name in (".rdata",) + + # Better symbolic strlen def get_max_strlen(state, value): i = 0 @@ -63,15 +88,46 @@ def checkExploitable(self, fmt): # state.add_constraints(fmt_len == max_read_len) if len(self.arguments) <= i: + log.info("[entry] printFormat: no argument at index %d", i) return False printf_arg = self.arguments[i] var_loc = solv(printf_arg) + try: + head_bv = state.memory.load(var_loc, 8) + head = solv(head_bv, cast_to=bytes) + head_sym = any( + state.memory.load(var_loc + b, 1).symbolic for b in range(8) + ) + stdin_head = state.posix.dumps(0)[:16] + log.info( + "[entry] printFormat hit: ptr=%s head=%r head_has_symbolic=%s stdin[:16]=%r", + hex(var_loc), + head, + head_sym, + stdin_head, + ) + except Exception as e: + log.info("[entry] printFormat hit: ptr=%s diag_error=%s", hex(var_loc), e) + + if format_string_ptr_is_read_only(state.project, var_loc): + log.info( + "[entry] skipping read-only format pointer %s", hex(var_loc) + ) + return False + # Parts of this argument could be symbolic, so we need # to check every byte var_data = state.memory.load(var_loc, max_read_len) var_len = get_max_strlen(state, var_data) + log.info("[entry] get_max_strlen returned %d for ptr=%s", var_len, hex(var_loc)) + # Zero-init stack buffers (e.g. char buf[256] = {0}) make byte 0 a + # concrete NUL, so get_max_strlen returns 0 before read() merges in + # stdin — scan a bounded window for symbolic bytes anyway. + if var_len == 0: + var_len = min(max_read_len, 256) + log.info("[entry] forcing var_len=%d for diagnostic scan", var_len) fmt_len = self._sim_strlen(fmt) # if len(state.solver.eval_upto(fmt_len,2)) > 1: @@ -111,6 +167,30 @@ def checkExploitable(self, fmt): position = i - 1 - count # previous position minus greatest count count = 0 + + try: + log.info(f"\n--- DEBUG: Inspecting Buffer at {hex(var_loc)} ---") + + # 1. Dump what the program has read from STDIN so far + stdin_dump = state.posix.dumps(0) + log.info(f"[DEBUG] Full STDIN content: {stdin_dump}") + + # 2. Dump the specific string argument passed to printf + # We load 32 bytes just to see what follows, even if var_len is 0 + raw_memory = state.memory.load(var_loc, 32) + concrete_string = state.solver.eval(raw_memory, cast_to=bytes) + log.info(f"[DEBUG] String passed to printf: {concrete_string}") + + # 3. Check the symbolic constraints on the first byte + first_byte = state.memory.load(var_loc, 1) + can_be_null = state.solver.satisfiable(extra_constraints=[first_byte == 0]) + must_be_null = not state.solver.satisfiable(extra_constraints=[first_byte != 0]) + log.info(f"[DEBUG] First byte can be NULL? {can_be_null}") + log.info(f"[DEBUG] First byte MUST be NULL? {must_be_null}") + log.info("---------------------------------------------------\n") + except Exception as e: + log.info(f"[DEBUG] Error dumping state: {e}") + log.info( "[+] Found symbolic buffer at position {} of length {}".format( position, greatest_count @@ -121,8 +201,15 @@ def checkExploitable(self, fmt): str_val = b"%lx_" if bits == 64: str_val = b"%llx_" + # Only constrain the symbolic run we just measured. Using `var_len` + # (max strlen) ran past stdin's filled length into concrete zero + # bytes (e.g. tail of `name[256] = {0}`), making the byte-by-byte + # check fail and dropping the path to the real printf with a + # symbolic fmt -> "Symbolic (format) string, game over". + constrain_loc = var_loc + position + constrain_len = greatest_count if self.can_constrain_bytes( - state, var_data, var_loc, position, var_len, strVal=str_val + state, var_data, constrain_loc, position, constrain_len, strVal=str_val ): log.info("[+] Can constrain bytes") log.info("[+] Constraining input to leak") @@ -130,9 +217,9 @@ def checkExploitable(self, fmt): self.constrainBytes( state, var_data, - var_loc, + constrain_loc, position, - var_len, + constrain_len, strVal=str_val, ) # Verify solution @@ -199,7 +286,11 @@ def constrainBytes(self, state, symVar, loc, position, length, strVal=b"%x_"): "[~] Byte {} not constrained to {}".format(i, strVal[strValIndex]) ) - def run(self, _, fmt): + def run(self, fmt): + # NOTE: angr's printf.run is `run(self, fmt)`. The original Zeratool + # used `(self, _, fmt)` which is the signature for __printf_chk and + # makes the SimProcedure pull a second CC arg (stack garbage). That + # garbage was then forwarded to super().run, often crashing the path. if not self.checkExploitable(fmt): return super(type(self), self).run(fmt) diff --git a/zeratool_lib/puts_model.py b/zeratool_lib/puts_model.py index 96c6269..c461338 100644 --- a/zeratool_lib/puts_model.py +++ b/zeratool_lib/puts_model.py @@ -1,6 +1,7 @@ import logging import angr +import claripy from pwn import * log = logging.getLogger(__name__) @@ -41,6 +42,5 @@ def run(self, string): strlen = angr.SIM_PROCEDURES["libc"]["strlen"] length = self.inline_call(strlen, string).ret_expr out = stdout.write(string, length) - stdout.write_data(self.state.solver.BVV(b"\n")) - return out + 1 + stdout.write_data(claripy.BVV(0x0A, 8)) return out + 1 diff --git a/zeratool_lib/simgr_helper.py b/zeratool_lib/simgr_helper.py index 3f9f7d1..3d46deb 100644 --- a/zeratool_lib/simgr_helper.py +++ b/zeratool_lib/simgr_helper.py @@ -29,7 +29,7 @@ def constrainToAddress(state, sym_val, addr, endian="little"): for i in range(bits / 8): curr_byte = sym_val.get_byte(i) constraint = claripy.And(curr_byte == padded_addr[i]) - if state.se.satisfiable(extra_constraints=[constraint]): + if state.solver.satisfiable(extra_constraints=[constraint]): constraints.append(constraint) return constraints @@ -429,7 +429,9 @@ def point_to_shellcode_filter(simgr): # Setup shellcode memory = state.memory.load(address, len(shellcode)) - shellcode_bvv = state.solver.BVV(shellcode) + shellcode_bvv = claripy.BVV( + int.from_bytes(shellcode, "little"), len(shellcode) * 8 + ) if "leaked_type" in state.globals: log.info("We have a leak, let's try and use that") @@ -1116,7 +1118,7 @@ def get_num_constraints(chop_byte, state): # Do any constraints mention this BV? for constraint in constraints: if any( - chop_byte.structurally_match(x) for x in constraint.recursive_children_asts + chop_byte.structurally_match(x) for x in constraint.children_asts() ): i += 1 # log.info("{} : {} : {}".format(chop_byte,i,state.solver.eval(chop_byte,cast_to=bytes)))