-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathencoder.py
229 lines (164 loc) · 8.33 KB
/
encoder.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
from __future__ import absolute_import, print_function, unicode_literals
import logging
import sys
from collections import defaultdict
from functools import partial
from wolframclient.serializers.utils import safe_len
from wolframclient.utils.api import multiprocessing, pkg_resources
from wolframclient.utils.dispatch import Dispatch
from wolframclient.utils.environment import installation_version
from wolframclient.utils.functional import composition, is_iterable, iterate, map
from wolframclient.utils.importutils import safe_import_string
logger = logging.getLogger(__name__)
__all__ = ["wolfram_encoder", "Encoder"]
class WolframDispatch(Dispatch):
def __init__(self, *args, **opts):
super().__init__(*args, **opts)
self.registry = defaultdict(list)
self.modules = set()
self.plugins_registry = defaultdict(list)
def register_modules(self, **handlers):
for module, _handlers in handlers.items():
self.modules.add(module)
self.registry[module].extend(iterate(_handlers))
def register_plugins(self, name="wolframclient_serializers_encoder"):
if logger.isEnabledFor(logging.INFO):
logger.info(
"Registering Wolfram encoders plugins associated with entrypoint %s." % name
)
for entry_point in pkg_resources.iter_entry_points(group=name):
self.plugins_registry[entry_point.name].extend(entry_point.module_name)
def _update_dispatch(self):
if self.modules:
installed_modules = sys.modules.keys()
for module in self.modules.intersection(installed_modules):
for handler in self.registry[module]:
self.update(safe_import_string(handler), keep_existing=True)
del self.registry[module]
self.modules.remove(module)
def _update_plugins(self):
if self.plugins_registry:
for plugins_name, handler in self.plugins_registry.items():
handler = "".join(handler)
try:
self.update(safe_import_string(handler))
except TypeError:
logger.fatal(
"Failed to load encoder associated to plugins %s." % plugins_name
)
raise
self.plugins_registry = defaultdict(list)
# global lock to avoid multiple dispatcher updating in multithreaded programs.
def update_dispatch(self):
with multiprocessing.Lock():
self._update_dispatch()
self._update_plugins()
wolfram_encoder = WolframDispatch()
wolfram_encoder.register_modules(
# builtin libraries
sys="wolframclient.serializers.encoders.builtin.encoder",
decimal="wolframclient.serializers.encoders.decimal.encoder",
io="wolframclient.serializers.encoders.io.encoder",
datetime="wolframclient.serializers.encoders.datetime.encoder",
fractions="wolframclient.serializers.encoders.fractions.encoder",
zoneinfo="wolframclient.serializers.encoders.zoneinfo.encoder",
# wolfram language support
numpy="wolframclient.serializers.encoders.numpy.encoder",
pandas="wolframclient.serializers.encoders.pandas.encoder",
astropy="wolframclient.serializers.encoders.astropy.encoder",
PIL=(
"wolframclient.serializers.encoders.pil.encoder",
"wolframclient.serializers.encoders.numpy.encoder",
),
)
wolfram_encoder.register_plugins()
@wolfram_encoder.dispatch(object)
def encode(serializer, o):
if is_iterable(o):
return serializer.serialize_iterable(map(serializer.encode, o), length=safe_len(o))
if serializer.object_processor:
return serializer.object_processor(serializer, o)
raise NotImplementedError("Cannot serialize object of class %s" % o.__class__)
wolfram_encoder.__doc__ = """
Mapping between Python types and encoders used during serializations.
This instance of :class:`~wolframclient.utils.dispatch.Dispatch` is used in
:func:`~wolframclient.serializers.export` to serialize Python expressions and produce a stream of bytes.
**Register new encoders:**
The annotation :meth:`~wolframclient.utils.dispatch.Dispatch.dispatch` applied to a function, defines an encoder
and associates it to the types passed as argument of the annotation.
Define a new class::
class MyPythonClass(object):
def __init__(self, *arguments):
self.arguments = arguments
Specify its encoder::
from wolframclient.serializers import wolfram_encoder
from wolframclient.language import wl
from wolframclient.serializers import export
@wolfram_encoder.dispatch(MyPythonClass)
def my_encoder(serializer, o):
return serializer.encode(wl.MyWolframFunction(*o.arguments))
Serialize an expression::
>>> export(MyPythonClass(1,2))
b'MyWolframFunction[1, 2]'
Alternatively, apply :meth:`~wolframclient.utils.dispatch.Dispatch.register` to a function and its associated
type(s) achieves the same result.
It is not possible to associate two encoders with the same type, but it's possible to remove a mapping. First,
unregister the previous encoder::
wolfram_encoder.unregister(MyPythonClass)
And register it again with :meth:`~wolframclient.utils.dispatch.Dispatch.register`::
wolfram_encoder.register(my_encoder, MyPythonClass)
**Update with a dispatcher:**
Another way to extend supported types is to create a new :class:`~wolframclient.utils.dispatch.Dispatch`, map
various types and encoders and ultimately update :data:`wolfram_encoder` using
:meth:`~wolframclient.utils.dispatch.Dispatch.update`.
Create a new dispatcher and register :data:`MyPythonClass`::
from wolframclient.utils.dispatch import Dispatch
dispatch = Dispatch()
dispatch.register(my_encoder, MyPythonClass)
Update the main encoder with the new dispatch instance::
wolfram_encoder.update(dispatch)
Serialize an expression::
>>> export(MyPythonClass(1,2))
b'MyWolframFunction[1, 2]'
**Define plugins:**
The library supports an entry point dedicated to new encoders: `wolframclient_serializers_encoder`. The library uses
this entry point to loads plugins at runtime as separated libraries. For more information about entry points, refer
to the documentation page about `entry points <https://packaging.python.org/specifications/entry-points/>`_.
The plugin name must be unique and the value must reference a dispatcher instance. This instance is loaded and used
to update :data:`wolfram_encoder`. A plugin is a simple way to distribute encoders as a separate library.
One type must have a unique encoder associated to it; as a consequence, two plugins registering an encoder for the
same type are incompatible. It is strongly advised to create one plugin for each existing Python library,
e.g. have one plugin dedicated to NumPy and one to Pandas, which makes heavy use of NumPy arrays.
"""
class Encoder:
"""A generic class exposing an :meth:`~wolframclient.serializers.encode.Encoder.encode`
method applying an optional normalizer function, followed the most relevant encoding available
for a given type.
Arbitrary named parameters passed during initialization are later accessible with
:meth:`~wolframclient.serializers.encode.Encoder.get_property`.
"""
def __init__(
self,
normalizer=None,
encoder=None,
object_processor=None,
target_kernel_version=None,
**kwargs,
):
self.encode = self.chain_normalizer(
normalizer, encoder=safe_import_string(encoder or wolfram_encoder)
)
self.object_processor = object_processor
self.target_kernel_version = target_kernel_version or installation_version()
self._properties = kwargs
def chain_normalizer(self, func, encoder):
if isinstance(encoder, WolframDispatch):
encoder.update_dispatch()
return composition(
*map(safe_import_string, iterate(func or (), partial(encoder.as_method(), self)))
)
def get_property(self, key, d=None):
"""Return the value of the named parameter passed during initialization.
Set `d` to the default value if key was not present.
"""
return self._properties.get(key, d)