diff --git a/.gitignore b/.gitignore index 5fb9841..5e86898 100755 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,7 @@ *.pyc *.egg-info dist +build *-jgrover* *.log .coverage diff --git a/build.sh b/build.sh deleted file mode 100755 index c4e9b98..0000000 --- a/build.sh +++ /dev/null @@ -1,6 +0,0 @@ -#!/bin/bash - -python setup.py sdist --formats=gztar,zip -python setup.py bdist --format=gztar,zip -python setup.py bdist_egg - diff --git a/build/lib/omniture/__init__.py b/build/lib/omniture/__init__.py deleted file mode 100644 index fa5e20f..0000000 --- a/build/lib/omniture/__init__.py +++ /dev/null @@ -1,85 +0,0 @@ -# encoding: utf-8 -from __future__ import absolute_import - -import os -import json -import logging.config -import io - -from .account import Account, Suite -from .elements import Value -from .query import Query, ReportNotSubmittedError -from .reports import InvalidReportError, Report, DataWarehouseReport -from .version import __version__ -from .utils import AddressableList, affix - - -def authenticate(username, secret=None, endpoint=Account.DEFAULT_ENDPOINT, prefix='', suffix=''): - """ Authenticate to the Adobe API using WSSE """ - #setup logging - setup_logging() - # if no secret is specified, we will assume that instead - # we have received a dictionary with credentials (such as - # from os.environ) - if not secret: - source = username - key_to_username = affix(prefix, 'OMNITURE_USERNAME', suffix) - key_to_secret = affix(prefix, 'OMNITURE_SECRET', suffix) - username = source[key_to_username] - secret = source[key_to_secret] - - return Account(username, secret, endpoint) - - -def queue(queries): - if isinstance(queries, dict): - queries = queries.values() - - for query in queries: - query.queue() - - -def sync(queries, heartbeat=None, interval=1): - """ - `omniture.sync` will queue a number of reports and then - block until the results are all ready. - - Queueing reports is idempotent, meaning that you can also - use `omniture.sync` to fetch the results for queries that - have already been queued: - - query = mysuite.report.range('2013-06-06').over_time('pageviews', 'page') - omniture.queue(query) - omniture.sync(query) - - The interval will operate under an exponetial decay until it reaches 5 minutes. At which point it will ping every 5 minutes - """ - - queue(queries) - - if isinstance(queries, list): - return [query.sync(heartbeat, interval) for query in queries] - elif isinstance(queries, dict): - return {key: query.sync(heartbeat, interval) for key, query in queries.items()} - else: - message = "Queries should be a list or a dictionary, received: {}".format( - queries.__class__) - raise ValueError(message) - - -def setup_logging(default_path='logging.json', default_level=logging.INFO, env_key='LOG_CFG'): - """Setup logging configuration. """ - path = default_path - value = os.getenv(env_key, None) - if value: - path = value - if os.path.exists(path): - with io.open(path, 'rt') as f: - config = json.load(f) - f.close() - logging.config.dictConfig(config) - requests_log = logging.getLogger("requests") - requests_log.setLevel(logging.WARNING) - - else: - logging.basicConfig(level=default_level) diff --git a/build/lib/omniture/account.py b/build/lib/omniture/account.py deleted file mode 100644 index 23967cd..0000000 --- a/build/lib/omniture/account.py +++ /dev/null @@ -1,270 +0,0 @@ -from __future__ import absolute_import -from __future__ import print_function - -import requests -import binascii -import json -from datetime import datetime, date -import logging -import uuid -import hashlib -import base64 -import os -from datetime import datetime - -from .elements import Value -from .query import Query -from . import reports -from . import utils - - -class Account(object): - """ A wrapper for the Adobe Analytics API. Allows you to query the reporting API """ - DEFAULT_ENDPOINT = 'https://api.omniture.com/admin/1.4/rest/' - - def __init__(self, username, secret, endpoint=DEFAULT_ENDPOINT, cache=False, cache_key=None): - """Authentication to make requests.""" - self.log = logging.getLogger(__name__) - self.log.info(datetime.now().strftime("%Y-%m-%d %I%p:%M:%S")) - self.username = username - self.secret = secret - self.endpoint = endpoint - #Allow someone to set a custom cache key - self.cache = cache - if cache_key: - self.cache_key = cache_key - else: - self.cache_key = date.today().isoformat() - if self.cache: - data = self.request_cached('Company', 'GetReportSuites')['report_suites'] - else: - data = self.request('Company', 'GetReportSuites')['report_suites'] - suites = [Suite(suite['site_title'], suite['rsid'], self) for suite in data] - self.suites = utils.AddressableList(suites) - - def request_cached(self, api, method, query={}, cache_key=None): - if cache_key: - key = cache_key - else: - key = self.cache_key - - #Generate a shortened hash of the query string so that method don't collide - query_hash = base64.urlsafe_b64encode(hashlib.md5(query).digest()) - - try: - with open(self.file_path+'/data_'+api+'_'+method+'_'+query_hash+'_'+key+'.txt') as fp: - for line in fp: - if line: - data = ast.literal_eval(line) - - except IOError as e: - data = self.request(api, method, query) - - # Capture all other old text files - #TODO decide if the query should be included in the file list to be cleared out when the cache key changes - filelist = [f for f in os.listdir(self.file_path) if f.startswith('data_'+api+'_'+method)] - - # Delete them - for f in filelist: - os.remove(self.file_path+'/'+f) - - # Build the new data - the_file = open(self.file_path+'/data_'+api+'_'+method+'_'+query_hash+'_'+key+'.txt', 'w') - the_file.write(str(data)) - the_file.close() - - - def request(self, api, method, query={}): - """ - Make a request to the Adobe APIs. - - * api -- the class of APIs you would like to call (e.g. Report, - ReportSuite, Company, etc.) - * method -- the method you would like to call inside that class - of api - * query -- a python object representing the parameters you would - like to pass to the API - """ - self.log.info("Request: %s.%s Parameters: %s", api, method, query) - response = requests.post( - self.endpoint, - params={'method': api + '.' + method}, - data=json.dumps(query), - headers=self._build_token() - ) - self.log.debug("Response for %s.%s:%s", api, method, response.text) - json_response = response.json() - - if type(json_response) == dict: - self.log.debug("Error Code %s", json_response.get('error')) - if json_response.get('error') == 'report_not_ready': - raise reports.ReportNotReadyError(json_response) - elif json_response.get('error') != None: - raise reports.InvalidReportError(json_response) - else: - return json_response - else: - return json_response - - def jsonReport(self, reportJSON): - """Generates a Report from the JSON (including selecting the report suite)""" - if type(reportJSON) == str: - reportJSON = json.loads(reportJSON) - suiteID = reportJSON['reportDescription']['reportSuiteID'] - suite = self.suites[suiteID] - return suite.jsonReport(reportJSON) - - - def _serialize_header(self, properties): - header = [] - for key, value in properties.items(): - header.append('{key}="{value}"'.format(key=key, value=value)) - return ', '.join(header) - - def _build_token(self): - nonce = str(uuid.uuid4()) - base64nonce = binascii.b2a_base64(binascii.a2b_qp(nonce)) - created_date = datetime.utcnow().isoformat() + 'Z' - sha = nonce + created_date + self.secret - sha_object = hashlib.sha1(sha.encode()) - password_64 = binascii.b2a_base64(sha_object.digest()) - - properties = { - "Username": self.username, - "PasswordDigest": password_64.decode().strip(), - "Nonce": base64nonce.decode().strip(), - "Created": created_date, - } - header = 'UsernameToken ' + self._serialize_header(properties) - - return {'X-WSSE': header} - - def _repr_html_(self): - """ Format in HTML for iPython Users """ - html = "" - html += "{0}: {1}
".format("Username", self.username) - html += "{0}: {1}
".format("Secret", "***************") - html += "{0}: {1}
".format("Report Suites", len(self.suites)) - html += "{0}: {1}
".format("Endpoint", self.endpoint) - return html - - def __str__(self): - return "Analytics Account -------------\n Username: \ - {0} \n Report Suites: {1} \n Endpoint: {2}" \ - .format(self.username, len(self.suites), self.endpoint) - - -class Suite(Value): - """Lets you query a specific report suite. """ - def request(self, api, method, query={}): - raw_query = {} - raw_query.update(query) - if method == 'GetMetrics' or method == 'GetElements': - raw_query['reportSuiteID'] = self.id - - return self.account.request(api, method, raw_query) - - def __init__(self, title, id, account, cache=False): - self.log = logging.getLogger(__name__) - super(Suite, self).__init__(title, id, account) - self.account = account - - @property - @utils.memoize - def metrics(self): - """ Return the list of valid metricsfor the current report suite""" - if self.account.cache: - data = self.request_cache('Report', 'GetMetrics') - else: - data = self.request('Report', 'GetMetrics') - return Value.list('metrics', data, self, 'name', 'id') - - @property - @utils.memoize - def elements(self): - """ Return the list of valid elementsfor the current report suite """ - if self.account.cache: - data = self.request_cached('Report', 'GetElements') - else: - data = self.request('Report', 'GetElements') - return Value.list('elements', data, self, 'name', 'id') - - @property - @utils.memoize - def segments(self): - """ Return the list of valid segments for the current report suite """ - if self.account.cache: - data = self.request_cached('Segments', 'Get',{"accessLevel":"shared"}) - else: - data = self.request('Segments', 'Get',{"accessLevel":"shared"}) - return Value.list('segments', data, self, 'name', 'id',) - - @property - def report(self): - """ Return a report to be run on this report suite """ - return Query(self) - - def jsonReport(self,reportJSON): - """Creates a report from JSON. Accepts either JSON or a string. Useful for deserializing requests""" - q = Query(self) - #TODO: Add a method to the Account Object to populate the report suite this call will ignore it on purpose - if type(reportJSON) == str: - reportJSON = json.loads(reportJSON) - - reportJSON = reportJSON['reportDescription'] - - if 'dateFrom' in reportJSON and 'dateTo' in reportJSON: - q = q.range(reportJSON['dateFrom'],reportJSON['dateTo']) - elif 'dateFrom' in reportJSON: - q = q.range(reportJSON['dateFrom']) - elif 'date' in reportJSON: - q = q.range(reportJSON['date']) - else: - q = q - - if 'dateGranularity' in reportJSON: - q = q.granularity(reportJSON['dateGranularity']) - - if 'source' in reportJSON: - q = q.set('source',reportJSON['source']) - - if 'metrics' in reportJSON: - for m in reportJSON['metrics']: - q = q.metric(m['id']) - - if 'elements' in reportJSON: - for e in reportJSON['elements']: - id = e['id'] - del e['id'] - q= q.element(id, **e) - - if 'locale' in reportJSON: - q = q.set('locale',reportJSON['locale']) - - if 'sortMethod' in reportJSON: - q = q.set('sortMethod',reportJSON['sortMethod']) - - if 'sortBy' in reportJSON: - q = q.sortBy(reportJSON['sortBy']) - - #WARNING This doesn't carry over segment IDs meaning you can't manipulate the segments in the new object - #TODO Loop through and add segment ID with filter method (need to figure out how to handle combined) - if 'segments' in reportJSON: - q = q.set('segments', reportJSON['segments']) - - if 'anomalyDetection' in reportJSON: - q = q.set('anomalyDetection',reportJSON['anomalyDetection']) - - if 'currentData' in reportJSON: - q = q.set('currentData',reportJSON['currentData']) - - if 'elementDataEncoding' in reportJSON: - q = q.set('elementDataEncoding',reportJSON['elementDataEncoding']) - return q - - def _repr_html_(self): - """ Format in HTML for iPython Users """ - return "{0}{1}".format(self.id, self.title) - - def __str__(self): - return "ID {0:25} | Name: {1} \n".format(self.id, self.title) diff --git a/build/lib/omniture/elements.py b/build/lib/omniture/elements.py deleted file mode 100644 index 78ab08a..0000000 --- a/build/lib/omniture/elements.py +++ /dev/null @@ -1,48 +0,0 @@ -# encoding: utf-8 -from __future__ import absolute_import -from __future__ import print_function - -import copy -import logging - -from .import utils - - -class Value(object): - """ Searchable Dict. Can search on both the key and the value """ - def __init__(self, title, id, parent, extra={}): - self.log = logging.getLogger(__name__) - self.title = str(title) - self.id = id - self.parent = parent - self.properties = {'id': id} - - for k, v in extra.items(): - setattr(self, k, v) - - @classmethod - def list(cls, name, items, parent, title='title', id='id'): - values = [cls(item[title], str(item[id]), parent, item) for item in items] - return utils.AddressableList(values, name) - - def __repr__(self): - print(self) - return "<{title}: {id} in {parent}>".format(**self.__dict__) - - def copy(self): - value = self.__class__(self.title, self.id, self.parent) - value.properties = copy.copy(self.properties) - return value - - def serialize(self): - return self.properties - - def _repr_html_(self): - """ Format in HTML for iPython Users """ - return "{0}{1}".format(self.id, self.title) - - - def __str__(self): - """ allows users to print this out in a user friendly using print - """ - return "ID {0:25} | Name: {1} \n".format(self.id, self.title) diff --git a/build/lib/omniture/query.py b/build/lib/omniture/query.py deleted file mode 100644 index 0393fcd..0000000 --- a/build/lib/omniture/query.py +++ /dev/null @@ -1,408 +0,0 @@ -# encoding: utf-8 -from __future__ import absolute_import -from __future__ import print_function - -import time -from copy import copy, deepcopy -import functools -from dateutil.relativedelta import relativedelta -import json -import logging -import sys - -from .elements import Value -from . import reports -from . import utils - - -def immutable(method): - @functools.wraps(method) - def wrapped_method(self, *vargs, **kwargs): - obj = self.clone() - method(obj, *vargs, **kwargs) - return obj - - return wrapped_method - -class ReportNotSubmittedError(Exception): - """ Exception that is raised when a is requested by hasn't been submitted - to Adobe - """ - def __init__(self,error): - self.log = logging.getLogger(__name__) - self.log.debug("Report Has not been submitted, call async() or run()") - super(ReportNotSubmittedError, self).__init__("Report Not Submitted") - -class Query(object): - """ Lets you build a query to the Reporting API for Adobe Analytics. - - Methods in this object are chainable. For example - >>> report = report.element("page").element("prop1"). - metric("pageviews").granularity("day").run() - Making it easy to create a report. - - To see the raw definition use - >>> print report - """ - - GRANULARITY_LEVELS = ['hour', 'day', 'week', 'month', 'quarter', 'year'] - STATUSES = ["Not Submitted","Not Ready","Done"] - - def __init__(self, suite): - """ Setup the basic structure of the report query. """ - self.log = logging.getLogger(__name__) - self.suite = suite - self.raw = {} - #Put the report suite in so the user can print - #the raw query and have it work as is - self.raw['reportSuiteID'] = str(self.suite.id) - self.id = None - self.method = "Get" - self.status = self.STATUSES[0] - #The report object - self.report = reports.Report - #The fully hydrated report object - self.processed_response = None - self.unprocessed_response = None - - def _normalize_value(self, value, category): - if isinstance(value, Value): - return value - else: - return getattr(self.suite, category)[value] - - def _serialize_value(self, value, category): - return self._normalize_value(value, category).serialize() - - def _serialize_values(self, values, category): - if not isinstance(values, list): - values = [values] - - return [self._serialize_value(value, category) for value in values] - - def _serialize(self, obj): - if isinstance(obj, list): - return [self._serialize(el) for el in obj] - elif isinstance(obj, Value): - return obj.serialize() - else: - return obj - - def clone(self): - """ Return a copy of the current object. """ - query = Query(self.suite) - query.raw = copy(self.raw) - query.report = self.report - query.status = self.status - query.processed_response = self.processed_response - query.unprocessed_response = self.unprocessed_response - return query - - @immutable - def range(self, start, stop=None, months=0, days=0, granularity=None): - """ - Define a date range for the report. - - * start -- The start date of the report. If stop is not present - it is assumed to be the to and from dates. - * stop (optional) -- the end date of the report (inclusive). - * months (optional, named) -- months to run used for relative dates - * days (optional, named)-- days to run used for relative dates) - * granulartiy (optional, named) -- set the granularity for the report - """ - start = utils.date(start) - stop = utils.date(stop) - - if days or months: - stop = start + relativedelta(days=days-1, months=months) - else: - stop = stop or start - - if start == stop: - self.raw['date'] = start.isoformat() - else: - self.raw.update({ - 'dateFrom': start.isoformat(), - 'dateTo': stop.isoformat(), - }) - - if granularity: - self.raw = self.granularity(granularity).raw - - return self - - @immutable - def granularity(self, granularity): - """ - Set the granulartiy for the report. - - Values are one of the following - 'hour', 'day', 'week', 'month', 'quarter', 'year' - """ - if granularity not in self.GRANULARITY_LEVELS: - levels = ", ".join(self.GRANULARITY_LEVELS) - raise ValueError("Granularity should be one of: " + levels) - - self.raw['dateGranularity'] = granularity - - return self - - @immutable - def set(self, key=None, value=None, **kwargs): - """ - Set a custom property in the report - - `set` is a way to add raw properties to the request, - for features that python-omniture does not support but the - SiteCatalyst API does support. For convenience's sake, - it will serialize Value and Element objects but will - leave any other kind of value alone. - """ - - if key and value: - self.raw[key] = self._serialize(value) - elif key or kwargs: - properties = key or kwargs - for key, value in properties.items(): - self.raw[key] = self._serialize(value) - else: - raise ValueError("Query#set requires a key and value, \ - a properties dictionary or keyword arguments.") - - return self - - @immutable - def filter(self, segment=None, segments=None, disable_validation=False, **kwargs): - """ Set Add a segment to the report. """ - # It would appear to me that 'segment_id' has a strict subset - # of the functionality of 'segments', but until I find out for - # sure, I'll provide both options. - if 'segments' not in self.raw: - self.raw['segments'] = [] - - if disable_validation == False: - if segments: - self.raw['segments'].extend(self._serialize_values(segments, 'segments')) - elif segment: - self.raw['segments'].append({"id":self._normalize_value(segment, - 'segments').id}) - elif kwargs: - self.raw['segments'].append(kwargs) - else: - raise ValueError() - - else: - if segments: - self.raw['segments'].extend([{"id":segment} for segment in segments]) - elif segment: - self.raw['segments'].append({"id":segment}) - elif kwargs: - self.raw['segments'].append(kwargs) - else: - raise ValueError() - return self - - @immutable - def element(self, element, disable_validation=False, **kwargs): - """ - Add an element to the report. - - This method is intended to be called multiple time. Each time it will - add an element as a breakdown - After the first element, each additional element is considered - a breakdown - """ - - if self.raw.get('elements', None) == None: - self.raw['elements'] = [] - - if disable_validation == False: - element = self._serialize_value(element, 'elements') - else: - element = {"id":element} - - if kwargs != None: - element.update(kwargs) - self.raw['elements'].append(deepcopy(element)) - - #TODO allow this method to accept a list - return self - - - def breakdown(self, element, **kwargs): - """ Pass through for element. Adds an element to the report. """ - return self.element(element, **kwargs) - - - def elements(self, *args, **kwargs): - """ Shortcut for adding multiple elements. Doesn't support arguments """ - obj = self - for e in args: - obj = obj.element(e, **kwargs) - - return obj - - @immutable - def metric(self, metric, disable_validation=False): - """ - Add an metric to the report. - - This method is intended to be called multiple time. - Each time a metric will be added to the report - """ - if self.raw.get('metrics', None) == None: - self.raw['metrics'] = [] - if disable_validation == False: - self.raw['metrics'].append(self._serialize_value(metric, 'metrics')) - else: - self.raw['metrics'].append({"id":metric}) - #self.raw['metrics'] = self._serialize_values(metric, 'metrics') - #TODO allow this metric to accept a list - return self - - def metrics(self, *args, **kwargs): - """ Shortcut for adding multiple metrics """ - obj = self - for m in args: - obj = obj.metric(m, **kwargs) - - return obj - - @immutable - def sortBy(self, metric): - """ Specify the sortBy Metric """ - self.raw['sortBy'] = metric - return self - - @immutable - def currentData(self): - """ Set the currentData flag """ - self.raw['currentData'] = True - return self - - - def build(self): - """ Return the report descriptoin as an object """ - return {'reportDescription': self.raw} - - def queue(self): - """ Submits the report to the Queue on the Adobe side. """ - q = self.build() - self.log.debug("Suite Object: %s Method: %s, Query %s", - self.suite, self.report.method, q) - self.id = self.suite.request('Report', - self.report.method, - q)['reportID'] - self.status = self.STATUSES[1] - return self - - def probe(self, heartbeat=None, interval=1, soak=False): - """ Keep checking until the report is done""" - #Loop until the report is done - while self.is_ready() == False: - if heartbeat: - heartbeat() - time.sleep(interval) - #Use a back off up to 30 seconds to play nice with the APIs - if interval < 1: - interval = 1 - elif interval < 30: - interval = round(interval * 1.5) - else: - interval = 30 - self.log.debug("Check Interval: %s seconds", interval) - - def is_ready(self): - """ inspects the response to see if the report is ready """ - if self.status == self.STATUSES[0]: - raise ReportNotSubmittedError('{"message":"Doh! the report needs to be submitted first"}') - elif self.status == self.STATUSES[1]: - try: - # the request method catches the report and populates it automatically - response = self.suite.request('Report','Get',{'reportID': self.id}) - self.status = self.STATUSES[2] - self.unprocessed_response = response - self.processed_response = self.report(response, self) - return True - except reports.ReportNotReadyError: - self.status = self.STATUSES[1] - #raise reports.InvalidReportError(response) - return False - elif self.status == self.STATUSES[2]: - return True - - - def sync(self, heartbeat=None, interval=0.01): - """ Run the report synchronously,""" - print("sync called") - if self.status == self.STATUSES[0]: - print("Queing Report") - self.queue() - self.probe(heartbeat, interval) - if self.status == self.STATUSES[1]: - self.probe() - return self.processed_response - - def async(self, callback=None, heartbeat=None, interval=1): - """ Run the Report Asynchrnously """ - if self.status == self.STATUSES[0]: - self.queue() - return self - - def get_report(self): - self.is_ready() - if self.status == self.STATUSES[2]: - return self.processed_response - else: - raise reports.ReportNotReadyError('{"message":"Doh! the report is not ready yet"}') - - def run(self, defaultheartbeat=True, heartbeat=None, interval=0.01): - """Shortcut for sync(). Runs the current report synchronously. """ - if defaultheartbeat == True: - rheartbeat = self.heartbeat - else: - rheartbeat = heartbeat - - return self.sync(rheartbeat, interval) - - def heartbeat(self): - """ A default heartbeat method that prints a dot for each request """ - sys.stdout.write('.') - sys.stdout.flush() - - - def check(self): - """ - Basically an alias to is ready to make the interface a bit better - """ - return self.is_ready() - - def cancel(self): - """ Cancels a the report from the Queue on the Adobe side. """ - return self.suite.request('Report', - 'CancelReport', - {'reportID': self.id}) - def json(self): - """ Return a JSON string of the Request """ - return str(json.dumps(self.build(), indent=4, separators=(',', ': '), sort_keys=True)) - - def __str__(self): - return self.json() - - def _repr_html_(self): - """ Format in HTML for iPython Users """ - report = { str(key):value for key,value in self.raw.items() } - html = "Current Report Settings
" - for k,v in sorted(list(report.items())): - html += "{0}: {1}
".format(k,v) - if self.id: - html += "This report has been submitted
" - html += "{0}: {1}
".format("ReportId", self.id) - return html - - - def __dir__(self): - """ Give sensible options for Tab Completion mostly for iPython """ - return ['async','breakdown','cancel','clone','currentData', 'element', - 'filter', 'granularity', 'id','json' ,'metric', 'queue', 'range', 'raw', 'report', - 'request', 'run', 'set', 'sortBy', 'suite'] diff --git a/build/lib/omniture/reports.py b/build/lib/omniture/reports.py deleted file mode 100644 index 40c1c01..0000000 --- a/build/lib/omniture/reports.py +++ /dev/null @@ -1,235 +0,0 @@ -# encoding: utf-8 -from __future__ import absolute_import -from __future__ import print_function - -import logging -from datetime import datetime -import json - -from .elements import Value - - -class InvalidReportError(Exception): - """ - Exception raised when the API says a report defintion is - invalid - """ - def normalize(self, error): - print ('error', error) - return { - 'error': error.get('error'), - 'error_description': error.get('error_description'), - 'error_uri': error.get('error_uri', ''), - } - - def __init__(self, error): - self.log = logging.getLogger(__name__) - error = self.normalize(error) - self.message = "{error}: {error_description} ({error_uri})".format(**error) - super(InvalidReportError, self).__init__(self.message) - -class ReportNotReadyError(Exception): - """ Exception that is raised when a report is not ready to be downloaded - """ - def __init__(self,error): - self.log = logging.getLogger(__name__) - self.log.debug("Report Not Ready") - super(ReportNotReadyError, self).__init__("Report Not Ready") - - -# TODO: also make this iterable (go through rows) -class Report(object): - """ - Object to parse the responses of the report - - To get the data use - >>> report.data - - To get a Pandas DataFrame use - >>> report.dataframe - - To get the raw response use - >>> print report - - """ - def process(self): - """ Parse out the relevant data from the report and store it for easy access - Should only be used internally to the class - """ - self.timing = { - 'queue': float(self.raw['waitSeconds']), - 'execution': float(self.raw['runSeconds']), - } - self.log.debug("Report Wait Time: %s, Report Execution Time: %s", self.timing['queue'], self.timing['execution']) - self.report = report = self.raw['report'] - self.metrics = Value.list('metrics', report['metrics'], self.suite, 'name', 'id') - self.elements = Value.list('elements', report['elements'], self.suite, 'name', 'id') - self.period = str(report['period']) - self.type = str(report['type']) - - segments = report.get('segments') - if segments: - self.segments = [] - for s in segments: - self.segments.append(self.query.suite.segments[s['id']]) - else: - self.segments = None - - #Set as none until it is actually used - self.dict_data = None - self.pandas_data = None - - @property - def data(self): - """ Returns the report data as a set of dicts for easy quering - It generates the dicts on the 1st call then simply returns the reference to the data in subsequent calls - """ - #If the data hasn't been generate it generate the data - if self.dict_data == None: - self.dict_data = self.parse_rows(self.report['data']) - - return self.dict_data - - def parse_rows(self,row, level=0, upperlevels=None): - """ - Parse through the data returned by a repor. Return a list of dicts. - - This method is recursive. - """ - #self.log.debug("Level %s, Upperlevels %s, Row Type %s, Row: %s", level,upperlevels, type(row), row) - data = {} - data_set = [] - - #merge in the upper levels - if upperlevels != None: - data.update(upperlevels) - - - if type(row) == list: - for r in row: - #on the first call set add to the empty list - pr = self.parse_rows(r,level, data.copy()) - if type(pr) == dict: - data_set.append(pr) - #otherwise add to the existing list - else: - data_set.extend(pr) - - #pull out the metrics from the lowest level - if type(row) == dict: - #pull out any relevant data from the current record - #Handle datetime isn't in the elements list for trended reports - if level == 0 and self.type == "trended": - element = "datetime" - elif self.type == "trended": - if hasattr(self.elements[level-1], 'classification'): - #handle the case where there are multiple classifications - element = str(self.elements[level-1].id) + ' | ' + str(self.elements[level-1].classification) - else: - element = str(self.elements[level-1].id) - else: - if hasattr(self.elements[level], 'classification'): - #handle the case where there are multiple classifications - element = str(self.elements[level].id) + ' | ' + str(self.elements[level].classification) - else: - element = str(self.elements[level].id) - - - if element == "datetime": - data[element] = datetime(int(row.get('year',0)),int(row.get('month',0)),int(row.get('day',0)),int(row.get('hour',0))) - data["datetime_friendly"] = str(row['name']) - else: - try: - data[element] = str(row['name']) - - # If the name value is Null or non-encodable value, return null - except: - data[element] = "null" - #parse out any breakdowns and add to the data set - if 'breakdown' in row and len(row['breakdown']) > 0: - data_set.extend(self.parse_rows(row['breakdown'], level+1, data)) - elif 'counts' in row: - for index, metric in enumerate(row['counts']): - #decide what type of event - if self.metrics[index].decimals > 0 or metric.find('.') >-1: - data[str(self.metrics[index].id)] = float(metric) - else: - data[str(self.metrics[index].id)] = int(metric) - - - - if len(data_set)>0: - return data_set - else: - return data - - @property - def dataframe(self): - """ - Returns pandas DataFrame for additional analysis. - - Will generate the data the first time it is called otherwise passes a cached version - """ - - if self.pandas_data is None: - self.pandas_data = self.to_dataframe() - - return self.pandas_data - - - def to_dataframe(self): - import pandas as pd - return pd.DataFrame.from_dict(self.data) - - def __init__(self, raw, query): - self.log = logging.getLogger(__name__) - self.raw = raw - self.query = query - self.suite = query.suite - self.process() - - def __repr__(self): - info = { - 'metrics': ", ".join(map(str, self.metrics)), - 'elements': ", ".join(map(str, self.elements)), - } - return "".format(**info) - - def __div__(self): - """ Give sensible options for Tab Completion mostly for iPython """ - return ['data','dataframe', 'metrics','elements', 'segments', 'period', 'type', 'timing'] - - def _repr_html_(self): - """ Format in HTML for iPython Users """ - html = "" - for index, item in enumerate(self.data): - html += "" - #populate header Row - if index < 1: - html += "" - if 'datetime' in item: - html += "".format('datetime') - for key in sorted(list(item.keys())): - if key != 'datetime': - html += "".format(key) - html += "" - - #Make sure date time is alway listed first - if 'datetime' in item: - html += "".format(item['datetime']) - for key, value in sorted(list(item.items())): - if key != 'datetime': - html += "".format(value) - html += "" - return html - - def __str__(self): - return json.dumps(self.raw,indent=4, separators=(',', ': '),sort_keys=True) - -Report.method = "Queue" - - -class DataWarehouseReport(object): - pass - -DataWarehouseReport.method = 'Request' diff --git a/build/lib/omniture/utils.py b/build/lib/omniture/utils.py deleted file mode 100644 index 3152f64..0000000 --- a/build/lib/omniture/utils.py +++ /dev/null @@ -1,119 +0,0 @@ -from __future__ import absolute_import - -from copy import copy -import datetime -from dateutil.parser import parse as parse_date -import six - - -class memoize: - def __init__(self, function): - self.function = function - self.memoized = {} - - def __call__(self, *args): - try: - return self.memoized[args] - except KeyError: - self.memoized[args] = self.function(*args) - return self.memoized[args] - - -class AddressableList(list): - """ List of items addressable either by id or by name """ - def __init__(self, items, name='items'): - super(AddressableList, self).__init__(items) - self.name = name - - def __getitem__(self, key): - if isinstance(key, int): - return super(AddressableList, self).__getitem__(key) - else: - matches = [item for item in self if item.title == key or item.id == key] - count = len(matches) - if count > 1: - matches = list(map(repr, matches)) - error = "Found multiple matches for {key}: {matches}. ".format( - key=key, matches=", ".join(matches)) - advice = "Use the identifier instead." - raise KeyError(error + advice) - elif count == 1: - return matches[0] - else: - raise KeyError("Cannot find {key} among the available {name}" - .format(key=key, name=self.name)) - - def _repr_html_(self): - """ HTML formating for iPython users """ - html = "
{0}{0}
{0}{0}
" - html += "".format("ID", "Title") - for i in self: - html += "" - html += i._repr_html_() - html += "" - html +="
{0}{1}
" - return html - - def __str__(self): - string = "" - for i in self: - string += i.__str__() - return string - - def __repr__(self): - return "" - - -def date(obj): - #used to ensure compatibility with Python3 without having to user six - try: - basestring - except NameError: - basestring = str - - if obj is None: - return None - elif isinstance(obj, datetime.date): - if hasattr(obj, 'date'): - return obj.date() - else: - return obj - elif isinstance(obj, six.string_types): - return parse_date(obj).date() - elif isinstance(obj, six.text_type): - return parse_date(str(obj)).date() - else: - raise ValueError("Can only convert strings into dates, received {}" - .format(obj.__class__)) - - -def wrap(obj): - if isinstance(obj, list): - return obj - else: - return [obj] - - -def affix(prefix=None, base=None, suffix=None, connector='_'): - if prefix: - prefix = prefix + connector - else: - prefix = '' - - if suffix: - suffix = connector + suffix - else: - suffix = '' - - return prefix + base + suffix - - -def translate(d, mapping): - d = copy(d) - - for src, dest in mapping.items(): - if src in d: - d[dest] = d[src] - del d[src] - - return d diff --git a/build/lib/omniture/version.py b/build/lib/omniture/version.py deleted file mode 100644 index 8b7c71c..0000000 --- a/build/lib/omniture/version.py +++ /dev/null @@ -1,4 +0,0 @@ -# 1) we don't load dependencies by storing it in __init__.py -# 2) we can import it in setup.py for the same reason -# 3) we can import it into your module module -__version__ = '0.5.2' diff --git a/build/lib/tests/__init__.py b/build/lib/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/build/lib/tests/testAccount.py b/build/lib/tests/testAccount.py deleted file mode 100644 index bff9754..0000000 --- a/build/lib/tests/testAccount.py +++ /dev/null @@ -1,137 +0,0 @@ -#!/usr/bin/python - -import unittest -import requests_mock -import omniture -import os - -creds = {} -creds['username'] = os.environ['OMNITURE_USERNAME'] -creds['secret'] = os.environ['OMNITURE_SECRET'] -test_report_suite = "omniture.api-gateway" - - -class AccountTest(unittest.TestCase): - def setUp(self): - with requests_mock.mock() as m: - path = os.path.dirname(__file__) - #read in mock response for Company.GetReportSuites to make tests faster - with open(path+'/mock_objects/Company.GetReportSuites.json') as get_report_suites_file: - report_suites = get_report_suites_file.read() - - with open(path+'/mock_objects/Report.GetMetrics.json') as get_metrics_file: - metrics = get_metrics_file.read() - - with open(path+'/mock_objects/Report.GetElements.json') as get_elements_file: - elements = get_elements_file.read() - - with open(path+'/mock_objects/Segments.Get.json') as get_segments_file: - segments = get_segments_file.read() - - #setup mock responses - m.post('https://api.omniture.com/admin/1.4/rest/?method=Company.GetReportSuites', text=report_suites) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.GetMetrics', text=metrics) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.GetElements', text=elements) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Segments.Get', text=segments) - - - self.analytics = omniture.authenticate(creds['username'], creds['secret']) - #force requests to happen in this method so they are cached - self.analytics.suites[test_report_suite].metrics - self.analytics.suites[test_report_suite].elements - self.analytics.suites[test_report_suite].segments - - - def test_os_environ(self): - test = omniture.authenticate({'OMNITURE_USERNAME':creds['username'], - 'OMNITURE_SECRET':creds['secret']}) - self.assertEqual(test.username,creds['username'], - "The username isn't getting set right: {}" - .format(test.username)) - - self.assertEqual(test.secret,creds['secret'], - "The secret isn't getting set right: {}" - .format(test.secret)) - - def test_suites(self): - self.assertIsInstance(self.analytics.suites, omniture.utils.AddressableList, "There are no suites being returned") - self.assertIsInstance(self.analytics.suites[test_report_suite], omniture.account.Suite, "There are no suites being returned") - - def test_simple_request(self): - """ simplest request possible. Company.GetEndpoint is not an authenticated method - """ - urls = ["https://api.omniture.com/admin/1.4/rest/", - "https://api2.omniture.com/admin/1.4/rest/", - "https://api3.omniture.com/admin/1.4/rest/", - "https://api4.omniture.com/admin/1.4/rest/", - "https://api5.omniture.com/admin/1.4/rest/"] - self.assertIn(self.analytics.request('Company', 'GetEndpoint'),urls, "Company.GetEndpoint failed" ) - - def test_authenticated_request(self): - """ Request that requires authentication to make sure the auth is working - """ - reportsuites = self.analytics.request('Company','GetReportSuites') - self.assertIsInstance(reportsuites, dict, "Didn't get a valid response back") - self.assertIsInstance(reportsuites['report_suites'], list, "Response doesn't contain the list of report suites might be an authentication issue") - - def test_metrics(self): - """ Makes sure the suite properties can get the list of metrics - """ - self.assertIsInstance(self.analytics.suites[test_report_suite].metrics, omniture.utils.AddressableList) - - def test_elements(self): - """ Makes sure the suite properties can get the list of elements - """ - self.assertIsInstance(self.analytics.suites[test_report_suite].elements, omniture.utils.AddressableList) - - def test_basic_report(self): - """ Make sure a basic report can be run - """ - report = self.analytics.suites[test_report_suite].report - queue = [] - queue.append(report) - response = omniture.sync(queue) - self.assertIsInstance(response, list) - - def test_json_report(self): - """Make sure reports can be generated from JSON objects""" - report = self.analytics.suites[test_report_suite].report\ - .element('page')\ - .metric('pageviews')\ - .sortBy('pageviews')\ - .filter("s4157_55b1ba24e4b0a477f869b912")\ - .range("2016-08-01","2016-08-31")\ - .set('sortMethod',"top")\ - .json() - self.assertEqual(report, self.analytics.jsonReport(report).json(), "The reports aren't serializating or de-serializing correctly in JSON") - - - def test_account_repr_html_(self): - """Make sure the account are printing out in - HTML correctly for ipython notebooks""" - html = self.analytics._repr_html_() - test_html = "Username: jgrover:Justin Grover Demo
Secret: ***************
Report Suites: 2
Endpoint: https://api.omniture.com/admin/1.4/rest/
" - self.assertEqual(html, test_html) - - def test_account__str__(self): - """ Make sure the custom str works """ - mystr = self.analytics.__str__() - test_str = "Analytics Account -------------\n Username: jgrover:Justin Grover Demo \n Report Suites: 2 \n Endpoint: https://api.omniture.com/admin/1.4/rest/" - self.assertEqual(mystr, test_str) - - def test_suite_repr_html_(self): - """Make sure the Report Suites are printing okay for - ipython notebooks """ - html = self.analytics.suites[0]._repr_html_() - test_html = "omniture.api-gatewaytest_suite" - self.assertEqual(html, test_html) - - def test_suite__str__(self): - """Make sure the str represntation is working """ - mystr = self.analytics.suites[0].__str__() - test_str = "ID omniture.api-gateway | Name: test_suite \n" - self.assertEqual(mystr,test_str) - - -if __name__ == '__main__': - unittest.main() diff --git a/build/lib/tests/testAll.py b/build/lib/tests/testAll.py deleted file mode 100644 index 7964291..0000000 --- a/build/lib/tests/testAll.py +++ /dev/null @@ -1,27 +0,0 @@ -from __future__ import absolute_import - -import unittest - -from omniture.tests import AccountTest -from .testQuery import QueryTest -from .testReports import ReportTest -from .testElement import ElementTest -import sys - - -def test_suite(): - """ Test Suite for omnitue module """ - - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(AccountTest)) - test_suite.addTest(unittest.makeSuite(QueryTest)) - test_suite.addTest(unittest.makeSuite(ReportTest)) - test_suite.addTest(unittest.makeSuite(AccountTest)) - - return test_suite - -mySuite = test_suite() - -runner = unittest.TextTestRunner() -ret = runner.run(mySuite).wasSuccessful() -sys.exit(not ret) diff --git a/build/lib/tests/testElement.py b/build/lib/tests/testElement.py deleted file mode 100644 index 8360509..0000000 --- a/build/lib/tests/testElement.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/python - -import unittest -import omniture -import os - -creds = {} -creds['username'] = os.environ['OMNITURE_USERNAME'] -creds['secret'] = os.environ['OMNITURE_SECRET'] - -class ElementTest(unittest.TestCase): - def setUp(self): - fake_list = [{"id":"123","title":"ABC"},{"id":"456","title":"DEF"}] - self.valueList = omniture.elements.Value.list("metrics",fake_list,"test") - - def test__repr__(self): - self.assertEqual(self.valueList.__repr__(),"",\ - "The value for __repr__ on the AddressableList was {}"\ - .format(self.valueList.__repr__())) - - def test_value__repr__(self): - self.assertEqual(self.valueList[0].__repr__(),"", \ - "The value of the first item in the AddressableList \ - was {}".format(self.valueList[0].__repr__())) - - def test_value__copy__(self): - value = self.valueList[0].copy() - self.assertEqual(value.__repr__(), self.valueList[0].__repr__(),\ - "The copied value was: {} the original was: {}"\ - .format(value, self.valueList[0])) - - def test_repr_html_(self): - self.assertEqual(self.valueList[0]._repr_html_(),\ - "123ABC",\ - "The html value was: {}"\ - .format(self.valueList[0]._repr_html_())) - - def test__str__(self): - self.assertEqual(self.valueList[0].__str__(),\ - "ID 123 | Name: ABC \n",\ - "__str__ returned: {}"\ - .format(self.valueList[0].__str__())) - -if __name__ == '__main__': - unittest.main() diff --git a/build/lib/tests/testQuery.py b/build/lib/tests/testQuery.py deleted file mode 100644 index 116ca31..0000000 --- a/build/lib/tests/testQuery.py +++ /dev/null @@ -1,552 +0,0 @@ -#!/usr/bin/python - -import unittest -import requests_mock -import omniture -import os - -creds = {} -creds['username'] = os.environ['OMNITURE_USERNAME'] -creds['secret'] = os.environ['OMNITURE_SECRET'] -test_report_suite = 'omniture.api-gateway' -dateTo = "2015-06-02" -dateFrom = "2015-06-01" -date = dateTo - - -class QueryTest(unittest.TestCase): - def setUp(self): - with requests_mock.mock() as m: - path = os.path.dirname(__file__) - #read in mock response for Company.GetReportSuites to make tests faster - with open(path+'/mock_objects/Company.GetReportSuites.json') as get_report_suites_file: - report_suites = get_report_suites_file.read() - - with open(path+'/mock_objects/Report.GetMetrics.json') as get_metrics_file: - metrics = get_metrics_file.read() - - with open(path+'/mock_objects/Report.GetElements.json') as get_elements_file: - elements = get_elements_file.read() - - with open(path+'/mock_objects/Segments.Get.json') as get_segments_file: - segments = get_segments_file.read() - - #setup mock responses - m.post('https://api.omniture.com/admin/1.4/rest/?method=Company.GetReportSuites', text=report_suites) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.GetMetrics', text=metrics) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.GetElements', text=elements) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Segments.Get', text=segments) - - - self.analytics = omniture.authenticate(creds['username'], creds['secret']) - #force requests to happen in this method so they are cached - self.analytics.suites[test_report_suite].metrics - self.analytics.suites[test_report_suite].elements - self.analytics.suites[test_report_suite].segments - - def tearDown(self): - self.analytics = None - - def test_ranked(self): - """Test that a basic query can be generated """ - basic_report = self.analytics.suites[test_report_suite].report.element("page") - self.assertEqual(basic_report.raw['elements'][0]['id'], "page", "The element is wrong: {}".format(basic_report.raw['elements'][0]['id'])) - self.assertEqual(len(basic_report.raw['elements']), 1, "There are too many elements: {}".format(basic_report.raw['elements'])) - - @requests_mock.mock() - def test_report_run(self,m): - """Make sure that are report can actually be run """ - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/basic_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - - self.assertIsInstance(self.analytics.suites[test_report_suite].report.run(), omniture.Report, "The run method doesn't work to create a report") - - @requests_mock.mock() - def test_report_run(self,m): - """Make sure that the interval gets passed down. Needs a bit of work to make the test usefule """ - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/basic_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - - self.assertIsInstance(self.analytics.suites[test_report_suite].report.run(interval=0.01), omniture.Report, "The run method doesn't work to create a report") - self.assertIsInstance(self.analytics.suites[test_report_suite].report.run(interval=2), omniture.Report, "The run method doesn't work to create a report") - self.assertIsInstance(self.analytics.suites[test_report_suite].report.run(interval=31), omniture.Report, "The run method doesn't work to create a report") - - @requests_mock.mock() - def test_report_async(self,m): - """Make sure that are report are run Asynchrnously """ - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/basic_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - query = self.analytics.suites[test_report_suite].report.async() - self.assertIsInstance(query, omniture.Query, "The Async method doesn't work") - self.assertTrue(query.check(), "The check method is weird") - self.assertIsInstance(query.get_report(), omniture.Report, "The check method is weird") - - @requests_mock.mock() - def test_report_bad_async(self,m): - """Make sure that are report can't be checked on out of order """ - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/Report.Get.NotReady.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - - with self.assertRaises(omniture.query.ReportNotSubmittedError): - self.analytics.suites[test_report_suite].report.get_report() - query = self.analytics.suites[test_report_suite].report.async() - self.assertIsInstance(query, omniture.Query, "The Async method doesn't work") - self.assertFalse(query.check(), "The check method is weird") - with self.assertRaises(omniture.reports.ReportNotReadyError): - query.get_report() - - #@unittest.skip("skip") - def test_bad_element(self): - """Test to make sure the element validation is working""" - self.assertRaises(KeyError,self.analytics.suites[test_report_suite].report.element, "pages") - - @unittest.skip("Test Not Finished") - def test_overtime(self): - basic_report = self.analytics.suites[test_report_suite].report.metric("orders").granularity("hour") - queue = [] - queue.append(basic_report) - response = omniture.sync(queue) - - #@unittest.skip("skip") - def test_double_element(self): - """Test to make sure two elements will work in the report""" - basic_report = self.analytics.suites[test_report_suite].report.element("page").element("browser") - self.assertEqual(basic_report.raw['elements'][0]['id'],"page", "The 1st element is wrong") - self.assertEqual(basic_report.raw['elements'][1]['id'], - "browser", "The 2nd element is wrong: {}" - .format(basic_report.raw['elements'][1]['id'])) - self.assertEqual(len(basic_report.raw['elements']), 2, "The number of elements is wrong: {}".format(basic_report.raw['elements'])) - - - #@unittest.skip("skip") - def test_elements(self): - """ Make sure the Elements method works as a shortcut for adding multiple - elements""" - basic_report = self.analytics.suites[test_report_suite].report.elements("page","browser") - self.assertEqual(basic_report.raw['elements'][0]['id'],"page", "The 1st element is wrong: {}".format(basic_report.raw['elements'][0]['id'])) - self.assertEqual(basic_report.raw['elements'][1]['id'],"browser", "The 2nd element is wrong: {}".format(basic_report.raw['elements'][1]['id'])) - self.assertEqual(len(basic_report.raw['elements']), 2, "The number of elements is wrong: {}".format(basic_report.raw['elements'])) - - #@unittest.skip("skip") - def test_double_metric(self): - """ Make sure multiple metric calls get set correcly """ - basic_report = self.analytics.suites[test_report_suite].report.metric("pageviews").metric("visits") - - self.assertEqual(basic_report.raw['metrics'][0]['id'],"pageviews", "The 1st element is wrong") - self.assertEqual(basic_report.raw['metrics'][1]['id'],"visits", "The 2nd element is wrong") - self.assertEqual(len(basic_report.raw['metrics']), 2, "The number of elements is wrong") - - #@unittest.skip("skip") - def test_metrics(self): - """ Make sure the metrics method works as a shortcut for multiple - metrics""" - basic_report = self.analytics.suites[test_report_suite].report.metrics("pageviews", "visits") - - self.assertEqual(basic_report.raw['metrics'][0]['id'],"pageviews", "The 1st element is wrong") - self.assertEqual(basic_report.raw['metrics'][1]['id'],"visits", "The 2nd element is wrong") - self.assertEqual(len(basic_report.raw['metrics']), 2, "The number of elements is wrong") - - - #@unittest.skip("skip") - def test_element_parameters(self): - """Test the top and startingWith parameters - """ - basic_report = self.analytics.suites[test_report_suite].report.element("page", top=5, startingWith=5) - - self.assertEqual(basic_report.raw['elements'][0]['id'], - "page" , - "The parameters might have screwed this up: {}" - .format(basic_report.raw['elements'][0]['id'])) - self.assertEqual(basic_report.raw['elements'][0]['top'], - 5 , - "The top parameter isn't 5: {}" - .format(basic_report.raw['elements'][0]['top'])) - self.assertEqual(basic_report.raw['elements'][0]['startingWith'], - 5 , - "The startingWith parameter isn't 5: {}" - .format(basic_report.raw['elements'][0]['startingWith'])) - - def test_breakown_parameters(self): - """Test the top and startingWith parameters - """ - basic_report = self.analytics.suites[test_report_suite].report.breakdown("page", top=5, startingWith=5) - - self.assertEqual(basic_report.raw['elements'][0]['id'], - "page" , - "The parameters might have screwed this up: {}" - .format(basic_report.raw['elements'][0]['id'])) - self.assertEqual(basic_report.raw['elements'][0]['top'], - 5 , - "The top parameter isn't 5: {}" - .format(basic_report.raw['elements'][0]['top'])) - self.assertEqual(basic_report.raw['elements'][0]['startingWith'], - 5 , - "The startingWith parameter isn't 5: {}" - .format(basic_report.raw['elements'][0]['startingWith'])) - - def test_set(self): - """ Make sure the set parameter can create custom parameters okay """ - report = self.analytics.suites[test_report_suite].report\ - .set('anomalyDetection',True)\ - .set({"test":"abc","currentData":True}) - - self.assertEqual(report.raw['anomalyDetection'], True) - self.assertEqual(report.raw['test'], "abc") - self.assertEqual(report.raw['currentData'], True) - - with self.assertRaises(ValueError): - report.set() - - @unittest.skip("don't have this one done yet") - def test_anamoly_detection(self): - basic_report = self.analytics.suites[test_report_suite].report.metric("pageviews").range(dateFrom, dateTo).anomaly_detection() - - self.assertEqual(basic_report.raw['anomalyDetection'],"True", "anomalyDetection isn't getting set: {}".format(basic_report.raw)) - - #@unittest.skip("skip") - def test_sortBy(self): - """ Make sure sortBy gets put in report description """ - basic_report = self.analytics.suites[test_report_suite].report.element('page').metric('pageviews').metric('visits').sortBy('visits') - self.assertEqual(basic_report.raw['sortBy'], "visits") - - #@unittest.skip("skip") - def test_current_data(self): - """ Make sure the current data flag gets set correctly """ - basic_report = self.analytics.suites[test_report_suite].report.element('page').metric('pageviews').metric('visits').currentData() - self.assertEqual(basic_report.raw['currentData'], True) - - #@unittest.skip("skip") - def test_inline_segments(self): - """ Make sure inline segments work """ - report = self.analytics.suites[test_report_suite].report\ - .element('page')\ - .metric('pageviews')\ - .metric('visits')\ - .filter(element='page', selected=["test","test1"]) - self.assertEqual(report.raw['segments'][0]['element'], "page", "The inline segment element isn't getting set") - self.assertEqual(report.raw['segments'][0]['selected'], ["test","test1"], "The inline segment selected field isn't getting set") - - def test_inline_segments_disable_validation(self): - """ Make sure inline segments work with disable_validation = True """ - report = self.analytics.suites[test_report_suite].report\ - .element('page')\ - .metric('pageviews')\ - .metric('visits')\ - .filter(element='page', selected=["test","test1"], disable_validation=True) - self.assertEqual(report.raw['segments'][0]['element'], "page", "The inline segment element isn't getting set") - self.assertEqual(report.raw['segments'][0]['selected'], ["test","test1"], "The inline segment selected field isn't getting set") - - - def test_filter(self): - """ Make sure the filter command sets the segments right """ - report1 = self.analytics.suites[test_report_suite].report\ - .filter("s4157_55b1ba24e4b0a477f869b912")\ - .filter(segment = "s4157_56097427e4b0ff9bcc064952") - reportMultiple = self.analytics.suites[test_report_suite].report\ - .filter(segments = ["s4157_55b1ba24e4b0a477f869b912","s4157_56097427e4b0ff9bcc064952"]) - - self.assertIn({'id': u's4157_55b1ba24e4b0a477f869b912'}\ - ,report1.raw['segments'], "Report1 failing") - self.assertIn({'id': u's4157_56097427e4b0ff9bcc064952'}\ - ,report1.raw['segments'], "report1 failing") - - self.assertIn({'id': u's4157_55b1ba24e4b0a477f869b912'}\ - ,reportMultiple.raw['segments'], "reportMultiple failing") - self.assertIn({'id': u's4157_56097427e4b0ff9bcc064952'}\ - ,reportMultiple.raw['segments'], "reportMultiple failing") - - with self.assertRaises(ValueError): - report1.filter() - - - def test_filter_disable_validation(self): - """ Make sure the filter command sets the segments - right when validation is disabled""" - report1 = self.analytics.suites[test_report_suite].report\ - .filter("s4157_55b1ba24e4b0a477f869b912", disable_validation=True)\ - .filter(segment = "s4157_56097427e4b0ff9bcc064952",\ - disable_validation=True) - reportMultiple = self.analytics.suites[test_report_suite].report\ - .filter(segments = ["s4157_55b1ba24e4b0a477f869b912","s4157_56097427e4b0ff9bcc064952"],\ - disable_validation=True) - - self.assertIn({'id': u's4157_55b1ba24e4b0a477f869b912'}\ - ,report1.raw['segments'], "Report1 failing") - self.assertIn({'id': u's4157_56097427e4b0ff9bcc064952'}\ - ,report1.raw['segments'], "report1 failing") - - self.assertIn({'id': u's4157_55b1ba24e4b0a477f869b912'}\ - ,reportMultiple.raw['segments'], "reportMultiple failing") - self.assertIn({'id': u's4157_56097427e4b0ff9bcc064952'}\ - ,reportMultiple.raw['segments'], "reportMultiple failing") - - with self.assertRaises(ValueError): - report1.filter(disable_validation=True) - - - #@unittest.skip("skip") - def test_hour_granularity(self): - """ Make sure granularity works """ - report = self.analytics.suites[test_report_suite].report.granularity('hour') - self.assertEqual(report.raw['dateGranularity'], "hour", "Hourly granularity can't be set via the granularity method") - - #@unittest.skip("skip") - def test_day_granularity(self): - """ Make sure granularity works """ - report = self.analytics.suites[test_report_suite].report.granularity('day') - self.assertEqual(report.raw['dateGranularity'], "day", "daily granularity can't be set via the granularity method") - - #@unittest.skip("skip") - def test_week_granularity(self): - """ Make sure granularity works """ - report = self.analytics.suites[test_report_suite].report.granularity('day') - self.assertEqual(report.raw['dateGranularity'], "day", "Weekly granularity can't be set via the granularity method") - - #@unittest.skip("skip") - def test_quarter_granularity(self): - """ Make sure granularity works """ - report = self.analytics.suites[test_report_suite].report.granularity('quarter') - self.assertEqual(report.raw['dateGranularity'], "quarter", "Quarterly granularity can't be set via the granularity method") - - #@unittest.skip("skip") - def test_year_granularity(self): - """ Make sure granularity works """ - report = self.analytics.suites[test_report_suite].report.granularity('year') - self.assertEqual(report.raw['dateGranularity'], "year", "Yearly granularity can't be set via the granularity method") - - def test_bad_granularity(self): - """ Make sure granularity works """ - with self.assertRaises(ValueError): - self.analytics.suites[test_report_suite].report.granularity('bad') - - #@unittest.skip("skip") - def test_single_date_range(self): - """ Make sure date range works with a single date """ - report = self.analytics.suites[test_report_suite].report.range(date) - self.assertEqual(report.raw['date'], date, "Can't set a single date") - - #@unittest.skip("skip") - def test_date_range(self): - """ Make sure date range works with two dates """ - report = self.analytics.suites[test_report_suite].report.range(dateFrom,dateTo) - self.assertEqual(report.raw['dateFrom'], dateFrom, "Start date isn't getting set correctly") - self.assertEqual(report.raw['dateTo'], dateTo, "End date isn't getting set correctly") - - def test_date_range_days(self): - """Make sure the dayes can be passed into the range function and that they work correctly""" - cDateFrom = "2017-01-01" - cDateTo = "2017-01-02" - report = self.analytics.suites[test_report_suite].report.range(cDateFrom,days=2) - self.assertEqual(report.raw['dateFrom'],cDateFrom, "Start Data isnt' working") - self.assertEqual(report.raw['dateTo'], cDateTo,"Check the days param of the range function") - - def test_date_range_months(self): - """Make sure the dayes can be passed into the range function and that they work correctly""" - cDateFrom = "2017-01-01" - cDateTo = "2017-03-31" - report = self.analytics.suites[test_report_suite].report.range(cDateFrom,months=3) - self.assertEqual(report.raw['dateFrom'],cDateFrom, "Start Data isnt' working") - self.assertEqual(report.raw['dateTo'], cDateTo,"Check the days param of the range function") - - - #@unittest.skip("skip") - def test_granularity_date_range(self): - """ Make sure granularity works in the date range app """ - report = self.analytics.suites[test_report_suite].report.range(dateFrom,dateTo, granularity='hour') - self.assertEqual(report.raw['dateFrom'], dateFrom, "Start date isn't getting set correctly") - self.assertEqual(report.raw['dateTo'], dateTo, "End date isn't getting set correctly") - self.assertEqual(report.raw['dateGranularity'], "hour", "Hourly granularity can't be set via the range method") - - ##@unittest.skip("skip") - def test_jsonReport(self): - """Check the JSON deserializer""" - report = self.analytics.suites[test_report_suite].report.range(dateFrom,dateTo,granularity='day')\ - .set("source","standard")\ - .metric("pageviews")\ - .metric("visits")\ - .element("page")\ - .element("sitesection", top=100, startingWith=1)\ - .set("locale","en_US")\ - .sortBy("visits")\ - .set("anomalyDetection",True)\ - .set("currentData", True)\ - .set("elementDataEncoding","utf8") - - testreport = self.analytics.suites[test_report_suite].jsonReport(report.json()) - self.assertEqual(report.json(),testreport.json(), "The reportings aren't deserializing from JSON the same old:{} new:{}".format(report.json(),testreport.json())) - self.assertEqual(report.json(),testreport.__str__(), "The reportings aren't deserializing to string __str__ the same old:{} new:{}".format(report.json(),testreport.__str__())) - - @requests_mock.mock() - def test_disable_validate_metric(self,m): - """checks that the no validate flag works for metrics""" - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/invalid_metric.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - with self.assertRaises(omniture.InvalidReportError) as e: - report = self.analytics.suites[test_report_suite].report\ - .metric("bad_metric", disable_validation=True)\ - .run() - - self.assertTrue(("metric_id_invalid" in e.exception.message),"The API is returning an error that might mean this is broken") - - @requests_mock.mock() - def test_disable_validate_element(self,m): - """checks that the no validate flag works for elements""" - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/invalid_element.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - with self.assertRaises(omniture.InvalidReportError) as e: - report = self.analytics.suites[test_report_suite].report\ - .element("bad_element", disable_validation=True)\ - .run() - - self.assertTrue(("element_id_invalid" in e.exception.message),"The API is returning an error that might mean this is broken") - - @requests_mock.mock() - def test_disable_validate_segments(self,m): - """checks that the no validate flag works for segments""" - - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/invalid_segment.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - with self.assertRaises(omniture.InvalidReportError) as e: - report = self.analytics.suites[test_report_suite].report\ - .filter("bad_segment", disable_validation=True)\ - .run() - - self.assertTrue(("segment_invalid" in e.exception.message), - "The API is returning an error that might mean this is broken: {}" - .format(e.exception.message)) - - def test_multiple_classifications(self): - """Checks to make sure that multiple classificaitons are handled correctly """ - report = self.analytics.suites[test_report_suite].report\ - .element("page", classification="test")\ - .element("page", classification= "test2") - - self.assertEqual("test", report.raw['elements'][0]['classification'],"The classifications aren't getting set right") - self.assertEqual("test2", report.raw['elements'][1]['classification'],"The second classification isn't getting set right") - - def test__dir__(self): - valid_value = ['async', 'breakdown', 'cancel', 'clone', 'currentData', - 'element', 'filter', 'granularity', 'id', 'json', - 'metric', 'queue', 'range', 'raw', 'report', 'request', - 'run', 'set', 'sortBy', 'suite'] - test_value = self.analytics.suites[test_report_suite].report.__dir__() - - self.assertEqual(test_value,valid_value, - "The __dir__ method isn't returning right: {}" - .format(test_value)) - - def test_repr_html_(self): - """Make sure the HTML representation fo iPython is working corretly""" - valid_html = "Current Report Settings
elements: [{'id': 'page'}]
metrics: [{'id': 'pageviews'}]
reportSuiteID: "+test_report_suite+"
" - test_html = self.analytics.suites[test_report_suite]\ - .report\ - .element('page')\ - .metric('pageviews')\ - ._repr_html_() - - - self.assertEqual(test_html,valid_html, - "the HTML isn't generating correctly: {}" - .format(test_html)) - - def test_repr_html_report_id(self): - """Make sure the HTML representation fo iPython is working corretly - with a report id""" - valid_html = "Current Report Settings
elements: [{'id': 'page'}]
metrics: [{'id': 'pageviews'}]
reportSuiteID: "+test_report_suite+"
This report has been submitted
ReportId: 123
" - test_html = self.analytics.suites[test_report_suite]\ - .report\ - .element('page')\ - .metric('pageviews') - test_html.id = "123" - test_html = test_html._repr_html_() - - - self.assertEqual(str(test_html),valid_html, - "the HTML isn't generating correctly: {}" - .format(test_html)) - - def test_serialize_values(self): - """Test the serialize method """ - - - single = self.analytics.suites[test_report_suite].report\ - ._serialize_values("s4157_55b1ba24e4b0a477f869b912", 'segments') - - double = self.analytics.suites[test_report_suite].report\ - ._serialize_values(["s4157_56097427e4b0ff9bcc064952"\ - ,"s4157_55b1ba24e4b0a477f869b912"], 'segments') - - self.assertEqual(single, [{'id':"s4157_55b1ba24e4b0a477f869b912"}]) - self.assertEqual(double, [{'id':"s4157_56097427e4b0ff9bcc064952"}, - {'id':"s4157_55b1ba24e4b0a477f869b912"}]) - - def test_serialize(self): - l = self.analytics.suites[test_report_suite].report\ - ._serialize(['1','2']) - obj = self.analytics.suites[test_report_suite].report\ - ._serialize(omniture.Value('title',"id",{})) - - self.assertEqual(l, ['1','2']) - self.assertEqual(list(obj), ["id"]) - - -if __name__ == '__main__': - unittest.main() diff --git a/build/lib/tests/testReports.py b/build/lib/tests/testReports.py deleted file mode 100644 index fe8f196..0000000 --- a/build/lib/tests/testReports.py +++ /dev/null @@ -1,320 +0,0 @@ -#!/usr/bin/python -from __future__ import print_function - - -import unittest -import omniture -import os -from datetime import date -import pandas -import datetime -import requests_mock - - -creds = {} -creds['username'] = os.environ['OMNITURE_USERNAME'] -creds['secret'] = os.environ['OMNITURE_SECRET'] -test_report_suite = 'omniture.api-gateway' - - -class ReportTest(unittest.TestCase): - def setUp(self): - self.maxDiff = None - with requests_mock.mock() as m: - path = os.path.dirname(__file__) - #read in mock response for Company.GetReportSuites to make tests faster - with open(path+'/mock_objects/Company.GetReportSuites.json') as get_report_suites_file: - report_suites = get_report_suites_file.read() - - with open(path+'/mock_objects/Report.GetMetrics.json') as get_metrics_file: - metrics = get_metrics_file.read() - - with open(path+'/mock_objects/Report.GetElements.json') as get_elements_file: - elements = get_elements_file.read() - - with open(path+'/mock_objects/Segments.Get.json') as get_segments_file: - segments = get_segments_file.read() - - #setup mock responses - m.post('https://api.omniture.com/admin/1.4/rest/?method=Company.GetReportSuites', text=report_suites) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.GetMetrics', text=metrics) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.GetElements', text=elements) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Segments.Get', text=segments) - - - self.analytics = omniture.authenticate(creds['username'], creds['secret']) - #force requests to happen in this method so they are cached - self.analytics.suites[test_report_suite].metrics - self.analytics.suites[test_report_suite].elements - self.analytics.suites[test_report_suite].segments - - def tearDown(self): - self.analytics = None - - @requests_mock.mock() - def test_basic_report(self,m): - """ Make sure a basic report can be run - """ - - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/basic_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - response = self.analytics.suites[test_report_suite].report.run() - - self.assertIsInstance(response.data, list, "Something went wrong with the report") - - #Timing Info - self.assertIsInstance(response.timing['queue'], float, "waitSeconds info is missing") - self.assertIsInstance(response.timing['execution'], float, "Execution info is missing") - #Raw Reports - self.assertIsInstance(response.report, dict, "The raw report hasn't been populated") - #Check Metrics - self.assertIsInstance(response.metrics, list, "The metrics weren't populated") - self.assertEqual(response.metrics[0].id,"pageviews", "Wrong Metric") - #Check Elements - self.assertIsInstance(response.elements, list, "The elements is the wrong type") - self.assertEqual(response.elements[0].id,"datetime", "There are elements when there shouldn't be") - - #check time range - checkdate = date(2016,9,4).strftime("%a. %e %h. %Y") - self.assertEqual(response.period, checkdate) - - #check segmetns - self.assertIsNone(response.segments) - - #Check Data - self.assertIsInstance(response.data, list, "Data isn't getting populated right") - self.assertIsInstance(response.data[0] , dict, "The data isn't getting into the dict") - self.assertIsInstance(response.data[0]['datetime'], datetime.datetime, "The date isn't getting populated in the data") - self.assertIsInstance(response.data[0]['pageviews'], int, "The pageviews aren't getting populated in the data") - - @requests_mock.mock() - def test_ranked_report(self, m): - """ Make sure the ranked report is being processed - """ - - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/ranked_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - ranked = self.analytics.suites[test_report_suite].report.element("page").metric("pageviews").metric("visits") - queue = [] - queue.append(ranked) - response = omniture.sync(queue) - - for report in response: - #Check Data - self.assertIsInstance(report.data, list, "Data isn't getting populated right") - self.assertIsInstance(report.data[0] , dict, "The data isn't getting into the dict") - self.assertIsInstance(report.data[0]['page'], str, "The page isn't getting populated in the data") - self.assertIsInstance(report.data[0]['pageviews'], int, "The pageviews aren't getting populated in the data") - self.assertIsInstance(report.data[0]['visits'], int, "The visits aren't getting populated in the data") - - @requests_mock.mock() - def test_trended_report(self,m): - """Make sure the trended reports are being processed corretly""" - - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/trended_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - - trended = self.analytics.suites[test_report_suite].report.element("page").metric("pageviews").granularity('hour').run() - self.assertIsInstance(trended.data, list, "Treneded Reports don't work") - self.assertIsInstance(trended.data[0] , dict, "The data isn't getting into the dict") - self.assertIsInstance(trended.data[0]['datetime'], datetime.datetime, "The date isn't getting propulated correctly") - self.assertIsInstance(trended.data[0]['page'], str, "The page isn't getting populated in the data") - self.assertIsInstance(trended.data[0]['pageviews'], int, "The pageviews aren't getting populated in the data") - - @requests_mock.mock() - def test_dataframe(self,m): - """Make sure the pandas data frame object can be generated""" - - - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/trended_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - trended = self.analytics.suites[test_report_suite].report.element("page").metric("pageviews").granularity('hour').run() - self.assertIsInstance(trended.dataframe, pandas.DataFrame, "Data Frame Object doesn't work") - - @requests_mock.mock() - def test_segments_id(self,m): - """ Make sure segments can be added """ - - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/segmented_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - suite = self.analytics.suites[test_report_suite] - report = suite.report.filter(suite.segments[0]).run() - - self.assertEqual(report.segments[0], suite.segments[0], "The segments don't match") - - @unittest.skip("skip inline segments because checked in Query") - def test_inline_segment(self): - """ Make sure inline segments work """ - #pretty poor check but need to make it work with any report suite - report = self.analytics.suites[0].report.element('page').metric('pageviews').metric('visits').filter(element='browser', selected=["::unspecified::"]).run() - self.assertIsInstance(report.data, list, "inline segments don't work") - - @requests_mock.mock() - def test_multiple_classifications(self, m): - """Makes sure the report can parse multiple classifications correctly since they have the same element ID""" - #load sample file - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/multi_classifications.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - ReportQueue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=ReportQueue) - - report = self.analytics.suites[0].report\ - .element('evar2',classification="Classification 1", disable_validation=True)\ - .element('evar2',classification="Classification 2", disable_validation=True)\ - - report = report.run() - - self.assertTrue('evar2 | Classification 1' in report.data[0], "The Value of report.data[0] was:{}".format(report.data[0])) - self.assertTrue('evar2 | Classification 2' in report.data[0], "The Value of report.data[0] was:{}".format(report.data[0])) - - @requests_mock.mock() - def test_mixed_classifications(self, m): - """Makes sure the report can parse reports with classifications and - regular dimensionscorrectly since they have the same element ID""" - #load sample files with responses for mock objects - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/mixed_classifications.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - ReportQueue = queue_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=ReportQueue) - - report = self.analytics.suites[0].report\ - .element('evar3',classification="Classification 1", disable_validation=True)\ - .element('evar5', disable_validation=True)\ - - report = report.run() - - self.assertTrue('evar3 | Classification 1' in report.data[0], "The Value of report.data[0] was:{}".format(report.data[0])) - self.assertTrue('evar5' in report.data[0], "The Value of report.data[0] was:{}".format(report.data[0])) - - @requests_mock.mock() - def test_repr_html_(self,m): - """Test the _repr_html_ method used by iPython for notebook display""" - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/trended_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - with open(path+'/mock_objects/trended_report.html') as basic_html_file: - basic_html = basic_html_file.read() - - #setup mock object - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - trended = self.analytics.suites[test_report_suite].report\ - .element("page").metric("pageviews").granularity('hour').run() - - - self.assertEqual(trended._repr_html_(),basic_html) - - @requests_mock.mock() - def test__div__(self,m): - """Test the __div__ method for tab autocompletion""" - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/basic_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - response = self.analytics.suites[test_report_suite].report.run() - self.assertEqual(response.__div__(), \ - ['data','dataframe', 'metrics','elements', 'segments', 'period', 'type', 'timing'], - "the __dir__ method broke: {}".format(response.__div__())) - - @requests_mock.mock() - def test__repr__(self,m): - path = os.path.dirname(__file__) - - with open(path+'/mock_objects/basic_report.json') as data_file: - json_response = data_file.read() - - with open(path+'/mock_objects/Report.Queue.json') as queue_file: - report_queue = queue_file.read() - - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Get', text=json_response) - m.post('https://api.omniture.com/admin/1.4/rest/?method=Report.Queue', text=report_queue) - - response = self.analytics.suites[test_report_suite].report.run() - test_string = """""" - self.assertEqual(response.__repr__(),test_string) - - -if __name__ == '__main__': - unittest.main() diff --git a/build/lib/tests/testUtils.py b/build/lib/tests/testUtils.py deleted file mode 100644 index 7538289..0000000 --- a/build/lib/tests/testUtils.py +++ /dev/null @@ -1,87 +0,0 @@ -import datetime - -import unittest -import omniture - - - - -class UtilsTest(unittest.TestCase): - def setUp(self): - fakelist = [{"id":"123", "title":"abc"},{"id":"456","title":"abc"}] - - self.alist = omniture.Value.list("segemnts",fakelist,{}) - - - def tearDown(self): - del self.alist - - def test_addressable_list_repr_html_(self): - """Test the _repr_html_ for AddressableList this is used in ipython """ - outlist = '
IDTitle
123abc
456abc
' - self.assertEqual(self.alist._repr_html_(),outlist,\ - "The _repr_html_ isn't working: {}"\ - .format(self.alist._repr_html_())) - - def test_addressable_list_str_(self): - """Test _str_ method """ - outstring = 'ID 123 | Name: abc \nID 456 | Name: abc \n' - self.assertEqual(self.alist.__str__(),outstring,\ - "The __str__ isn't working: {}"\ - .format(self.alist.__str__())) - - def test_addressable_list_get_time(self): - """ Test the custom get item raises a problem when there are duplicate names """ - with self.assertRaises(KeyError): - self.alist['abc'] - - def test_wrap(self): - """Test the wrap method """ - self.assertIsInstance(omniture.utils.wrap("test"),list) - self.assertIsInstance(omniture.utils.wrap(["test"]),list) - self.assertEqual(omniture.utils.wrap("test"),["test"]) - self.assertEqual(omniture.utils.wrap(["test"]),["test"]) - - def test_date(self): - """Test the Date Method""" - test_date = "2016-09-01" - self.assertEqual(omniture.utils.date(None), None) - self.assertEqual(omniture.utils.date(test_date).strftime("%Y-%m-%d"), - test_date) - d = datetime.date(2016,9,1) - self.assertEqual(omniture.utils.date(d).strftime("%Y-%m-%d"), - test_date) - - t = datetime.datetime(2016,9,1) - self.assertEqual(omniture.utils.date(t).strftime("%Y-%m-%d"), - test_date) - - self.assertEqual(omniture.utils.date(u"2016-09-01").strftime("%Y-%m-%d"), - test_date) - with self.assertRaises(ValueError): - omniture.utils.date({}) - - def test_affix(self): - """Test the Affix method to make sure it handles things correctly""" - p = "pre" - s = "suf" - v = "val" - con = "+" - - self.assertEqual(omniture.utils.affix(p,v,connector=con), - con.join([p,v])) - self.assertEqual(omniture.utils.affix(base=v,suffix=s,connector=con), - con.join([v,s])) - self.assertEqual(omniture.utils.affix(p,v,s,connector=con), - con.join([p,v,s])) - self.assertEqual(omniture.utils.affix(base=v,connector=con), - con.join([v])) - - def test_translate(self): - """Test the translate method """ - t = {"product":"cat_collar", "price":100, "location":"no where"} - m = {"product":"Product_Name","price":"Cost","date":"Date"} - s = {"Product_Name":"cat_collar", "Cost":100, "location":"no where"} - self.assertEqual(omniture.utils.translate(t,m),s) - - diff --git a/omniture/reports.py b/omniture/reports.py index 40c1c01..8b1fbd5 100755 --- a/omniture/reports.py +++ b/omniture/reports.py @@ -150,9 +150,14 @@ def parse_rows(self,row, level=0, upperlevels=None): data_set.extend(self.parse_rows(row['breakdown'], level+1, data)) elif 'counts' in row: for index, metric in enumerate(row['counts']): - #decide what type of event - if self.metrics[index].decimals > 0 or metric.find('.') >-1: + if self.metrics[index].decimals > 0 or metric.find('.') >-1: + if metric == 'INF': + data[str(self.metrics[index].id)] = float('inf') + else: data[str(self.metrics[index].id)] = float(metric) + else: + if metric == 'INF': + data[str(self.metrics[index].id)] = float('inf') else: data[str(self.metrics[index].id)] = int(metric) diff --git a/setup.py b/setup.py index de6d70d..23c7be7 100755 --- a/setup.py +++ b/setup.py @@ -6,8 +6,8 @@ long_description=open('README.md').read(), author='Stijn Debrouwere', author_email='stijn@stdout.be', - url='http://stdbrouw.github.com/python-omniture/', - download_url='http://www.github.com/stdbrouw/python-omniture/tarball/master', + url='http://github.com/Dotmodus/python-omniture/', + #download_url='http://www.github.com/Dotmodus/python-omniture/tarball/master', version=__version__, license='MIT', packages=find_packages(),