Source code for datamonster_api.lib.aggregation

from .company import Company
from .errors import DataMonsterError


[docs]class Aggregation(object): """A representation of an aggregation type within DataMonster""" aggregation_period_map = { "fiscalQuarter": "fiscal quarterly", "quarter": "calendar quarterly", "month": "monthly", "week": "weekly", } def __init__(self, period, company): """Initialize an Aggregation :param str period: The period of time over which to aggregate. Valid choices are: "week", "month", "quarter", "fiscalQuarter", "year" :param company: The ``Company`` object for which fiscal quarters are calculated. Only relevant for the `fiscalQuarter` aggregation period. """ self.period = period self.company = company self.aggregation_sanity_check() def to_time_aggregation_dictionary(self, aggregation_type='sum'): """Used to extract the time aggregation dictionary for API usage from the Aggregation""" agg_dict = { 'cadence': self.aggregation_period_map[self.period], 'aggregationType': aggregation_type, 'includePTD': False } if self.period == 'fiscalQuarter': agg_dict['sectionPk'] = self.company.id return agg_dict def aggregation_sanity_check(self, company=None): if self.period is not None and self.period not in self.aggregation_period_map: raise DataMonsterError( "Bad Aggregation Period: {}. Valid choices are: {}".format( self.period, self.aggregation_period_map.keys() ) ) if self.company is not None and not isinstance(self.company, Company): raise DataMonsterError( "Aggregation company must be Company. Found : {}".format( self.company.__class__ ) ) if self.period == "fiscalQuarter": if self.company is None: raise DataMonsterError( "Company must be specified for a fiscalQuarter aggregation" ) if company and self.company.id != company.id: raise DataMonsterError( "Aggregating by the fiscal quarter of a different " "company not yet supported" )