Skip to content

Commit 014dc3d

Browse files
committed
test two separate list
1 parent ac5b09d commit 014dc3d

File tree

1 file changed

+69
-0
lines changed

1 file changed

+69
-0
lines changed

reactive-programming.org

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1859,3 +1859,72 @@ trait Actor {
18591859
}
18601860
}
18611861
#+end_src
1862+
** Persistent Actor State
1863+
Actors representing a stateful resource
1864+
- shall not lose important state due to (system) failure
1865+
- must persist state as needed
1866+
- must recover state at (re)start
1867+
Two possibilities for persisting state:
1868+
- in-place updates
1869+
- persist changes in append-only fashion
1870+
*** Changes vs. Current State
1871+
*** Snapshots
1872+
Immutable snapshots can be used to bound recovery time
1873+
*** Persistence Primitive
1874+
- being persistent means "taking notes"
1875+
#+begin_src scala
1876+
persist(MyEvent(...)) { event =>
1877+
// now <event> is persisted
1878+
do SomethingWidth(event)
1879+
}
1880+
#+end_src
1881+
*** Event Example
1882+
#+begin_src scala
1883+
case class NewPost(text: String, id: Long)
1884+
case class BlogPosted(id: Long)
1885+
case class BlogNotPosted(id: Long, reason: String)
1886+
1887+
sealed trait Event
1888+
case class PostCreated(text: String) extends Event
1889+
case object QuotaReached extends Event
1890+
1891+
case class State(posts: Vector[String], disabled: Boolean) {
1892+
def updated(e: Event): State = e match {
1893+
case PostCreated(text) => copy(posts = posts :+ text)
1894+
case QuotaReached => copy(disabled = true)
1895+
}
1896+
}
1897+
1898+
class UserProcessor extends PersistentActor {
1899+
var state = State(Vector.empty, false)
1900+
def receiveCommand = {
1901+
case NewPost(text, id) =>
1902+
if (state.disabled) sender() ! BlogNotPosted(id, "quota reached")
1903+
else { persist(PostCreated(text)) { e =>
1904+
updateState(e)
1905+
sender() ! BlogPosted(id) }
1906+
persist(QuotaReached) (updateState) }
1907+
}
1908+
def updateState(e: Event) { state = state.updated(e) }
1909+
def receiveRecover = { case e: Event => updateState(e) }
1910+
}
1911+
1912+
case NewPost(text, id) =>
1913+
if (!state.disabled) {
1914+
val created = PostCreated(text)
1915+
update(created)
1916+
update(QuotaReached)
1917+
persistAsync(created)(sender() ! BlogPosted(id))
1918+
persistAsync(QuotaReached)(_ => ())
1919+
} else sender() ! BlogNotPosted(id, "quota reached")
1920+
#+end_src
1921+
*** When to Apply the Events?
1922+
- Applying after persisting leaves actor in stale state.
1923+
*** At-Least-Once Delivery
1924+
- Guaranteeing delivery means retrying until successful
1925+
- Retries are the sender's responsibility
1926+
- The recipient needs to acknowledge receipt
1927+
- Lost receipts lead to duplicate deliveries => at-least-once
1928+
1929+
- Retrying means taking note that the message needs to be sent
1930+
- Acknowledgement means taking note of the receipt of the confirmation

0 commit comments

Comments
 (0)