Skip to content

Commit 69761b0

Browse files
author
Kenneth Chan
committed
Event API JSON file exporter
1 parent d9b97dd commit 69761b0

File tree

2 files changed

+102
-17
lines changed

2 files changed

+102
-17
lines changed

examples/event_fileexport.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
import predictionio
2+
from datetime import datetime
3+
import pytz
4+
5+
exporter = predictionio.FileExporter(file_name="test.json")
6+
7+
first_event_properties = {
8+
"prop1" : 1,
9+
"prop2" : "value2",
10+
"prop3" : [1, 2, 3],
11+
"prop4" : True,
12+
"prop5" : ["a", "b", "c"],
13+
"prop6" : 4.56 ,
14+
}
15+
first_event_time = datetime(
16+
2004, 12, 13, 21, 39, 45, 618000, pytz.timezone('US/Mountain'))
17+
exporter.create_event(
18+
event="my_event",
19+
entity_type="user",
20+
entity_id="uid",
21+
properties=first_event_properties,
22+
event_time=first_event_time,
23+
)
24+
25+
# Second event
26+
second_event_properties = {
27+
"someProperty" : "value1",
28+
"anotherProperty" : "value2",
29+
}
30+
exporter.create_event(
31+
event="my_event",
32+
entity_type="user",
33+
entity_id="uid",
34+
target_entity_type="item",
35+
target_entity_id="iid",
36+
properties=second_event_properties,
37+
event_time=datetime(2014, 12, 13, 21, 38, 45, 618000, pytz.utc))
38+
39+
exporter.close()

