Skip to content

coverage

analyze_coverage(operation)

Efficiently categorizes 271k+ test runs by AWSServiceType.

Source code in wintermute/utils/coverage.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def analyze_coverage(operation: Operation) -> Dict[str, int]:
    """
    Efficiently categorizes 271k+ test runs by AWSServiceType.
    """
    log.info("[*] Indexing Operation Assets for fast lookup...")

    # 1. Build a Fast Lookup Index {object_id: object}
    # This prevents us from searching the whole account list for every test run.
    asset_index = {}

    # Index Devices (e.g., Attacker Machine)
    for dev in operation.devices:
        # Use hostname or ip as ID depending on your binding logic
        asset_index[dev.hostname] = dev

    # Index Cloud Assets
    for acc in operation.cloud_accounts:
        # Index Services (EC2, S3, Lambda, etc.)
        if hasattr(acc, "services"):
            for svc in acc.services:
                asset_index[svc.arn] = svc
                # Also index by name if ARNs aren't used in binding
                asset_index[svc.name] = svc

        # Index IAM Users
        if hasattr(acc, "iamusers"):
            for user in acc.iamusers:
                asset_index[user.username] = user

        # Index IAM Roles
        if hasattr(acc, "iamroles"):
            for role in acc.iamroles:
                asset_index[role.role_name] = role

    log.info(f"[*] Indexing complete. Mapped {len(asset_index)} assets.")
    log.info(f"[*] Categorizing {len(operation.test_runs)} test runs...")

    # 2. Iterate and Count
    stats: Dict[str, int] = defaultdict(int)

    for run in operation.test_runs:
        # A run might have multiple bindings, we usually care about the first one (the target)
        if not run.bound:
            stats["GLOBAL_MISC"] += 1
            continue

        target_ref = run.bound[0]  # BoundObjectRef
        obj_id = target_ref.object_id

        # Fast Lookup
        obj = asset_index.get(obj_id)

        if not obj:
            # Fallback: If ID lookup fails, try to guess from Test Case Code prefix
            # e.g., "AWS-S3-..." -> "S3"
            prefix = (
                run.test_case_code.split("-")[1]
                if "-" in run.test_case_code
                else "UNKNOWN"
            )
            stats[f"Unbound ({prefix})"] += 1
            continue

        # 3. Determine Type based on Object Class
        if hasattr(obj, "service_type"):
            # It's an AWSService (EC2, S3, RDS, etc.)
            # If service_type is an Enum, get its value, else use string
            sType = (
                obj.service_type.value
                if hasattr(obj.service_type, "value")
                else str(obj.service_type)
            )
            stats[sType.upper()] += 1

        elif isinstance(obj, (IAMUser, IAMRole)):
            # Group Users and Roles under generic IAM
            stats["IAM"] += 1

        elif isinstance(obj, Device):
            stats["OSINT/GLOBAL"] += 1

        else:
            stats["OTHER"] += 1

    return stats