Source code for datamonster_api.lib.data_group
from .base import BaseClass
from .errors import DataMonsterError
import numpy as np
from io import BytesIO
from .utils import dataframe_to_avro_bytes
date_regex = r'\d{4}-\d{2}-\d{2}'
max_file_size = 64 * 1024 * 1024 # flask server only allows 64MB files
data_frame_cutoff_size = 2 * max_file_size # Don't even make an avro file if the df is too big
[docs]class DataGroup(BaseClass):
"""Representation of a DataGroup in DataMonster
:param _id: (int) unique internal identifier for the Data Group
:param name: (str) name of the Data Group
:param columns: (list of ``DataGroupColumn`` objects) representing columns of uploaded data
:param status: (str, enum) Status of the DataSources in DataGroup at instantiation time. This
property is updated by ``get_current_status``. It can take one of the following three values.
`success` if all Data Sources in the group have successfully loaded
`processing` if any DataSource in the group is still processing
`error` if any DataSource in the group is in an error state
Note: `error` takes precedence over `processing`
:param dm: ``DataMonster`` object
"""
def __init__(self, _id, name, columns, status, dm):
self.id = _id
self.name = name
self.columns = columns
self.status = status
self.dm = dm
def __hash__(self):
return hash(self.id)
def __eq__(self, obj):
return isinstance(obj, DataGroup) and self.id == obj.id
[docs] def get_details(self):
"""
Get details (metadata) for this data group,
providing basic information as stored in DataMonster
:return: (dict)
"""
return self.dm.get_data_group_details(self.id)
def start_data_refresh(self, data_frame):
self._accepts(data_frame)
if sum(data_frame.memory_usage()) > data_frame_cutoff_size:
raise DataMonsterError('Data Too Large. Data Groups can be refreshed with data < 64 MB.')
avro_file = BytesIO(dataframe_to_avro_bytes(data_frame, 'upload_data', 'com.adaptivemgmt.upload'))
if avro_file.getbuffer().nbytes > max_file_size:
raise DataMonsterError('Data Too Large. Data Groups can be refreshed with data < 64 MB.')
files = {'avro_file': avro_file}
headers = {'Accept': 'avro/binary'}
try:
return self.dm.client.post(self._get_refresh_url(), {}, headers=headers, files=files)
except Exception:
raise DataMonsterError('Unknown problem refreshing data. Please contact DataMonster Customer Service.')
def _get_refresh_url(self):
return '{}/refresh'.format(self.dm._get_data_group_path(self.id))
[docs] def get_current_status(self):
"""
Query Data Monster servers for the most up-to-date status of this DataGroup.
Calling this method will update the `status` field on this instance and return it.
:return: The status of this DataGroup. Values can be one of the following:
`success` if all Data Sources in the group have successfully loaded
`processing` if any DataSource in the group is still processing
`error` if any DataSource in the group is in an error state
Note: `error` takes precedence over `processing`
"""
try:
res = self.dm.client.get(self._get_status_url())
except Exception:
raise DataMonsterError('Unknown problem fetching current status. ' +
'Please contact DataMonster Customer Service.')
if res['_id'] == self.id and res['status'] is not None:
self.status = res['status']
return self.status
raise DataMonsterError('Unknown problem fetching current status. ' +
'Please contact DataMonster Customer Service.')
def _get_status_url(self):
return '{}/status'.format(self.dm._get_data_group_path(self.id))
@staticmethod
def _get_dgctype_(column):
if hasattr(column, 'str') and column.str.match(date_regex).any():
return 'date'
elif np.issubdtype(column, np.number):
return 'number'
elif np.issubdtype(column, np.object_) or np.issubdtype(column, np.str_):
return 'string'
@staticmethod
def _construct_error_message(missing, extras, bad_dates):
msg = ['Invalid DataFrame Schema:']
if missing:
msg.append(' DataGroup could not find the following column{}:'.format(
's' if len(missing) > 1 else ''
))
for miss in missing:
msg.append(' name: "{}", type: {}'.format(miss.name, miss.type_))
if extras:
msg.append(' DataGroup was not expecting the following column{}:'.format(
's' if len(extras) > 1 else ''
))
for extra in extras:
msg.append(' name: "{}", type: {}'.format(extra.name, extra.type_))
if bad_dates:
msg.append(' The following column{} expected to contain only YYYY-MM-DD dates but did not:'.format(
's were' if len(bad_dates) > 1 else ' was'
))
for bad_date in bad_dates:
msg.append(' name: "{}", type: {}\n'.format(bad_date.name, bad_date.type_))
return '\n'.join(msg)
def _validate_schema(self, df):
"""Check if the schema of a provided pandas dataframe matches the expected columns"""
extra = []
# Find missing columns
missing = [col for col in self.columns if not col._exists_in_df(df)]
# Find extra columns
if len(df.columns) + len(missing) != len(self.columns):
col_names_totype_s = {c.name: c.type_ for c in self.columns}
for col in df.columns:
dgctype_ = self._get_dgctype_(df[col])
if col not in col_names_totype_s or dgctype_ != col_names_totype_s[col]:
extra.append(DataGroupColumn(col, dgctype_))
# Verify date columns are complete
date_columns = [col for col in self.columns if col.type_ == 'date' and col not in missing]
bad_dates = [column for column in date_columns if
not hasattr(df[column.name], 'str') or
not df[column.name].str.match(date_regex).all()]
return missing, extra, bad_dates
def _accepts(self, df):
"""Check if DataGroup could run a refresh with the given data frame"""
missing, extra, bad_dates = self._validate_schema(df)
if missing or bad_dates:
raise DataMonsterError(self._construct_error_message(missing, extra, bad_dates))
[docs]class DataGroupColumn(object):
"""Representation of a DataGroupColumn in DataMonster
:param name: (str) name of the DataGroupColumn
:param type_: (enum 'string', 'number' or 'date') expected data type of the column
"""
def __init__(self, name=None, type_=None):
self.name = name
self.type_ = type_
def __repr__(self):
return "<{}: {}>".format(self.__class__.__name__, self.name)
def _exists_in_df(self, df):
"""Return true if this DataGroupColumn is represented in ``df``
:param df: Pandas Data Frame
:return: True iff a column exists in ``df`` that matches self.name and self.type_
"""
if self.name in df.columns:
column = df[self.name]
if self.type_ == 'string':
# pandas maps strings to objects
return np.issubdtype(column, np.object_) or np.issubdtype(column, np.str_)
elif self.type_ == 'number':
return np.issubdtype(column, np.number)
elif self.type_ == 'date':
return column.str.match(date_regex).any()
else:
raise DataMonsterError('Unrecognized column type of column {}'.format(self.name))