predictionio/__init__.py

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ class EventClient(BaseClient):
167167
Default value is 5.
168168
"""
169169

170-
def __init__(self, access_key,
170+
def __init__(self, access_key,
171171
url="http://localhost:7070",
172172
threads=1, qsize=0, timeout=5):
173173
assert type(access_key) is str, ("access_key must be string. "
@@ -201,7 +201,7 @@ def acreate_event(self, event, entity_type, entity_id,
201201
:param target_entity_id: target entity id. type str.
202202
:param properties: a custom dict associated with an event. type dict.
203203
:param event_time: the time of the event. type datetime, must contain
204-
timezone info.
204+
timezone info.
205205
206206
:returns:
207207
AsyncRequest object. You can call the get_response() method using this
@@ -227,7 +227,7 @@ def acreate_event(self, event, entity_type, entity_id,
227227
# need to skip the last three digits.
228228
et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
229229
data["eventTime"] = et_str
230-
230+
231231
path = "/events.json?accessKey=%s" % (self.access_key, )
232232
request = AsyncRequest("POST", path, **data)
233233
request.set_rfunc(self._acreate_resp)
@@ -239,7 +239,7 @@ def create_event(self, event, entity_type, entity_id,
239239
event_time=None):
240240
"""Synchronously (blocking) create an event."""
241241
return self.acreate_event(event, entity_type, entity_id,
242-
target_entity_type, target_entity_id, properties,
242+
target_entity_type, target_entity_id, properties,
243243
event_time).get_response()
244244

245245
def aget_event(self, event_id):
@@ -249,7 +249,7 @@ def aget_event(self, event_id):
249249
event.
250250
251251
:returns:
252-
AsyncRequest object.
252+
AsyncRequest object.
253253
"""
254254
enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
255255
path = "/events/%s.json" % enc_event_id
@@ -269,7 +269,7 @@ def adelete_event(self, event_id):
269269
event.
270270
271271
:returns:
272-
AsyncRequest object.
272+
AsyncRequest object.
273273
"""
274274
enc_event_id = urllib.quote(event_id, "") # replace special char with %xx
275275
path = "/events/%s.json" % (enc_event_id, )
@@ -286,7 +286,7 @@ def delete_event(self, event_id):
286286

287287
def aset_user(self, uid, properties={}, event_time=None):
288288
"""Set properties of a user.
289-
289+
290290
Wrapper of acreate_event function, setting event to "$set" and entity_type
291291
to "user".
292292
"""
@@ -304,7 +304,7 @@ def set_user(self, uid, properties={}, event_time=None):
304304

305305
def aunset_user(self, uid, properties, event_time=None):
306306
"""Unset properties of an user.
307-
307+
308308
Wrapper of acreate_event function, setting event to "$unset" and entity_type
309309
to "user".
310310
"""
@@ -323,7 +323,7 @@ def unset_user(self, uid, properties, event_time=None):
323323

324324
def adelete_user(self, uid, event_time=None):
325325
"""Delete a user.
326-
326+
327327
Wrapper of acreate_event function, setting event to "$delete" and entity_type
328328
to "user".
329329
"""
@@ -339,7 +339,7 @@ def delete_user(self, uid, event_time=None):
339339

340340
def aset_item(self, iid, properties={}, event_time=None):
341341
"""Set properties of an item.
342-
342+
343343
Wrapper of acreate_event function, setting event to "$set" and entity_type
344344
to "item".
345345
"""
@@ -356,7 +356,7 @@ def set_item(self, iid, properties={}, event_time=None):
356356

357357
def aunset_item(self, iid, properties={}, event_time=None):
358358
"""Unset properties of an item.
359-
359+
360360
Wrapper of acreate_event function, setting event to "$unset" and entity_type
361361
to "item".
362362
"""
@@ -373,7 +373,7 @@ def unset_item(self, iid, properties={}, event_time=None):
373373

374374
def adelete_item(self, iid, event_time=None):
375375
"""Delete an item.
376-
376+
377377
Wrapper of acreate_event function, setting event to "$delete" and entity_type
378378
to "item".
379379
"""
@@ -413,7 +413,7 @@ def record_user_action_on_item(self, action, uid, iid, properties={},
413413
class EngineClient(BaseClient):
414414
"""Client for extracting prediction results from an PredictionIO Engine
415415
Instance.
416-
416+
417417
:param url: the url of the PredictionIO Engine Instance.
418418
:param threads: number of threads to handle PredictionIO API requests.
419419
Must be >= 1.
@@ -424,7 +424,7 @@ class EngineClient(BaseClient):
424424
:param timeout: timeout for HTTP connection attempts and requests in
425425
seconds (optional).
426426
Default value is 5.
427-
427+
428428
"""
429429
def __init__(self, url="http://localhost:8000", threads=1,
430430
qsize=0, timeout=5):
@@ -435,7 +435,7 @@ def asend_query(self, data):
435435
query.
436436
437437
:param data: the query: It is coverted to an json object using json.dumps
438-
method. type dict.
438+
method. type dict.
439439
440440
:returns:
441441
AsyncRequest object. You can call the get_response() method using this
@@ -449,10 +449,56 @@ def asend_query(self, data):
449449

450450
def send_query(self, data):
451451
"""Synchronously send a request.
452-
452+
453453
:param data: the query: It is coverted to an json object using json.dumps
454-
method. type dict.
454+
method. type dict.
455455
456456
:returns: the prediction.
457457
"""
458458
return self.asend_query(data).get_response()
459+
460+
class FileExporter(object):
461+
"""File exporter to export events to JSON file for batch import
462+
463+
:param file_name: the destination file name
464+
"""
465+
def __init__(self, file_name):
466+
"""Constructor of Exporter.
467+
468+
"""
469+
self._file = open(file_name, 'w')
470+
471+
def create_event(self, event, entity_type, entity_id,
472+
target_entity_type=None, target_entity_id=None, properties=None,
473+
event_time=None):
474+
"""write event to the file"""
475+
data = {
476+
"event": event,
477+
"entityType": entity_type,
478+
"entityId": entity_id,
479+
}
480+
481+
if target_entity_type is not None:
482+
data["targetEntityType"] = target_entity_type
483+
484+
if target_entity_id is not None:
485+
data["targetEntityId"] = target_entity_id
486+
487+
if properties is not None:
488+
data["properties"] = properties
489+
490+
et = event_time_validation(event_time)
491+
# EventServer uses milliseconds, but python datetime class uses micro. Hence
492+
# need to skip the last three digits.
493+
et_str = et.strftime("%Y-%m-%dT%H:%M:%S.%f")[:-3] + et.strftime("%z")
494+
data["eventTime"] = et_str
495+
496+
j = json.dumps(data)
497+
self._file.write(j+"\n")
498+
499+
def close(self):
500+
"""Close the FileExporter
501+
502+
Call this method when you finish writing all events to JSON file
503+
"""
504+
self._file.close()

0 commit comments

Comments
 (0)