Skip to content

Commit 681de03

Browse files
committed
add queue and pull patterns
1 parent e7fef59 commit 681de03

File tree

6 files changed

+185
-0
lines changed

6 files changed

+185
-0
lines changed

build.sbt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,3 +15,9 @@ lazy val chapter12 = project dependsOn (common)
1515
lazy val chapter13 = project dependsOn (common)
1616

1717
lazy val chapter14 = project dependsOn (common)
18+
19+
lazy val chapter15 = project dependsOn (common)
20+
21+
lazy val chapter16 = project dependsOn (common)
22+
23+
lazy val chapter17 = project dependsOn (common)

chapter15/build.sbt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import Build._
2+
3+
libraryDependencies ++= Seq(akkaActor, akkaTyped, akkaPersistence, akkaPersistenceQuery, akkaStream, junit, scalatest, levelDb)
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter15
5+
6+
import java.math.MathContext
7+
import java.math.RoundingMode
8+
9+
import akka.actor.Actor
10+
import akka.actor.ActorRef
11+
import akka.actor.ActorSystem
12+
import akka.actor.Props
13+
14+
object PullPattern {
15+
16+
case class Job(id: Long, input: Int, replyTo: ActorRef)
17+
case class Result(id: Long, report: BigDecimal)
18+
19+
case class WorkRequest(worker: ActorRef, items: Int)
20+
21+
class Manager extends Actor {
22+
23+
val workStream: Iterator[Job] =
24+
Iterator from 1 map (x => Job(x, x, self)) take 1000000
25+
26+
val aggregator = (x: BigDecimal, y: BigDecimal) => x + y
27+
var approximation = BigDecimal(0, new MathContext(10000, RoundingMode.HALF_EVEN))
28+
29+
var outstandingWork = 0
30+
31+
(1 to 8) foreach (_ => context.actorOf(Props(new Worker(self))))
32+
33+
def receive = {
34+
case WorkRequest(worker, items) =>
35+
workStream.take(items).foreach {
36+
job =>
37+
worker ! job
38+
outstandingWork += 1
39+
}
40+
case Result(id, report) =>
41+
approximation = aggregator(approximation, report)
42+
outstandingWork -= 1
43+
if (outstandingWork == 0 && workStream.isEmpty) {
44+
println("final result: " + approximation)
45+
context.system.terminate()
46+
}
47+
}
48+
}
49+
50+
class Worker(manager: ActorRef) extends Actor {
51+
val mc = new MathContext(100, RoundingMode.HALF_EVEN)
52+
val plus = BigDecimal(1, mc)
53+
val minus = BigDecimal(-1, mc)
54+
55+
var requested = 0
56+
def request(): Unit =
57+
if (requested < 5) {
58+
manager ! WorkRequest(self, 10)
59+
requested += 10
60+
}
61+
62+
request()
63+
64+
def receive = {
65+
case Job(id, data, replyTo) =>
66+
requested -= 1
67+
request()
68+
val sign = if ((data & 1) == 1) plus else minus
69+
val result = sign / data
70+
replyTo ! Result(id, result)
71+
}
72+
}
73+
74+
def main(args: Array[String]): Unit = {
75+
val sys = ActorSystem("pi")
76+
sys.actorOf(Props(new Manager), "manager")
77+
}
78+
79+
}
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/**
2+
* Copyright (C) 2015 Roland Kuhn <http://rolandkuhn.com>
3+
*/
4+
package com.reactivedesignpatterns.chapter15
5+
6+
import java.math.MathContext
7+
import java.math.RoundingMode
8+
import akka.actor.Actor
9+
import akka.actor.ActorRef
10+
import akka.actor.ActorSystem
11+
import akka.actor.Props
12+
import akka.pattern.extended.ask
13+
import scala.collection.immutable.Queue
14+
import scala.concurrent.duration._
15+
import akka.util.Timeout
16+
import scala.concurrent.Future
17+
import scala.concurrent.Await
18+
import java.util.concurrent.TimeoutException
19+
20+
object QueuePattern {
21+
22+
case class Job(id: Long, input: Int, replyTo: ActorRef)
23+
case class JobRejected(id: Long)
24+
case class Result(id: Long, report: BigDecimal)
25+
26+
case class WorkRequest(worker: ActorRef, items: Int)
27+
28+
class Manager extends Actor {
29+
30+
var workQueue = Queue.empty[Job]
31+
var requestQueue = Queue.empty[ActorRef]
32+
33+
(1 to 8) foreach (_ => context.actorOf(Props(new Worker(self))))
34+
35+
def receive = {
36+
case job @ Job(id, _, replyTo) =>
37+
if (requestQueue.isEmpty) {
38+
if (workQueue.size < 1000) workQueue :+= job
39+
else replyTo ! JobRejected(id)
40+
} else {
41+
requestQueue.head ! job
42+
requestQueue = requestQueue.drop(1)
43+
}
44+
case WorkRequest(worker, items) =>
45+
if (workQueue.isEmpty) {
46+
if (!requestQueue.contains(worker)) requestQueue :+= worker
47+
} else {
48+
workQueue.iterator.take(items).foreach(job => worker ! job)
49+
workQueue = workQueue.drop(items)
50+
}
51+
}
52+
}
53+
54+
class Worker(manager: ActorRef) extends Actor {
55+
val mc = new MathContext(100, RoundingMode.HALF_EVEN)
56+
val plus = BigDecimal(1, mc)
57+
val minus = BigDecimal(-1, mc)
58+
59+
manager ! WorkRequest(self, 1)
60+
61+
def receive = {
62+
case Job(id, data, replyTo) =>
63+
manager ! WorkRequest(self, 1)
64+
val sign = if ((data & 1) == 1) plus else minus
65+
val result = sign / data
66+
replyTo ! Result(id, result)
67+
}
68+
}
69+
70+
def main(args: Array[String]): Unit = {
71+
val sys = ActorSystem("pi")
72+
import sys.dispatcher
73+
val calculator = sys.actorOf(Props(new Manager), "manager")
74+
implicit val timeout = Timeout(10.seconds)
75+
val futures =
76+
(1 to 1000000).map(i =>
77+
(calculator ? (Job(i, i, _)))
78+
.collect {
79+
case Result(_, report) => report
80+
case _ => BigDecimal(0)
81+
})
82+
Future.reduce(futures)(_ + _)
83+
.map(x => println("final result: " + x * 4))
84+
.recover {
85+
case ex =>
86+
ex.printStackTrace()
87+
}
88+
.foreach(_ => sys.terminate())
89+
}
90+
91+
}

chapter16/build.sbt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import Build._
2+
3+
libraryDependencies ++= Seq(akkaActor, akkaTyped, akkaPersistence, akkaPersistenceQuery, akkaStream, junit, scalatest, levelDb)

chapter17/build.sbt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
import Build._
2+
3+
libraryDependencies ++= Seq(akkaActor, akkaTyped, akkaPersistence, akkaPersistenceQuery, akkaStream, junit, scalatest, levelDb)

0 commit comments

Comments
 (0)