Source code for datamonster_api.lib.datasource

from .base import BaseClass
from .company import Company
from .errors import DataMonsterError


[docs]class Datasource(BaseClass): """Representation of a data source in DataMonster :param _id: (str) unique internal identifier for the data source :param name: (dict) name of the data source, including the vendor for the data :param category: (list) associated categories :param uri: (str) DataMonster resource identifier associated with the data source :param dm: ``DataMonster`` object *property* **name** **Returns** (str) name of data source, including vendor *property* **category** **Returns** (str) category associated with the data source, e.g., `Web Scrape Data` or `Uploaded Data` """ def __init__(self, _id, name, category, uri, dm): self.id = _id self.name = name self.category = category self.uri = uri self.dm = dm def __hash__(self): return hash(self.id) def __eq__(self, obj): return isinstance(obj, Datasource) and self.id == obj.id
[docs] def get_details(self): """ Get details (metadata) for this data source, providing basic information as stored in DataMonster :return: (dict) """ return self.dm.get_datasource_details(self.id)
@property def companies(self): """ :return: (iter) iterable of ``Company`` objects associated with this data source, memoized """ if not hasattr(self, "_companies"): self._companies = self.dm.get_companies(datasource=self) return self._companies
[docs] def get_data(self, company, aggregation=None, start_date=None, end_date=None): """Get data for this data source. :param company: ``Company`` object to filter the data source on :param aggregation: Optional ``Aggregation`` object to specify the aggregation of the data :param start_date: Optional string to act as a filter for the start date of the data; accepted formats include: YYYY-MM-DD, MM/DD/YYYY, or pandas or regular ``datetime`` object :param end_date: Optional string to act as a filter for the end date of the data; accepted formats include: YYYY-MM-DD or MM/DD/YYYY, or pandas or regular ``datetime`` object :return: pandas.DataFrame """ return self.dm.get_data(self, company, aggregation, start_date, end_date)
[docs] def get_dimensions(self, company=None, add_company_info_from_pks=True, **kwargs): """Return the dimensions for this data source, restricted to the given company or companies and filtered by any kwargs items. Not memoized. :param company: a ``Company`` object, a list or tuple of ``Company`` objects, or ``None``. If not ``None`` the return value will only include rows corresponding to the given companies. :param add_company_info_from_pks: Determines whether return value will include tickers for the returned companies. If ``False``, only ``section_pk`` s will be returned. :param kwargs: Additional items to filter by, e.g. ``category='Banana Republic'`` :return: a ``DimensionSet`` object - an iterable through a collection of dimension dicts, filtered as requested. See `this documentation <api.html#datamonster_api.DimensionSet>`_ for more info. See `here <examples.html#get-dimensions-for-datasource>`__ for example usage of a similar function. :raises: can raise ``DataMonsterError`` if company is not of an expected type, or if some kwarg item is not JSON-serializable. """ filters = kwargs if company: if isinstance(company, Company): filters["section_pk"] = company.pk elif isinstance(company, (list, tuple)): # loop, rather than `all` and a comprehension, for better error reporting pk_list = [] for cc in company: if not isinstance(cc, Company): raise DataMonsterError( "Every item in `company` argument must be a `Company`; {!r} is not".format( cc ) ) pk_list.append(cc.pk) filters["section_pk"] = pk_list else: raise DataMonsterError( "company argument must be a `Company`, or a list or tuple of `Company`s" ) add_company_info_from_pks = bool(add_company_info_from_pks) return self.dm.get_dimensions_for_datasource( self, filters=filters, add_company_info_from_pks=bool(add_company_info_from_pks), )