diff --git a/resources/session02/homework/http_server.py b/resources/session02/homework/http_server.py index 84ceffe1..2794e1d2 100644 --- a/resources/session02/homework/http_server.py +++ b/resources/session02/homework/http_server.py @@ -1,14 +1,16 @@ import socket import sys +import pathlib +import mimetypes -def response_ok(body=b"this is a pretty minimal response", mimetype=b"text/plain"): +def response_ok(body, mimetype): """returns a basic HTTP response""" resp = [] resp.append(b"HTTP/1.1 200 OK") - resp.append(b"Content-Type: text/plain") + resp.append(b"Content-Type: " + mimetype) # couldn't call format() on byte string resp.append(b"") - resp.append(b"this is a pretty minimal response") + resp.append(body) return b"\r\n".join(resp) @@ -22,7 +24,10 @@ def response_method_not_allowed(): def response_not_found(): """returns a 404 Not Found response""" - return b"" + resp = [] + resp.append("HTTP/1.1 404 Not Found") + resp.append("") + return "\r\n".join(resp).encode('utf8') def parse_request(request): @@ -35,7 +40,17 @@ def parse_request(request): def resolve_uri(uri): """This method should return appropriate content and a mime type""" - return b"still broken", b"text/plain" + path = pathlib.Path('webroot{}'.format(uri)) + if path.is_dir(): + resources = [item.name.encode('utf8') for item in path.iterdir()] + contents = b'\r\n'.join(resources) + mime_type = b'text/plain' + elif path.is_file(): + contents = path.read_bytes() + mime_type = mimetypes.guess_type(uri)[0].encode('utf8') + else: + raise NameError('No such file or directory. Please try again.') + return contents, mime_type def server(log_buffer=sys.stderr): diff --git a/resources/session03/calculator/calculator.py b/resources/session03/calculator/calculator.py new file mode 100644 index 00000000..bb6f0051 --- /dev/null +++ b/resources/session03/calculator/calculator.py @@ -0,0 +1,101 @@ +import re +import operator +from urllib.parse import parse_qsl + + +def header(): + header = """ + +Calculator + + +

Python Does Math For You


""" + return header + + +def footer(): + footer = """ +

+ + +""" + return footer + + +def calculator(): + calculator = """ +
+ Enter a calculation (e.g. '5+2'):

+

+ +
""" + return calculator + + +def calculate(num1, op_str, num2): + ops = {'+': operator.add, + '-': operator.sub, + '*': operator.mul, + '/': operator.truediv} + result = ops[op_str](int(num1), int(num2)) + calculation = """

{} {} {} equals {}



+ Make another calculation.""" + return calculation.format(num1, op_str, num2, result) + + +def html_doc(doc_body): + doc_header = header() + doc_footer = footer() + return doc_header + doc_body + doc_footer + + +def resolve_path(path): + urls = [(r'^$', calculator), + (r'^([\d]+)(\+|\-|\*|\/)([\d]+)$', calculate)] + matchpath = path.lstrip('/') + for regexp, func in urls: + match = re.match(regexp, matchpath) + if match is None: + continue + args = match.groups([]) + return func, args + # we get here if no url matches + raise NameError + + +def application(environ, start_response): + headers = [("Content-type", "text/html")] + try: + request = environ.get('PATH_INFO', None) + qs = environ.get('QUERY_STRING', None) + if request is None: + raise NameError + if qs: + qsl = parse_qsl(qs) # urllib function to convert query string to list of keys and values + request = qsl[0][1].replace(' ', '') # grab the calculation from the query string, strip whitespace and store it in request + func, args = resolve_path(request) + body = func(*args) + status = "200 OK" + except NameError: + status = "400 Bad Request" + body = """

Please re-enter your calculation using only digits and the following operands: +, -, *, /. Thanks!

+ Try another calculation.""" + except ZeroDivisionError: + status = "400 Bad Request" + body = """

You can't divide by zero!

+ Let's try something more reasonable.""" + except Exception: + status = "500 Internal Server Error" + body = """

Something bad has happened, but it's not your fault. Sorry.

