-
Notifications
You must be signed in to change notification settings - Fork 28.5k
/
Copy pathlogistic_regression.py
executable file
·89 lines (69 loc) · 3.23 KB
/
logistic_regression.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""
A logistic regression implementation that uses NumPy (http://www.numpy.org)
to act on batches of input data using efficient matrix operations.
In practice, one may prefer to use the LogisticRegression algorithm in
ML, as shown in examples/src/main/python/ml/logistic_regression_with_elastic_net.py.
"""
import sys
from typing import Iterable, List
import numpy as np
from pyspark.sql import SparkSession
D = 10 # Number of dimensions
# Read a batch of points from the input file into a NumPy matrix object. We operate on batches to
# make further computations faster.
# The data file contains lines of the form <label> <x1> <x2> ... <xD>. We load each block of these
# into a NumPy array of size numLines * (D + 1) and pull out column 0 vs the others in gradient().
def readPointBatch(iterator: Iterable[str]) -> List[np.ndarray]:
strs = list(iterator)
matrix = np.zeros((len(strs), D + 1))
for i, s in enumerate(strs):
matrix[i] = np.fromstring(s.replace(',', ' '), dtype=np.float32, sep=' ')
return [matrix]
if __name__ == "__main__":
if len(sys.argv) != 3:
print("Usage: logistic_regression <file> <iterations>", file=sys.stderr)
sys.exit(-1)
print("""WARN: This is a naive implementation of Logistic Regression and is
given as an example!
Please refer to examples/src/main/python/ml/logistic_regression_with_elastic_net.py
to see how ML's implementation is used.""", file=sys.stderr)
spark = SparkSession\
.builder\
.appName("PythonLR")\
.getOrCreate()
points = spark.read.text(sys.argv[1]).rdd.map(lambda r: r[0])\
.mapPartitions(readPointBatch).cache()
iterations = int(sys.argv[2])
# Initialize w to a random value
w = 2 * np.random.ranf(size=D) - 1
print("Initial w: " + str(w))
# Compute logistic regression gradient for a matrix of data points
def gradient(matrix: np.ndarray, w: np.ndarray) -> np.ndarray:
Y = matrix[:, 0] # point labels (first column of input file)
X = matrix[:, 1:] # point coordinates
# For each point (x, y), compute gradient function, then sum these up
return ((1.0 / (1.0 + np.exp(-Y * X.dot(w))) - 1.0) * Y * X.T).sum(1)
def add(x: np.ndarray, y: np.ndarray) -> np.ndarray:
x += y
return x
for i in range(iterations):
print("On iteration %i" % (i + 1))
w -= points.map(lambda m: gradient(m, w)).reduce(add)
print("Final w: " + str(w))
spark.stop()