Skip to content

Commit 384a300

Browse files
committed
add tokio/sync examples
1 parent 65a8620 commit 384a300

File tree

4 files changed

+356
-1
lines changed

4 files changed

+356
-1
lines changed

tokio_examples/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9+
tokio = { version = "1.21.2", features = ["full"] }

tokio_examples/src/lib.rs

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
pub mod primitives;
2+
3+
pub use primitives::*;
4+
5+
use std::process::Stdio;
6+
7+
use tokio;
8+
use tokio::process::Command;
9+
use tokio::io::{BufReader, AsyncBufReadExt};
10+
11+
pub fn process() {
12+
let rt = tokio::runtime::Runtime::new().unwrap();
13+
14+
let _: Result<(), Box<dyn std::error::Error>> = rt.block_on(async {
15+
let mut child = Command::new("echo")
16+
.arg("hello")
17+
.arg("world")
18+
.spawn()
19+
.expect("failed to spawn");
20+
21+
// Await until the command completes
22+
let status = child.wait().await?;
23+
println!("the command exited with: {}", status);
24+
25+
Ok(())
26+
});
27+
}
28+
29+
pub fn process2() {
30+
let rt = tokio::runtime::Runtime::new().unwrap();
31+
32+
let _: Result<(), Box<dyn std::error::Error>> = rt.block_on(async {
33+
let mut cmd = Command::new("cat");
34+
cmd.arg("Cargo.toml");
35+
36+
// Specify that we want the command's standard output piped back to us.
37+
// By default, standard input/output/error will be inherited from the
38+
// current process (for example, this means that standard input will
39+
// come from the keyboard and standard output/error will go directly to
40+
// the terminal if this process is invoked from the command line).
41+
cmd.stdout(Stdio::piped());
42+
43+
let mut child = cmd.spawn().expect("failed to spawn command");
44+
45+
let stdout = child
46+
.stdout
47+
.take()
48+
.expect("child did not have a handle to stdout");
49+
50+
let mut reader = BufReader::new(stdout).lines();
51+
52+
// Ensure the child process is spawned in the runtime so it can
53+
// make progress on its own while we await for any output.
54+
tokio::spawn(async move {
55+
let status = child
56+
.wait()
57+
.await
58+
.expect("child process encountered an error");
59+
60+
println!("child status was: {}", status);
61+
});
62+
63+
while let Some(line) = reader.next_line().await? {
64+
println!("Line: {}", line);
65+
}
66+
67+
Ok(())
68+
});
69+
70+
71+
}
72+
73+
74+
pub fn oneshot() {
75+
let rt = tokio::runtime::Runtime::new().unwrap();
76+
77+
let _: Result<(), Box<dyn std::error::Error>> = rt.block_on(async {
78+
let (tx, rx) = tokio::sync::oneshot::channel::<String>();
79+
80+
tokio::spawn(async move {
81+
tx.send("hello".to_string()).unwrap();
82+
});
83+
84+
let msg = rx.await?;
85+
println!("got = {}", msg);
86+
87+
Ok(())
88+
});
89+
}
90+
91+
pub fn async_with_oneshot() {
92+
let rt = tokio::runtime::Runtime::new().unwrap();
93+
94+
async fn some_computation() -> String {
95+
"the result of the computation".to_string()
96+
}
97+
98+
let _: Result<(), Box<dyn std::error::Error>> = rt.block_on(async {
99+
let join_handle = tokio::spawn(async move {
100+
some_computation().await
101+
});
102+
103+
// Do other work while the computation is happening in the background
104+
105+
// Wait for the computation result
106+
let res = join_handle.await?;
107+
println!("result = {}", res);
108+
109+
Ok(())
110+
});
111+
}
112+
113+
pub fn mpsc_example() {
114+
async fn some_computation(input: u32) -> String {
115+
format!("the result of computation {}", input)
116+
}
117+
118+
119+
let rt = tokio::runtime::Runtime::new().unwrap();
120+
121+
let _: Result<(), Box<dyn std::error::Error>> = rt.block_on(async {
122+
let (tx, mut rx) = tokio::sync::mpsc::channel::<String>(10);
123+
124+
tokio::spawn(async move {
125+
for i in 0..10 {
126+
let res = some_computation(i).await;
127+
tx.send(res).await.unwrap();
128+
}
129+
});
130+
131+
while let Some(res) = rx.recv().await {
132+
println!("got = {}", res);
133+
}
134+
135+
Ok(())
136+
});
137+
}
138+
139+
pub fn broadcast_example() {
140+
let rt = tokio::runtime::Runtime::new().unwrap();
141+
142+
let _: Result<(), Box<dyn std::error::Error>> = rt.block_on(async {
143+
let (tx, mut rx1) = tokio::sync::broadcast::channel::<String>(10);
144+
let mut rx2 = tx.subscribe();
145+
146+
tokio::spawn(async move {
147+
tx.send("hello".to_string()).unwrap();
148+
tx.send("world".to_string()).unwrap();
149+
});
150+
151+
println!("rx1 = {:?}", rx1.recv().await);
152+
println!("rx2 = {:?}", rx2.recv().await);
153+
println!("rx2 = {:?}", rx2.recv().await);
154+
155+
Ok(())
156+
});
157+
158+
}
159+
160+
pub fn watch_example() {
161+
let rt = tokio::runtime::Runtime::new().unwrap();
162+
163+
let _: Result<(), Box<dyn std::error::Error>> = rt.block_on(async {
164+
let (tx, rx1) = tokio::sync::watch::channel::<String>("hello".to_string());
165+
let mut rx2 = tx.subscribe();
166+
167+
tokio::spawn(async move {
168+
tx.send("world".to_string()).unwrap();
169+
});
170+
171+
println!("rx1 = {:?}", *rx1.borrow());
172+
println!("rx2 = {:?}", *rx2.borrow());
173+
println!("rx2 = {:?}", rx2.changed().await);
174+
175+
Ok(())
176+
});
177+
178+
}

