-
-
Notifications
You must be signed in to change notification settings - Fork 25
/
Copy pathclient.py
157 lines (141 loc) · 5.37 KB
/
client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
from collections import defaultdict
import json
import re
import redis
import threading
import time
import traceback
import uuid
import base64
import binascii
TTL = 2
hash_keys = ('cmd', 'user')
cmd_hash_keys = {
'comment': ('addr',),
'extra_comment': ('addr',),
'area_comment': ('addr',),
'rename': ('addr',),
'stackvar_renamed': ('addr', 'offset', 'name',),
'struc_created': ('struc_name', 'is_union',),
'struc_deleted': ('struc_name',),
'struc_renamed': ('old_name', 'new_name',),
'struc_member_created': ('struc_name', 'offset', 'member_name', 'size', 'flag',),
'struc_member_deleted': ('struc_name', 'offset',),
'struc_member_renamed': ('struc_name', 'offset', 'member_name',),
'struc_member_changed': ('struc_name', 'offset', 'size',),
}
key_dec = {
'c': 'cmd',
'a': 'addr',
'u': 'user',
't': 'text',
'i': 'uuid',
'b': 'blocks'
}
key_enc = dict((v, k) for k, v in key_dec.items())
nick_filter = re.compile(r'[^a-zA-Z0-9_\-]')
def decode(data):
d = json.loads(data)
return dict((key_dec.get(k, k), v) for k, v in d.items())
def dtokey(d):
return tuple(((k, v) for k, v in sorted(d.items()) if k not in ('user', 'ts', 'uuid')))
def remove_ttl(a):
now = time.time()
return [d for d in a if now - d[0] < TTL]
class Client:
def __init__(self, host, port, nick, password=None):
self.r = redis.StrictRedis(host=host, port=port, password=password, socket_connect_timeout=5)
self.r.info()
self.nick = nick_filter.sub('_', nick)
self.ps = {}
self.nolock = threading.Lock()
self.nosend = defaultdict(list)
self.uuid = str(base64.b64encode(binascii.unhexlify(uuid.uuid4().hex)).decode('ascii'))
def debounce(self, no, data):
dkey = dtokey(data)
now = time.time()
with self.nolock:
for data in no:
ts = data[0]
key = data[1:]
if dkey == key and now - ts < TTL:
no.remove(data)
return True
return False
def _sub_thread(self, ps, cb, key):
for item in ps.listen():
try:
if item['type'] == 'message':
data = decode(item['data'])
if 'user' in data:
data['user'] = nick_filter.sub('_', data['user'])
# reject our own messages
if data.get('uuid') == self.uuid:
continue
with self.nolock:
self.nosend[key] = remove_ttl(self.nosend[key])
self.nosend[key].append((time.time(),) + dtokey(data))
cb(key, data)
elif item['type'] == 'subscribe':
decoded = []
for data in self.r.lrange(key, 0, -1):
try:
decoded.append(decode(data))
except Exception:
print('error decoding history', data)
traceback.print_exc()
state = []
dedup = set()
for data in reversed(decoded):
cmd = data.get('cmd')
if cmd:
keys = hash_keys + cmd_hash_keys.get(cmd, ())
hashkey = tuple([str(data.get(k)) for k in keys])
if all(hashkey):
if hashkey in dedup:
continue
dedup.add(hashkey)
state.append(data)
for data in reversed(state):
try:
with self.nolock:
self.nosend[key].append((time.time(),) + dtokey(data))
cb(key, data, replay=True)
except Exception:
print('error replaying history', data)
traceback.print_exc()
else:
print('unknown redis push', item)
except Exception:
print('error processing item', item)
traceback.print_exc()
def join(self, key, cb):
ps = self.r.pubsub()
ps.subscribe(key)
t = threading.Thread(target=self._sub_thread, args=(ps, cb, key))
t.daemon = True
t.start()
self.ps[key] = ps
self.publish(key, {'cmd': 'join'}, perm=False)
def leave(self, key):
ps = self.ps.pop(key, None)
if ps:
ps.unsubscribe(key)
def publish(self, key, data, perm=True, send_uuid=True):
if self.debounce(self.nosend[key], data):
return
data['user'] = self.nick
data['ts'] = self.r.time()[0]
if send_uuid:
data['uuid'] = self.uuid
data = dict((key_enc.get(k, k), v) for k, v in data.items())
data = json.dumps(data, separators=(',', ':'), sort_keys=True)
if perm:
self.r.rpush(key, data)
self.r.publish(key, data)
def push(self, key, data, send_uuid=True):
if send_uuid:
data['uuid'] = self.uuid
data = dict((key_enc.get(k, k), v) for k, v in data.items())
data = json.dumps(data, separators=(',', ':'), sort_keys=True)
self.r.lpush(key, data)