diff --git a/attack_surface_approximation/cli.py b/attack_surface_approximation/cli.py index 1d0984c..9e9b988 100644 --- a/attack_surface_approximation/cli.py +++ b/attack_surface_approximation/cli.py @@ -1,198 +1,246 @@ -import typing - -import click -from rich import print # pylint: disable=redefined-builtin -from rich.table import Table - -from attack_surface_approximation.arguments_fuzzing import ( - ArgumentsFuzzer, - ArgumentsPair, -) -from attack_surface_approximation.dictionaries_generators import ( - ArgumentsGenerator, -) -from attack_surface_approximation.static_input_streams_detection import ( - InputStreamsDetector, -) -from commons.input_streams import InputStreams - - -@click.group() -def cli() -> None: - """Discovers the attack surface of vulnerable programs.""" - - -@cli.command(help="Generate dictionaries with arguments, based on heuristics.") -@click.option( - "--elf", - type=click.Path(exists=True, readable=True), - help="ELF Executable", -) -@click.option( - "--heuristic", - type=click.Choice( - ArgumentsGenerator.get_available_heuristics(), - case_sensitive=False, - ), - required=True, - help="Generation heuristic", -) -@click.option( - "--output", - type=click.Path(exists=False, writable=True), - required=True, - help="Output filename", -) -@click.option( - "--top", - type=int, - required=False, - default=0, - help=( - "Number indicating how much arguments are returned after sorting by" - " frequency" - ), -) -def generate(heuristic: str, output: str, top: int, elf: str = None) -> None: - generator = ArgumentsGenerator() - generator.generate(heuristic, elf) - arguments_count = generator.dump(output, top_count=top) - - print( - f"Successfully generated dictionary with {arguments_count} arguments" - ) - - -@cli.command( - help="Statically detect what input streams are used by an executable." -) -@click.option( - "--elf", - type=click.Path(exists=True, readable=True), - required=True, - help="ELF Executable", -) -def detect(elf: str) -> None: - detector = InputStreamsDetector(elf) - streams = detector.detect_all() - - print_detected_streams(streams) - - -def print_detected_streams(streams: InputStreams) -> None: - if not any(streams): - print_no_detected_stream() - else: - print_multiple_detected_streams(streams) - - -def print_no_detected_stream() -> None: - print("No input mechanism was detected for the given program.") - - -def print_multiple_detected_streams(streams: dict) -> None: - print("Several input mechanisms were detected for the given program:\n") - - table = build_detected_streams_table(streams) - print(table) - - -@cli.command(help="Fuzz the arguments of an executable.") -@click.option( - "--elf", - type=click.Path(exists=True, readable=True), - required=True, - help="ELF Executable", -) -@click.option( - "--dictionary", - type=click.Path(exists=True, readable=True), - required=True, - help="Arguments dictionary", -) -def fuzz(elf: str, dictionary: str) -> None: - generator = ArgumentsGenerator() - generator.load(dictionary) - possible_arguments = generator.get_arguments() - - fuzzer = ArgumentsFuzzer(elf, possible_arguments) - actual_arguments = fuzzer.get_all_valid_arguments() - - print_arguments(actual_arguments) - - -def print_arguments(arguments: typing.List[ArgumentsPair]) -> None: - if not arguments: - print_no_detected_argument() - else: - print_multiple_detected_arguments(arguments) - - -def print_no_detected_argument() -> None: - print("No argument was detected for the given program.") - - -def print_multiple_detected_arguments( - arguments: typing.List[ArgumentsPair], -) -> None: - print("Several arguments were detected for the given program:\n") - - table = build_arguments_table(arguments) - print(table) - - -def build_arguments_table(arguments: typing.List[ArgumentsPair]) -> Table: - table = Table() - table.add_column("Argument") - table.add_column("Role", justify="center") - - for argument in arguments: - argument_str = argument.to_str() - - roles_str = [role.name for role in argument.valid_roles] - roles = ", ".join(roles_str) - - table.add_row(argument_str, roles) - - return table - - -def build_detected_streams_table(streams: dict) -> Table: - table = Table() - - table.add_column("Stream") - table.add_column("Present", justify="center") - - for stream in InputStreams: - is_present = "Yes" if stream in streams else "No" - table.add_row(stream.name, is_present) - - return table - - -@cli.command(help="Analyze with all methods.") -@click.option( - "--elf", - type=click.Path(exists=True, readable=True), - required=True, - help="ELF Executable", -) -@click.option( - "--dictionary", - type=click.Path(exists=True, readable=True), - required=True, - help="Arguments dictionary", -) -@click.pass_context -def analyze(ctx: click.Context, elf: str, dictionary: str) -> None: - ctx.invoke(detect, elf=elf) - print("") - ctx.invoke(fuzz, elf=elf, dictionary=dictionary) - - -def main() -> None: - cli(prog_name="attack_surface_approximation") - - -if __name__ == "__main__": - main() +import typing +from commons.ghidra import GhidraAnalysis +import os +import click +from rich import print # pylint: disable=redefined-builtin +from rich.table import Table +from attack_surface_approximation.detector import ( + analyze_valid_code, + print_analysis, + get_func_names, + print_intro, + find_all_vulns, + extract_dynamic_functions +) + +from attack_surface_approximation.arguments_fuzzing import ( + ArgumentsFuzzer, + ArgumentsPair, +) +from attack_surface_approximation.dictionaries_generators import ( + ArgumentsGenerator, +) +from attack_surface_approximation.static_input_streams_detection import ( + InputStreamsDetector, +) +from commons.input_streams import InputStreams + + +@click.group() +def cli() -> None: + """Discovers the attack surface of vulnerable programs.""" + + +@cli.command(help="Generate dictionaries with arguments, based on heuristics.") +@click.option( + "--elf", + type=click.Path(exists=True, readable=True), + help="ELF Executable", +) +@click.option( + "--heuristic", + type=click.Choice( + ArgumentsGenerator.get_available_heuristics(), + case_sensitive=False, + ), + required=True, + help="Generation heuristic", +) +@click.option( + "--output", + type=click.Path(exists=False, writable=True), + required=True, + help="Output filename", +) +@click.option( + "--top", + type=int, + required=False, + default=0, + help=( + "Number indicating how much arguments are returned after sorting by" + " frequency" + ), +) +def generate(heuristic: str, output: str, top: int, elf: str = None) -> None: + generator = ArgumentsGenerator() + generator.generate(heuristic, elf) + arguments_count = generator.dump(output, top_count=top) + + print( + f"Successfully generated dictionary with {arguments_count} arguments" + ) + + +@cli.command( + help="Statically detect what input streams are used by an executable." +) +@click.option( + "--elf", + type=click.Path(exists=True, readable=True), + required=True, + help="ELF Executable", +) +def detect(elf: str) -> None: + detector = InputStreamsDetector(elf) + streams = detector.detect_all() + + print_detected_streams(streams) + + +def print_detected_streams(streams: InputStreams) -> None: + if not any(streams): + print_no_detected_stream() + else: + print_multiple_detected_streams(streams) + + +def print_no_detected_stream() -> None: + print("No input mechanism was detected for the given program.") + + +def print_multiple_detected_streams(streams: dict) -> None: + print("Several input mechanisms were detected for the given program:\n") + + table = build_detected_streams_table(streams) + print(table) + + +@cli.command(help="Fuzz the arguments of an executable.") +@click.option( + "--elf", + type=click.Path(exists=True, readable=True), + required=True, + help="ELF Executable", +) +@click.option( + "--dictionary", + type=click.Path(exists=True, readable=True), + required=True, + help="Arguments dictionary", +) +def fuzz(elf: str, dictionary: str) -> None: + generator = ArgumentsGenerator() + generator.load(dictionary) + possible_arguments = generator.get_arguments() + + fuzzer = ArgumentsFuzzer(elf, possible_arguments) + actual_arguments = fuzzer.get_all_valid_arguments() + + print_arguments(actual_arguments) + + +def print_arguments(arguments: typing.List[ArgumentsPair]) -> None: + if not arguments: + print_no_detected_argument() + else: + print_multiple_detected_arguments(arguments) + + +def print_no_detected_argument() -> None: + print("No argument was detected for the given program.") + + +def print_multiple_detected_arguments( + arguments: typing.List[ArgumentsPair], +) -> None: + print("Several arguments were detected for the given program:\n") + + table = build_arguments_table(arguments) + print(table) + + +def build_arguments_table(arguments: typing.List[ArgumentsPair]) -> Table: + table = Table() + table.add_column("Argument") + table.add_column("Role", justify="center") + + for argument in arguments: + argument_str = argument.to_str() + + roles_str = [role.name for role in argument.valid_roles] + roles = ", ".join(roles_str) + + table.add_row(argument_str, roles) + + return table + + +def build_detected_streams_table(streams: dict) -> Table: + table = Table() + + table.add_column("Stream") + table.add_column("Present", justify="center") + + for stream in InputStreams: + is_present = "Yes" if stream in streams else "No" + table.add_row(stream.name, is_present) + + return table + + +@cli.command(help="Analyze with all methods.") +@click.option( + "--elf", + type=click.Path(exists=True, readable=True), + required=True, + help="ELF Executable", +) +@click.option( + "--dictionary", + type=click.Path(exists=True, readable=True), + required=True, + help="Arguments dictionary", +) +@click.pass_context +def analyze(ctx: click.Context, elf: str, dictionary: str) -> None: + ctx.invoke(detect, elf=elf) + print("") + ctx.invoke(fuzz, elf=elf, dictionary=dictionary) + +@cli.command(name="ast-check", help="Detecting CWE-134 Vulnerabilities using the source code") +@click.option( + "--source", + type=click.Path(exists=True, readable=True), + required=True, + help="C source code to analyze" +) +def ast_check(source): + vulns = analyze_valid_code(source) + print_analysis(vulns, source) + + if vulns: + raise SystemExit(1) + +@cli.command(name="elf-check", help="Detecting CWE-134 Vulnerabilities using the elf executable") +@click.option( + "--elf", + type=click.Path(exists=True, readable=True), + required=True, + help="Elf Executable to analyze" +) +def elf_check(elf): + + elf = os.path.abspath(elf) + + print_intro(elf) + + extract_dynamic_functions(elf) + + func_names = get_func_names(elf) + + analysis = GhidraAnalysis(elf) + + vulns = find_all_vulns(analysis, func_names) + + print_analysis(vulns) + + + + +def main() -> None: + cli(prog_name="attack_surface_approximation") + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/attack_surface_approximation/configuration.py b/attack_surface_approximation/configuration.py index 78bb875..20b945c 100644 --- a/attack_surface_approximation/configuration.py +++ b/attack_surface_approximation/configuration.py @@ -1,3 +1,59 @@ +class Colors: + RED = "\033[1;31m" + GREEN = "\033[1;32m" + YELLOW = "\033[1;33m" + CYAN = "\033[1;36m" + BOLD = "\033[1m" + RESET = "\033[0m" + +class StaticAnalysisConfig: + FORMAT_STRING_FUNCTIONS = ['printf', 'fprintf', 'sprintf', 'snprintf', 'dprintf'] + + # Aceasta lista va fi populata si dinamic cu nm -D in timpul rularii + LIBC_EXACT_NAMES = [ + 'deregister_tm_clones', 'register_tm_clones', 'frame_dummy' + ] + + GHIDRA_TYPE_REPLACEMENTS = [ + ("undefined8", "long long"), + ("undefined4", "int"), + ("undefined2", "short"), + ("undefined1", "char"), + ("undefined", "char"), + ("ulonglong", "unsigned long long"), + ("longlong", "long long"), + ("ulong", "unsigned long"), + ("uint", "unsigned int"), + ("ushort", "unsigned short"), + ("qword", "unsigned long long"), + ("dword", "unsigned int"), + ("word", "unsigned short"), + ("byte", "unsigned char"), + ("bool", "int"), + ("code", "void"), + ] + + GHIDRA_PREAMBLE = """ +typedef int size_t; +typedef long ssize_t; +typedef unsigned int mode_t; +typedef int pid_t; +typedef int __gid_t; +typedef int __uid_t; +typedef int __pid_t; +typedef int FILE; +typedef void va_list; +typedef long time_t; +typedef int wint_t; +typedef struct { int dummy; } sockaddr; +int printf(char *fmt, ...); +int fprintf(int stream, char *fmt, ...); +int sprintf(char *str, char *fmt, ...); +int snprintf(char *str, int size, char *fmt, ...); +int dprintf(int fd, char *fmt, ...); +""" + GHIDRA_PREAMBLE_LEN = len(GHIDRA_PREAMBLE.splitlines()) + class Configuration: class Fuzzer: GENERATE_RANDOM_BASELINE_ARGUMENTS = False diff --git a/attack_surface_approximation/detector.py b/attack_surface_approximation/detector.py new file mode 100644 index 0000000..a977342 --- /dev/null +++ b/attack_surface_approximation/detector.py @@ -0,0 +1,235 @@ +import sys +import re +import subprocess +from pycparser import c_parser, c_ast +from attack_surface_approximation.configuration import Colors, StaticAnalysisConfig + +class FormatStringVisitor(c_ast.NodeVisitor): + def __init__(self): + self.directives = 0 + self.vulnerabilities = [] + self.currFunc = "Global" + + def visit_FuncDef(self, node: c_ast.FuncDef): + parentFunc = node.decl.name + + oldFunc = self.currFunc + self.currFunc = parentFunc + + self.generic_visit(node) + self.currFunc = oldFunc + + def visit_FuncCall(self, node: c_ast.FuncCall): + funcName = get_func_name(node) + if funcName in StaticAnalysisConfig.FORMAT_STRING_FUNCTIONS: + vuln_arg = get_args(node, funcName) + if vuln_arg is not None and not isinstance(vuln_arg, c_ast.Constant): + self.vulnerabilities.append( + { + "function": funcName, + "containing_function": self.currFunc, + "arg_type": type(vuln_arg).__name__, + "line": node.coord.line if node.coord else "?", + "column": node.coord.column if node.coord else "?" + } + ) + self.generic_visit(node) + +def is_user_function(func_name): + if func_name.startswith("_") and not func_name.startswith("_Z"): + return False + if "@" in func_name: + return False + if "." in func_name: + return False + if func_name in StaticAnalysisConfig.LIBC_EXACT_NAMES: + return False + return True + +def extract_dynamic_functions(elf): + functions = set() + + try: + nm_result = subprocess.run(['nm', '-D', elf], capture_output=True, text=True, check=True) + lines = nm_result.stdout.splitlines() + for line in lines: + symbol = line.split() + if symbol[0] == 'U': + func_name = symbol[1].split('@') + functions.add(func_name[0]) + StaticAnalysisConfig.LIBC_EXACT_NAMES.extend(list(functions)) + + except subprocess.CalledProcessError: + print(f"{Colors.RED}[~]{Colors.RESET} nm -D nu a gasit simboluri dinamice (probabil executabil static).") + except FileNotFoundError: + print(f"{Colors.RED}[!]{Colors.RESET} Eroare: Utilitarul 'nm' nu este instalat pe sistemul tau Linux.") + +def get_func_names(elf_path): + from elftools.elf.elffile import ELFFile + from elftools.elf.sections import SymbolTableSection + functions = [] + with open(elf_path, 'rb') as file: + elf = ELFFile(file) + for section in elf.iter_sections(): + if not isinstance(section, SymbolTableSection): + continue + for symbol in section.iter_symbols(): + if symbol['st_info']['type'] == 'STT_FUNC' and symbol.name: + if is_user_function(symbol.name): + functions.append(symbol.name) + return functions + + + +def sanitize_ghidra_code(ghidra_code): + lines = [] + for line in ghidra_code.splitlines(): + if "/* WARNING" in line: + continue + lines.append(line + "\n") + code = "".join(lines) + + code = re.sub(r"__attribute__\s*\(\(.*?\)\)", "", code) + + code = re.sub(r"\b__extension__\b", "", code) + for ghidra_type, c_type in StaticAnalysisConfig.GHIDRA_TYPE_REPLACEMENTS: + code = re.sub(r"\b" + re.escape(ghidra_type) + r"\b", c_type, code) + + return code + +def analyze_ghidra_func(code_func): + sanitized = sanitize_ghidra_code(code_func) + + full_code = StaticAnalysisConfig.GHIDRA_PREAMBLE + "\n" + sanitized + + parser = c_parser.CParser() + try: + ast = parser.parse(full_code, filename="") + except Exception as e: + print(f"{Colors.YELLOW}[~]{Colors.RESET} Skipping unparseable function. Error: {e}") + return [] + visitor = FormatStringVisitor() + visitor.visit(ast) + if len(visitor.vulnerabilities) == 0: + print(f"{Colors.GREEN}[✔]{Colors.RESET} Done — {len(visitor.vulnerabilities)} findings\n") + else: + print(f"{Colors.YELLOW}[!]{Colors.RESET} Done — {len(visitor.vulnerabilities)} finding(s)\n") + return visitor.vulnerabilities + + +def find_all_vulns(analysis, func_names): + vulns = [] + print(f"{Colors.CYAN}[!]{Colors.RESET} Total Functions to be scanned: {Colors.BOLD}{len(func_names)}{Colors.RESET}\n") + for func_name in func_names: + code_func = analysis.decompile_function(func_name) + if code_func.strip(): + print(f"{Colors.CYAN}[*]{Colors.RESET} Scanning function: {Colors.BOLD}{func_name}{Colors.RESET}") + func_vulns = analyze_ghidra_func(code_func) + vulns.extend(func_vulns) + return vulns + + +def get_args(node: c_ast.FuncCall, funcName): + if node.args is None: + return None + if funcName == "printf": + if len(node.args.exprs) < 1: + return None + return node.args.exprs[0] + + if funcName in ("fprintf", "sprintf", "dprintf"): + if len(node.args.exprs) < 2: + return None + return node.args.exprs[1] + + if funcName in ("snprintf"): + if len(node.args.exprs) < 3: + return None + return node.args.exprs[2] + + +def get_func_name(node: c_ast.FuncCall): + func = node.name + if isinstance(func, c_ast.ID): + return func.name + return "" + + +def read_c_file(filepath): + valid_code = "" + with open(filepath, 'r') as Cfile: + for line in Cfile: + l = line + if l.strip().startswith("#") or l.strip().startswith("//"): + + continue + valid_code += l + return valid_code + +def analyze_valid_code(filepath): + + valid_code = read_c_file(filepath) + parser = c_parser.CParser() + + ast = parser.parse(valid_code, filepath) + + visitor = FormatStringVisitor() + + visitor.visit(ast) + + return visitor.vulnerabilities + +def print_analysis(vulns): + + if not vulns: + print(f"{Colors.BOLD}{Colors.GREEN} [✔] No CWE-134 format-string vulnerabilities detected.{Colors.RESET}\n") + else: + print(f"{Colors.BOLD}{Colors.RED}[!] {len(vulns)} CWE-134 vulnebilities detected:{Colors.RESET}") + + print("\n") + + for i, vuln in enumerate(vulns, start=1): + containing_func = vuln.get("containing_function", "") + func = vuln.get("function", "") + arg_type = vuln.get("arg_type", "") + line = vuln.get("line", "") + column = vuln.get("column", "") + print( + f"{Colors.BOLD}{Colors.YELLOW} [{i}]{Colors.RESET} " + f"{Colors.BOLD}{func}(){Colors.RESET} " + f"in function {Colors.BOLD}{Colors.CYAN}'{containing_func}'{Colors.RESET} " + f"at line {Colors.BOLD}{line - StaticAnalysisConfig.GHIDRA_PREAMBLE_LEN}{Colors.RESET}, " + f"col {Colors.BOLD}{column}{Colors.RESET}" + ) + print(f" Format arg is a {Colors.RED}{arg_type}{Colors.RESET} (not a string) => {Colors.BOLD} CWE-134 {Colors.RESET}") + print("\n") + + print(f"{Colors.BOLD}{Colors.RED} [X]{Colors.RESET} How to Fix: Always use a constant format string.") + print(f" Ex: {Colors.BOLD}{Colors.GREEN}printf(\"%s\", user_input);{Colors.RESET}") + print(f" Instead of: {Colors.BOLD}{Colors.RED}printf(user_input);{Colors.RESET}\n") + +def print_intro(filepath): + + print("\n") + print(f"{Colors.BOLD}{Colors.CYAN} ╔══════════════════════════════════════════════╗{Colors.RESET}") + print(f"{Colors.BOLD}{Colors.CYAN} ║ OpenCRS - CWE-134 Detector ║{Colors.RESET}") + print(f"{Colors.BOLD}{Colors.CYAN} ╚══════════════════════════════════════════════╝{Colors.RESET}") + + print("\n\n") + + print(f"{Colors.BOLD}{Colors.CYAN}[*]{Colors.RESET} Target: {Colors.BOLD}{filepath}{Colors.RESET}") + + +def main(): + if len(sys.argv) < 2: + print("Usage: python3 detector.py ") + return; + + print_intro(sys.argv[1]) + + vulns = analyze_valid_code(sys.argv[1]) + print_analysis(vulns) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/examples/test_cwe134 b/examples/test_cwe134 new file mode 100644 index 0000000..0a5c770 Binary files /dev/null and b/examples/test_cwe134 differ diff --git a/examples/test_cwe134.c b/examples/test_cwe134.c new file mode 100644 index 0000000..99fab06 --- /dev/null +++ b/examples/test_cwe134.c @@ -0,0 +1,16 @@ +#include + +int main(int argc, char *argv[]) { + char dest_buffer[256]; + + if (argc < 2) { + printf("Usage: %s \n", argv[0]); + return 1; + } + + printf(argv[1]); + sprintf(dest_buffer, argv[1]); + snprintf(dest_buffer, 100, argv[1]); + // Test Comment + return 0; +} \ No newline at end of file