-
Notifications
You must be signed in to change notification settings - Fork 13
/
Copy pathquery_async.py
87 lines (72 loc) · 2.22 KB
/
query_async.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
import asyncio
import random
import time
import pandas
from influxdb_client_3 import InfluxDBClient3
from config import Config
async def fibio(iterations, grit=0.5):
"""
example coroutine to run parallel with query_async
:param iterations:
:param grit:
:return:
"""
n0 = 1
n1 = 1
vals = [n0, n1]
for _ in range(iterations):
val = n0 + n1
n0 = n1
n1 = val
print(val)
vals.append(val)
await asyncio.sleep(grit)
return vals
def write_data(client: InfluxDBClient3, measurement):
"""
Synchronous write - only for preparing data
:param client:
:param measurement:
:return:
"""
ids = ['s3b1', 'dq41', 'sgw22']
lp_template = f"{measurement},id=%s speed=%f,alt=%f,bearing=%f %d"
data_size = 10
data = []
interval = 10 * 1_000_000_000
ts = time.time_ns() - (interval * data_size)
for _ in range(data_size):
data.append(lp_template % (ids[random.randint(0, len(ids) - 1)],
random.random() * 300,
random.random() * 2000,
random.random() * 360, ts))
ts += interval
client.write(data)
async def query_data(client: InfluxDBClient3, measurement):
"""
Query asynchronously - should not block other coroutines
:param client:
:param measurement:
:return:
"""
query = f"SELECT * FROM \"{measurement}\" WHERE time >= now() - interval '5 minutes' ORDER BY time DESC"
print(f"query start: {pandas.Timestamp(time.time_ns())}")
table = await client.query_async(query)
print(f"query returned: {pandas.Timestamp(time.time_ns())}")
return table.to_pandas()
async def main():
config = Config()
client = InfluxDBClient3(
host=config.host,
token=config.token,
database=config.database,
org=config.org
)
measurement = 'example_uav'
write_data(client, measurement)
# run both coroutines simultaneously
result = await asyncio.gather(fibio(10, 0.2), query_data(client, measurement))
print(f"fibio sequence = {result[0]}")
print(f"data set =\n{result[1]}")
if __name__ == "__main__":
asyncio.run(main())