Skip to content

aws

AWSAccount

Bases: CloudAccount

This class represents an AWS Account that may contain users and vulnerabilities.

This class represents an AWS Account that may contain users and vulnerabilities, this class was not designed to be called by itself, it should be called from Operation for it's management

Examples:

>>> from wintermute.cloud.aws import AWSAccount, AWSUser, AWSService
>>> from wintermute.findings import Vulnerability, Risk
>>> acct = AWSAccount(
...     name="aws-prod",
...     description="This is the prod account",
...     account_id="123456789012",
...     arn="arn:aws:iam::123456789012:root",
...     default_region="us-east-1",
...     users=[
...         {"username": "alice"},
...         {"username": "bob", "attached_policies": ["Admin"]},
...     ],
...     services=[{"name": "s3", "resources": ["bucket1", "bucket2"]}],
...     vulnerabilities=[
...         Vulnerability(
...             title="Public S3",
...             description="Bucket allows public read",
...             risk=Risk(severity="High"),
...         ),
...     ],
... )
>>> r.account_id
'123456789012'
>>> r.name
'aws-prod'
>>> r.description
'This is the prod account'

Attributes:

Name Type Description
* account_id (str

AWS Account ID

* arn (str

AWS Account ARN

* partition (str

AWS partition (aws, aws-us-gov, aws-cn)

* default_region (str

Default AWS region for the account

* tags (dict

Dictionary of tags associated with the account

* users (list

List of AWSUser objects associated with the account

* services (list

List of AWSService objects associated with the account

Source code in wintermute/cloud/aws.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
class AWSAccount(CloudAccount):
    """This class represents an AWS Account that may contain users and vulnerabilities.

    This class represents an AWS Account that may contain users and vulnerabilities,
    this class was not designed to be called by itself, it should be called from
    Operation for it's management

    Examples:
        >>> from wintermute.cloud.aws import AWSAccount, AWSUser, AWSService
        >>> from wintermute.findings import Vulnerability, Risk
        >>> acct = AWSAccount(
        ...     name="aws-prod",
        ...     description="This is the prod account",
        ...     account_id="123456789012",
        ...     arn="arn:aws:iam::123456789012:root",
        ...     default_region="us-east-1",
        ...     users=[
        ...         {"username": "alice"},
        ...         {"username": "bob", "attached_policies": ["Admin"]},
        ...     ],
        ...     services=[{"name": "s3", "resources": ["bucket1", "bucket2"]}],
        ...     vulnerabilities=[
        ...         Vulnerability(
        ...             title="Public S3",
        ...             description="Bucket allows public read",
        ...             risk=Risk(severity="High"),
        ...         ),
        ...     ],
        ... )
        >>> r.account_id
        '123456789012'
        >>> r.name
        'aws-prod'
        >>> r.description
        'This is the prod account'

    Attributes:
        * account_id (str): AWS Account ID
        * arn (str): AWS Account ARN
        * partition (str): AWS partition (aws, aws-us-gov, aws-cn)
        * default_region (str): Default AWS region for the account
        * tags (dict): Dictionary of tags associated with the account
        * users (list): List of AWSUser objects associated with the account
        * services (list): List of AWSService objects associated with the account
    """

    __schema__ = {
        "users": AWSUser,
        "services": AWSService,
        "iamusers": IAMUser,
        "iamroles": IAMRole,
    }

    def __init__(
        self,
        name: str,
        description: str = "",
        *,
        account_id: str | None = None,
        arn: str | None = None,
        partition: str = "aws",
        default_region: str | None = None,
        tags: Dict[str, str] | None = None,
        users: Optional[List[AWSUser | Dict[str, Any]]] = None,
        iamusers: Optional[List[IAMUser | Dict[str, Any]]] = None,
        iamroles: Optional[List[IAMRole | Dict[str, Any]]] = None,
        services: Optional[List[AWSService | Dict[str, Any]]] = None,
        vulnerabilities: Optional[List[Any]] = None,
        roles: Optional[List[Any]] = None,
        # Note: cloud_type is NOT an argument here, we set it below
    ) -> None:
        # --- DEFENSIVE FIX START ---
        # Detect if 'name' is actually the data dictionary (the cause of your bug)
        if isinstance(name, dict):
            data = name
            # 1. Extract the real scalars
            name = data.get("name", "Unknown-Recovered")
            description = data.get("description", description)
            account_id = data.get("account_id", account_id)
            arn = data.get("arn", arn)
            partition = data.get("partition", partition)
            default_region = data.get("default_region", default_region)

            # 2. Extract lists if they weren't passed in kwargs
            #    (Use existing values if provided, else look in dict)
            tags = tags or data.get("tags")
            users = users or data.get("users")
            iamusers = iamusers or data.get("iamusers")
            iamroles = iamroles or data.get("iamroles")
            services = services or data.get("services")
            vulnerabilities = vulnerabilities or data.get("vulnerabilities")
            roles = roles or data.get("roles")

            log.warning(
                f"Self-Corrected AWS Account initialization for: {name} (ID: {account_id})"
            )
        # --- DEFENSIVE FIX END ---

        super().__init__(
            name=name, description=description, vulnerabilities=vulnerabilities
        )

        self.cloud_type = "AWS"

        self.account_id = account_id
        self.arn = arn
        self.partition = partition
        self.default_region = default_region
        self.tags = dict(tags) if tags else {}
        self.roles = roles if roles else []

        self.users: List[AWSUser] = _load_list(users, AWSUser)
        self.iamusers: List[IAMUser] = _load_list(iamusers, IAMUser)
        self.iamroles: List[IAMRole] = _load_list(iamroles, IAMRole)
        self.services: List[AWSService] = _load_list(services, AWSService)

        log.debug(f"AWS Account initialized: {self.account_id} - {self.name}")

    # Optional: convenience
    @property
    def provider(self) -> str:
        return "aws"

    def _add_component(self, target_list: List[T], cls: Type[T], **kwargs: Any) -> bool:
        """
        Internal helper to construct an object, check for duplicates, and append.
        """
        # Create the object using the provided class and keyword arguments
        obj = cls(**kwargs)
        log.debug(f"Created {cls.__name__} object: {obj}")

        # Check if it already exists (requires __eq__ or dataclass default equality)
        if obj not in target_list:
            target_list.append(obj)
            log.debug(f"Added {cls.__name__} to AWSAccount {self.account_id}: {obj}")
            return True
        log.info(
            f"Did not add {cls.__name__} to AWSAccount {self.account_id}: duplicate found"
        )
        return False

    def addVulnerability(
        self,
        title: str,
        description: str,
        threat: str = "",
        cvss: int = 0,
        mitigation: bool = True,
        fix: bool = True,
        fix_desc: str = "",
        mitigation_desc: str = "",
        risk: dict[str, str] | None = None,
        verified: bool = False,
    ) -> bool:
        v = Vulnerability(
            title=title,
            description=description,
            threat=threat,
            cvss=cvss,
            mitigation=mitigation,
            fix=fix,
            fix_desc=fix_desc,
            mitigation_desc=mitigation_desc,
            verified=verified,
        )
        if risk:
            v.setRisk(**risk)
            log.debug(
                f"Set risk for Vulnerability {v.title} in AWS Account {self.account_id}: {risk}"
            )
        if v not in self.vulnerabilities:
            self.vulnerabilities.append(v)
            log.info(
                f"Vulnerability {v.title} added to AWS Account {self.account_id}: {title}"
            )
            return True
        log.warning(
            f"Vulnerability {v.title} not added to AWS Account {self.account_id}: duplicate found"
        )
        return False

    def addUser(
        self,
        username: str,
        arn: str | None = None,
        attached_policies: List[str] = [],
        custom_properties: Dict[str, Any] = {},
    ) -> bool:
        log.info(f"Adding AWS User {username} to AWS Account {self.account_id}")
        # We assume attached_policies defaults to empty list in the dataclass if None passed
        return self._add_component(
            self.users,
            AWSUser,
            username=username,
            arn=arn,
            attached_policies=attached_policies or [],
            custom_properties=custom_properties or {},
        )

    def addIAMUser(
        self,
        username: str,
        arn: str | None = None,
        administrator: bool = False,
        attached_policies: List[str] = [],
        custom_properties: Dict[str, Any] = {},
    ) -> bool:
        log.info(f"Adding IAM User {username} to AWS Account {self.account_id}")
        return self._add_component(
            self.iamusers,
            IAMUser,
            username=username,
            arn=arn,
            administrator=administrator,
            attached_policies=attached_policies or [],
            custom_properties=custom_properties or {},
        )

    def addIAMRole(
        self,
        role_name: str,
        arn: str | None = None,
        administrator: bool = False,
        attached_policies: List[str] = [],
        custom_properties: Dict[str, Any] = {},
    ) -> bool:
        log.info(f"Adding IAM Role {role_name} to AWS Account {self.account_id}")
        return self._add_component(
            self.iamroles,
            IAMRole,
            role_name=role_name,
            arn=arn,
            administrator=administrator,
            attached_policies=attached_policies or [],
            custom_properties=custom_properties or {},
        )

    def addService(
        self,
        name: str,
        arn: str,
        service_type: AWSServiceType,
        config: Dict[str, Any] = {},
        custom_properties: Dict[str, Any] = {},
    ) -> bool:
        log.info(
            f"Adding {service_type} AWS Service {name} to AWS Account {self.account_id}"
        )
        return self._add_component(
            self.services,
            AWSService,
            name=name,
            arn=arn,
            service_type=service_type,
            config=config or {},
            custom_properties=custom_properties or {},
        )