summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorheqnx <root@heqnx.com>2025-08-03 00:56:32 +0300
committerheqnx <root@heqnx.com>2025-08-03 00:56:32 +0300
commit45484ba79b624a2e81f23678d9eb13d72ab5aa03 (patch)
tree5d571f3615f7a5e3811c013ce1843bbbc9a960a8
parent11dcf0fbbc9bd18e4f2afd558102fa7d8ba66ff0 (diff)
downloadssti-discovery-45484ba79b624a2e81f23678d9eb13d72ab5aa03.tar.gz
ssti-discovery-45484ba79b624a2e81f23678d9eb13d72ab5aa03.zip
initial commit
-rw-r--r--ssti-app.py48
-rw-r--r--ssti-discovery.py194
-rw-r--r--templates/index.html42
3 files changed, 284 insertions, 0 deletions
diff --git a/ssti-app.py b/ssti-app.py
new file mode 100644
index 0000000..336e868
--- /dev/null
+++ b/ssti-app.py
@@ -0,0 +1,48 @@
+from flask import Flask, request, jsonify, render_template
+from jinja2 import Environment, TemplateError
+import argparse
+import importlib
+
+app = Flask(__name__)
+
+parser = argparse.ArgumentParser(description='SSTI Payload Tester')
+parser.add_argument('--module', type=str, default='',
+ help='Comma-separated list of modules to import (e.g., os,lipsum)')
+args = parser.parse_args()
+
+modules = {}
+if args.module:
+ for module_name in args.module.split(','):
+ try:
+ modules[module_name] = importlib.import_module(module_name.strip())
+ except ImportError as e:
+ print(f"Warning: Failed to import module '{module_name}': {e}")
+
+@app.route('/')
+def index():
+ return render_template('index.html')
+
+@app.route('/execute', methods=['POST'])
+def execute_payload():
+ payload = request.json.get('payload', '')
+ if not payload:
+ return jsonify({'error': 'No payload provided'}), 400
+
+ result = {'output': '', 'error': None}
+
+ try:
+ env = Environment()
+ env.globals.update(modules)
+ template = env.from_string(payload)
+ result['output'] = template.render()
+ except TemplateError as e:
+ result['error'] = str(e)
+ result['output'] = str(e)
+ except Exception as e:
+ result['error'] = f"Unexpected error: {str(e)}"
+ result['output'] = str(e)
+
+ return jsonify(result)
+
+if __name__ == '__main__':
+ app.run(debug=False, host='0.0.0.0', port=5000)
diff --git a/ssti-discovery.py b/ssti-discovery.py
new file mode 100644
index 0000000..bbf0c49
--- /dev/null
+++ b/ssti-discovery.py
@@ -0,0 +1,194 @@
+import argparse
+import importlib
+import sys
+import json
+from jinja2 import TemplateError
+from jinja2.sandbox import SandboxedEnvironment
+from types import ModuleType, FunctionType, MethodType
+from contextlib import contextmanager
+
+def safe_import(module_name):
+ try:
+ module = importlib.import_module(module_name)
+ return module, None
+ except ImportError as e:
+ return None, f"Failed to import module '{module_name}': {str(e)}"
+
+def enumerate_object(obj, depth=0, max_depth=3, path=None, visited=None):
+ if path is None:
+ path = []
+ if visited is None:
+ visited = set()
+ obj_id = id(obj)
+ if obj_id in visited:
+ return None, "Cycle detected"
+ visited.add(obj_id)
+ try:
+ result = {
+ 'path': '.'.join(path),
+ 'attributes': dir(obj)[:50],
+ 'globals': list(getattr(obj, '__globals__', {}).keys())[:50] if hasattr(obj, '__globals__') else None,
+ 'subclasses': [cls.__name__ for cls in obj.__subclasses__()][:50] if hasattr(obj, '__subclasses__') else None
+ }
+ if depth < max_depth:
+ result['nested'] = {}
+ skip_attrs = {'__class__', '__base__', '__bases__', '__mro__'}
+ for attr in dir(obj)[:10]:
+ if attr in skip_attrs:
+ continue
+ try:
+ value = getattr(obj, attr)
+ if isinstance(value, (ModuleType, type)) or hasattr(value, '__dict__') or hasattr(value, '__globals__'):
+ nested_result, error = enumerate_object(value, depth + 1, max_depth, path + [attr], visited)
+ result['nested'][attr] = nested_result if nested_result else 'Inaccessible'
+ except:
+ result['nested'][attr] = 'Inaccessible'
+ return result, None
+ except Exception as e:
+ return None, f"Error enumerating object: {str(e)}"
+
+def discover_rce_vectors(context, framework="jinja2"):
+ dangerous_modules = [
+ 'os', 'subprocess', 'sys', 'shutil', 'platform', 'ctypes', 'pickle', 'code', 'builtins',
+ 'threading', 'multiprocessing', 'socket', 'webbrowser', 'tempfile', 'asyncio', 'signal', 'popen2'
+ ]
+ dangerous_functions = [
+ 'popen', 'system', 'exec', 'eval', 'execfile', 'compile', 'open', 'execvp', 'execl', 'execle',
+ 'execv', 'execlp', 'call', 'run', 'communicate', 'load', 'getattr', 'setattr', '__import__',
+ 'check_call', 'check_output', 'getoutput', 'getstatusoutput'
+ ]
+ dangerous_classes = [
+ 'Popen', '_wrap_close', 'InteractiveConsole', 'CodeType', 'Process', 'Thread',
+ 'TCPServer', 'UDPServer', 'AsyncIOEventLoop'
+ ]
+ execution_keywords = ['exec', 'run', 'call', 'eval', 'system', 'load', 'spawn', 'fork']
+
+ rce_vectors = []
+
+ def check_path(obj, obj_name, path=None):
+ if path is None:
+ path = [obj_name]
+ try:
+ enumeration, _ = enumerate_object(obj, path=path)
+ if not enumeration:
+ return
+ if enumeration.get('globals'):
+ for mod in enumeration['globals']:
+ if mod in dangerous_modules:
+ rce_vectors.append({
+ 'path': '.'.join(path + ['__globals__', mod]),
+ 'type': 'potentially dangerous module, investigate manually',
+ 'details': f"access to '{mod}' module"
+ })
+ for attr in enumeration['attributes']:
+ if attr in dangerous_functions or any(keyword in attr.lower() for keyword in execution_keywords):
+ try:
+ value = getattr(obj, attr)
+ if isinstance(value, (FunctionType, MethodType)):
+ rce_vectors.append({
+ 'path': '.'.join(path + [attr]),
+ 'type': 'potentially dangerous function, investigate manually',
+ 'details': f"access to '{attr}' function"
+ })
+ except:
+ pass
+ if enumeration.get('subclasses'):
+ for cls in enumeration['subclasses']:
+ if cls in dangerous_classes or any(keyword in cls.lower() for keyword in execution_keywords):
+ rce_vectors.append({
+ 'path': '.'.join(path + ['__subclasses__', cls]),
+ 'type': 'potentially dangerous class, investigate manually',
+ 'details': f"access to {cls}"
+ })
+ for attr, nested in enumeration.get('nested', {}).items():
+ if nested != 'Inaccessible':
+ try:
+ value = getattr(obj, attr)
+ check_path(value, attr, path + [attr])
+ except:
+ pass
+ except:
+ pass
+
+ for key, value in context.items():
+ if value is not None:
+ check_path(value, key)
+
+ return rce_vectors
+
+@contextmanager
+def flask_context():
+ from flask import Flask, request
+ app = Flask(__name__)
+ with app.test_request_context():
+ yield {
+ 'request': request,
+ 'config': app.config,
+ 'self': None
+ }
+
+@contextmanager
+def django_context():
+ from django.http import HttpRequest
+ from django.test import RequestFactory
+ from django.conf import settings
+ if not settings.configured:
+ settings.configure(DEFAULT_CHARSET='utf-8')
+ factory = RequestFactory()
+ request = factory.get('/')
+ yield {
+ 'request': request,
+ 'self': None
+ }
+
+def main():
+ parser = argparse.ArgumentParser(description="SSTI RCE Vector Discovery Tool")
+ parser.add_argument('--module', required=True, help="Module to import (e.g., os, numpy, myutils)")
+ parser.add_argument('--framework', choices=['jinja2', 'django'], default='jinja2',
+ help="Template framework to simulate (jinja2 or django)")
+ parser.add_argument('--output', help="Output file for results (default: console)")
+ args = parser.parse_args()
+
+ module, import_error = safe_import(args.module)
+ if import_error:
+ print(f"Error: {import_error}")
+ sys.exit(1)
+
+ context = {'utils': module}
+ rce_vectors = []
+ if args.framework == 'jinja2':
+ try:
+ with flask_context() as flask_ctx:
+ context.update(flask_ctx)
+ env = SandboxedEnvironment()
+ env.globals.update(context)
+ rce_vectors = discover_rce_vectors(env.globals, framework="jinja2")
+ except ImportError:
+ print("Error: Flask not installed. Run 'pip install flask'")
+ sys.exit(1)
+ else:
+ try:
+ from django.template import Template, Context
+ with django_context() as django_ctx:
+ context.update(django_ctx)
+ rce_vectors = discover_rce_vectors(context, framework="django")
+ except ImportError:
+ print("Error: Django not installed. Run 'pip install django'")
+ sys.exit(1)
+
+ results = {
+ 'module': args.module,
+ 'framework': args.framework,
+ 'rce_vectors': rce_vectors
+ }
+
+ output = json.dumps(results, indent=2)
+ if args.output:
+ with open(args.output, 'w') as f:
+ f.write(output)
+ print(f"Results written to {args.output}")
+ else:
+ print(output)
+
+if __name__ == '__main__':
+ main()
diff --git a/templates/index.html b/templates/index.html
new file mode 100644
index 0000000..da4e523
--- /dev/null
+++ b/templates/index.html
@@ -0,0 +1,42 @@
+<!DOCTYPE html>
+<html lang="en">
+<head>
+ <meta charset="UTF-8">
+ <meta name="viewport" content="width=device-width, initial-scale=1.0">
+ <title>SSTI Payload Tester</title>
+ <script src="https://cdn.tailwindcss.com"></script>
+</head>
+<body class="bg-gray-100 font-sans">
+ <div class="container mx-auto p-6 max-w-4xl">
+ <h1 class="text-3xl font-bold text-gray-800 mb-6 text-center">SSTI Payload Tester</h1>
+ <div class="bg-white shadow-lg rounded-lg p-6">
+ <div class="mb-4">
+ <label for="payload" class="block text-sm font-medium text-gray-700">SSTI Payload</label>
+ <textarea id="payload" rows="4" class="mt-1 block w-full border-gray-300 rounded-md shadow-sm focus:ring-indigo-500 focus:border-indigo-500 sm:text-sm" placeholder="{{ 7 * 7 }}"></textarea>
+ </div>
+ <button onclick="executePayload()" class="w-full bg-indigo-600 text-white py-2 px-4 rounded-md hover:bg-indigo-700 focus:outline-none focus:ring-2 focus:ring-indigo-500">Execute</button>
+ <div id="results" class="mt-6 hidden">
+ <h2 class="text-lg font-semibold text-gray-800 mb-2">Results</h2>
+ <div class="mb-4">
+ <h3 class="text-sm font-medium text-gray-700">Output</h3>
+ <pre id="output" class="bg-gray-50 p-4 rounded-md text-sm text-gray-800 whitespace-pre-wrap break-words"></pre>
+ </div>
+ </div>
+ </div>
+ </div>
+ <script>
+ async function executePayload() {
+ const payload = document.getElementById('payload').value;
+ const response = await fetch('/execute', {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ payload })
+ });
+ const data = await response.json();
+ const resultsDiv = document.getElementById('results');
+ resultsDiv.classList.remove('hidden');
+ document.getElementById('output').textContent = data.output || data.error || 'No output';
+ }
+ </script>
+</body>
+</html>