1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
| #!/usr/bin/env python3
import subprocess
import json
import logging
from datetime import datetime
class ContainerEscapeDetector:
def __init__(self):
self.logger = logging.getLogger(__name__)
def check_running_containers(self):
"""Check for dangerous container configurations"""
try:
# Get running containers
result = subprocess.run(
['docker', 'ps', '--format', 'json'],
capture_output=True, text=True, check=True
)
containers = [json.loads(line) for line in result.stdout.strip().split('\n')]
for container in containers:
self.analyze_container_security(container['ID'])
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to list containers: {e}")
def analyze_container_security(self, container_id):
"""Analyze individual container for security issues"""
try:
# Inspect container configuration
result = subprocess.run(
['docker', 'inspect', container_id],
capture_output=True, text=True, check=True
)
config = json.loads(result.stdout)[0]
# Check for dangerous configurations
security_issues = []
# Check privileged mode
if config['HostConfig'].get('Privileged', False):
security_issues.append("CRITICAL: Container running in privileged mode")
# Check volume mounts
for mount in config.get('Mounts', []):
if mount['Source'] == '/':
security_issues.append("CRITICAL: Root filesystem mounted")
elif mount['Source'].startswith('/proc'):
security_issues.append("HIGH: Proc filesystem mounted")
elif mount['Source'] == '/var/run/docker.sock':
security_issues.append("CRITICAL: Docker socket mounted")
# Check capabilities
added_caps = config['HostConfig'].get('CapAdd', [])
if 'SYS_ADMIN' in added_caps or 'ALL' in added_caps:
security_issues.append("HIGH: Dangerous capabilities granted")
# Check network mode
if config['HostConfig'].get('NetworkMode') == 'host':
security_issues.append("MEDIUM: Host network mode enabled")
# Report issues
if security_issues:
self.report_security_issues(container_id, security_issues)
except subprocess.CalledProcessError as e:
self.logger.error(f"Failed to inspect container {container_id}: {e}")
def report_security_issues(self, container_id, issues):
"""Report detected security issues"""
for issue in issues:
self.logger.warning(f"Container {container_id[:12]}: {issue}")
# Integration with SIEM/alerting system
alert_data = {
'timestamp': datetime.utcnow().isoformat(),
'container_id': container_id,
'issue': issue,
'severity': issue.split(':')[0]
}
# Send to monitoring system
self.send_alert(alert_data)
def send_alert(self, alert_data):
"""Send alert to monitoring system"""
# Implementation depends on your monitoring stack
# Examples: Elasticsearch, Splunk, Datadog, etc.
pass
# Usage
if __name__ == "__main__":
detector = ContainerEscapeDetector()
detector.check_running_containers()
|