Skip to content

Commit e58aa2f

Browse files
committed
add gat & tait
1 parent 384a300 commit e58aa2f

File tree

30 files changed

+477
-101
lines changed

30 files changed

+477
-101
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
11
[workspace]
2-
members = ["thread", "pool", "asyncwait", "primitive", "collections", "process", "syncs", "channel", "timer_examples", "parking_lot_examples", "crossbeam_examples", "rayon_examples", "tokio_examples", "special"]
2+
members = ["thread", "pool", "asyncwait", "container_primitive", "sync_primitive", "collections", "process", "channel", "timer_examples", "parking_lot_examples", "crossbeam_examples", "rayon_examples", "tokio_examples", "special"]

asyncwait/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ edition = "2021"
99
anyhow = "1.0.65"
1010
async-std = "1.12.0"
1111
async-stream = "0.3.3"
12+
async-trait = "0.1.57"
13+
bytes = "1.2.1"
1214
futures = { version = "0.3.24", features = ["executor", "thread-pool"] }
1315
futures-lite = "1.12.0"
1416
futures-util = "0.3.24"
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
use async_trait::*; // the official feature is not stable yet
2+
3+
// https://blog.theincredibleholk.org/blog/2022/04/18/how-async-functions-in-traits-could-work-in-rustc/
4+
5+
#[async_trait]
6+
trait AsyncTrait {
7+
async fn get_string(&self) -> String;
8+
}
9+
10+
#[async_trait]
11+
impl AsyncTrait for i32 {
12+
async fn get_string(&self) -> String {
13+
self.to_string()
14+
}
15+
}
16+
17+
pub fn async_trait_example() {
18+
let rt = tokio::runtime::Runtime::new().unwrap();
19+
rt.block_on(async {
20+
let x = 10;
21+
let y = x.get_string().await;
22+
println!("y={}", y);
23+
});
24+
}

asyncwait/src/gat.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
use std::future::Future;
2+
use async_std::io::WriteExt;
3+
use bytes::Bytes;
4+
5+
// copy from https://www.sobyte.net/post/2022-04/rust-gat-async-trait/
6+
//
7+
// generic_associated_types
8+
// type_alias_impl_trait traits
9+
pub trait KvIterator {
10+
type NextFuture<'a>: Future<Output = Option<(&'a [u8], &'a [u8])>>
11+
where
12+
Self: 'a;
13+
14+
/// Get the next item from the iterator.
15+
fn next(&mut self) -> Self::NextFuture<'_>;
16+
}
17+
18+
pub struct TestIterator {
19+
idx: usize,
20+
to_idx: usize,
21+
key: Vec<u8>,
22+
value: Vec<u8>,
23+
}
24+
25+
impl TestIterator {
26+
pub fn new(from_idx: usize, to_idx: usize) -> Self {
27+
Self {
28+
idx: from_idx,
29+
to_idx,
30+
key: Vec::new(),
31+
value: Vec::new(),
32+
}
33+
}
34+
}
35+
36+
#[allow(deprecated_where_clause_location)]
37+
impl KvIterator for TestIterator {
38+
type NextFuture<'a>
39+
where
40+
Self: 'a,
41+
= impl Future<Output = Option<(&'a [u8], &'a [u8])>>;
42+
43+
fn next(&mut self) -> Self::NextFuture<'_> {
44+
async move {
45+
if self.idx >= self.to_idx {
46+
return None;
47+
}
48+
49+
// Zero-allocation key value manipulation
50+
51+
self.key.clear();
52+
53+
write!(&mut self.key, "key_{:05}", self.idx).await.unwrap();
54+
55+
self.value.clear();
56+
write!(&mut self.value, "value_{:05}", self.idx).await.unwrap();
57+
58+
self.idx += 1;
59+
Some((&self.key[..], &self.value[..]))
60+
}
61+
}
62+
}
63+
64+
pub fn kviterator_example() {
65+
let rt = tokio::runtime::Runtime::new().unwrap();
66+
rt.block_on(async {
67+
let mut iter = TestIterator::new(0, 10);
68+
while let Some((key, value)) = iter.next().await {
69+
println!(
70+
"{:?} {:?}",
71+
Bytes::copy_from_slice(key),
72+
Bytes::copy_from_slice(value)
73+
);
74+
}
75+
});
76+
}

asyncwait/src/lib.rs

Lines changed: 9 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -1,100 +1,15 @@
1+
#![feature(type_alias_impl_trait)]
2+
// #![feature(return_position_impl_trait_in_trait)]
3+
14
pub mod asyncio;
25
pub mod future;
6+
pub mod runtimes;
7+
pub mod gat;
8+
pub mod async_trait_example;
39

410
pub use asyncio::*;
511
pub use future::*;
12+
pub use runtimes::*;
13+
pub use gat::*;
14+
pub use async_trait_example::*;
615

