#!/usr/bin/env python
# -*- coding: utf-8 -*-
# File: azureenergylabelerlib.py
#
# Copyright 2022 Sayantan Khanra
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to
# deal in the Software without restriction, including without limitation the
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
# sell copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
#
"""
Main code for azureenergylabelerlib.
.. _Google Python Style Guide:
https://google.github.io/styleguide/pyguide.html
"""
import logging
from cachetools import cached, TTLCache
from azure.core.exceptions import ClientAuthenticationError
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import SubscriptionClient
from .azureenergylabelerlibexceptions import InvalidCredentials
from .configuration import (TENANT_THRESHOLDS,
RESOURCE_GROUP_THRESHOLDS,
SUBSCRIPTION_THRESHOLDS,
DEFAULT_DEFENDER_FOR_CLOUD_FRAMEWORKS,
FINDING_FILTERING_STATES)
from .entities import DefenderForCloud, Tenant, FindingParserLabeler
from .schemas import (resource_group_thresholds_schema,
subscription_thresholds_schema,
tenant_thresholds_schema)
__author__ = '''Sayantan Khanra <skhanra@schubergphilis.com>'''
__docformat__ = '''google'''
__date__ = '''22-04-2022'''
__copyright__ = '''Copyright 2022, Sayantan Khanra'''
__credits__ = ["Sayantan Khanra"]
__license__ = '''MIT'''
__maintainer__ = '''Sayantan Khanra'''
__email__ = '''<skhanra@schubergphilis.com>'''
__status__ = '''Development''' # "Prototype", "Development", "Production".
# This is the main prefix used for logging
LOGGER_BASENAME = '''azureenergylabelerlib'''
LOGGER = logging.getLogger(LOGGER_BASENAME)
LOGGER.addHandler(logging.NullHandler())
[docs]class AzureEnergyLabeler: # pylint: disable=too-many-arguments
"""Labeling subscriptions based on findings and label configurations.
Parameters
----------
tenant_id : str
Azure Tenant ID to collect energy label, for example: `18d9dec0-d762-11ec-9cb5-00155da09878`.
frameworks : set[str]
Frameworks taken into account when generating the energy label. Defaults to :data:`~azureenergylabelerlib.configuration.DEFAULT_DEFENDER_FOR_CLOUD_FRAMEWORKS`
tenant_thresholds : list[dict[str, Any]]
Defines percentage thresholds mapping to energy labels for the tenant. Defaults to :data:`~azureenergylabelerlib.configuration.TENANT_THRESHOLDS`
resource_group_thresholds : list[dict[str, Any]]
Defines percentage thresholds mapping to energy labels for resource groups. Defaults to :data:`~azureenergylabelerlib.configuration.RESOURCE_GROUP_THRESHOLDS`
subscription_thresholds : list[dict[str, Any]]
Defines percentage thresholds mapping to energy labels for resource groups. Defaults to :data:`~azureenergylabelerlib.configuration.SUBSCRIPTION_THRESHOLDS`
credentials : Any
One of :py:class:`~azure.identity` Credential object containing the credentials used to access the Azure API.
If not supplied, the library will create a :py:class:`~azure.identity.DefaultAzureCredential`
and attempt to authenticate in the following order:
1. A service principal configured by environment variables. See :class:`~azure.identity.EnvironmentCredential`
for more details.
2. An Azure managed identity. See :class:`~azure.identity.ManagedIdentityCredential` for more details.
3. On Windows only: a user who has signed in with a Microsoft application, such as Visual Studio. If multiple
identities are in the cache, then the value of the environment variable ``AZURE_USERNAME`` is used to select
which identity to use. See :class:`~azure.identity.SharedTokenCacheCredential` for more details.
4. The user currently signed in to Visual Studio Code.
5. The identity currently logged in to the Azure CLI.
6. The identity currently logged in to Azure PowerShell.
allowed_subscription_ids : Any
Inclusion list of subscripitions to be evaluated
denied_subscription_ids : Any
Exclude list of subscriptions to be evaluated
"""
# pylint: disable=dangerous-default-value
def __init__(self,
tenant_id,
frameworks=DEFAULT_DEFENDER_FOR_CLOUD_FRAMEWORKS,
tenant_thresholds=TENANT_THRESHOLDS,
resource_group_thresholds=RESOURCE_GROUP_THRESHOLDS,
subscription_thresholds=SUBSCRIPTION_THRESHOLDS,
credentials=None,
allowed_subscription_ids=None,
denied_subscription_ids=None,
):
self._logger = logging.getLogger(f'{LOGGER_BASENAME}.{self.__class__.__name__}')
self._tenant_id = tenant_id
self.resource_group_thresholds = resource_group_thresholds_schema.validate(resource_group_thresholds)
self.tenant_thresholds = tenant_thresholds_schema.validate(tenant_thresholds)
self.subscription_thresholds = subscription_thresholds_schema.validate(subscription_thresholds)
self.tenant_credentials = self._fetch_credentials(credentials)
self.allowed_subscription_ids = allowed_subscription_ids
self.denied_subscription_ids = denied_subscription_ids
self._tenant = Tenant(credential=self.tenant_credentials,
tenant_id=self._tenant_id,
thresholds=self.tenant_thresholds,
subscription_thresholds=self.subscription_thresholds,
resource_group_thresholds=self.resource_group_thresholds,
allowed_subscription_ids=self.allowed_subscription_ids,
denied_subscription_ids=self.denied_subscription_ids)
self._defender_for_cloud = self._initialize_defender_for_cloud(credential=self.tenant_credentials)
self._frameworks = DefenderForCloud.validate_frameworks(frameworks)
self._tenant_energy_label = None
self._labeled_subscriptions_energy_label = None
self._tenant_labeled_subscriptions = None
def _fetch_credentials(self, credentials=None):
credentials = credentials if credentials else DefaultAzureCredential()
try:
subscription_client = SubscriptionClient(credentials)
subscriptions = [subscription.display_name for subscription in subscription_client.subscriptions.list()]
self._logger.info(f'Credentials valid for: {subscriptions}')
except ClientAuthenticationError as error:
raise InvalidCredentials(error) from None
return credentials
def _initialize_defender_for_cloud(self, credential):
"""Initialize defender for cloud."""
subscription_list = [subscription.subscription_id for subscription in
self._tenant.subscriptions]
return DefenderForCloud(credential, subscription_list)
@property
@cached(cache=TTLCache(maxsize=150000, ttl=120))
def defender_for_cloud_findings(self):
"""Defender for cloud findings."""
return self._defender_for_cloud.get_findings(frameworks=self._frameworks)
@property
def filtered_defender_for_cloud_findings(self):
"""Filtered defender for cloud findings."""
not_skipped_findings = FindingParserLabeler.get_not_skipped_findings(self.defender_for_cloud_findings)
return FindingParserLabeler.exclude_findings_by_state(not_skipped_findings, FINDING_FILTERING_STATES)
@property
def matching_frameworks(self):
"""The frameworks provided to match the findings of."""
return self._frameworks
@property
def defender_for_cloud(self):
"""Defender for cloud."""
return self._defender_for_cloud
@property
def tenant(self):
"""Tenant."""
return self._tenant
@property
def tenant_energy_label(self):
"""Energy label of the Azure Tenant."""
if self._tenant_energy_label is None:
self._logger.debug(f'Tenant subscriptions labeled are {len(self._tenant.subscriptions_to_be_labeled)}')
self._tenant_energy_label = self._tenant.get_energy_label(self.defender_for_cloud_findings)
return self._tenant_energy_label
@property
def labeled_subscriptions_energy_label(self):
"""Energy label of the labeled subscriptions."""
if self._labeled_subscriptions_energy_label is None:
self._labeled_subscriptions_energy_label = self._tenant.get_energy_label_of_targeted_subscriptions(
self.defender_for_cloud_findings)
return self._labeled_subscriptions_energy_label
@property
def tenant_labeled_subscriptions(self):
"""The tenant labeled subscription objects."""
if self._tenant_labeled_subscriptions is None:
self._tenant_labeled_subscriptions = self._tenant.get_labeled_targeted_subscriptions(
self.defender_for_cloud_findings)
return self._tenant_labeled_subscriptions