+ Give us another chance.""" + finally: + html = html_doc(body) + headers.append(('Content-length', str(len(html)))) + start_response(status, headers) + return [html.encode('utf8')] + + +if __name__ == '__main__': + from wsgiref.simple_server import make_server + srv = make_server('localhost', 8080, application) + srv.serve_forever() diff --git a/resources/session04/soup/README.md b/resources/session04/soup/README.md new file mode 100644 index 00000000..382440f1 --- /dev/null +++ b/resources/session04/soup/README.md @@ -0,0 +1,29 @@ +MASHUP.PY +========= + +Instructions: +------------- + +A program to view restaurant health inspection data for an admitedly +limited geographical area of Seattle via a color-coded map. + +Type `python mashup.py` from the command line to run the program. + +The first prompt will ask for sorting criteria. You may choose from +the following selections: + +* Average Score +* High Score +* Total Inspections + +The second prompt will ask for a count value to determine the number of +results to display on the map. + +Optional command line parameters when running program: + +* `python mashup.py --help`: display help screen. +* `python mashup.py --low-to-high`: display the lowest health inspection +scores for your selected sorting criteria. Defaults to 'high-to-low' + + + diff --git a/resources/session04/soup/mashup.py b/resources/session04/soup/mashup.py new file mode 100644 index 00000000..ec4fd350 --- /dev/null +++ b/resources/session04/soup/mashup.py @@ -0,0 +1,222 @@ +import sys +from pprint import pprint # used for debugging +from bs4 import BeautifulSoup +import geocoder +import json +import pathlib +import re +import requests +import click +import webbrowser +import urllib.parse + + +INSPECTION_DOMAIN = 'http://info.kingcounty.gov/' +INSPECTION_PATH = '/health/ehs/foodsafety/inspections/Results.aspx' +INSPECTION_PARAMS = { + 'Output': 'W', + 'Business_Name': '', + 'Business_Address': '', + 'Longitude': '', + 'Latitude': '', + 'City': '', + 'Zip_Code': '', + 'Inspection_Type': 'All', + 'Inspection_Start': '', + 'Inspection_End': '', + 'Inspection_Closed_Business': 'A', + 'Violation_Points': '', + 'Violation_Red_Points': '', + 'Violation_Descr': '', + 'Fuzzy_Search': 'N', + 'Sort': 'H' +} + + +def get_inspection_page(**kwargs): + url = INSPECTION_DOMAIN + INSPECTION_PATH + params = INSPECTION_PARAMS.copy() + for key, val in kwargs.items(): + if key in INSPECTION_PARAMS: + params[key] = val + resp = requests.get(url, params=params) + resp.raise_for_status() # raise python exception based on http status + return resp.text + + +def parse_source(html): + parsed = BeautifulSoup(html) + return parsed + + +def load_inspection_page(name): + file_path = pathlib.Path(name) + return file_path.read_text(encoding='utf8') + + +def restaurant_data_generator(html): + id_finder = re.compile(r'PR[\d]+~') + return html.find_all('div', id=id_finder) + + +def has_two_tds(elem): + is_tr = elem.name == 'tr' + td_children = elem.find_all('td', recursive=False) + has_two = len(td_children) == 2 + return is_tr and has_two + + +def clean_data(td): + return td.text.strip(" \n:-") + + +def extract_restaurant_metadata(elem): + restaurant_data_rows = elem.find('tbody').find_all( + has_two_tds, recursive=False + ) + rdata = {} + current_label = '' + for data_row in restaurant_data_rows: + key_cell, val_cell = data_row.find_all('td', recursive=False) + new_label = clean_data(key_cell) + current_label = new_label if new_label else current_label + rdata.setdefault(current_label, []).append(clean_data(val_cell)) + return rdata + + +def is_inspection_data_row(elem): + is_tr = elem.name == 'tr' + if not is_tr: + return False + td_children = elem.find_all('td', recursive=False) + has_four = len(td_children) == 4 + this_text = clean_data(td_children[0]).lower() + contains_word = 'inspection' in this_text + does_not_start = not this_text.startswith('inspection') + return is_tr and has_four and contains_word and does_not_start + + +def get_score_data(elem): + inspection_rows = elem.find_all(is_inspection_data_row) + samples = len(inspection_rows) + total = 0 + high_score = 0 + average = 0 + for row in inspection_rows: + strval = clean_data(row.find_all('td')[2]) + try: + intval = int(strval) + except (ValueError, TypeError): + samples -= 1 + else: + total += intval + high_score = intval if intval > high_score else high_score + + if samples: + average = total/float(samples) + data = { + u'Average Score': average, + u'High Score': high_score, + u'Total Inspections': samples + } + return data + + +def result_generator(sort_by, high_to_low, count): + use_params = { + 'Inspection_Start': '2/1/2013', + 'Inspection_End': '2/1/2015', + 'Zip_Code': '98101' + } + # html = get_inspection_page(**use_params) + html = load_inspection_page('inspection_page.html') + parsed = parse_source(html) + content_col = parsed.find("td", id="contentcol") + data_list = restaurant_data_generator(content_col) + restaurant_list = [] + for data_div in data_list: + metadata = extract_restaurant_metadata(data_div) + inspection_data = get_score_data(data_div) + metadata.update(inspection_data) + restaurant_list.append(metadata) + if sort_by: + restaurant_list = sorted(restaurant_list, + key=lambda k: k[sort_by], + reverse=high_to_low) + for restaurant in restaurant_list[:count]: + yield restaurant + + +def get_geojson(result): + address = " ".join(result.get('Address', '')) + if not address: + return None + geocoded = geocoder.google(address) + geojson = geocoded.geojson + inspection_data = {} + use_keys = ( + 'marker-color', + 'Business Name', + 'Average Score', + 'Total Inspections', + 'High Score' + ) + for key, val in result.items(): + if key not in use_keys: + continue + if isinstance(val, list): + val = " ".join(val) + inspection_data[key] = val + geojson['properties'] = inspection_data + return geojson + + +def set_marker_color(sort_by, results): + # calculate the average score for this sample size and sorting criteria + scores = [result['properties'][sort_by] for result in results] + avg_score = sum(scores)/len(scores) + # set marker-color property for results based on relationship to avg. score + # green=go, yellow=proceed with caution, red=stop + for result in results: + if result['properties'][sort_by] >= (avg_score+5): + result['properties']['marker-color'] = '#00ff00' + elif result['properties'][sort_by] <= (avg_score-5): + result['properties']['marker-color'] = '#ff0000' + else: + result['properties']['marker-color'] = '#ffff00' + return results + + +def open_map(total_result): + # open map in geojson.io with your data + map_url = 'http://geojson.io/#data=data:application/json,' + escaped_geojson = urllib.parse.quote(json.dumps(total_result)) # use json to convert dict to string for escaping + geojson_url = map_url + escaped_geojson + webbrowser.open(geojson_url) + + +@click.command() +@click.option('--sort-by', + type=click.Choice(['Average Score', 'High Score', 'Total Inspections']), + prompt=True, + help='Sorting options: Average Score, High Score, Total Inspections') +@click.option('--high-to-low/--low-to-high', + default=True, + help='Select sort order.') +@click.option('--count', + default=10, + prompt=True, + help='Select number of results.') +def display_results(sort_by, high_to_low, count): + """A program to display the results of restaurant health inspections on a map.""" + total_result = {'type': 'FeatureCollection', 'features': []} + for result in result_generator(sort_by, high_to_low, count): + geojson = get_geojson(result) + total_result['features'].append(geojson) + total_result['features'] = set_marker_color(sort_by, total_result['features']) + # with open('my_map.json', 'w') as fh: + # json.dump(total_result, fh) + open_map(total_result) + +if __name__ == '__main__': + display_results() diff --git a/resources/session04/soup/test_mashup.py b/resources/session04/soup/test_mashup.py new file mode 100644 index 00000000..2816522e --- /dev/null +++ b/resources/session04/soup/test_mashup.py @@ -0,0 +1,44 @@ +import pytest + +from mashup import result_generator, get_geojson, set_marker_color + + +def test_result_generator(): + result_list = [] + for result in result_generator('Average Score', True, 5): + result_list.append(result) + for i in range(len(result_list)-1): + assert result_list[i]['Average Score'] >= result_list[i+1]['Average Score'] + assert len(result_list) == 5 + result_list = [] + for result in result_generator('High Score', False, 5): + result_list.append(result) + for i in range(len(result_list)-1): + assert result_list[i]['High Score'] <= result_list[i+1]['High Score'] + assert len(result_list) == 5 + result_list = [] + for result in result_generator('Total Inspections', True, 1): + result_list.append(result) + assert len(result_list) == 1 + + +def test_set_marker_color(): + # generate geojson result set + total_result = {'type': 'FeatureCollection', 'features': []} + for result in result_generator('Average Score', True, 10): + geojson = get_geojson(result) + total_result['features'].append(geojson) + total_result['features'] = set_marker_color('Average Score', total_result['features']) + # calculate avg. score of result set + scores = [result['properties']['Average Score'] for result in total_result['features']] + avg_score = sum(scores)/len(scores) + # assert color values set according to score + for i in range(len(total_result)-1): + if total_result['features'][i]['properties']['Average Score'] >= (avg_score+5): + assert total_result['features'][i]['properties']['marker-color'] == '#00ff00' + elif total_result['features'][i]['properties']['Average Score'] <= (avg_score-5): + assert total_result['features'][i]['properties']['marker-color'] == '#ff0000' + else: + assert total_result['features'][i]['properties']['marker-color'] == '#ffff00' + +