7-
use futures::channel::mpsc;
8-
use futures::executor::{self, ThreadPool};
9-
use futures::try_join;
10-
use futures::StreamExt;
11-
use futures::{
12-
future::FutureExt, // for `.fuse()`
13-
pin_mut,
14-
select,
15-
};
16-
17-
pub fn tokio_async() {
18-
let rt = tokio::runtime::Runtime::new().unwrap();
19-
rt.block_on(async {
20-
println!("Hello from tokio!");
21-
22-
rt.spawn(async {
23-
println!("Hello from a tokio task!");
24-
println!("in spawn")
25-
})
26-
.await
27-
.unwrap();
28-
});
29-
30-
rt.spawn_blocking(|| println!("in spawn_blocking"));
31-
}
32-
33-
pub fn futures_async() {
34-
let pool = ThreadPool::new().expect("Failed to build pool");
35-
let (tx, rx) = mpsc::unbounded::<i32>();
36-
37-
let fut_values = async {
38-
let fut_tx_result = async move {
39-
(0..100).for_each(|v| {
40-
tx.unbounded_send(v).expect("Failed to send");
41-
})
42-
};
43-
pool.spawn_ok(fut_tx_result);
44-
45-
let fut_values = rx.map(|v| v * 2).collect();
46-
47-
fut_values.await
48-
};
49-
50-
let values: Vec<i32> = executor::block_on(fut_values);
51-
52-
println!("Values={:?}", values);
53-
}
54-
55-
pub fn futures_lite_async() {
56-
futures_lite::future::block_on(async { println!("Hello from futures_lite") })
57-
}
58-
59-
pub fn async_std() {
60-
async_std::task::block_on(async { println!("Hello from async_std") })
61-
}
62-
63-
pub fn smol_async() {
64-
smol::block_on(async { println!("Hello from smol") })
65-
}
66-
67-
struct Book();
68-
struct Music();
69-
70-
async fn get_book() -> Result<Book, String> {
71-
println!("in get_book");
72-
Ok(Book())
73-
}
74-
async fn get_music() -> Result<Music, String> {
75-
println!("in get_music");
76-
Ok(Music())
77-
}
78-
async fn get_book_and_music() -> Result<(Book, Music), String> {
79-
let book_fut = get_book();
80-
let music_fut = get_music();
81-
try_join!(book_fut, music_fut)
82-
}
83-
84-
pub fn join() {
85-
futures_lite::future::block_on(async { get_book_and_music().await }).unwrap();
86-
}
87-
88-
pub fn select() {
89-
futures_lite::future::block_on(async {
90-
let t1 = get_book().fuse();
91-
let t2 = get_music().fuse();
92-
93-
pin_mut!(t1, t2);
94-
95-
select! {
96-
_x = t1 => println!("select get_book"),
97-
_y = t2 => println!("select get_music"),
98-
}
99-
});
100-
}

asyncwait/src/main.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,5 +14,7 @@ fn main() {
1414

1515
stream();
1616

17+
kviterator_example();
1718

19+
async_trait_example();
1820
}

asyncwait/src/runtimes.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
2+
use futures::channel::mpsc;
3+
use futures::executor::{self, ThreadPool};
4+
use futures::try_join;
5+
use futures::StreamExt;
6+
use futures::{
7+
future::FutureExt, // for `.fuse()`
8+
pin_mut,
9+
select,
10+
};
11+
12+
13+
pub fn tokio_async() {
14+
let rt = tokio::runtime::Runtime::new().unwrap();
15+
rt.block_on(async {
16+
println!("Hello from tokio!");
17+
18+
rt.spawn(async {
19+
println!("Hello from a tokio task!");
20+
println!("in spawn")
21+
})
22+
.await
23+
.unwrap();
24+
});
25+
26+
rt.spawn_blocking(|| println!("in spawn_blocking"));
27+
}
28+
29+
pub fn futures_async() {
30+
let pool = ThreadPool::new().expect("Failed to build pool");
31+
let (tx, rx) = mpsc::unbounded::<i32>();
32+
33+
let fut_values = async {
34+
let fut_tx_result = async move {
35+
(0..100).for_each(|v| {
36+
tx.unbounded_send(v).expect("Failed to send");
37+
})
38+
};
39+
pool.spawn_ok(fut_tx_result);
40+
41+
let fut_values = rx.map(|v| v * 2).collect();
42+
43+
fut_values.await
44+
};
45+
46+
let values: Vec<i32> = executor::block_on(fut_values);
47+
48+
println!("Values={:?}", values);
49+
}
50+
51+
pub fn futures_lite_async() {
52+
futures_lite::future::block_on(async { println!("Hello from futures_lite") })
53+
}
54+
55+
pub fn async_std() {
56+
async_std::task::block_on(async { println!("Hello from async_std") })
57+
}
58+
59+
pub fn smol_async() {
60+
smol::block_on(async { println!("Hello from smol") })
61+
}
62+
63+
struct Book();
64+
struct Music();
65+
66+
async fn get_book() -> Result<Book, String> {
67+
println!("in get_book");
68+
Ok(Book())
69+
}
70+
async fn get_music() -> Result<Music, String> {
71+
println!("in get_music");
72+
Ok(Music())
73+
}
74+
async fn get_book_and_music() -> Result<(Book, Music), String> {
75+
let book_fut = get_book();
76+
let music_fut = get_music();
77+
try_join!(book_fut, music_fut)
78+
}
79+
80+
pub fn join() {
81+
futures_lite::future::block_on(async { get_book_and_music().await }).unwrap();
82+
}
83+
84+
pub fn select() {
85+
futures_lite::future::block_on(async {
86+
let t1 = get_book().fuse();
87+
let t2 = get_music().fuse();
88+
89+
pin_mut!(t1, t2);
90+
91+
select! {
92+
_x = t1 => println!("select get_book"),
93+
_y = t2 => println!("select get_music"),
94+
}
95+
});
96+
}

channel/Cargo.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,11 @@ edition = "2021"
66
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
77

88
[dependencies]
9+
async-channel = "1.7.1"
10+
async-priority-channel = "0.1.0"
11+
atomic_mpmc = "0.2.0"
12+
broadcaster = "1.0.0"
13+
crossfire = "0.1.7"
14+
flume = "0.10.14"
15+
futures-util = "0.3.24"
16+
tokio = { version = "1.21.2", features = ["full"] }

0 commit comments

Comments
 (0)