diff options
-rw-r--r-- | ssti-app.py | 48 | ||||
-rw-r--r-- | ssti-discovery.py | 194 | ||||
-rw-r--r-- | templates/index.html | 42 |
3 files changed, 284 insertions, 0 deletions
diff --git a/ssti-app.py b/ssti-app.py new file mode 100644 index 0000000..336e868 --- /dev/null +++ b/ssti-app.py @@ -0,0 +1,48 @@ +from flask import Flask, request, jsonify, render_template +from jinja2 import Environment, TemplateError +import argparse +import importlib + +app = Flask(__name__) + +parser = argparse.ArgumentParser(description='SSTI Payload Tester') +parser.add_argument('--module', type=str, default='', + help='Comma-separated list of modules to import (e.g., os,lipsum)') +args = parser.parse_args() + +modules = {} +if args.module: + for module_name in args.module.split(','): + try: + modules[module_name] = importlib.import_module(module_name.strip()) + except ImportError as e: + print(f"Warning: Failed to import module '{module_name}': {e}") + +@app.route('/') +def index(): + return render_template('index.html') + +@app.route('/execute', methods=['POST']) +def execute_payload(): + payload = request.json.get('payload', '') + if not payload: + return jsonify({'error': 'No payload provided'}), 400 + + result = {'output': '', 'error': None} + + try: + env = Environment() + env.globals.update(modules) + template = env.from_string(payload) + result['output'] = template.render() + except TemplateError as e: + result['error'] = str(e) + result['output'] = str(e) + except Exception as e: + result['error'] = f"Unexpected error: {str(e)}" + result['output'] = str(e) + + return jsonify(result) + +if __name__ == '__main__': + app.run(debug=False, host='0.0.0.0', port=5000) diff --git a/ssti-discovery.py b/ssti-discovery.py new file mode 100644 index 0000000..bbf0c49 --- /dev/null +++ b/ssti-discovery.py @@ -0,0 +1,194 @@ +import argparse +import importlib +import sys +import json +from jinja2 import TemplateError +from jinja2.sandbox import SandboxedEnvironment +from types import ModuleType, FunctionType, MethodType +from contextlib import contextmanager + +def safe_import(module_name): + try: + module = importlib.import_module(module_name) + return module, None + except ImportError as e: + return None, f"Failed to import module '{module_name}': {str(e)}" + +def enumerate_object(obj, depth=0, max_depth=3, path=None, visited=None): + if path is None: + path = [] + if visited is None: + visited = set() + obj_id = id(obj) + if obj_id in visited: + return None, "Cycle detected" + visited.add(obj_id) + try: + result = { + 'path': '.'.join(path), + 'attributes': dir(obj)[:50], + 'globals': list(getattr(obj, '__globals__', {}).keys())[:50] if hasattr(obj, '__globals__') else None, + 'subclasses': [cls.__name__ for cls in obj.__subclasses__()][:50] if hasattr(obj, '__subclasses__') else None + } + if depth < max_depth: + result['nested'] = {} + skip_attrs = {'__class__', '__base__', '__bases__', '__mro__'} + for attr in dir(obj)[:10]: + if attr in skip_attrs: + continue + try: + value = getattr(obj, attr) + if isinstance(value, (ModuleType, type)) or hasattr(value, '__dict__') or hasattr(value, '__globals__'): + nested_result, error = enumerate_object(value, depth + 1, max_depth, path + [attr], visited) + result['nested'][attr] = nested_result if nested_result else 'Inaccessible' + except: + result['nested'][attr] = 'Inaccessible' + return result, None + except Exception as e: + return None, f"Error enumerating object: {str(e)}" + +def discover_rce_vectors(context, framework="jinja2"): + dangerous_modules = [ + 'os', 'subprocess', 'sys', 'shutil', 'platform', 'ctypes', 'pickle', 'code', 'builtins', + 'threading', 'multiprocessing', 'socket', 'webbrowser', 'tempfile', 'asyncio', 'signal', 'popen2' + ] + dangerous_functions = [ + 'popen', 'system', 'exec', 'eval', 'execfile', 'compile', 'open', 'execvp', 'execl', 'execle', + 'execv', 'execlp', 'call', 'run', 'communicate', 'load', 'getattr', 'setattr', '__import__', + 'check_call', 'check_output', 'getoutput', 'getstatusoutput' + ] + dangerous_classes = [ + 'Popen', '_wrap_close', 'InteractiveConsole', 'CodeType', 'Process', 'Thread', + 'TCPServer', 'UDPServer', 'AsyncIOEventLoop' + ] + execution_keywords = ['exec', 'run', 'call', 'eval', 'system', 'load', 'spawn', 'fork'] + + rce_vectors = [] + + def check_path(obj, obj_name, path=None): + if path is None: + path = [obj_name] + try: + enumeration, _ = enumerate_object(obj, path=path) + if not enumeration: + return + if enumeration.get('globals'): + for mod in enumeration['globals']: + if mod in dangerous_modules: + rce_vectors.append({ + 'path': '.'.join(path + ['__globals__', mod]), + 'type': 'potentially dangerous module, investigate manually', + 'details': f"access to '{mod}' module" + }) + for attr in enumeration['attributes']: + if attr in dangerous_functions or any(keyword in attr.lower() for keyword in execution_keywords): + try: + value = getattr(obj, attr) + if isinstance(value, (FunctionType, MethodType)): + rce_vectors.append({ + 'path': '.'.join(path + [attr]), + 'type': 'potentially dangerous function, investigate manually', + 'details': f"access to '{attr}' function" + }) + except: + pass + if enumeration.get('subclasses'): + for cls in enumeration['subclasses']: + if cls in dangerous_classes or any(keyword in cls.lower() for keyword in execution_keywords): + rce_vectors.append({ + 'path': '.'.join(path + ['__subclasses__', cls]), + 'type': 'potentially dangerous class, investigate manually', + 'details': f"access to {cls}" + }) + for attr, nested in enumeration.get('nested', {}).items(): + if nested != 'Inaccessible': + try: + value = getattr(obj, attr) + check_path(value, attr, path + [attr]) + except: + pass + except: + pass + + for key, value in context.items(): + if value is not None: + check_path(value, key) + + return rce_vectors + +@contextmanager +def flask_context(): + from flask import Flask, request + app = Flask(__name__) + with app.test_request_context(): + yield { + 'request': request, + 'config': app.config, + 'self': None + } + +@contextmanager +def django_context(): + from django.http import HttpRequest + from django.test import RequestFactory + from django.conf import settings + if not settings.configured: + settings.configure(DEFAULT_CHARSET='utf-8') + factory = RequestFactory() + request = factory.get('/') + yield { + 'request': request, + 'self': None + } + +def main(): + parser = argparse.ArgumentParser(description="SSTI RCE Vector Discovery Tool") + parser.add_argument('--module', required=True, help="Module to import (e.g., os, numpy, myutils)") + parser.add_argument('--framework', choices=['jinja2', 'django'], default='jinja2', + help="Template framework to simulate (jinja2 or django)") + parser.add_argument('--output', help="Output file for results (default: console)") + args = parser.parse_args() + + module, import_error = safe_import(args.module) + if import_error: + print(f"Error: {import_error}") + sys.exit(1) + + context = {'utils': module} + rce_vectors = [] + if args.framework == 'jinja2': + try: + with flask_context() as flask_ctx: + context.update(flask_ctx) + env = SandboxedEnvironment() + env.globals.update(context) + rce_vectors = discover_rce_vectors(env.globals, framework="jinja2") + except ImportError: + print("Error: Flask not installed. Run 'pip install flask'") + sys.exit(1) + else: + try: + from django.template import Template, Context + with django_context() as django_ctx: + context.update(django_ctx) + rce_vectors = discover_rce_vectors(context, framework="django") + except ImportError: + print("Error: Django not installed. Run 'pip install django'") + sys.exit(1) + + results = { + 'module': args.module, + 'framework': args.framework, + 'rce_vectors': rce_vectors + } + + output = json.dumps(results, indent=2) + if args.output: + with open(args.output, 'w') as f: + f.write(output) + print(f"Results written to {args.output}") + else: + print(output) + +if __name__ == '__main__': + main() diff --git a/templates/index.html b/templates/index.html new file mode 100644 index 0000000..da4e523 --- /dev/null +++ b/templates/index.html @@ -0,0 +1,42 @@ +<!DOCTYPE html> +<html lang="en"> +<head> + <meta charset="UTF-8"> + <meta name="viewport" content="width=device-width, initial-scale=1.0"> + <title>SSTI Payload Tester</title> + <script src="https://cdn.tailwindcss.com"></script> +</head> +<body class="bg-gray-100 font-sans"> + <div class="container mx-auto p-6 max-w-4xl"> + <h1 class="text-3xl font-bold text-gray-800 mb-6 text-center">SSTI Payload Tester</h1> + <div class="bg-white shadow-lg rounded-lg p-6"> + <div class="mb-4"> + <label for="payload" class="block text-sm font-medium text-gray-700">SSTI Payload</label> + <textarea id="payload" rows="4" class="mt-1 block w-full border-gray-300 rounded-md shadow-sm focus:ring-indigo-500 focus:border-indigo-500 sm:text-sm" placeholder="{{ 7 * 7 }}"></textarea> + </div> + <button onclick="executePayload()" class="w-full bg-indigo-600 text-white py-2 px-4 rounded-md hover:bg-indigo-700 focus:outline-none focus:ring-2 focus:ring-indigo-500">Execute</button> + <div id="results" class="mt-6 hidden"> + <h2 class="text-lg font-semibold text-gray-800 mb-2">Results</h2> + <div class="mb-4"> + <h3 class="text-sm font-medium text-gray-700">Output</h3> + <pre id="output" class="bg-gray-50 p-4 rounded-md text-sm text-gray-800 whitespace-pre-wrap break-words"></pre> + </div> + </div> + </div> + </div> + <script> + async function executePayload() { + const payload = document.getElementById('payload').value; + const response = await fetch('/execute', { + method: 'POST', + headers: { 'Content-Type': 'application/json' }, + body: JSON.stringify({ payload }) + }); + const data = await response.json(); + const resultsDiv = document.getElementById('results'); + resultsDiv.classList.remove('hidden'); + document.getElementById('output').textContent = data.output || data.error || 'No output'; + } + </script> +</body> +</html> |