-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathbatching_example.py
141 lines (118 loc) · 5.27 KB
/
batching_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
import datetime
import random
import time
from bson import ObjectId
import influxdb_client_3 as InfluxDBClient3
from influxdb_client_3 import write_client_options, WritePrecision, WriteOptions, InfluxDBError
from config import Config
class BatchingCallback(object):
def __init__(self):
self.write_status_msg = None
self.write_count = 0
self.retry_count = 0
self.start = time.time_ns()
def success(self, conf, data: str):
print(f"Written batch: {conf}, data: {data}")
self.write_count += 1
self.write_status_msg = f"SUCCESS: {self.write_count} writes"
def error(self, conf, data: str, exception: InfluxDBError):
print(f"Cannot write batch: {conf}, data: {data} due: {exception}")
self.write_status_msg = f"FAILURE - cause: {exception}"
def retry(self, conf, data: str, exception: InfluxDBError):
print(f"Retryable error occurs for batch: {conf}, data: {data} retry: {exception}")
self.retry_count += 1
def elapsed(self) -> int:
return time.time_ns() - self.start
def main() -> None:
conf = Config()
# Creating 5.000 gatewayId values as MongoDB ObjectIDs
gatewayIds = [ObjectId() for x in range(0, 100)]
# Setting decimal precision to 2
precision = 2
# Setting timestamp for first sensor reading
sample_window_days = 7
now = datetime.datetime.now()
now = now - datetime.timedelta(days=sample_window_days)
target_sample_count = sample_window_days * 24 * 60 * 6
callback = BatchingCallback()
write_options = WriteOptions(batch_size=5_000,
flush_interval=10_000,
jitter_interval=2_000,
retry_interval=5_000,
max_retries=5,
max_retry_delay=30_000,
max_close_wait=600_000,
exponential_base=2)
wco = write_client_options(success_callback=callback.success,
error_callback=callback.error,
retry_callback=callback.retry,
write_options=write_options)
# Opening InfluxDB client with a batch size of 5k points or flush interval
# of 10k ms and gzip compression
with InfluxDBClient3.InfluxDBClient3(token=conf.token,
host=conf.host,
org=conf.org,
database=conf.database,
enable_gzip=True,
write_client_options=wco) as _client:
# Creating iterator for one hour worth of data (6 sensor readings per
# minute)
print(f"Writing {target_sample_count} data points.")
for i in range(0, target_sample_count):
# Adding 10 seconds to timestamp of previous sensor reading
now = now + datetime.timedelta(seconds=10)
# Iterating over gateways
for gatewayId in gatewayIds:
# Creating random test data for 12 fields to be stored in
# timeseries database
bcW = random.randrange(1501)
bcWh = round(random.uniform(0, 4.17), precision)
bdW = random.randrange(71)
bdWh = round(random.uniform(0, 0.12), precision)
cPvWh = round(random.uniform(0.51, 27.78), precision)
cW = random.randrange(172, 10001)
cWh = round(random.uniform(0.51, 27.78), precision)
eWh = round(random.uniform(0, 41.67), precision)
iWh = round(random.uniform(0, 16.67), precision)
pW = random.randrange(209, 20001)
pWh = round(random.uniform(0.58, 55.56), precision)
scWh = round(random.uniform(0.58, 55.56), precision)
# Creating point to be ingested into InfluxDB
p = InfluxDBClient3.Point("stream").tag(
"gatewayId",
str(gatewayId)).field(
"bcW",
bcW).field(
"bcWh",
bcWh).field(
"bdW",
bdW).field(
"bdWh",
bdWh).field(
"cPvWh",
cPvWh).field(
"cW",
cW).field(
"cWh",
cWh).field(
"eWh",
eWh).field(
"iWh",
iWh).field(
"pW",
pW).field(
"pWh",
pWh).field(
"scWh",
scWh).time(
now.strftime('%Y-%m-%dT%H:%M:%SZ'),
WritePrecision.S)
# Writing point (InfluxDB automatically batches writes into sets of
# 5k points)
_client.write(record=p)
print(callback.write_status_msg)
print(f"Write retries: {callback.retry_count}")
print(f"Wrote {target_sample_count} data points.")
print(f"Elapsed time ms: {int(callback.elapsed() / 1_000_000)}")
if __name__ == "__main__":
main()