-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathcloudsession.py
358 lines (292 loc) · 13.2 KB
/
cloudsession.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
from __future__ import absolute_import, print_function, unicode_literals
import logging
from wolframclient.evaluation.base import WolframEvaluator
from wolframclient.evaluation.cloud.base import WolframAPICallBase
from wolframclient.evaluation.cloud.oauth import OAuth1RequestsSyncSession as OAuthSession
from wolframclient.evaluation.cloud.oauth import XAuthRequestsSyncSession as XAuthSession
from wolframclient.evaluation.cloud.server import WOLFRAM_PUBLIC_CLOUD_SERVER
from wolframclient.evaluation.result import (
WolframAPIResponseBuilder,
WolframCloudEvaluationWXFResponse,
)
from wolframclient.exception import AuthenticationException
from wolframclient.serializers import export
from wolframclient.utils import six
from wolframclient.utils.api import futures, json, requests
from wolframclient.utils.decorators import to_dict
from wolframclient.utils.url import evaluation_api_url, user_api_url
logger = logging.getLogger(__name__)
__all__ = ["WolframCloudSession", "WolframAPICall"]
class WolframCloudSession(WolframEvaluator):
"""Represent a session to a given cloud enabling simple API call.
This is the central class of the cloud evaluation package. It is initialized with a server instance
representing a given cloud. By default, a session targets the Wolfram Public Cloud.
Most of the time it is necessary to authenticate with the server before issuing requests. A session supports two
forms of authentication:
* 2-legged oauth using a secured authentication key
* xauth using the user ID and password
Calling an API is done through the method
:func:`~wolframclient.evaluation.cloud.cloudsession.WolframCloudSession.call`, which will return an instance of
:class:`~wolframclient.evaluation.result.WolframAPIResponse`.
It is strongly advised to reuse a session to make multiple calls to mitigate the cost of initialization.
`max_workers` can be specified and is passed to the :class:`~concurrent.futures.ThreadPoolExecutor` used for future methods.
"""
def __init__(
self,
credentials=None,
server=None,
inputform_string_evaluation=True,
oauth_session_class=None,
xauth_session_class=None,
http_sessionclass=None,
max_workers=4,
):
super().__init__(inputform_string_evaluation=inputform_string_evaluation)
self.server = server or WOLFRAM_PUBLIC_CLOUD_SERVER
self.evaluation_api_url = evaluation_api_url(http://github.com/self.server)
self.http_sessionclass = http_sessionclass or requests.Session
self.http_session = None
self.credentials = credentials
self.evaluation_api_url = evaluation_api_url(http://github.com/self.server)
self.xauth_session_class = xauth_session_class or XAuthSession
self.oauth_session_class = oauth_session_class or OAuthSession
self.oauth_session = None
self.verify = self.server.certificate
self._pool = None
self._max_workers = max_workers
def duplicate(self):
return self.__class__(
credentials=self.credentials,
server=self.server,
inputform_string_evaluation=self.inputform_string_evaluation,
oauth_session_class=self.oauth_session_class,
xauth_session_class=self.xauth_session_class,
http_sessionclass=self.http_sessionclass,
max_workers=self._max_workers,
)
@property
def started(self):
return self.http_session is not None and (self.anonymous() or self.authorized())
def start(self):
self.stopped = False
if not self.started:
if self.http_session is None:
self.http_session = self.http_sessionclass()
self.http_session.headers = {"User-Agent": "WolframClientForPython/1.0"}
if not self.anonymous():
self._authenticate()
def stop(self):
self._stop(gracefully=True)
def terminate(self):
self._stop(gracefully=False)
def _stop(self, gracefully=True):
self.stopped = True
if self.http_session:
self.http_session.close()
self.http_session = None
if self._pool is not None:
self._pool.shutdown(wait=gracefully)
self._pool = None
self.oauth_session = None
def anonymous(self):
return self.credentials is None
def authorized(self):
return self.oauth_session is not None and self.oauth_session.authorized()
def _authenticate(self):
"""Authenticate with the server using the credentials.
This method supports both oauth and xauth methods. It is not necessary
to call it, since the session will try to authenticate when the first
request is issued."""
logger.info("Authenticating to the server.")
if self.credentials is None:
raise AuthenticationException("Missing credentials.")
if self.credentials.is_xauth:
self.oauth_session = self.xauth_session_class(
self.credentials,
self.http_session,
self.server,
self.server.xauth_consumer_key,
self.server.xauth_consumer_secret,
)
else:
self.oauth_session = self.oauth_session_class(
self.http_session,
self.server,
self.credentials.consumer_key,
self.credentials.consumer_secret,
)
self.oauth_session.authenticate()
def _post(self, url, headers={}, body={}, files={}, params={}):
"""Do a POST request, signing the content only if authentication has been successful."""
self.ensure_started()
headers["User-Agent"] = "WolframClientForPython/1.0"
if self.authorized():
logger.info("Authenticated call to api %s", url)
return self.oauth_session.signed_request(
url, headers=headers, body=body, files=files
)
else:
logger.info("Anonymous call to api %s", url)
return self.http_session.post(
url, params=params, headers=headers, data=body, files=files, verify=self.verify
)
def ensure_started(self):
if not self.started:
self.start()
if self.stopped:
self.restart()
def call(
self,
api,
input_parameters={},
files={},
target_format="wl",
permissions_key=None,
**kwargv,
):
"""Call a given API using the provided input parameters.
`api` can be a string url or a :class:`tuple` (`username`, `api name`). The username is generally the Wolfram
Language symbol ``$UserName``. The API name can be a UUID or a relative path, e.g. *myapi/foo/bar*.
The input parameters are provided as a dictionary with string keys being the name of the parameters associated
to their value.
Files are passed in a dictionary. Values can have multiple forms::
{'parameter name': file_pointer}
It is possible to explicitly specify a filename and a content type::
{'parameter name': ('filename', file_pointer, 'content-type')}
String can also be passed as files::
{'parameter name': ('filename', '...string...data...', 'content-type')}
It is possible to pass a ``PermissionsKey`` to the server alongside the query and get access to a given
resource.
"""
url = user_api_url(http://github.com/self.server,%20api)
params = {"_key": permissions_key} if permissions_key is not None else {}
is_multipart = isinstance(files, dict) and len(files) > 0
encoded_inputs = encode_api_inputs(
input_parameters, target_format=target_format, multipart=is_multipart, **kwargv
)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Encoded input %s", encoded_inputs)
# in multipart requests we have to merge input parameters with files.
# and use the same format for both.
if is_multipart:
encoded_inputs.update(files)
response = self._post(url, files=encoded_inputs, params=params)
else:
response = self._post(url, body=encoded_inputs, params=params)
return WolframAPIResponseBuilder.build(response)
def _call_evaluation_api(self, expr, **kwargs):
data = export(expr, target_format="wl", **kwargs)
if logger.isEnabledFor(logging.DEBUG):
logger.debug("Sending expression to cloud server for evaluation: %s", data)
response = self._post(self.evaluation_api_url, body=data)
return WolframCloudEvaluationWXFResponse(response)
def evaluate(self, expr, **kwargs):
"""Send `expr` to the cloud for evaluation and return the result.
`expr` can be a Python object serializable by :func:`~wolframclient.serializers.export` or the string
:wl:`InputForm` of an expression to evaluate.
"""
return self._call_evaluation_api(self.normalize_input(expr), **kwargs).get()
def evaluate_wrap(self, expr, **kwargs):
"""Similar to :func:`~wolframclient.evaluation.cloud.cloudsession.WolframCloudSession.evaluate` but return the
result as a :class:`~wolframclient.evaluation.result.WolframCloudEvaluationResponse`.
"""
return self._call_evaluation_api(self.normalize_input(expr), **kwargs)
# Future methods
@property
def pool(self):
if self._pool is None:
self._pool = futures.ThreadPoolExecutor(max_workers=self._max_workers)
return self._pool
def call_future(
self, api, input_parameters={}, target_format="wl", permissions_key=None, **kwargv
):
"""Call a given API asynchronously and return a :class:`~concurrent.futures.Future` object.
See :func:`WolframCloudSession.call` for more details about input parameters.
"""
return self.pool.submit(
self.call,
api,
input_parameters=input_parameters,
target_format=target_format,
permissions_key=permissions_key,
**kwargv,
)
def evaluate_future(self, expr, **kwargs):
"""Send `expr` to the cloud for asynchronous evaluation and return a :class:`~concurrent.futures.Future` object.
`expr` can be a Python object serializable by :func:`~wolframclient.serializers.export` or the string
:wl:`InputForm` of an expression to evaluate.
"""
return self.pool.submit(self.evaluate, expr, **kwargs)
def evaluate_wrap_future(self, expr, **kwargs):
return self.pool.submit(self.evaluate_wrap, expr, **kwargs)
def wolfram_api_call(self, api, **kwargs):
"""Build an helper class instance to call a given API."""
return WolframAPICall(self, api, **kwargs)
def __repr__(self):
return "<{}:base={}, authorized={}>".format(
self.__class__.__name__, self.server.cloudbase, self.authorized()
)
class WolframAPICall(WolframAPICallBase):
"""Helper class to perform an API call using a cloud session."""
def perform(self, **kwargs):
"""Make the API call and return the result."""
return self.target.call(
self.api,
input_parameters=self.parameters,
files=self.files,
permissions_key=self.permission_key,
**kwargs,
)
def perform_future(self, **kwargs):
return self.target.call_future(
self.api,
input_parameters=self.parameters,
files=self.files,
permissions_key=self.permission_key,
**kwargs,
)
@to_dict
def _encode_inputs_as_wxf(inputs, multipart, **kwargs):
for name, value in inputs.items():
yield "%s__wxf" % name, _to_multipart(
name, export(value, target_format="wxf", **kwargs), multipart=multipart
)
@to_dict
def _encode_inputs_as_json(inputs, multipart, **kwargs):
for name, value in inputs.items():
yield "%s__json" % name, _to_multipart(
name, json.dumps(value, **kwargs), multipart=multipart
)
@to_dict
def _encode_inputs_as_wl(inputs, multipart, **kwargs):
for name, value in inputs.items():
# avoid double encoding of strings '\"string\"'.
if isinstance(value, six.string_types):
yield name, _to_multipart(name, value, multipart=multipart)
else:
yield name, _to_multipart(
name, export(value, target_format="wl", **kwargs), multipart=multipart
)
def _to_multipart(name, value, multipart=False):
"""Update the given :class:`~parameters` with a new inputs using the appropriate form based on `multipart`."""
if multipart:
return ("tmp_file_%s" % name, value)
else:
return value
SUPPORTED_ENCODING_FORMATS = {
"json": _encode_inputs_as_json,
"wxf": _encode_inputs_as_wxf,
"wl": _encode_inputs_as_wl,
}
def encode_api_inputs(inputs, target_format="wl", multipart=False, **kwargs):
if not inputs:
return {}
try:
encoder = SUPPORTED_ENCODING_FORMATS[target_format]
except KeyError:
raise ValueError(
"Invalid encoding format {}. Choices are: {}".format(
target_format, ", ".join(SUPPORTED_ENCODING_FORMATS.keys())
)
)
return encoder(inputs, multipart, **kwargs)