import argparse import importlib import sys import json from jinja2 import TemplateError from jinja2.sandbox import SandboxedEnvironment from types import ModuleType, FunctionType, MethodType from contextlib import contextmanager def safe_import(module_name): try: module = importlib.import_module(module_name) return module, None except ImportError as e: return None, f"Failed to import module '{module_name}': {str(e)}" def enumerate_object(obj, depth=0, max_depth=3, path=None, visited=None): if path is None: path = [] if visited is None: visited = set() obj_id = id(obj) if obj_id in visited: return None, "Cycle detected" visited.add(obj_id) try: result = { 'path': '.'.join(path), 'attributes': dir(obj)[:50], 'globals': list(getattr(obj, '__globals__', {}).keys())[:50] if hasattr(obj, '__globals__') else None, 'subclasses': [cls.__name__ for cls in obj.__subclasses__()][:50] if hasattr(obj, '__subclasses__') else None } if depth < max_depth: result['nested'] = {} skip_attrs = {'__class__', '__base__', '__bases__', '__mro__'} for attr in dir(obj)[:10]: if attr in skip_attrs: continue try: value = getattr(obj, attr) if isinstance(value, (ModuleType, type)) or hasattr(value, '__dict__') or hasattr(value, '__globals__'): nested_result, error = enumerate_object(value, depth + 1, max_depth, path + [attr], visited) result['nested'][attr] = nested_result if nested_result else 'Inaccessible' except: result['nested'][attr] = 'Inaccessible' return result, None except Exception as e: return None, f"Error enumerating object: {str(e)}" def discover_rce_vectors(context, framework="jinja2"): dangerous_modules = [ 'os', 'subprocess', 'sys', 'shutil', 'platform', 'ctypes', 'pickle', 'code', 'builtins', 'threading', 'multiprocessing', 'socket', 'webbrowser', 'tempfile', 'asyncio', 'signal', 'popen2' ] dangerous_functions = [ 'popen', 'system', 'exec', 'eval', 'execfile', 'compile', 'open', 'execvp', 'execl', 'execle', 'execv', 'execlp', 'call', 'run', 'communicate', 'load', 'getattr', 'setattr', '__import__', 'check_call', 'check_output', 'getoutput', 'getstatusoutput' ] dangerous_classes = [ 'Popen', '_wrap_close', 'InteractiveConsole', 'CodeType', 'Process', 'Thread', 'TCPServer', 'UDPServer', 'AsyncIOEventLoop' ] execution_keywords = ['exec', 'run', 'call', 'eval', 'system', 'load', 'spawn', 'fork'] rce_vectors = [] def check_path(obj, obj_name, path=None): if path is None: path = [obj_name] try: enumeration, _ = enumerate_object(obj, path=path) if not enumeration: return if enumeration.get('globals'): for mod in enumeration['globals']: if mod in dangerous_modules: rce_vectors.append({ 'path': '.'.join(path + ['__globals__', mod]), 'type': 'potentially dangerous module, investigate manually', 'details': f"access to '{mod}' module" }) for attr in enumeration['attributes']: if attr in dangerous_functions or any(keyword in attr.lower() for keyword in execution_keywords): try: value = getattr(obj, attr) if isinstance(value, (FunctionType, MethodType)): rce_vectors.append({ 'path': '.'.join(path + [attr]), 'type': 'potentially dangerous function, investigate manually', 'details': f"access to '{attr}' function" }) except: pass if enumeration.get('subclasses'): for cls in enumeration['subclasses']: if cls in dangerous_classes or any(keyword in cls.lower() for keyword in execution_keywords): rce_vectors.append({ 'path': '.'.join(path + ['__subclasses__', cls]), 'type': 'potentially dangerous class, investigate manually', 'details': f"access to {cls}" }) for attr, nested in enumeration.get('nested', {}).items(): if nested != 'Inaccessible': try: value = getattr(obj, attr) check_path(value, attr, path + [attr]) except: pass except: pass for key, value in context.items(): if value is not None: check_path(value, key) return rce_vectors @contextmanager def flask_context(): from flask import Flask, request app = Flask(__name__) with app.test_request_context(): yield { 'request': request, 'config': app.config, 'self': None } @contextmanager def django_context(): from django.http import HttpRequest from django.test import RequestFactory from django.conf import settings if not settings.configured: settings.configure(DEFAULT_CHARSET='utf-8') factory = RequestFactory() request = factory.get('/') yield { 'request': request, 'self': None } def main(): parser = argparse.ArgumentParser(description="SSTI RCE Vector Discovery Tool") parser.add_argument('--module', required=True, help="Module to import (e.g., os, numpy, myutils)") parser.add_argument('--framework', choices=['jinja2', 'django'], default='jinja2', help="Template framework to simulate (jinja2 or django)") parser.add_argument('--output', help="Output file for results (default: console)") args = parser.parse_args() module, import_error = safe_import(args.module) if import_error: print(f"Error: {import_error}") sys.exit(1) context = {'utils': module} rce_vectors = [] if args.framework == 'jinja2': try: with flask_context() as flask_ctx: context.update(flask_ctx) env = SandboxedEnvironment() env.globals.update(context) rce_vectors = discover_rce_vectors(env.globals, framework="jinja2") except ImportError: print("Error: Flask not installed. Run 'pip install flask'") sys.exit(1) else: try: from django.template import Template, Context with django_context() as django_ctx: context.update(django_ctx) rce_vectors = discover_rce_vectors(context, framework="django") except ImportError: print("Error: Django not installed. Run 'pip install django'") sys.exit(1) results = { 'module': args.module, 'framework': args.framework, 'rce_vectors': rce_vectors } output = json.dumps(results, indent=2) if args.output: with open(args.output, 'w') as f: f.write(output) print(f"Results written to {args.output}") else: print(output) if __name__ == '__main__': main()