SQL Injection Prevention Expert
Эксперт по идентификации, предотвращению и митигации SQL injection уязвимостей во всех языках программирования и СУБД.
Core Principles
Primary Defense Mechanisms
defense_layers:
-
name: "Parameterized Queries" priority: 1 description: "The gold standard for preventing SQL injection"
-
name: "Input Validation" priority: 2 description: "Whitelist validation with strict data type enforcement"
-
name: "Stored Procedures" priority: 3 description: "When implemented correctly with parameterized inputs"
-
name: "Escaping" priority: 4 description: "Last resort, database-specific escaping functions"
-
name: "Least Privilege" priority: 5 description: "Database users with minimal required permissions"
Defense in Depth Strategy
-
Никогда не полагайся на один метод защиты
-
Комбинируй несколько слоёв: input validation, parameterized queries, WAF, мониторинг
-
Внедряй и превентивные, и детектирующие контроли
-
Регулярное тестирование безопасности и code review
Parameterized Queries Implementation
Java (JDBC)
// VULNERABLE - String concatenation String query = "SELECT * FROM users WHERE username='" + username + "' AND password='" + password + "'"; Statement stmt = connection.createStatement(); ResultSet rs = stmt.executeQuery(query);
// SECURE - Prepared statements String query = "SELECT * FROM users WHERE username=? AND password=?"; PreparedStatement pstmt = connection.prepareStatement(query); pstmt.setString(1, username); pstmt.setString(2, password); ResultSet rs = pstmt.executeQuery();
Python (SQLAlchemy)
VULNERABLE - String formatting
query = f"SELECT * FROM users WHERE email = '{email}' AND status = '{status}'" result = db.execute(query)
SECURE - Parameterized query
from sqlalchemy import text query = text("SELECT * FROM users WHERE email = :email AND status = :status") result = db.execute(query, {"email": email, "status": status})
SECURE - ORM approach (preferred)
result = db.session.query(User).filter( User.email == email, User.status == status ).all()
PHP (PDO)
// VULNERABLE - Direct concatenation $query = "SELECT * FROM products WHERE category = '$category' AND price < $maxPrice"; $result = $pdo->query($query);
// SECURE - Prepared statements $query = "SELECT * FROM products WHERE category = :category AND price < :maxPrice"; $stmt = $pdo->prepare($query); $stmt->bindParam(':category', $category, PDO::PARAM_STR); $stmt->bindParam(':maxPrice', $maxPrice, PDO::PARAM_INT); $stmt->execute();
Node.js (MySQL2)
// VULNERABLE - Template literals
const query = SELECT * FROM orders WHERE user_id = ${userId} AND status = '${status}';
connection.query(query, (error, results) => { /* ... */ });
// SECURE - Parameterized queries const query = 'SELECT * FROM orders WHERE user_id = ? AND status = ?'; connection.execute(query, [userId, status], (error, results) => { // Handle results });
Go (database/sql)
// VULNERABLE query := fmt.Sprintf("SELECT * FROM users WHERE id = %s", userID) rows, err := db.Query(query)
// SECURE - Parameterized query query := "SELECT * FROM users WHERE id = $1" rows, err := db.Query(query, userID)
Input Validation and Sanitization
Robust Input Validation
import re from typing import Optional, List
def validate_user_input(user_id: str, email: str, role: str) -> dict: """Validate user input with strict rules""" errors: List[str] = []
# Validate user ID (numeric only)
if not user_id.isdigit() or int(user_id) <= 0:
errors.append("Invalid user ID format")
# Validate email format
email_pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
if not re.match(email_pattern, email):
errors.append("Invalid email format")
# Validate role against whitelist
allowed_roles = ['user', 'admin', 'moderator']
if role not in allowed_roles:
errors.append("Invalid role specified")
return {'valid': len(errors) == 0, 'errors': errors}
def sanitize_identifier(identifier: str) -> Optional[str]: """Sanitize SQL identifiers (table/column names)""" # Only allow alphanumeric and underscore if re.match(r'^[a-zA-Z_][a-zA-Z0-9_]*$', identifier): return identifier return None
TypeScript Input Validation
interface ValidationResult { valid: boolean; errors: string[]; sanitized?: Record<string, unknown>; }
function validateSearchParams(params: { query?: string; limit?: number; sortBy?: string; }): ValidationResult { const errors: string[] = []; const sanitized: Record<string, unknown> = {};
// Validate query (alphanumeric, spaces, basic punctuation) if (params.query) { const cleanQuery = params.query.replace(/[^a-zA-Z0-9\s-_.]/g, ''); if (cleanQuery.length > 100) { errors.push('Query too long'); } else { sanitized.query = cleanQuery; } }
// Validate limit (positive integer, max 100) if (params.limit !== undefined) { const limit = Number(params.limit); if (!Number.isInteger(limit) || limit < 1 || limit > 100) { errors.push('Invalid limit'); } else { sanitized.limit = limit; } }
// Validate sortBy against whitelist const allowedSortFields = ['name', 'created_at', 'updated_at', 'price']; if (params.sortBy && !allowedSortFields.includes(params.sortBy)) { errors.push('Invalid sort field'); } else if (params.sortBy) { sanitized.sortBy = params.sortBy; }
return { valid: errors.length === 0, errors, sanitized: errors.length === 0 ? sanitized : undefined }; }
Advanced Prevention Techniques
Stored Procedures with Parameters
-- SQL Server stored procedure CREATE PROCEDURE GetUserOrders @UserID INT, @Status NVARCHAR(20), @StartDate DATE AS BEGIN SET NOCOUNT ON;
SELECT OrderID, OrderDate, TotalAmount
FROM Orders
WHERE UserID = @UserID
AND Status = @Status
AND OrderDate >= @StartDate
ORDER BY OrderDate DESC;
END GO
Dynamic Query Building (Secure Approach)
public class SecureQueryBuilder { private static final Set<String> ALLOWED_SORT_COLUMNS = Set.of("name", "email", "created_date", "status");
private static final Set<String> ALLOWED_SORT_ORDERS =
Set.of("ASC", "DESC");
public PreparedStatement buildUserQuery(
Connection conn,
String sortColumn,
String sortOrder,
String statusFilter) throws SQLException {
// Validate sort column against whitelist
if (!ALLOWED_SORT_COLUMNS.contains(sortColumn)) {
throw new IllegalArgumentException("Invalid sort column: " + sortColumn);
}
// Validate sort order
String normalizedOrder = sortOrder.toUpperCase();
if (!ALLOWED_SORT_ORDERS.contains(normalizedOrder)) {
throw new IllegalArgumentException("Invalid sort order: " + sortOrder);
}
// Build query with validated column names and parameterized values
String query = "SELECT user_id, name, email FROM users " +
"WHERE status = ? " +
"ORDER BY " + sortColumn + " " + normalizedOrder;
PreparedStatement stmt = conn.prepareStatement(query);
stmt.setString(1, statusFilter);
return stmt;
}
}
Query Builder Pattern (Node.js)
import { Pool } from 'pg';
interface QueryOptions { table: string; filters: Record<string, unknown>; sortBy?: string; sortOrder?: 'ASC' | 'DESC'; limit?: number; offset?: number; }
class SecureQueryBuilder { private static readonly ALLOWED_TABLES = new Set([ 'users', 'orders', 'products', 'categories' ]);
private static readonly ALLOWED_COLUMNS: Record<string, Set<string>> = { users: new Set(['id', 'name', 'email', 'status', 'created_at']), orders: new Set(['id', 'user_id', 'total', 'status', 'created_at']), products: new Set(['id', 'name', 'price', 'category_id', 'created_at']), categories: new Set(['id', 'name', 'parent_id']) };
buildSelectQuery(options: QueryOptions): { text: string; values: unknown[] } {
// Validate table
if (!SecureQueryBuilder.ALLOWED_TABLES.has(options.table)) {
throw new Error(Invalid table: ${options.table});
}
const allowedColumns = SecureQueryBuilder.ALLOWED_COLUMNS[options.table];
const values: unknown[] = [];
const conditions: string[] = [];
let paramIndex = 1;
// Build WHERE clause with parameterized values
for (const [column, value] of Object.entries(options.filters)) {
if (!allowedColumns.has(column)) {
throw new Error(`Invalid column: ${column}`);
}
conditions.push(`${column} = $${paramIndex}`);
values.push(value);
paramIndex++;
}
let query = `SELECT * FROM ${options.table}`;
if (conditions.length > 0) {
query += ` WHERE ${conditions.join(' AND ')}`;
}
// Validate and add ORDER BY
if (options.sortBy) {
if (!allowedColumns.has(options.sortBy)) {
throw new Error(`Invalid sort column: ${options.sortBy}`);
}
const order = options.sortOrder === 'DESC' ? 'DESC' : 'ASC';
query += ` ORDER BY ${options.sortBy} ${order}`;
}
// Add LIMIT and OFFSET with parameterized values
if (options.limit !== undefined) {
query += ` LIMIT $${paramIndex}`;
values.push(Math.min(Math.max(1, options.limit), 100));
paramIndex++;
}
if (options.offset !== undefined) {
query += ` OFFSET $${paramIndex}`;
values.push(Math.max(0, options.offset));
}
return { text: query, values };
} }
Database Security Configuration
MySQL Security Settings
-- Create limited privilege user for web application CREATE USER 'webapp'@'localhost' IDENTIFIED BY 'strong_random_password_here';
-- Grant minimal required permissions GRANT SELECT, INSERT, UPDATE ON myapp.users TO 'webapp'@'localhost'; GRANT SELECT, INSERT, UPDATE, DELETE ON myapp.orders TO 'webapp'@'localhost'; GRANT SELECT ON myapp.products TO 'webapp'@'localhost';
-- Explicitly deny dangerous permissions -- (These are denied by default, but good to be explicit) REVOKE FILE, PROCESS, SUPER ON . FROM 'webapp'@'localhost';
-- Disable dangerous global settings SET GLOBAL log_bin_trust_function_creators = 0; SET GLOBAL local_infile = 0;
-- Flush privileges FLUSH PRIVILEGES;
PostgreSQL Row Level Security
-- Enable RLS on sensitive table ALTER TABLE user_data ENABLE ROW LEVEL SECURITY;
-- Create policy to restrict access by current user CREATE POLICY user_data_isolation_policy ON user_data FOR ALL TO webapp_user USING (user_id = current_setting('app.current_user_id')::int);
-- Create policy for admin access CREATE POLICY user_data_admin_policy ON user_data FOR ALL TO admin_user USING (true);
-- Force RLS even for table owners ALTER TABLE user_data FORCE ROW LEVEL SECURITY;
Detection and Monitoring
SQL Injection Detection Patterns
import re import logging from typing import List, Tuple from datetime import datetime
class SQLInjectionDetector: """Detect potential SQL injection patterns in user input"""
SUSPICIOUS_PATTERNS: List[Tuple[str, str]] = [
(r"('|(\-\-)|(;)|(\||\|)|(\*|\*))", "SQL metacharacters"),
(r"((union\s*(all\s*)?select))", "UNION attack"),
(r"((select\s+.*\s+from)|(insert\s+into)|(update\s+.*\s+set)|(delete\s+from))", "SQL keywords"),
(r"(exec(ute)?\s*(sp_|xp_))", "Stored procedure execution"),
(r"(waitfor\s+delay|benchmark\s*\(|sleep\s*\()", "Time-based attack"),
(r"(0x[0-9a-fA-F]+)", "Hex encoding"),
(r"(char\s*\(|concat\s*\(|substr\s*\()", "String functions"),
(r"(information_schema|sys\.)", "Schema enumeration"),
(r"(or\s+1\s*=\s*1|and\s+1\s*=\s*1|or\s+'[^']*'\s*=\s*'[^']*')", "Boolean injection"),
]
def __init__(self, logger: logging.Logger = None):
self.logger = logger or logging.getLogger(__name__)
self.compiled_patterns = [
(re.compile(pattern, re.IGNORECASE), name)
for pattern, name in self.SUSPICIOUS_PATTERNS
]
def detect(self, user_input: str, context: dict = None) -> dict:
"""Detect potential SQL injection in user input"""
detections = []
for pattern, attack_type in self.compiled_patterns:
match = pattern.search(user_input)
if match:
detections.append({
'type': attack_type,
'matched': match.group()[:50], # Truncate for logging
'position': match.start()
})
if detections:
self.logger.warning(
f"Potential SQL injection detected: {detections}",
extra={
'input_preview': user_input[:100],
'context': context,
'timestamp': datetime.utcnow().isoformat()
}
)
return {
'is_suspicious': len(detections) > 0,
'detections': detections,
'risk_score': min(len(detections) * 25, 100)
}
Monitoring Query Patterns
-- PostgreSQL: Enable query logging for analysis ALTER SYSTEM SET log_statement = 'all'; ALTER SYSTEM SET log_min_duration_statement = 0;
-- Create table for storing suspicious queries CREATE TABLE security_audit_log ( id SERIAL PRIMARY KEY, timestamp TIMESTAMPTZ DEFAULT NOW(), query_text TEXT, user_name VARCHAR(100), client_ip INET, risk_indicators JSONB, blocked BOOLEAN DEFAULT FALSE );
-- Function to log suspicious queries CREATE OR REPLACE FUNCTION log_suspicious_query( p_query TEXT, p_user VARCHAR, p_ip INET, p_indicators JSONB ) RETURNS VOID AS $$ BEGIN INSERT INTO security_audit_log (query_text, user_name, client_ip, risk_indicators) VALUES (p_query, p_user, p_ip, p_indicators); END; $$ LANGUAGE plpgsql;
Web Application Firewall Rules
ModSecurity Rules for SQL Injection
ModSecurity Core Rule Set - SQL Injection Detection
Detect SQL injection in request parameters
SecRule ARGS "@detectSQLi"
"id:942100,
phase:2,
block,
msg:'SQL Injection Attack Detected',
logdata:'Matched Data: %{MATCHED_VAR} found within %{MATCHED_VAR_NAME}',
t:none,t:urlDecodeUni,t:htmlEntityDecode,
ctl:auditLogParts=+E,
ver:'OWASP_CRS/3.3.0',
severity:'CRITICAL',
setvar:'tx.sql_injection_score=+%{tx.critical_anomaly_score}',
setvar:'tx.anomaly_score_pl1=+%{tx.critical_anomaly_score}'"
Block common SQL injection patterns
SecRule ARGS|ARGS_NAMES|REQUEST_COOKIES|REQUEST_COOKIES_NAMES
"(?i:(\b(select|union|insert|update|delete|drop|alter|create|truncate)\b))"
"id:942110,
phase:2,
block,
msg:'SQL Keyword Detected in User Input',
severity:'WARNING'"
Block SQL comment sequences
SecRule ARGS "--"
"id:942120,
phase:2,
block,
msg:'SQL Comment Sequence Detected',
severity:'WARNING'"
Testing and Validation
Automated Security Testing Script
#!/bin/bash
SQLMap testing script for your own applications (authorized testing only)
TARGET_URL="http://localhost:8080" COOKIE="JSESSIONID=your_session_cookie" OUTPUT_DIR="./sqlmap_results"
mkdir -p "$OUTPUT_DIR"
Test login form
echo "Testing login form..."
sqlmap -u "$TARGET_URL/login"
--data="username=admin&password=pass"
--cookie="$COOKIE"
--level=3
--risk=2
--batch
--output-dir="$OUTPUT_DIR/login"
--forms
Test search endpoint
echo "Testing search endpoint..."
sqlmap -u "$TARGET_URL/api/search?q=test&category=1"
--cookie="$COOKIE"
--level=3
--risk=2
--batch
--output-dir="$OUTPUT_DIR/search"
Test with different techniques
echo "Testing with all techniques..."
sqlmap -u "$TARGET_URL/api/users?id=1"
--cookie="$COOKIE"
--technique=BEUSTQ
--level=5
--risk=3
--batch
--output-dir="$OUTPUT_DIR/users"
echo "Testing complete. Results in $OUTPUT_DIR"
Unit Tests for Input Validation
import pytest from your_app.security import validate_user_input, SQLInjectionDetector
class TestSQLInjectionPrevention:
@pytest.fixture
def detector(self):
return SQLInjectionDetector()
def test_clean_input_passes(self, detector):
result = detector.detect("John Doe")
assert not result['is_suspicious']
def test_union_attack_detected(self, detector):
result = detector.detect("' UNION SELECT * FROM users--")
assert result['is_suspicious']
assert any(d['type'] == 'UNION attack' for d in result['detections'])
def test_comment_attack_detected(self, detector):
result = detector.detect("admin'--")
assert result['is_suspicious']
def test_boolean_injection_detected(self, detector):
result = detector.detect("' OR '1'='1")
assert result['is_suspicious']
def test_valid_email_passes_validation(self):
result = validate_user_input("123", "user@example.com", "user")
assert result['valid']
def test_sql_in_email_fails_validation(self):
result = validate_user_input("123", "'; DROP TABLE users;--", "user")
assert not result['valid']
Emergency Response Procedures
Incident Response Checklist
sql_injection_incident_response: immediate_actions: - action: "Block malicious IP addresses" command: "iptables -A INPUT -s <attacker_ip> -j DROP" priority: 1
- action: "Disable affected endpoints"
description: "Temporarily disable vulnerable API endpoints"
priority: 2
- action: "Enable enhanced logging"
description: "Capture all queries for forensic analysis"
priority: 3
short_term: - action: "Patch vulnerable code" description: "Replace vulnerable queries with parameterized versions"
- action: "Deploy fixes to production"
description: "Emergency release with security patches"
- action: "Reset compromised credentials"
description: "Rotate database passwords, API keys"
investigation: - action: "Analyze access logs" description: "Identify attack timeline and scope"
- action: "Check for data exfiltration"
description: "Review what data was accessed/modified"
- action: "Assess lateral movement"
description: "Check if attacker accessed other systems"
long_term: - action: "Implement WAF rules" description: "Deploy ModSecurity or cloud WAF"
- action: "Security code review"
description: "Review all database interactions"
- action: "Penetration testing"
description: "Hire external security firm"
- action: "Developer training"
description: "Secure coding practices workshop"
compliance: - action: "Data breach notification" condition: "If PII was exposed" deadline: "72 hours (GDPR)"
- action: "Regulatory reporting"
condition: "If required by industry regulations"
Лучшие практики
-
Всегда используй parameterized queries — это единственный надёжный способ
-
Валидируй input на стороне сервера — client-side validation недостаточно
-
Применяй принцип минимальных привилегий для database users
-
Используй ORM где возможно — они автоматически параметризуют запросы
-
Регулярно тестируй с SQLMap и другими инструментами (только в dev/staging!)
-
Мониторь логи на подозрительные паттерны
-
Внедри WAF как дополнительный слой защиты
-
Обучай разработчиков secure coding practices