| 
13 | 13 | import logging  | 
14 | 14 | import optparse  | 
15 | 15 | import queue  | 
 | 16 | +import socket  | 
16 | 17 | 
 
  | 
17 | 18 | #   | 
18 | 19 | parser = optparse.OptionParser()  | 
 | 
22 | 23 | 
 
  | 
23 | 24 | (options, args) = parser.parse_args()  | 
24 | 25 | 
 
  | 
 | 26 | +# hack to figure out my ip address  | 
 | 27 | +s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)  | 
 | 28 | +s.connect(("8.8.8.8", 80))  | 
 | 29 | +myaddr = s.getsockname()[0]  | 
 | 30 | +s.close()  | 
 | 31 | + | 
25 | 32 | if options.admin_port is None:  | 
26 | 33 |     options.admin_port = "5561"  | 
27 | 34 | 
 
  | 
28 | 35 | if options.pub_port is None:  | 
29 | 36 |     options.pub_port = "5556"  | 
30 | 37 | 
 
  | 
31 |  | -options.admin_interface = "tcp://*:{}".format(options.admin_port)  | 
32 |  | -options.pub_interface = "tcp://*:{}".format(options.pub_port)  | 
 | 38 | +options.admin_interface = "tcp://{}:{}".format(myaddr,options.admin_port)  | 
 | 39 | +options.pub_interface = "tcp://{}:{}".format(myaddr,options.pub_port)  | 
 | 40 | +myinterfaces = ( options.admin_interface, options.pub_interface )  | 
33 | 41 | 
 
  | 
34 | 42 | if options.debug is None:  | 
35 | 43 |     options.stream_log_level = logging.INFO  | 
@@ -71,37 +79,57 @@ def decode_command(message):  | 
71 | 79 |         command, key, value = (None, None, None)  | 
72 | 80 |     return ( command, key, value )  | 
73 | 81 | 
 
  | 
 | 82 | +def blob(config):  | 
 | 83 | +    return repr( (myinterfaces, config.data, config.peers) )  | 
 | 84 | + | 
74 | 85 | def admin(config):  | 
75 | 86 |     context = zmq.Context()  | 
76 | 87 |     admin = context.socket(zmq.REP)  | 
77 | 88 |     admin.bind(options.admin_interface)  | 
78 | 89 |     log.info("admin bound on {}".format(options.admin_interface))  | 
 | 90 | + | 
 | 91 | +    talkback = context.socket(zmq.REQ)  | 
 | 92 | + | 
79 | 93 |     while True:  | 
80 | 94 |         command, key, value = decode_command(admin.recv_string())  | 
81 | 95 |         response = ""  | 
82 | 96 |         log.info("received command {}, key {}, value {}".format(command, key, value))  | 
83 | 97 |         if command == "dump":  | 
84 |  | -            log.info("DUMP: " + str(config.data))  | 
85 |  | -            response = str(config.data)  | 
 | 98 | +            log.debug("dump request, responding {}".format(blob(config)))  | 
 | 99 | +            log.info("dump request")  | 
 | 100 | +            response = blob(config)  | 
 | 101 | +            admin.send_string(response)  | 
86 | 102 |         if command == "put":  | 
87 | 103 |             log.info("putting {} {}".format(key, value))  | 
88 | 104 |             config.pub_queue.put((key, value))  | 
89 | 105 |             config.sub_queue.put(key)  | 
90 | 106 |             response = "ack"  | 
 | 107 | +            admin.send_string(response)  | 
91 | 108 |         if command == "get":  | 
92 | 109 |             log.info("getting {}".format(key))  | 
93 | 110 |             try:  | 
94 | 111 |                 config.sub_queue.put(key)  | 
95 | 112 |                 value = config.get_value(key)  | 
96 |  | -                reponse = value  | 
 | 113 | +                response = value  | 
 | 114 | +                admin.send_string(response)  | 
97 | 115 |             except KeyError:  | 
98 | 116 |                 config.sub_queue.put(key)  | 
99 | 117 |                 response = ""  | 
 | 118 | +                admin.send_string(response)  | 
100 | 119 |         if command == "link":  | 
101 |  | -            log.info("linking {}".format(value))  | 
102 |  | -            config.link_queue.put(value)  | 
103 |  | -            response = "ack"  | 
104 |  | -        admin.send_string(response)  | 
 | 120 | +            response = "initiating a link request to remote admin port at {}".format(value)  | 
 | 121 | +            log.debug(response)  | 
 | 122 | +            admin.send_string(response)  | 
 | 123 | +            log.debug("opening connection to " + str(value))  | 
 | 124 | +            talkback.connect(value)  | 
 | 125 | +            cmd = ("('dump', None, None)")  | 
 | 126 | +            log.debug("sending command " + cmd )  | 
 | 127 | +            talkback.send_string(cmd)    | 
 | 128 | +            message = talkback.recv_string()  | 
 | 129 | +            log.debug("got " + str(message) )  | 
 | 130 | +            remote_ports, data, peers = eval(message)  | 
 | 131 | +            remote_admin, remote_pub = remote_ports  | 
 | 132 | +            log.debug("remote_admin {}, remote_pub {}, data {}, peers {}".format(remote_admin, remote_pub, data, peers))  | 
105 | 133 | 
 
  | 
106 | 134 | def pub(config):  | 
107 | 135 |     context = zmq.Context()  | 
@@ -151,6 +179,7 @@ def __init__(self):  | 
151 | 179 |         self.sub_queue = queue.Queue()  | 
152 | 180 |         self.link_queue = queue.Queue()  | 
153 | 181 |         self.data = {}  | 
 | 182 | +        self.peers = {}  | 
154 | 183 |     def set_value(self, key, value):  | 
155 | 184 |         self.data[key] = value  | 
156 | 185 |     def get_value(self, key):  | 
 | 
0 commit comments