Efficiently categorizes 271k+ test runs by AWSServiceType.
Source code in wintermute/utils/coverage.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120 | def analyze_coverage(operation: Operation) -> Dict[str, int]:
"""
Efficiently categorizes 271k+ test runs by AWSServiceType.
"""
log.info("[*] Indexing Operation Assets for fast lookup...")
# 1. Build a Fast Lookup Index {object_id: object}
# This prevents us from searching the whole account list for every test run.
asset_index = {}
# Index Devices (e.g., Attacker Machine)
for dev in operation.devices:
# Use hostname or ip as ID depending on your binding logic
asset_index[dev.hostname] = dev
# Index Cloud Assets
for acc in operation.cloud_accounts:
# Index Services (EC2, S3, Lambda, etc.)
if hasattr(acc, "services"):
for svc in acc.services:
asset_index[svc.arn] = svc
# Also index by name if ARNs aren't used in binding
asset_index[svc.name] = svc
# Index IAM Users
if hasattr(acc, "iamusers"):
for user in acc.iamusers:
asset_index[user.username] = user
# Index IAM Roles
if hasattr(acc, "iamroles"):
for role in acc.iamroles:
asset_index[role.role_name] = role
log.info(f"[*] Indexing complete. Mapped {len(asset_index)} assets.")
log.info(f"[*] Categorizing {len(operation.test_runs)} test runs...")
# 2. Iterate and Count
stats: Dict[str, int] = defaultdict(int)
for run in operation.test_runs:
# A run might have multiple bindings, we usually care about the first one (the target)
if not run.bound:
stats["GLOBAL_MISC"] += 1
continue
target_ref = run.bound[0] # BoundObjectRef
obj_id = target_ref.object_id
# Fast Lookup
obj = asset_index.get(obj_id)
if not obj:
# Fallback: If ID lookup fails, try to guess from Test Case Code prefix
# e.g., "AWS-S3-..." -> "S3"
prefix = (
run.test_case_code.split("-")[1]
if "-" in run.test_case_code
else "UNKNOWN"
)
stats[f"Unbound ({prefix})"] += 1
continue
# 3. Determine Type based on Object Class
if hasattr(obj, "service_type"):
# It's an AWSService (EC2, S3, RDS, etc.)
# If service_type is an Enum, get its value, else use string
sType = (
obj.service_type.value
if hasattr(obj.service_type, "value")
else str(obj.service_type)
)
stats[sType.upper()] += 1
elif isinstance(obj, (IAMUser, IAMRole)):
# Group Users and Roles under generic IAM
stats["IAM"] += 1
elif isinstance(obj, Device):
stats["OSINT/GLOBAL"] += 1
else:
stats["OTHER"] += 1
return stats
|