Source code for datamonster_api.lib.datamonster

import datetime
import fastavro
import json
import pandas
import six

from .client import Client
from .company import Company
from .data_group import DataGroup, DataGroupColumn
from .datasource import Datasource
from .errors import DataMonsterError

__all__ = ["DataMonster", "DimensionSet"]


[docs]class DataMonster(object): """DataMonster object. Main entry point to the library :param key_id: (str) a user's public key :param secret: (str) a user's secret key :param server: (optional, str) default to dm.adaptivemgmt.com :param verify: (optional, bool) whether to verify the server's TLS certificate """ company_path = "/rest/v1/company" datasource_path = "/rest/v1/datasource" dimensions_path = "/rest/v1/datasource/{}/dimensions" data_group_path = '/rest/v1/data_group' rawdata_path = "/rest/v2/datasource/{}/rawdata" DATAMONSTER_SCHEMA_FIELDS = { "lower_date": "start_date", "upper_date": "end_date", "value": "value", } def __init__(self, key_id, secret, server=None, verify=True): self.client = Client(key_id, secret, server, verify) self.key_id = key_id self.secret = secret def _get_paginated_results(self, url): """Get the paginated results starting with this url""" next_page = url while next_page is not None: resp = self.client.get(next_page) for result in resp["results"]: yield result next_page = resp["pagination"]["nextPageURI"] @staticmethod def _check_param(company=None, datasource=None): if company is not None and not isinstance(company, Company): raise DataMonsterError("company argument must be a Company object") if datasource is not None and not isinstance(datasource, Datasource): raise DataMonsterError("datasource argument must be a Datasource object") ############################################## # Company methods ##############################################
[docs] def get_company_by_ticker(self, ticker): """Get a single company by ticker :param ticker: Ticker to search for :return: Single ``Company`` object if any companies exactly match the ticker (case insensitive) :raises: ``DataMonsterError`` if no companies match ticker """ ticker = ticker.lower() companies = self.get_companies(ticker) for company in companies: if company.ticker is not None and company.ticker.lower() == ticker: return company raise DataMonsterError("Could not find company with ticker {}".format(ticker))
[docs] def get_company_by_id(self, company_id): """Get a single company by id :param company_id: (str or int) unique internal identifier for the desired company. Can take str form e.g. '718', or int form, e.g. 707. In order to find the id of a frequently used company, find the company by ticker and call ``.pk`` on the resulting ``Company`` object :return: Single ``Company`` object if any company matches the id :raises: ``DataMonsterError`` if no company matches id """ company = self.get_company_details(company_id) company["uri"] = self._get_company_path(company_id) return self._company_result_to_object(company, has_details=True)
[docs] def get_companies(self, query=None, datasource=None): """Get available companies :param query: Optional query that will restrict companies by ticker or name :param datasource: Optional ``Datasource`` object that restricts companies to those covered by the given data source :return: Iterator of ``Company`` objects """ params = {} if query: params["q"] = query if datasource: self._check_param(datasource=datasource) params["datasourceId"] = datasource.id url = self.company_path if params: url = "".join([url, "?", six.moves.urllib.parse.urlencode(params)]) companies = self._get_paginated_results(url) return six.moves.map(self._company_result_to_object, companies)
[docs] def get_company_details(self, company_id): """Get details for the given company :param company_id: (str or int) unique internal identifier for company. See the method |br| `get_company_by_id <api.html#datamonster_api.DataMonster.get_company_by_id>`__ for more info on ``company_id``. :return: (dict) details (metadata) for this company, providing basic information. """ path = self._get_company_path(company_id) return self.client.get(path)
def _get_company_path(self, company_id): return "{}/{}".format(self.company_path, company_id) def _company_result_to_object(self, company, has_details=False): company_inst = Company( company["id"], company["ticker"], company["name"], company["uri"], self ) if has_details: company_inst.set_details(company) return company_inst ############################################## # Datasource methods ##############################################
[docs] def get_datasources(self, query=None, company=None): """Get available datasources :param query: (str) Optional query that will restrict data sources by name or provider name :param company: Optional ``Company`` object that restricts data sources to those that cover the given company :return: Iterator of ``Datasource`` objects """ params = {} if query: params["q"] = query if company: self._check_param(company=company) params["companyId"] = company.id url = self.datasource_path if params: url = "".join([url, "?", six.moves.urllib.parse.urlencode(params)]) datasources = self._get_paginated_results(url) return six.moves.map(self._datasource_result_to_object, datasources)
[docs] def get_datasource_by_name(self, name): """Given a name, try to find a data source of that name :param name: (str) :return: Single ``Datasource`` object with the given name :raises: ``DataMonsterError`` if no data source matches the given name """ for ds in self.get_datasources(query=name): if ds.name.lower() == name.lower(): return ds raise DataMonsterError( "Did not find a data source matching the name {!r}".format(name) )
[docs] def get_datasource_by_id(self, datasource_id): """Given a data source UUID (universal unique identifier), return the corresponding ``Datasource`` object. To find the UUID for a data source, first find it by name, then call ``.id`` on the resulting ``Datasource`` object. :param datasource_id: (str) :return: Single ``Datasource`` object with the given id :raises: ``DataMonsterError`` if no data source matches the given id """ datasource = self.get_datasource_details(datasource_id) datasource["uri"] = self._get_datasource_path(datasource_id) return self._datasource_result_to_object(datasource, has_details=True)
[docs] def get_datasource_details(self, datasource_id): """Get details (metadata) for the data source corresponding to the given UUID :param datasource_id: (str) See the method |br| `get_datasource_by_id <api.html#datamonster_api.DataMonster.get_datasource_by_id>`__ for more info on ``datasource_id`` :return: (dict) details (metadata) for this data source, providing basic information. """ path = self._get_datasource_path(datasource_id) return self.client.get(path)
def _get_datasource_path(self, datasource_id): return "{}/{}".format(self.datasource_path, datasource_id) def _get_data_group_path(self, data_group_id): return '{}/{}'.format(self.data_group_path, data_group_id) def _get_dimensions_path(self, uuid): return self.dimensions_path.format(uuid) def _datasource_result_to_object(self, datasource, has_details=False): ds_inst = Datasource( datasource["id"], datasource["name"], datasource["category"], datasource["uri"], self, ) if has_details: ds_inst.set_details(datasource) return ds_inst
[docs] def get_data( self, datasource, company, aggregation=None, start_date=None, end_date=None ): """Get data for data source :param datasource: ``Datasource`` object to get the data for :param company: ``Company`` object to filter the data source on :param aggregation: Optional ``Aggregation`` object to specify the aggregation of the data :param start_date: Optional filter for the start date of the data :param end_date: Optional filter for the end date of the data See `here <quickstart.html#>`__ for example usage. :return: pandas.DataFrame """ # todo: support multiple companies self._check_param(company=company, datasource=datasource) filters = {"section_pk": [int(company.id)]} if start_date is not None: if not datasource.upperDateField: raise DataMonsterError("This data source does not support date queries") if end_date is not None: if not datasource.lowerDateField: raise DataMonsterError("This data source does not support date queries") if aggregation is not None and aggregation.period == 'fiscalQuarter' and aggregation.company != company: raise DataMonsterError("Aggregating by the fiscal quarter of a different company not yet supported") schema, df = self.get_data_raw(datasource, filters, aggregation) if datasource.type == "datasource": df = self._datamonster_data_mapper( self.DATAMONSTER_SCHEMA_FIELDS, schema, df ) # Trim the dates on the client side. This would be more efficient on the server, but we don't support # greater than or less than right now if start_date is not None and 'end_date' in df: df = df[df.end_date >= pandas.Timestamp(start_date)] if end_date is not None and 'start_date' in df: df = df[df.start_date <= pandas.Timestamp(end_date)] if "end_date" in df: df.sort_values(by="end_date", inplace=True) return df
[docs] def get_data_raw(self, datasource, filters=None, aggregation=None): """Get raw data for all companies available in the data source. :param datasource: ``Datasource`` object to get the data for :param aggregation: ``Aggregation`` object to specify requested aggregation :param filters: dictionary of requested filters :return: (schema, pandas.DataFrame) See `here <examples.html#get-data-raw>`__ for example usage. """ post_data = { 'forecast': False, 'valueAggregation': None, 'timeAggregation': None, } if filters is not None: post_data['filters'] = filters if aggregation is not None: post_data['timeAggregation'] = aggregation.to_time_aggregation_dictionary( datasource.aggregationType ) headers = {"Accept": "avro/binary", 'Content-Type': 'application/json'} url = self.rawdata_path.format(datasource.id) resp = self.client.post(url, post_data, headers, stream=True) return self._avro_to_df(resp.content, datasource.fields)
[docs] def get_raw_data(self, *args, **kwargs): """This function is deprecated. Please use the get_data_raw function instead""" raise DataMonsterError("This function has been deprecated. Please use get_data_raw")
def _avro_to_df(self, avro_buffer, data_types): """Read an avro structure into a dataframe and minimially parse it returns: (schema, pandas.Dataframe) """ def parse_row(row): return { col["name"]: pandas.to_datetime(row[col["name"]]) if col["data_type"] == "date" else row[col["name"]] for col in data_types } reader = fastavro.reader(six.BytesIO(avro_buffer)) metadata = reader.writer_schema.get("structure", ()) if not metadata: raise DataMonsterError( "DataMonster does not currently support this request" ) records = [parse_row(r) for r in reader] return metadata, pandas.DataFrame.from_records(records) @staticmethod def _datamonster_data_mapper(mapping_fields, schema, df): """mapping function applied to a ``DataMonster`` data source to format the data :param mapping_fields (dict): mapping of column names to rename from in the schema :param schema (dict): avro schema of the data :param df (pandas.DataFrame): data to manipulate :return: pandas.DataFrame """ if df.empty: return df if not set(schema.keys()).issuperset(mapping_fields.keys()): raise DataMonsterError( "DataMonster does not currently support this request" ) split_columns = schema.get("split", []) rename_columns = {} for key, val in mapping_fields.items(): if len(schema[key]) != 1: raise DataMonsterError( "Expected a single defined column for {!r}. Got {!r}".format( key, schema[key] ) ) rename_columns[schema[key][0]] = val df.rename(columns=rename_columns, inplace=True) df["dimensions"] = df.apply( lambda row, *splits: {split: row[split] for split in splits}, args=(split_columns), axis=1, ) df["time_span"] = df["end_date"] - df["start_date"] df["end_date"] -= datetime.timedelta( days=1 ) # Change the format of the end_date drop_columns = [col for col in split_columns + ["section_pk"] if col in df] df.drop(columns=drop_columns, inplace=True) return df
[docs] def get_dimensions_for_datasource( self, datasource, filters=None, add_company_info_from_pks=False ): """Get dimensions ("splits") for the data source from the DataMonster REST endpoint ``/datasource/<uuid>/dimensions?filters=...`` where the ``filters`` string is optional. :param datasource: ``Datasource`` object :param filters: (dict): a dict of key/value pairs to filter dimensions by :param add_company_info_from_pks: (bool): Determines whether return value will include tickers for the returned companies. If ``False``, only ``section_pk`` s will be returned. See `here <examples.html#get-dimensions-for-datasource>`__ for example usage. :return: a ``DimensionSet`` object - an iterable through a collection of dimension dicts, filtered as requested. See `this documentation <api.html#datamonster_api.DimensionSet>`_ for more info on ``DimensionSet`` objects. :raises: ``DataMonsterError`` if ``filters`` is not a dict or is not JSON-serializable. Re-raises ``DataMonsterError`` if ``self.client.get()`` raises that. """ self._check_param(datasource=datasource) params = {} if filters: params["filters"] = self.to_json_checked(filters) url = self._get_dimensions_path(uuid=datasource.id) if params: url = "".join([url, "?", six.moves.urllib.parse.urlencode(params)]) # Let any DataMonsterError from self.client.get() happen -- we don't occlude them return DimensionSet( url, self, add_company_info_from_pks=add_company_info_from_pks )
@staticmethod def to_json_checked(filters): """ Not "private" because `Datasource.get_dimensions()` uses it too :param filters: dict :return: JSON string encoding `filters`. Normal exit if `filters` is JSON-serializable. :raises: DataMonsterError if `filters` isn't a dict or can't be JSON-encoded. """ if not isinstance(filters, dict): raise DataMonsterError( "`filters` must be a dict, got {} instead".format( type(filters).__name__ ) ) try: return json.dumps(filters) except TypeError as e: raise DataMonsterError( "Problem with filters when getting dimensions: {}".format(e) ) ############################################## # DataGroup methods ##############################################
[docs] def get_data_groups(self, query=None): """Get available data groups :param query: (str) Optional query that will restrict data groups by name or data source name :return: Iterator of ``DataGroup`` objects. """ params = {} if query is not None: params['q'] = query url = self.data_group_path if params: url = ''.join([url, '?', six.moves.urllib.parse.urlencode(params)]) datagroups = self._get_paginated_results(url) return six.moves.map(self._data_group_result_to_object, datagroups)
[docs] def get_data_group_details(self, id): """Given a data group id, return the corresponding ``DataGroup`` object :param id: (int) :return: Single ``DataGroup`` object with the given id :raises: ``DataMonsterError`` if no data group matches the given id """ path = self._get_data_group_path(id) return self.client.get(path)
[docs] def get_data_group_by_id(self, id): """Give a data group pk (primary key), return the corresponding ``DataGroup`` object. To find the pk for a data group, first find it using the iterator returned by ``get_data_groups()``, then call ``.id`` on the ``DataGroup`` object. :param id: (int) :return: Single ``DataGroup`` object with the given id :raises: ``DataMonsterError`` if no data group matches the given id """ dg = self.get_data_group_details(id) return self._data_group_result_to_object(dg, has_details=True)
def _data_group_result_to_object(self, data_group, has_details=False): columns = [DataGroupColumn(**column) for column in data_group['columns']] dg_inst = DataGroup( data_group['_id'], data_group['name'], columns, data_group['status'], self ) if has_details: dg_inst.set_details(data_group) return dg_inst
[docs]class DimensionSet(object): """ An iterable through a collection of dimensions dictionaries. Each dimension dictionary has 4 keys: ``max_date``, ``min_date``, ``row_count``, and ``split_combination``. The first two have values that are dates as strings in ISO format; ``split_combination`` points to a dict containing data from all other columns; ``row_count`` points to an int specifying how many rows match the dates and all splits in ``split_combination`` """ def __init__(self, url, dm, add_company_info_from_pks): """ :param url: (string) URL for REST endpoint :param dm: DataMonster object :param add_company_info_from_pks: (bool) If ``True``, create ticker items from ``section_pk`` items. """ self._url_orig = url resp0 = dm.client.get(url) self._min_date = resp0["minDate"] self._max_date = resp0["maxDate"] self._row_count = resp0["rowCount"] self._dimension_count = resp0["dimensionCount"] self._resp = resp0 self._dm = dm self._add_company_info_from_pks = bool(add_company_info_from_pks) # Populated during iteration, maps pk => Company. # Contents are not "settled" until iteration is complete. self._pk2company = {} def __str__(self): has_extra_info_str = ( "; extra company info" if self.has_extra_company_info else "" ) "{}: {} dimensions, {} rows, from {} to {}{}".format( self.__class__.__name__, len(self), self._row_count, self._min_date, self._max_date, has_extra_info_str, ) def __len__(self): """ (int) number of *dimension dicts* in the collection """ return self._dimension_count def __iter__(self): """Generator that iterates through the dimension dicts in the collection. Populates self.pk2company during iteration: `section_pk`s already in this dict will use the tickers (/names) of `Company`s already looked up and saved; newly-encountered `section_pk`s will have their corresponding `Company`s saved here """ while True: resp = self._resp # shorthand if not resp: return results_this_page = resp["results"] next_page_uri = resp["pagination"]["nextPageURI"] if not results_this_page: break for dimension in results_this_page: # do `_camel2snake` *before* possible pk->ticker conversion, # as `_create_ticker_items_from_section_pks` assumes snake_case # ('split_combination') dimension = DimensionSet._camel2snake(dimension) if self._add_company_info_from_pks: self._create_ticker_items_from_section_pks(dimension) yield dimension if next_page_uri is None: break self._resp = self._dm.client.get(next_page_uri) # So that attempts to reuse the iterator get nothing. # Without this, the last page could be re-yielded self._resp = None @property def pk2company(self): """Empty if ``has_extra_company_info`` is ``False``. If ``has_extra_company_info``, this dict maps company pk's (int id's) to ``Company`` objects. If ``pk`` is a key in the dict, then ``self.pk2company[pk].pk == pk``. The pk's in ``pk2company`` are those in the ``section_pk`` items of dimension dicts in this collection. (``section_pk`` items are in the ``split_combination`` subdict of a dimension dict.) During an iteration, ``pk2company`` contains all pk's from ``section_pk`` values in dimension dicts *that have been yielded so far*. Thus, ``pk2company`` is initially empty, and isn't fully populated until the iteration completes. Note that making a *list* of a ``DimensionSet`` performs a complete iteration. :return: (dict) """ return self._pk2company @property def min_date(self): """ :return type: (str) min of the ``min_date`` of the dimension dicts """ return self._min_date @property def max_date(self): """ :return: (str) max of the ``max_date`` of the dimension dicts """ return self._max_date @property def row_count(self): """ :return: (int) number of rows matching the filters for this ``DimensionSet`` """ return self._row_count @property def has_extra_company_info(self): """ :return: (bool) The value passed as ``add_company_info_from_pks`` to the constructor, coerced to *bool*. """ return self._add_company_info_from_pks @staticmethod def _camel2snake(dimension_dict): """Return a dict with four keys changed from camelCase to snake_case; `dimension_dict` unchanged """ camel2snake = { "splitCombination": "split_combination", "maxDate": "max_date", "minDate": "min_date", "rowCount": "row_count", } return {camel2snake[k]: dimension_dict[k] for k in dimension_dict} def _create_ticker_items_from_section_pks(self, dimension): """ :param dimension: a dimension dict, with a key 'split_combination'. :return: `None` Mutates the dict `dimension: if 'section_pk' in `dimension['split_combination']`, its value:: dimension['split_combination']['section_pk"] is a 'section_pk'` or a list of them (we accommodate `None`, too). We add a `'ticker'` item to dimension['split_combination'] whose value is the ticker or tickers for the pk's in the value of 'section_pk'` -- more precisely, the value corresponding to any `pk` is: `self._pk_to_ticker(pk)` if ticker is not `None`, name of company with key `pk` if ticker is `None` """ combo = dimension["split_combination"] if "section_pk" in combo: value = combo.get("section_pk") # type: int or list[int] if value is not None: combo["ticker"] = ( self._pk_to_ticker(value) if isinstance(value, int) else list(six.moves.map(lambda pk: self._pk_to_ticker(pk), value)) ) return dimension def _pk_to_ticker(self, pk): """ :param pk: int -- a section_pk :return: str -- `self._dm.get_company_from_id(pk).ticker` if that is not `None`, name of company with key `pk` otherwise (actual ticker is `None` or empty) Note that `self._pk2company` basically holds memos for this method: for each `pk`, `self._dm.get_company_from_id(pk)` is only called once. """ if pk not in self._pk2company: self._pk2company[pk] = self._dm.get_company_by_id(pk) company = self._pk2company[pk] return company.ticker or company.name