-
Notifications
You must be signed in to change notification settings - Fork 416
/
Copy pathmedium-example.in.rb
273 lines (237 loc) · 9.5 KB
/
medium-example.in.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
require 'concurrent-edge'
# This little bit more complicated commented example aims to
# demonstrate some of the capabilities of concurrent-ruby new abstractions.
#
# It is a concurrent processing pipeline which on one side has several web crawlers.
# They are searching the web for data and filling buffer.
# On the other side there are data processors which are pop the data from buffer.
# They are processing the data and storing results into a DB
# which has limited concurrency level.
# Some of the parts like Web and DB are just stubs.
# Each part logs and increments counters to keep some stats about the pipeline.
# There is also a periodical readout of the stats into log scheduled.
#
# Schema of the pipeline:
#
# web-crawlers -> buffer -> data-processing -> DB
# \____________________________\_____\___> logging
# TODO (pitr-ch 10-Mar-2019): replace with a better more realistic example using
# * actors for limited concurrency with state - local DB connection
# * throttled futures for REST API - limiting server load
# The central logger is defined first.
# It has state like the logger instance, therefore the actor is used.
# It is better to exactly define the communication protocol of the logging actor.
# It will only understand these messages.
Log = Concurrent::ImmutableStruct.new :severity, :message
SetLevel = Concurrent::ImmutableStruct.new :level
require 'logger'
require 'stringio'
# Including actor constants so this scope understands ANY etc.
include Concurrent::ErlangActor::EnvironmentConstants
# The logger does not need a dedicated thread, let's use a pool.
LOGGING = Concurrent::ErlangActor.spawn Logger::FATAL,
type: :on_pool,
name: 'logger' do |level|
# a Logger instance with nicer formatting is created
@logger = Logger.new($captured_out)
@logger.level = level
@logger.formatter = lambda do |severity, datetime, progname, msg|
formatted_message = case msg
when String
msg
when Exception
format "%s (%s)\n%s",
msg.message, msg.class, (msg.backtrace || []).join("\n")
else
msg.inspect
end
format "[%s] %5s -- %s: %s\n",
datetime.strftime('%Y-%m-%d %H:%M:%S.%L'),
severity,
progname,
formatted_message
end
# definition of the logging actor behaviour
receive(
# log messages
on(Log) { |message| @logger.log message.severity, message.message },
# change level
on(SetLevel) { |message| @logger.level = message.level },
# It is a good practice to read and log bad messages,
# otherwise they would accumulate in the inbox.
on(ANY) { |message| @logger.error bad_message: message },
# The logger has static behaviour, therefore keep can be used, and the actor
# will behave the same with each message received as defined below.
keep: true)
end
# testing the logger works as expected
LOGGING.tell Log[Logger::FATAL, :tornado]
LOGGING.tell Log[Logger::INFO, :wind]
LOGGING.tell SetLevel[Logger::DEBUG]
LOGGING.tell Log[Logger::INFO, :breeze]
sleep 0.05 # the logging is asynchronous, we need to wait a bit until it's written
get_captured_output
# the logging could be wrapped in a method
def log(severity, message)
LOGGING.tell Log[severity, message]
true
end
include Logger::Severity
log INFO, 'alive'
sleep 0.05
get_captured_output
# The stub which will represent the web
module Web
@counter = Concurrent::AtomicFixnum.new
def self.search
sleep 0.01
@counter.increment.to_s(16)
end
end #
# The cancellation which will be used to cancel the whole processing pipeline.
@cancellation, origin = Concurrent::Cancellation.new
# Buffer for work
buffer_capacity = 10
@buffer = Concurrent::Promises::Channel.new buffer_capacity
web_crawler_count = 4
# Track the number of data provided by each crawler
crawler_data_counter = Array.new(web_crawler_count) do |i|
# this is accessed by multiple threads so it should be a tread-safe counter
Concurrent::AtomicFixnum.new
end #
# the array is frozen which makes it immutable,
# therefore safe to use when concurrently accessed.
# Otherwise if it was being modified it wound has to be Concurrent::Array to make it safe.
crawler_data_counter.freeze
# The web crawlers are defined directly with threads to start the example simply.
# They search the web and immediately as they find something they push
# the data into the buffer.
# The push will block if the buffer is full,
# regulating how fast is the work being found.
# This is called backpressure.
crawlers = Array.new web_crawler_count do |i|
Thread.new do
while true
# crawl the web until cancelled
break if @cancellation.canceled?
# will block and slow down the crawler if the buffer is full
data = Web.search
until @buffer.push data, 0.1
# It is a good practice to use timeouts on all blocking operations
# If the pipeline is cancelled and the data-processors finish
# before taking data from buffer a crawler could get stack on this push.
break if @cancellation.canceled?
end
# it pushed data, increment its counter
crawler_data_counter[i].increment
log DEBUG, "crawler #{i} found #{data}"
end
end
end.freeze
# So far only the crawlers looking for data are defined
# pushing data into the buffer.
# The data processing definition follows.
# Threads are not used again directly but rather the data processing
# is defined using Futures.
# Even though that makes the definition more complicated
# it has a big advantage that data processors will not require a Thread each
# but they will share and run on a Thread pool.
# That removes an important limitation of the total number of threads process can have,
# which can be an issue in larger systems.
# This example would be fine with using the Threads
# however it would not demonstrate the more advanced usage then.
# The data processing stores results in a DB,
# therefore the stub definition of a database precedes the data processing.
module DB
@data = Concurrent::Map.new
# increment a counter for char
def self.add(char, count)
@data.compute char do |old|
(old || 0) + count
end
true
end
# return the stored data as Hash
def self.data
@data.each_pair.reduce({}) { |h, (k, v)| h.update k => v }
end
end
# Lets assume that instead having this DB
# we have limited number of connections
# and therefore there is a limit on
# how many threads can communicate with the DB at the same time.
# The throttle is created to limit the number of concurrent access to DB.
@db_throttle = Concurrent::Throttle.new 4
# The data processing definition follows
data_processing_count = 20 # this could actually be thousands if required
# track the number of data received by data processors
@data_processing_counters = Array.new data_processing_count do
Concurrent::AtomicFixnum.new
end.freeze
def data_processing(i)
# pop_op returns a future which is fulfilled with a message from buffer
# when a message is valuable.
@buffer.pop_op.then_on(:fast) do |data|
# then we process the message on :fast pool since this has no blocking
log DEBUG, "data-processor #{i} got #{data}"
@data_processing_counters[i].increment
sleep 0.1 # simulate it actually doing something which take some time
# find the most frequent char
data.chars.
group_by { |v| v }.
map { |ch, arr| [ch, arr.size] }.
max_by { |ch, size| size }
end.then_on(@db_throttle.on(:io)) do |char, count|
# the db access has to be limited therefore the db_throttle is used
# DBs use io therefore this part is executed on global thread pool wor :io
DB.add char, count
end.then_on(:fast) do |_|
# last section executes back on :fast executor
# checks if it was cancelled
# if not then it calls itself recursively
# which in combination with #run will turn this into infinite data processing
# (until cancelled)
# The #run will keep flatting to the inner future as long the value is a future.
if @cancellation.canceled?
# return something else then future, #run will stop executing
:done
else
# continue running with a future returned by data_processing
data_processing i
end
end
end #
# create the data processors
data_processors = Array.new data_processing_count do |i|
data_processing(i).run
end
# Some statics are collected in crawler_data_counter
# and @data_processing_counters.
# Schedule a periodical readout to a log.
def readout(crawler_data_counter)
# schedule readout in 0.4 sec or on cancellation
(@cancellation.origin | Concurrent::Promises.schedule(0.4)).then do
log INFO,
"\ncrawlers found: #{crawler_data_counter.map(&:value).join(', ')}\n" +
"data processors consumed: #{@data_processing_counters.map(&:value).join(', ')}"
end.then do
# reschedule if not cancelled
readout crawler_data_counter unless @cancellation.canceled?
end
end
# start the periodical readouts
readouts = readout(crawler_data_counter).run
sleep 2 # let the whole processing pipeline work
# cancel everything
origin.resolve
# wait for everything to stop
crawlers.each(&:join)
data_processors.each(&:wait!)[0..10]
readouts.wait!
# terminate the logger
Concurrent::ErlangActor.terminate LOGGING, :cancelled
LOGGING.terminated.wait
# inspect collected char frequencies
DB.data
# see the logger output
get_captured_output