forked from Kyligence/spark
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmlflow.py
238 lines (195 loc) · 8.33 KB
/
mlflow.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
MLflow-related functions to load models and apply them to pandas-on-Spark dataframes.
"""
from typing import List, Union
from pyspark.sql.types import DataType
import pandas as pd
import numpy as np
from typing import Any
from pyspark.pandas._typing import Label, Dtype
from pyspark.pandas.utils import lazy_property, default_session
from pyspark.pandas.frame import DataFrame
from pyspark.pandas.series import Series, first_series
from pyspark.pandas.typedef import as_spark_type
__all__ = ["PythonModelWrapper", "load_model"]
class PythonModelWrapper:
"""
A wrapper around MLflow's Python object model.
This wrapper acts as a predictor on pandas-on-Spark
"""
def __init__(self, model_uri: str, return_type_hint: Union[str, type, Dtype]):
self._model_uri = model_uri
self._return_type_hint = return_type_hint
@lazy_property
def _return_type(self) -> DataType:
hint = self._return_type_hint
# The logic is simple for now, because it corresponds to the default
# case: continuous predictions
# TODO: do something smarter, for example when there is a sklearn.Classifier (it should
# return an integer or a categorical)
# We can do the same for pytorch/tensorflow/keras models by looking at the output types.
# However, this is probably better done in mlflow than here.
if hint == "infer" or not hint:
hint = np.float64
return as_spark_type(hint)
@lazy_property
def _model(self) -> Any:
"""
The return object has to follow the API of mlflow.pyfunc.PythonModel.
"""
from mlflow import pyfunc
return pyfunc.load_model(model_uri=self._model_uri)
@lazy_property
def _model_udf(self) -> Any:
from mlflow import pyfunc
spark = default_session()
return pyfunc.spark_udf(spark, model_uri=self._model_uri, result_type=self._return_type)
def __str__(self) -> str:
return "PythonModelWrapper({})".format(str(self._model))
def __repr__(self) -> str:
return "PythonModelWrapper({})".format(repr(self._model))
def predict(self, data: Union[DataFrame, pd.DataFrame]) -> Union[Series, pd.Series]:
"""
Returns a prediction on the data.
If the data is a pandas-on-Spark DataFrame, the return is a pandas-on-Spark Series.
If the data is a pandas Dataframe, the return is the expected output of the underlying
pyfunc object (typically a pandas Series or a numpy array).
"""
if isinstance(data, pd.DataFrame):
return self._model.predict(data)
elif isinstance(data, DataFrame):
return_col = self._model_udf(*data._internal.data_spark_columns)
# TODO: the columns should be named according to the mlflow spec
# However, this is only possible with spark >= 3.0
# s = F.struct(*data.columns)
# return_col = self._model_udf(s)
column_labels: List[Label] = [
(col,) for col in data._internal.spark_frame.select(return_col).columns
]
internal = data._internal.copy(
column_labels=column_labels, data_spark_columns=[return_col], data_fields=None
)
return first_series(DataFrame(internal))
else:
raise ValueError("unknown data type: {}".format(type(data).__name__))
def load_model(
model_uri: str, predict_type: Union[str, type, Dtype] = "infer"
) -> PythonModelWrapper:
"""
Loads an MLflow model into a wrapper that can be used both for pandas and pandas-on-Spark
DataFrame.
Parameters
----------
model_uri : str
URI pointing to the model. See MLflow documentation for more details.
predict_type : a python basic type, a numpy basic type, a Spark type or 'infer'.
This is the return type that is expected when calling the predict function of the model.
If 'infer' is specified, the wrapper will attempt to automatically determine the return type
based on the model type.
Returns
-------
PythonModelWrapper
A wrapper around MLflow PythonModel objects. This wrapper is expected to adhere to the
interface of mlflow.pyfunc.PythonModel.
Examples
--------
Here is a full example that creates a model with scikit-learn and saves the model with
MLflow. The model is then loaded as a predictor that can be applied on a pandas-on-Spark
Dataframe.
We first initialize our MLflow environment:
>>> from mlflow.tracking import MlflowClient, set_tracking_uri
>>> import mlflow.sklearn
>>> from tempfile import mkdtemp
>>> d = mkdtemp("pandas_on_spark_mlflow")
>>> set_tracking_uri("file:%s"%d)
>>> client = MlflowClient()
>>> exp_id = mlflow.create_experiment("my_experiment")
>>> exp = mlflow.set_experiment("my_experiment")
We aim at learning this numerical function using a simple linear regressor.
>>> from sklearn.linear_model import LinearRegression
>>> train = pd.DataFrame({"x1": np.arange(8), "x2": np.arange(8)**2,
... "y": np.log(2 + np.arange(8))})
>>> train_x = train[["x1", "x2"]]
>>> train_y = train[["y"]]
>>> with mlflow.start_run():
... lr = LinearRegression()
... lr.fit(train_x, train_y)
... mlflow.sklearn.log_model(lr, "model")
LinearRegression...
Now that our model is logged using MLflow, we load it back and apply it on a pandas-on-Spark
dataframe:
>>> from pyspark.pandas.mlflow import load_model
>>> run_info = client.search_runs(exp_id)[-1].info
>>> model = load_model("runs:/{run_id}/model".format(run_id=run_info.run_id))
>>> prediction_df = ps.DataFrame({"x1": [2.0], "x2": [4.0]})
>>> prediction_df["prediction"] = model.predict(prediction_df)
>>> prediction_df
x1 x2 prediction
0 2.0 4.0 1.355551
The model also works on pandas DataFrames as expected:
>>> model.predict(prediction_df[["x1", "x2"]].to_pandas())
array([[1.35555142]])
Notes
-----
Currently, the model prediction can only be merged back with the existing dataframe.
Other columns must be manually joined.
For example, this code will not work:
>>> df = ps.DataFrame({"x1": [2.0], "x2": [3.0], "z": [-1]})
>>> features = df[["x1", "x2"]]
>>> y = model.predict(features)
>>> # Works:
>>> features["y"] = y # doctest: +SKIP
>>> # Will fail with a message about dataframes not aligned.
>>> df["y"] = y # doctest: +SKIP
A current workaround is to use the .merge() function, using the feature values
as merging keys.
>>> features['y'] = y
>>> everything = df.merge(features, on=['x1', 'x2'])
>>> everything
x1 x2 z y
0 2.0 3.0 -1 1.376932
"""
return PythonModelWrapper(model_uri, predict_type)
def _test() -> None:
import os
import doctest
import sys
from pyspark.sql import SparkSession
import pyspark.pandas.mlflow
os.chdir(os.environ["SPARK_HOME"])
globs = pyspark.pandas.mlflow.__dict__.copy()
globs["ps"] = pyspark.pandas
spark = (
SparkSession.builder.master("local[4]").appName("pyspark.pandas.mlflow tests").getOrCreate()
)
(failure_count, test_count) = doctest.testmod(
pyspark.pandas.mlflow,
globs=globs,
optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE,
)
spark.stop()
if failure_count:
sys.exit(-1)
if __name__ == "__main__":
try:
import mlflow # noqa: F401
import sklearn # noqa: F401
_test()
except ImportError:
pass