tokio_examples/src/main.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,21 @@
1+
use tokio_examples::*;
2+
13
fn main() {
2-
println!("Hello, world!");
4+
process();
5+
process2();
6+
7+
oneshot();
8+
async_with_oneshot();
9+
mpsc_example();
10+
broadcast_example();
11+
watch_example();
12+
13+
barrier_example();
14+
mutex_example();
15+
rwlock_example();
16+
semaphore_example();
17+
semaphore_example2();
18+
notify_example();
19+
notify_example2();
20+
321
}

tokio_examples/src/primitives.rs

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
use std::sync::Arc;
2+
use tokio::sync::Barrier;
3+
use tokio::sync::Mutex;
4+
use tokio::sync::RwLock;
5+
6+
use tokio::sync::Notify;
7+
use tokio::sync::{Semaphore, TryAcquireError};
8+
9+
pub fn barrier_example() {
10+
let rt = tokio::runtime::Runtime::new().unwrap();
11+
12+
rt.block_on(async {
13+
let mut handles = Vec::with_capacity(10);
14+
let barrier = Arc::new(Barrier::new(10));
15+
for _ in 0..10 {
16+
let c = barrier.clone();
17+
// The same messages will be printed together.
18+
// You will NOT see any interleaving.
19+
handles.push(tokio::spawn(async move {
20+
println!("before wait");
21+
let wait_result = c.wait().await;
22+
println!("after wait");
23+
wait_result
24+
}));
25+
}
26+
27+
// Will not resolve until all "after wait" messages have been printed
28+
let mut num_leaders = 0;
29+
for handle in handles {
30+
let wait_result = handle.await.unwrap();
31+
if wait_result.is_leader() {
32+
num_leaders += 1;
33+
}
34+
}
35+
36+
// Exactly one barrier will resolve as the "leader"
37+
assert_eq!(num_leaders, 1);
38+
});
39+
}
40+
41+
pub fn mutex_example() {
42+
let rt = tokio::runtime::Runtime::new().unwrap();
43+
44+
rt.block_on(async {
45+
let data1 = Arc::new(Mutex::new(0));
46+
let data2 = Arc::clone(&data1);
47+
48+
tokio::spawn(async move {
49+
let mut lock = data2.lock().await;
50+
*lock += 1;
51+
});
52+
53+
let mut lock = data1.lock().await;
54+
*lock += 1;
55+
});
56+
}
57+
58+
pub fn rwlock_example() {
59+
let rt = tokio::runtime::Runtime::new().unwrap();
60+
61+
rt.block_on(async {
62+
let lock = RwLock::new(5);
63+
64+
// many reader locks can be held at once
65+
{
66+
let r1 = lock.read().await;
67+
let r2 = lock.read().await;
68+
assert_eq!(*r1, 5);
69+
assert_eq!(*r2, 5);
70+
} // read locks are dropped at this point
71+
72+
// only one write lock may be held, however
73+
{
74+
let mut w = lock.write().await;
75+
*w += 1;
76+
assert_eq!(*w, 6);
77+
} // write lock is dropped here
78+
});
79+
}
80+
81+
pub fn semaphore_example() {
82+
let rt = tokio::runtime::Runtime::new().unwrap();
83+
84+
rt.block_on(async {
85+
let semaphore = Semaphore::new(3);
86+
87+
let _a_permit = semaphore.acquire().await.unwrap();
88+
let _two_permits = semaphore.acquire_many(2).await.unwrap();
89+
90+
assert_eq!(semaphore.available_permits(), 0);
91+
92+
let permit_attempt = semaphore.try_acquire();
93+
assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
94+
});
95+
}
96+
97+
pub fn semaphore_example2() {
98+
let rt = tokio::runtime::Runtime::new().unwrap();
99+
100+
rt.block_on(async {
101+
let semaphore = Arc::new(Semaphore::new(3));
102+
let mut join_handles = Vec::new();
103+
104+
for _ in 0..5 {
105+
let permit = semaphore.clone().acquire_owned().await.unwrap();
106+
join_handles.push(tokio::spawn(async move {
107+
// perform task...
108+
// explicitly own `permit` in the task
109+
drop(permit);
110+
}));
111+
}
112+
113+
for handle in join_handles {
114+
handle.await.unwrap();
115+
}
116+
});
117+
}
118+
119+
pub fn notify_example() {
120+
let rt = tokio::runtime::Runtime::new().unwrap();
121+
122+
rt.block_on(async {
123+
let notify = Arc::new(Notify::new());
124+
let notify2 = notify.clone();
125+
126+
let handle = tokio::spawn(async move {
127+
notify2.notified().await;
128+
println!("received notification");
129+
});
130+
131+
println!("sending notification");
132+
notify.notify_one();
133+
134+
// Wait for task to receive notification.
135+
handle.await.unwrap();
136+
});
137+
}
138+
139+
pub fn notify_example2() {
140+
let rt = tokio::runtime::Runtime::new().unwrap();
141+
142+
rt.block_on(async {
143+
let notify = Arc::new(Notify::new());
144+
let notify2 = notify.clone();
145+
146+
let notified1 = notify.notified();
147+
let notified2 = notify.notified();
148+
149+
let _handle = tokio::spawn(async move {
150+
println!("sending notifications");
151+
notify2.notify_waiters();
152+
});
153+
154+
notified1.await;
155+
notified2.await;
156+
println!("received notifications");
157+
});
158+
}

0 commit comments

Comments
 (0)