diff --git a/.gitignore b/.gitignore index 088ba6b..c0eb52b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk +.history \ No newline at end of file diff --git a/README.md b/README.md index d40898a..c29a3c6 100644 --- a/README.md +++ b/README.md @@ -25,41 +25,47 @@ Introduces [async feature](asyncwait/src/main.rs). ### chapter 4: synchronization primitives -Introduces synchronization primitives contains [containers](container_primitive/src/main.rs) and [primitives](sync_primitive/src/main.rs) in std lib. +Introduces synchronization primitives contains [containers](container_primitive/src/main.rs) -### chapter 5: concurrency collections +### chapter 5: basic concurrency primitives +Introduction of basic concurrency [primitives](sync_primitive/src/main.rs) in std lib. + +### chapter 6: concurrency collections Introduces [concurrency collections](collections/src/main.rs) in std lib. -### chapter 6: process +### chapter 7: process Introduces starting and executing a new [process](process/src/main.rs) in the easy way. -### chapter 7: channel +### chapter 8: channel Introduces each [channels](channel/src/main.rs) such as mpsc, mpmc and broadcasters. -### chapter 8: timer/ticker +### chapter 9: timer/ticker Introduces [timer and ticker](timer_examples/src/main.rs). -### chapter 9: parking_lot +### chapter 10: parking_lot Introduces [parking_lot](parking_lot_examples/src/main.rs). -### chapter 10: crossbeam +### chapter 11: crossbeam Introduces [crossbeam](crossbeam_examples/src/main.rs). -### chapter 11: rayon +### chapter 12: rayon Introduces [rayon](rayon_examples/src/main.rs). -### chapter 12: tokio +### chapter 13: tokio Introduces [tokio](tokio_examples/src/main.rs). - -### chapter n: special +### chapter 14: special some special synchronization primitives and concurrency libs only for special single purpose. + + +- replace std::mpsc with crossbeam-channel: https://github.com/rust-lang/rust/pull/93563 + diff --git a/asyncwait/Cargo.toml b/asyncwait/Cargo.toml index 88d2245..2efeee9 100644 --- a/asyncwait/Cargo.toml +++ b/asyncwait/Cargo.toml @@ -14,5 +14,7 @@ bytes = "1.2.1" futures = { version = "0.3.24", features = ["executor", "thread-pool"] } futures-lite = "1.12.0" futures-util = "0.3.24" +monoio = "0.1.9" smol = "1.2.5" tokio = { version = "1.21.2", features = ["full"] } +value-bag = "1.4.1" diff --git a/asyncwait/src/lib.rs b/asyncwait/src/lib.rs index 9ee509c..0adc478 100644 --- a/asyncwait/src/lib.rs +++ b/asyncwait/src/lib.rs @@ -1,15 +1,21 @@ #![feature(type_alias_impl_trait)] +#![feature(impl_trait_in_assoc_type)] + + // #![feature(return_position_impl_trait_in_trait)] -pub mod asyncio; -pub mod future; -pub mod runtimes; -pub mod gat; -pub mod async_trait_example; +mod asyncio; +mod future; +mod runtimes; +mod gat; +mod async_trait_example; +mod monoio_example; pub use asyncio::*; pub use future::*; pub use runtimes::*; pub use gat::*; pub use async_trait_example::*; +pub use monoio_example::*; + diff --git a/asyncwait/src/main.rs b/asyncwait/src/main.rs index 855d159..f1f6e46 100644 --- a/asyncwait/src/main.rs +++ b/asyncwait/src/main.rs @@ -5,17 +5,25 @@ fn main() { futures_async(); futures_lite_async(); async_std(); + async_std_task(); smol_async(); timefuture_async(); + try_join(); join(); select(); futures_select(); + smol_zip(); stream(); kviterator_example(); async_trait_example(); + + match monoio_example() { + Ok(_) => println!("monoio_example: Ok"), + Err(e) => println!("monoio_example: Err: {}", e), + } } diff --git a/asyncwait/src/monoio_example.rs b/asyncwait/src/monoio_example.rs new file mode 100644 index 0000000..bea1c8f --- /dev/null +++ b/asyncwait/src/monoio_example.rs @@ -0,0 +1,25 @@ + +use monoio::fs::File; + + +pub fn monoio_example() -> Result<(), Box>{ + monoio::start::(async { + println!("monoio_example: Hello world!"); + + // Open a file + let file = File::open("LICENSE").await?; + + let buf = vec![0; 4096]; + // Read some data, the buffer is passed by ownership and + // submitted to the kernel. When the operation completes, + // we get the buffer back. + let (res, buf) = file.read_at(buf, 0).await; + let n = res?; + + // Display the contents + println!("monoio_example: {:?}", &buf[..n]); + + Ok(()) + }) +} + diff --git a/asyncwait/src/runtimes.rs b/asyncwait/src/runtimes.rs index 1817dc7..02014ee 100644 --- a/asyncwait/src/runtimes.rs +++ b/asyncwait/src/runtimes.rs @@ -4,10 +4,13 @@ use futures::try_join; use futures::StreamExt; use futures::{ future::FutureExt, // for `.fuse()` + join, pin_mut, select, }; +use async_std::task; + pub fn tokio_async() { let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async { @@ -54,11 +57,22 @@ pub fn async_std() { async_std::task::block_on(async { println!("Hello from async_std") }) } +pub fn async_std_task() { + task::block_on(async { + task::spawn(get_book()); + task::spawn(get_music()); + + println!("in async_std_task"); + }); +} + pub fn smol_async() { - smol::block_on(async { println!("Hello from smol") }) + smol::block_on(async { println!("Hello from smol") }); } +#[derive(Debug)] struct Book(); +#[derive(Debug)] struct Music(); async fn get_book() -> Result { @@ -69,14 +83,24 @@ async fn get_music() -> Result { println!("in get_music"); Ok(Music()) } -async fn get_book_and_music() -> Result<(Book, Music), String> { - let book_fut = get_book(); - let music_fut = get_music(); - try_join!(book_fut, music_fut) + +pub fn try_join() { + futures_lite::future::block_on(async { + let book_fut = get_book(); + let music_fut = get_music(); + println!("try_join: {:?}", try_join!(book_fut, music_fut)); + }); } pub fn join() { - futures_lite::future::block_on(async { get_book_and_music().await }).unwrap(); + let a = async { 1 }; + let b = async { 2 }; + let c = async { 3 }; + + futures_lite::future::block_on(async { + println!("join: {:?}", join!(get_book(), get_music())); + println!("join: {:?}", join!(a, b, c)); + }); } pub fn select() { @@ -112,3 +136,21 @@ pub fn futures_select() { assert_eq!(total, 10); }); } + +pub fn smol_zip() { + smol::block_on(async { + use smol::future::{try_zip, zip}; + + let future1 = async { 1 }; + let future2 = async { 2 }; + + let result = zip(future1, future2); + println!("smol_zip: {:?}", result.await); + + let future1 = async { Ok::(1) }; + let future2 = async { Err::(2) }; + + let result = try_zip(future1, future2).await; + println!("smol_try_zip: {:?}", result); + }); +} diff --git a/book/images/gear.png b/book/images/gear.png deleted file mode 100644 index eb5c21d..0000000 Binary files a/book/images/gear.png and /dev/null differ diff --git a/book_cn/ch01 Thread.pdf b/book_cn/ch01 Thread.pdf deleted file mode 100644 index 407c317..0000000 Binary files a/book_cn/ch01 Thread.pdf and /dev/null differ diff --git a/channel/Cargo.toml b/channel/Cargo.toml index 30b2991..d3f5505 100644 --- a/channel/Cargo.toml +++ b/channel/Cargo.toml @@ -8,11 +8,11 @@ edition = "2021" [dependencies] async-channel = "1.7.1" async-priority-channel = "0.1.0" -atomic_mpmc = "0.2.0" broadcaster = "1.0.0" crossfire = "0.1.7" flume = "0.10.14" futures = "0.3.24" futures-channel = "0.3.24" futures-util = "0.3.24" +kanal = "0.1.0-pre6" tokio = { version = "1.21.2", features = ["full"] } diff --git a/channel/src/lib.rs b/channel/src/lib.rs index 1ed29f6..f6eb83d 100644 --- a/channel/src/lib.rs +++ b/channel/src/lib.rs @@ -4,6 +4,7 @@ pub use others::*; use std::sync::mpsc; use std::sync::mpsc::sync_channel; use std::thread; +use std::time::Duration; pub fn mpsc_example1() { let (tx, rx) = mpsc::channel(); @@ -60,3 +61,27 @@ pub fn mpsc_example4() { println!("mpsc_example4 completed"); } + +pub fn mpsc_drop_example() { + // 创建一个有边界的多生产者、单消费者的通道 + let (sender, receiver) = mpsc::channel::(); // 指定通道中传递的数据类型为 i32 + + // 启动三个生产者线程 + for i in 0..3 { + let tx = sender.clone(); // 克隆发送端,每个线程都拥有独立的发送端 + thread::spawn(move || { + thread::sleep(Duration::from_secs(1)); // 等待所有线程启动完毕 + tx.send(i).expect("Failed to send message"); + }); + } + + + // 丢弃发送端,不影响clone + drop(sender); + + + // 主线程作为消费者,接收来自生产者线程的消息 + for received_message in receiver { + println!("Received message: {}", received_message); + } +} \ No newline at end of file diff --git a/channel/src/main.rs b/channel/src/main.rs index 0d325eb..f61d97d 100644 --- a/channel/src/main.rs +++ b/channel/src/main.rs @@ -5,13 +5,20 @@ fn main() { mpsc_example2(); mpsc_example3(); mpsc_example4(); + mpsc_drop_example(); crossfire_mpsc(); crossfire_mpmc(); - atomic_mpmc(); + flume_example(); + flume_select(); + flume_async(); + async_channel_example(); async_priority_channel_example(); futures_channel_mpsc_example(); futures_channel_oneshot_example(); + kanal_example(); + kanal_async_example(); + kanal_oneshot_example(); } diff --git a/channel/src/others.rs b/channel/src/others.rs index c3e5820..c94fb4d 100644 --- a/channel/src/others.rs +++ b/channel/src/others.rs @@ -74,50 +74,6 @@ pub fn crossfire_mpmc() { }); } -// rx can't aware tx close -pub fn atomic_mpmc() { - // let (tx, rx) = channel::(10); - - // let mut sender_handles = vec![]; - // for v in 0..4 { - // let tx = tx.clone(); - // let handle = thread::spawn(move || { - // for i in 0i32..10 { - // if i % 5 != v { - // continue; - // } - // let _ = tx.send(i).unwrap(); - // println!("thread {} atomic_mpmc sent {}", v, i); - // } - // }); - - // sender_handles.push(handle); - // } - - // let mut handles = vec![]; - // for i in 0..4 { - // let rx = rx.clone(); - // let handle = thread::spawn(move || loop { - // if let Ok(_i) = rx.recv() { - // println!("atomic_mpmc thread {} recv {}", i, _i); - // } else { - // println!("atomic_mpmc rx closed"); - // break; - // } - // }); - // handles.push(handle); - // } - - // for handle in sender_handles { - // handle.join().unwrap(); - // } - // drop((tx,rx)); - - // for handle in handles { - // handle.join().unwrap(); - // } -} - // has issues. pub fn broadcaster() { // let rt = tokio::runtime::Runtime::new().unwrap(); @@ -149,6 +105,35 @@ pub fn flume_example() { assert_eq!((0..10).sum::(), received); } +pub fn flume_select() { + let (tx0, rx0) = flume::unbounded(); + let (tx1, rx1) = flume::unbounded(); + + std::thread::spawn(move || { + tx0.send(true).unwrap(); + tx1.send(42).unwrap(); + }); + + flume::Selector::new() + .recv(&rx0, |b| println!("Received {:?}", b)) + .recv(&rx1, |n| println!("Received {:?}", n)) + .wait(); +} + +pub fn flume_async() { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let (tx, rx) = flume::unbounded(); + + rt.block_on(async move { + tokio::spawn(async move { + tx.send_async(5).await.unwrap(); + }); + + println!("flume async rx: {}", rx.recv_async().await.unwrap()); + }); +} + pub fn async_channel_example() { let rt = tokio::runtime::Runtime::new().unwrap(); @@ -223,3 +208,43 @@ pub fn async_priority_channel_example() { assert_eq!(r.recv().await, Ok(("Bar", 2))); }); } + +pub fn kanal_example() { + let (tx, rx) = kanal::unbounded(); + + thread::spawn(move || { + (0..10).for_each(|i| { + tx.send(i).unwrap(); + }); + + drop(tx) + }); + + let received: u32 = rx.sum(); + + println!("received sum: {}", received); +} + +pub fn kanal_async_example() { + let rt = tokio::runtime::Runtime::new().unwrap(); + + let (tx, rx) = kanal::unbounded_async(); + + rt.block_on(async move { + tokio::spawn(async move { + tx.send(5).await.unwrap(); + }); + + println!("rx: {}", rx.recv().await.unwrap()); + }); +} + +pub fn kanal_oneshot_example() { + let (tx, rx) = kanal::oneshot(); + + thread::spawn(move || { + tx.send(5).unwrap(); + }); + + println!("kanal oneshot rx: {}", rx.recv().unwrap()); +} \ No newline at end of file diff --git a/collections/Cargo.toml b/collections/Cargo.toml index 482838c..4b4d586 100644 --- a/collections/Cargo.toml +++ b/collections/Cargo.toml @@ -6,3 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +cuckoofilter = "0.5.0" +dashmap = "5.5.3" +evmap = "10.0.2" diff --git a/collections/src/lib.rs b/collections/src/lib.rs index c9444e1..3fe0ee2 100644 --- a/collections/src/lib.rs +++ b/collections/src/lib.rs @@ -1,5 +1,10 @@ -use std::{sync::{Arc, Mutex}, collections::HashMap}; +use cuckoofilter::CuckooFilter; +use dashmap::DashMap; use std::collections::LinkedList; +use std::{ + collections::HashMap, + sync::{Arc, Mutex}, +}; pub fn common_thread_safe_collections() { let map: HashMap = HashMap::new(); @@ -21,7 +26,6 @@ pub fn common_thread_safe_collections() { println!("HashMap: {:?}", *m.lock().unwrap()); } - pub fn common_thread_safe_vec() { let vec1 = vec![]; let vec2 = Arc::new(Mutex::new(vec1)); @@ -39,7 +43,7 @@ pub fn common_thread_safe_vec() { handle.join().unwrap(); } - println!("vec: {:?}", vec2.lock().unwrap()); + println!("vec: {:?}", vec2.lock().unwrap()); } pub fn common_thread_safe_linkedlist() { @@ -59,5 +63,74 @@ pub fn common_thread_safe_linkedlist() { handle.join().unwrap(); } - println!("LinkedList: {:?}", list2.lock().unwrap()); + println!("LinkedList: {:?}", list2.lock().unwrap()); +} + +pub fn dashmap_example() { + let map = Arc::new(DashMap::new()); + let mut handles = vec![]; + + for i in 0..10 { + let map = Arc::clone(&map); + handles.push(std::thread::spawn(move || { + map.insert(i, i); + })); + } + + for handle in handles { + handle.join().unwrap(); + } + + println!("DashMap: {:?}", map); +} + +pub fn cuckoofilter_example() { + let value: &str = "hello world"; + + // Create cuckoo filter with default max capacity of 1000000 items + let mut cf = CuckooFilter::new(); + + // Add data to the filter + cf.add(value).unwrap(); + + // Lookup if data is in the filter + let success = cf.contains(value); + assert!(success); + + // Test and add to the filter (if data does not exists then add) + let success = cf.test_and_add(value).unwrap(); + assert!(!success); + + // Remove data from the filter. + let success = cf.delete(value); + assert!(success); +} + +pub fn evmap_example() { + let (book_reviews_r, book_reviews_w) = evmap::new(); + + // start some writers. + // since evmap does not support concurrent writes, we need + // to protect the write handle by a mutex. + let w = Arc::new(Mutex::new(book_reviews_w)); + let writers: Vec<_> = (0..4) + .map(|i| { + let w = w.clone(); + std::thread::spawn(move || { + let mut w = w.lock().unwrap(); + w.insert(i, true); + w.refresh(); + }) + }) + .collect(); + + // eventually we should see all the writes + while book_reviews_r.len() < 4 { + std::thread::yield_now(); + } + + // all the threads should eventually finish writing + for w in writers.into_iter() { + assert!(w.join().is_ok()); + } } diff --git a/collections/src/main.rs b/collections/src/main.rs index 8ab2429..ead74d3 100644 --- a/collections/src/main.rs +++ b/collections/src/main.rs @@ -5,4 +5,7 @@ fn main() { common_thread_safe_vec(); common_thread_safe_linkedlist(); + dashmap_example(); + cuckoofilter_example(); + evmap_example(); } diff --git a/container_primitive/Cargo.toml b/container_primitive/Cargo.toml index a57f436..dc79ed8 100644 --- a/container_primitive/Cargo.toml +++ b/container_primitive/Cargo.toml @@ -6,3 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +beef = "0.5.2" +once_cell = "1.18.0" diff --git a/container_primitive/src/box.rs b/container_primitive/src/box.rs index e7bfe95..973e92d 100644 --- a/container_primitive/src/box.rs +++ b/container_primitive/src/box.rs @@ -1,3 +1,4 @@ +use std::boxed::ThinBox; // Box, casually referred to as a ‘box’, provides the simplest form of heap allocation in Rust. // Boxes provide ownership for this allocation, and drop their contents when they go out of scope. // Boxes also ensure that they never allocate more than isize::MAX bytes. @@ -17,3 +18,18 @@ pub fn box_example2() { println!("{list:?}"); } +pub fn thin_box_example() { + use std::mem::{size_of, size_of_val}; + let size_of_ptr = size_of::<*const ()>(); + + let box_five = Box::new(5); + let box_slice = Box::<[i32]>::new_zeroed_slice(5); + assert_eq!(size_of_ptr, size_of_val(&box_five)); + assert_eq!(size_of_ptr * 2, size_of_val(&box_slice)); + + + let five = ThinBox::new(5); + let thin_slice = ThinBox::<[i32]>::new_unsize([1, 2, 3, 4]); + assert_eq!(size_of_ptr, size_of_val(&five)); + assert_eq!(size_of_ptr, size_of_val(&thin_slice)); +} \ No newline at end of file diff --git a/container_primitive/src/cell.rs b/container_primitive/src/cell.rs index 885f56e..0a02fff 100644 --- a/container_primitive/src/cell.rs +++ b/container_primitive/src/cell.rs @@ -2,7 +2,7 @@ use std::cell::*; use std::collections::HashMap; -use std::rc::Rc; +use std::sync::LazyLock; // Shareable mutable containers. // Shareable mutable containers exist to permit mutability in a controlled manner, even in the presence of aliasing. @@ -27,6 +27,8 @@ pub fn cell_example() { let _ = my_struct.regular_field; // my_struct.regular_field = 100; my_struct.special_field.set(100); + + my_struct.special_field.update(|v| v + 1); } pub fn refcell_example() { @@ -53,23 +55,7 @@ pub fn refcell_example() { println!("special_field = {}", my_struct.special_field.borrow()); } -pub fn rc_refcell_example() { - let shared_map: Rc> = Rc::new(RefCell::new(HashMap::new())); - // Create a new block to limit the scope of the dynamic borrow - { - let mut map: RefMut<_> = shared_map.borrow_mut(); - map.insert("africa", 92388); - map.insert("kyoto", 11837); - map.insert("piccadilly", 11826); - map.insert("marbles", 38); - } - // Note that if we had not let the previous borrow of the cache fall out - // of scope then the subsequent borrow would cause a dynamic thread panic. - // This is the major hazard of using `RefCell`. - let total: i32 = shared_map.borrow().values().sum(); - println!("{total}"); -} pub fn once_cell_example() { let cell = OnceCell::new(); @@ -90,78 +76,18 @@ pub fn lazy_cell_example() { println!("{}", *lazy); } -pub fn myrc_example() { - let s = example::Rc::new("hello world"); - let s1 = s.clone(); - - let v = s1.value(); - println!("myrc value: {}", v); -} - -pub mod example { - use std::cell::Cell; - use std::marker::PhantomData; - use std::process::abort; - use std::ptr::NonNull; - - pub struct Rc { - ptr: NonNull>, - phantom: PhantomData>, - } - - impl Rc { - pub fn new(t: T) -> Self { - let ptr = Box::new(RcBox { - strong: Cell::new(1), - refcount: Cell::new(1), - value: t, - }); - let ptr = NonNull::new(Box::into_raw(ptr)).unwrap(); - Self { - ptr: ptr, - phantom: PhantomData, - } - } - - pub fn value(&self) -> &T { - &self.inner().value - } - } - - - struct RcBox { - strong: Cell, - refcount: Cell, - value: T, - } - - impl Clone for Rc { - fn clone(&self) -> Rc { - self.inc_strong(); - Rc { - ptr: self.ptr, - phantom: PhantomData, - } - } - } - - trait RcBoxPtr { - fn inner(&self) -> &RcBox; - - fn strong(&self) -> usize { - self.inner().strong.get() - } - - fn inc_strong(&self) { - self.inner() - .strong - .set(self.strong().checked_add(1).unwrap_or_else(|| abort())); - } - } - - impl RcBoxPtr for Rc { - fn inner(&self) -> &RcBox { - unsafe { self.ptr.as_ref() } - } - } -} +static HASHMAP: LazyLock> = LazyLock::new(|| { + println!("initializing"); + let mut m = HashMap::new(); + m.insert(13, "Spica".to_string()); + m.insert(74, "Hoyten".to_string()); + m +}); + +pub fn lazy_lock() { + println!("ready"); + std::thread::spawn(|| { + println!("{:?}", HASHMAP.get(&13)); + }).join().unwrap(); + println!("{:?}", HASHMAP.get(&74)); +} \ No newline at end of file diff --git a/container_primitive/src/cow.rs b/container_primitive/src/cow.rs index bc662e7..1f415d1 100644 --- a/container_primitive/src/cow.rs +++ b/container_primitive/src/cow.rs @@ -1,17 +1,20 @@ use std::borrow::Cow; +use std::mem::size_of; -// The type Cow is a smart pointer providing clone-on-write functionality: -// it can enclose and provide immutable access to borrowed data, -// and clone the data lazily when mutation or ownership is required. +// The type Cow is a smart pointer providing clone-on-write functionality: +// it can enclose and provide immutable access to borrowed data, +// and clone the data lazily when mutation or ownership is required. pub fn cow_example() { let origin = "hello world"; - let mut cow = Cow::from(origin); + let mut cow = Cow::from(origin); // Cow::Borrowed assert_eq!(cow, "hello world"); // Cow can be borrowed as a str let s: &str = &cow; assert_eq!(s, "hello world"); + assert_eq!(s.len(), cow.len()); + // Cow can be borrowed as a mut str let s: &mut str = cow.to_mut(); s.make_ascii_uppercase(); @@ -54,4 +57,19 @@ pub fn cow_example2() { // No clone occurs because `input` is already owned. let mut input = Cow::from(vec![-1, 0, 1]); abs_all(&mut input); -} \ No newline at end of file + +} + +pub fn beef_cow() { + let borrowed: beef::Cow = beef::Cow::borrowed("Hello"); + let owned: beef::Cow = beef::Cow::owned(String::from("World")); + let _ = beef::Cow::from("Hello"); + + assert_eq!(format!("{} {}!", borrowed, owned), "Hello World!",); + + const WORD: usize = size_of::(); + + assert_eq!(size_of::>(), 3 * WORD); + assert_eq!(size_of::>(), 3 * WORD); + assert_eq!(size_of::>(), 2 * WORD); +} diff --git a/container_primitive/src/lib.rs b/container_primitive/src/lib.rs index 68fad7f..19e8054 100644 --- a/container_primitive/src/lib.rs +++ b/container_primitive/src/lib.rs @@ -1,4 +1,7 @@ -#![feature(once_cell)] +#![feature(lazy_cell)] +#![feature(thin_box)] +#![feature(new_uninit)] +#![feature(cell_update)] pub mod cow; pub mod r#box; diff --git a/container_primitive/src/main.rs b/container_primitive/src/main.rs index 448b1ed..300ca9c 100644 --- a/container_primitive/src/main.rs +++ b/container_primitive/src/main.rs @@ -3,16 +3,19 @@ use container_primitive::*; fn main() { cow_example(); cow_example2(); + beef_cow(); box_example(); box_example2(); + thin_box_example(); cell_example(); refcell_example(); rc_refcell_example(); once_cell_example(); lazy_cell_example(); - myrc_example(); + lazy_lock(); rc_example(); + myrc_example(); } \ No newline at end of file diff --git a/container_primitive/src/rc.rs b/container_primitive/src/rc.rs index a57d798..4200bb9 100644 --- a/container_primitive/src/rc.rs +++ b/container_primitive/src/rc.rs @@ -1,7 +1,9 @@ // Single-threaded reference-counting pointers. ‘Rc’ stands for ‘Reference Counted’. +use std::collections::HashMap; use std::rc::Rc; +use std::cell::{RefCell,RefMut}; pub fn rc_example() { let rc = Rc::new(1); @@ -17,4 +19,98 @@ pub fn rc_example() { println!("my_weak: {}", my_weak.upgrade().is_none()); -} \ No newline at end of file +} + +pub fn rc_refcell_example() { + let shared_map: Rc> = Rc::new(RefCell::new(HashMap::new())); + // Create a new block to limit the scope of the dynamic borrow + { + let mut map: RefMut<_> = shared_map.borrow_mut(); + map.insert("africa", 92388); + map.insert("kyoto", 11837); + map.insert("piccadilly", 11826); + map.insert("marbles", 38); + } + + // Note that if we had not let the previous borrow of the cache fall out + // of scope then the subsequent borrow would cause a dynamic thread panic. + // This is the major hazard of using `RefCell`. + let total: i32 = shared_map.borrow().values().sum(); + println!("{total}"); +} + +pub fn myrc_example() { + let s = example::Rc::new("hello world"); + let s1 = s.clone(); + + let v = s1.value(); + println!("myrc value: {}", v); +} + +pub mod example { + use std::cell::Cell; + use std::marker::PhantomData; + use std::process::abort; + use std::ptr::NonNull; + + pub struct Rc { + ptr: NonNull>, + phantom: PhantomData>, + } + + impl Rc { + pub fn new(t: T) -> Self { + let ptr = Box::new(RcBox { + strong: Cell::new(1), + refcount: Cell::new(1), + value: t, + }); + let ptr = NonNull::new(Box::into_raw(ptr)).unwrap(); + Self { + ptr: ptr, + phantom: PhantomData, + } + } + + pub fn value(&self) -> &T { + &self.inner().value + } + } + + + struct RcBox { + strong: Cell, + refcount: Cell, + value: T, + } + + impl Clone for Rc { + fn clone(&self) -> Rc { + self.inc_strong(); + Rc { + ptr: self.ptr, + phantom: PhantomData, + } + } + } + + trait RcBoxPtr { + fn inner(&self) -> &RcBox; + + fn strong(&self) -> usize { + self.inner().strong.get() + } + + fn inc_strong(&self) { + self.inner() + .strong + .set(self.strong().checked_add(1).unwrap_or_else(|| abort())); + } + } + + impl RcBoxPtr for Rc { + fn inner(&self) -> &RcBox { + unsafe { self.ptr.as_ref() } + } + } +} diff --git a/crossbeam_examples/src/atomics.rs b/crossbeam_examples/src/atomics.rs index f1dc9c0..6592648 100644 --- a/crossbeam_examples/src/atomics.rs +++ b/crossbeam_examples/src/atomics.rs @@ -1,12 +1,10 @@ use crossbeam::atomic::AtomicCell; - pub fn atomic_cell_example() { let a = AtomicCell::new(0i32); a.store(1); assert_eq!(a.load(), 1); - assert_eq!(a.compare_exchange(1, 2), Ok(1)); assert_eq!(a.fetch_add(1), 2); @@ -20,6 +18,3 @@ pub fn atomic_cell_example() { assert_eq!(v, 100); assert_eq!(a.load(), 0); } - - - diff --git a/crossbeam_examples/src/main.rs b/crossbeam_examples/src/main.rs index f68af94..c7227d8 100644 --- a/crossbeam_examples/src/main.rs +++ b/crossbeam_examples/src/main.rs @@ -1,6 +1,8 @@ use crossbeam_examples::*; + fn main() { atomic_cell_example(); + atomic_consume_example(); crossbeam_deque_example(); arrayqueue_example(); diff --git a/parking_lot_examples/src/lib.rs b/parking_lot_examples/src/lib.rs index fe9b58a..4b926a0 100644 --- a/parking_lot_examples/src/lib.rs +++ b/parking_lot_examples/src/lib.rs @@ -53,6 +53,47 @@ pub fn mutex_example2() { println!("mutex_example2: done"); } +pub fn mutex_example3() { + const N: usize = 10; + + let mutex = Arc::new(Mutex::new(())); + + let handles: Vec<_> = (0..N) + .map(|i| { + let mutex = Arc::clone(&mutex); + thread::spawn(move || match mutex.try_lock() { + Some(_guard) => println!("thread {} got the lock", i), + None => println!("thread {} did not get the lock", i), + }) + }) + .collect(); + + for handle in handles { + handle.join().unwrap(); + } + + println!("mutex_example3: done"); +} + +pub fn mutex_example4() { + use parking_lot::Mutex; + use std::mem; + + let mutex = Mutex::new(1); + + // 使用mem::forget持有锁直到结束 + let _guard = mem::forget(mutex.lock()); + + // 一些访问受mutex保护的数据的代码 + + // 在结束前解锁mutex + unsafe { + mutex.force_unlock(); + } + + println!("mutex_example4: done"); +} + pub fn fairmutex_example() { const N: usize = 10; @@ -136,7 +177,6 @@ pub fn once_example() { } } - let handle = thread::spawn(|| { println!("thread 1 get_cached_val: {}", get_cached_val()); }); @@ -146,7 +186,6 @@ pub fn once_example() { handle.join().unwrap(); } - pub fn condvar_example() { use std::sync::Condvar; use std::sync::Mutex; @@ -167,4 +206,4 @@ pub fn condvar_example() { started = cvar.wait(started).unwrap(); // block until notified } println!("condvar_example: done"); -} \ No newline at end of file +} diff --git a/parking_lot_examples/src/main.rs b/parking_lot_examples/src/main.rs index a96484f..73cd8c7 100644 --- a/parking_lot_examples/src/main.rs +++ b/parking_lot_examples/src/main.rs @@ -3,6 +3,9 @@ use parking_lot_examples::*; fn main() { mutex_example(); mutex_example2(); + mutex_example3(); + mutex_example4(); + fairmutex_example(); rwmutex_example(); diff --git a/pool/Cargo.toml b/pool/Cargo.toml index d8fb62b..b1c9cf7 100644 --- a/pool/Cargo.toml +++ b/pool/Cargo.toml @@ -6,11 +6,18 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +executor-service = "0.2.2" +executors = "0.9.0" fast-threadpool = "0.3.0" futures-lite = "1.12.0" +poolite = "0.7.1" rayon = "1.5.3" rusty_pool = "0.7.0" scheduled-thread-pool = "0.2.6" +scoped-tls = "1.0.1" scoped_threadpool = "0.1.9" smol = "1.2.5" threadpool = "1.8.1" +threadpool-executor = "0.3.2" +tokio = { version = "1.32.0", features = ["full"] } +workerpool-rs = "0.2.1" diff --git a/pool/src/lib.rs b/pool/src/lib.rs index 01aced9..3f77ddc 100644 --- a/pool/src/lib.rs +++ b/pool/src/lib.rs @@ -1,10 +1,14 @@ +use std::sync::atomic::{AtomicUsize, AtomicI32, Ordering}; use std::sync::mpsc::channel; +use std::sync::{Arc, Barrier, Mutex}; +use std::thread; +use std::thread::sleep; use std::time::Duration; -use futures_lite::*; -use rayon; -use threadpool::ThreadPool; use fast_threadpool::ThreadPoolConfig; +use rayon; +use rusty_pool; +use tokio; fn fib(n: usize) -> usize { if n == 0 || n == 1 { @@ -23,10 +27,35 @@ pub fn rayon_threadpool() { println!("{}", n); } +scoped_tls::scoped_thread_local!(static POOL_DATA: Vec); +pub fn rayon_threadpool2() { + let pool_data = vec![1, 2, 3]; + + // We haven't assigned any TLS data yet. + assert!(!POOL_DATA.is_set()); + + rayon::ThreadPoolBuilder::new() + .build_scoped( + // Borrow `pool_data` in TLS for each thread. + |thread| POOL_DATA.set(&pool_data, || thread.run()), + // Do some work that needs the TLS data. + |pool| { + pool.install(|| { + assert!(POOL_DATA.is_set()); + assert_eq!(POOL_DATA.with(|data| data.len()), 3); + }) + }, + ) + .unwrap(); + + // Once we've returned, `pool_data` is no longer borrowed. + drop(pool_data); +} + pub fn threadpool_example() { let n_workers = 4; let n_jobs = 8; - let pool = ThreadPool::new(n_workers); + let pool = threadpool::ThreadPool::new(n_workers); let (tx, rx) = channel(); for _ in 0..n_jobs { @@ -40,8 +69,37 @@ pub fn threadpool_example() { assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8); } +pub fn threadpool_example2() { + // create at least as many workers as jobs or you will deadlock yourself + let n_workers = 42; + let n_jobs = 23; + let pool = threadpool::ThreadPool::new(n_workers); + let an_atomic = Arc::new(AtomicUsize::new(0)); + + assert!(n_jobs <= n_workers, "too many jobs, will deadlock"); + + // create a barrier that waits for all jobs plus the starter thread + let barrier = Arc::new(Barrier::new(n_jobs + 1)); + for _ in 0..n_jobs { + let barrier = barrier.clone(); + let an_atomic = an_atomic.clone(); + + pool.execute(move || { + // do the heavy work + an_atomic.fetch_add(1, Ordering::Relaxed); + + // then wait for the other threads + barrier.wait(); + }); + } + + // wait for the threads to finish the work + barrier.wait(); + assert_eq!(an_atomic.load(Ordering::SeqCst), /* n_jobs = */ 23); +} + pub fn rusty_pool_example() { - let pool = ThreadPool::new(4); + let pool = rusty_pool::ThreadPool::default(); for _ in 1..10 { pool.execute(|| { @@ -49,14 +107,82 @@ pub fn rusty_pool_example() { }); } - pool.join(); + pool.join(); + + let handle = pool.evaluate(|| { + thread::sleep(Duration::from_secs(5)); + return 4; + }); + let result = handle.await_complete(); + assert_eq!(result, 4); +} + +async fn some_async_fn(x: i32, y: i32) -> i32 { + x + y +} + +async fn other_async_fn(x: i32, y: i32) -> i32 { + x - y } -pub fn fast_threadpool_example() -> Result<(), fast_threadpool::ThreadPoolDisconnected>{ - let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_sync_handler(); +pub fn rusty_pool_example2() { + let pool = rusty_pool::ThreadPool::default(); + let handle = pool.complete(async { + let a = some_async_fn(4, 6).await; // 10 + let b = some_async_fn(a, 3).await; // 13 + let c = other_async_fn(b, a).await; // 3 + some_async_fn(c, 5).await // 8 + }); + assert_eq!(handle.await_complete(), 8); + + let count = Arc::new(AtomicI32::new(0)); + let clone = count.clone(); + pool.spawn(async move { + let a = some_async_fn(3, 6).await; // 9 + let b = other_async_fn(a, 4).await; // 5 + let c = some_async_fn(b, 7).await; // 12 + clone.fetch_add(c, Ordering::SeqCst); + }); + pool.join(); + assert_eq!(count.load(Ordering::SeqCst), 12); +} + +pub fn rusty_pool_example3() { + let pool = rusty_pool::ThreadPool::default(); + for _ in 0..10 { + pool.execute(|| thread::sleep(Duration::from_secs(10))) + } + + // 等待所有线程变得空闲,即所有任务都完成,包括此线程调用join()后由其他线程添加的任务,或者等待超时 + pool.join_timeout(Duration::from_secs(5)); + + let count = Arc::new(AtomicI32::new(0)); + for _ in 0..15 { + let clone = count.clone(); + pool.execute(move || { + thread::sleep(Duration::from_secs(5)); + clone.fetch_add(1, Ordering::SeqCst); + }); + } + + // 关闭并删除此“ ThreadPool”的唯一实例(无克隆),导致通道被中断,从而导致所有worker在完成当前工作后退出 + pool.shutdown_join(); + assert_eq!(count.load(Ordering::SeqCst), 15); +} +pub fn fast_threadpool_example() -> Result<(), fast_threadpool::ThreadPoolDisconnected> { + let threadpool = + fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_sync_handler(); assert_eq!(4, threadpool.execute(|_| { 2 + 2 })?); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let threadpool = fast_threadpool::ThreadPool::start(ThreadPoolConfig::default(), ()).into_async_handler(); + assert_eq!(4, threadpool.execute(|_| { 2 + 2 }).await.unwrap()); + }); + + Ok(()) } @@ -84,31 +210,150 @@ pub fn scheduled_thread_pool() { let (sender, receiver) = channel(); let pool = scheduled_thread_pool::ScheduledThreadPool::new(4); - let handle = pool.execute_after(Duration::from_millis(1000), move ||{ + let handle = pool.execute_after(Duration::from_millis(1000), move || { println!("Hello from a scheduled thread!"); sender.send("done").unwrap(); }); - let _ = handle; receiver.recv().unwrap(); + let handle = pool.execute_at_fixed_rate(Duration::from_millis(1000), Duration::from_millis(1000), || { + println!("Hello from a scheduled thread!"); + }); + + sleep(Duration::from_secs(5)); + handle.cancel() } -pub fn unblocking_smol() -> io::Result<()> { - smol::block_on(async { - let mut stream = smol::net::TcpStream::connect("example.com:80").await?; - let req = b"GET / HTTP/1.1\r\nHost: example.com\r\nConnection: close\r\n\r\n"; - stream.write_all(req).await?; +// workerpool-rs +pub fn workerpool_rs_example() { + use workerpool_rs::pool::WorkerPool; - let mut stdout = smol::Unblock::new(std::io::stdout()); - io::copy(stream, &mut stdout).await?; - Ok(()) - }) + let n_workers = 4; + let n_jobs = 8; + let pool = WorkerPool::new(n_workers); + + let (tx, rx) = channel(); + let atx = Arc::new(Mutex::new(tx)); + for _ in 0..n_jobs { + let atx = atx.clone(); + pool.execute(move || { + let tx = atx.lock().unwrap(); + tx.send(1) + .expect("channel will be there waiting for the pool"); + }); + } + + // assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8); + println!("{}", rx.iter().take(n_jobs).fold(0, |a, b| a + b)) +} + +fn test(msg: usize) { + println!("key: {}\tvalue: {}", msg, fib(msg)); } -// threads_pool -// workerpool // poolite +pub fn poolite_example() { + let pool = poolite::Pool::new().unwrap(); + for i in 0..10 { + pool.push(move || test(i)); + } + + pool.join(); //wait for the pool +} + +pub fn poolite_example2() { + let pool = poolite::Pool::new().unwrap(); + let mut array = (0..10usize).into_iter().map(|i| (i, 0)).collect::>(); + + // scoped method will waiting scoped's task running finish. + pool.scoped(|scope| { + for i in array.iter_mut() { + // have to move + scope.push(move || i.1 = i.0 * i.0); + } + }); + + for (i, j) in array { + println!("key: {}\tvalue: {}", i, j); + } +} + +pub fn executor_service_example() { + use executor_service::Executors; + + let mut executor_service = + Executors::new_fixed_thread_pool(10).expect("Failed to create the thread pool"); + + let counter = Arc::new(AtomicUsize::new(0)); + + for _ in 0..10 { + let counter = counter.clone(); + executor_service.execute(move || { + thread::sleep(Duration::from_millis(100)); + counter.fetch_add(1, Ordering::SeqCst); + }).unwrap(); + } + + thread::sleep(Duration::from_millis(1000)); + assert_eq!(counter.load(Ordering::SeqCst), 10); + + let mut executor_service = + Executors::new_fixed_thread_pool(2).expect("Failed to create the thread pool"); + + let some_param = "Mr White"; + let res = executor_service + .submit_sync(move || { + sleep(Duration::from_secs(5)); + println!("Hello {:}", some_param); + println!("Long computation finished"); + 2 + }) + .expect("Failed to submit function"); + + println!("Result: {:#?}", res); + assert_eq!(res, 2); +} + +pub fn threadpool_executor_example() { + let pool = threadpool_executor::ThreadPool::new(1); + let mut expectation = pool.execute(|| "hello, thread pool!").unwrap(); + assert_eq!(expectation.get_result().unwrap(), "hello, thread pool!"); + + let pool = threadpool_executor::threadpool::Builder::new() + .core_pool_size(1) + .maximum_pool_size(3) + .keep_alive_time(std::time::Duration::from_secs(300)) + .exeed_limit_policy(threadpool_executor::threadpool::ExceedLimitPolicy::Wait) + .build(); + + pool.execute(|| { + std::thread::sleep(std::time::Duration::from_secs(3)); + }) + .unwrap(); + let mut exp = pool.execute(|| {}).unwrap(); + exp.cancel().unwrap(); +} + +pub fn executors_example() { + use executors::*; + + let n_workers = 4; + let n_jobs = 8; + let pool = crossbeam_workstealing_pool::small_pool(n_workers); + + let (tx, rx) = channel(); + for _ in 0..n_jobs { + let tx = tx.clone(); + pool.execute(move || { + tx.send(1) + .expect("channel will be there waiting for the pool"); + }); + } + + assert_eq!(rx.iter().take(n_jobs).fold(0, |a, b| a + b), 8); +} +// slave-pool diff --git a/pool/src/main.rs b/pool/src/main.rs index a11a74c..fabb200 100644 --- a/pool/src/main.rs +++ b/pool/src/main.rs @@ -2,12 +2,17 @@ use pool::*; fn main() { rayon_threadpool(); + rayon_threadpool2(); threadpool_example(); + threadpool_example2(); scoped_threadpool(); rusty_pool_example(); fast_threadpool_example().unwrap(); - scheduled_thread_pool(); - - unblocking_smol().unwrap(); + workerpool_rs_example(); + poolite_example(); + poolite_example2(); + executor_service_example(); + threadpool_executor_example(); + executors_example(); } diff --git a/process/src/lib.rs b/process/src/lib.rs index ac43a3f..238924e 100644 --- a/process/src/lib.rs +++ b/process/src/lib.rs @@ -56,3 +56,75 @@ pub fn easy_process_example() { } } + +pub fn pipe() { + // 创建两个子进程,一个作为生产者,一个作为消费者 + + // 生产者进程 + let producer = Command::new("echo") + .arg("Hello, Rust!") + .stdout(Stdio::piped()) + .spawn() + .expect("Failed to start producer command"); + + // 消费者进程 + let consumer = Command::new("grep") + .arg("Rust") + .stdin(producer.stdout.unwrap()) + .output() + .expect("Failed to start consumer command"); + + // 获取消费者的输出 + let output = String::from_utf8_lossy(&consumer.stdout); + println!("Output: {:?}", output); +} + +pub fn spawn_a_process() { + let output = Command::new("echo") + .arg("Hello world") + .output() + .expect("Failed to execute command"); + + assert_eq!(b"Hello world\n", output.stdout.as_slice()); +} + +pub fn process_io() { + let echo_child = Command::new("echo") + .arg("Oh no, a tpyo!") + .stdout(Stdio::piped()) + .spawn() + .expect("Failed to start echo process"); + + let echo_out = echo_child.stdout.expect("Failed to open echo stdout"); + + let sed_child = Command::new("sed") + .arg("s/tpyo/typo/") + .stdin(Stdio::from(echo_out)) + .stdout(Stdio::piped()) + .spawn() + .expect("Failed to start sed process"); + + let output = sed_child.wait_with_output().expect("Failed to wait on sed"); + assert_eq!(b"Oh no, a typo!\n", output.stdout.as_slice()); +} + +pub fn child() { + let mut child = Command::new("/bin/cat") + .arg("Cargo.toml") + .spawn() + .expect("failed to execute child"); + + let ecode = child.wait().expect("failed to wait on child"); + + assert!(ecode.success()); +} + +pub fn kill() { + let mut command = Command::new("yes"); + if let Ok(mut child) = command.spawn() { + println!("Child's ID is {}", child.id()); + child.kill().expect("command wasn't running"); + } else { + println!("yes command didn't start"); + } +} diff --git a/process/src/main.rs b/process/src/main.rs index debebd2..88c7c17 100644 --- a/process/src/main.rs +++ b/process/src/main.rs @@ -1,4 +1,3 @@ -use std::process::{Command, Stdio}; use process::*; fn main() { @@ -6,58 +5,10 @@ fn main() { process_io(); child(); kill(); + pipe(); async_process_example(); process_control_example(); easy_process_example(); } -fn spawn_a_process() { - let output = Command::new("echo") - .arg("Hello world") - .output() - .expect("Failed to execute command"); - - assert_eq!(b"Hello world\n", output.stdout.as_slice()); -} - -fn process_io() { - let echo_child = Command::new("echo") - .arg("Oh no, a tpyo!") - .stdout(Stdio::piped()) - .spawn() - .expect("Failed to start echo process"); - - let echo_out = echo_child.stdout.expect("Failed to open echo stdout"); - - let sed_child = Command::new("sed") - .arg("s/tpyo/typo/") - .stdin(Stdio::from(echo_out)) - .stdout(Stdio::piped()) - .spawn() - .expect("Failed to start sed process"); - - let output = sed_child.wait_with_output().expect("Failed to wait on sed"); - assert_eq!(b"Oh no, a typo!\n", output.stdout.as_slice()); -} - -fn child() { - let mut child = Command::new("/bin/cat") - .arg("Cargo.toml") - .spawn() - .expect("failed to execute child"); - - let ecode = child.wait().expect("failed to wait on child"); - - assert!(ecode.success()); -} - -fn kill() { - let mut command = Command::new("yes"); - if let Ok(mut child) = command.spawn() { - println!("Child's ID is {}", child.id()); - child.kill().expect("command wasn't running"); - } else { - println!("yes command didn't start"); - } -} diff --git a/rayon_examples/src/lib.rs b/rayon_examples/src/lib.rs index 6d2c6eb..7fce8da 100644 --- a/rayon_examples/src/lib.rs +++ b/rayon_examples/src/lib.rs @@ -84,4 +84,8 @@ pub fn rayon_threadpool_example() { let n = pool.install(|| fib(20)); println!("{}", n); +} + +pub fn rayon_global_thread_pool_example() { + rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap(); } \ No newline at end of file diff --git a/rayon_examples/src/main.rs b/rayon_examples/src/main.rs index ae721a4..18ed96a 100644 --- a/rayon_examples/src/main.rs +++ b/rayon_examples/src/main.rs @@ -8,4 +8,5 @@ fn main() { rayon_scope_example2(); rayon_scopefifo_example(); rayon_threadpool_example(); + rayon_global_thread_pool_example(); } diff --git a/special/Cargo.toml b/special/Cargo.toml index 2ba33be..f3211fb 100644 --- a/special/Cargo.toml +++ b/special/Cargo.toml @@ -6,4 +6,39 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +arc-swap = "1.6.0" +async-lock = "2.5.0" +async-oneshot = "0.5.0" +async-weighted-semaphore = "0.2.1" +async_singleflight = "0.5.0" +atomic-waker = "1.1.2" +atomic_float = "0.1.0" +atomicbox = "0.4.0" +atomig = "0.4.0" +awaitgroup = "0.6.0" +barrage = "0.2.3" +catty = "0.1.5" +concurrent-queue = "1.2.4" +crossbeam-utils = "0.8.14" +dashmap = "5.4.0" +event-listener = "2.5.3" +evmap = "10.0.2" +flurry = "0.4.0" +futures = "0.3.25" +oneshot = "0.1.5" +portable-atomic = { version = "0.3", features=["float"] } process_lock = "0.1.0" +scc = "1.5.0" +sharded-slab = "0.1.4" +simple-mutex = "1.1.5" +singleflight = "0.2.0" +singleflight-async = "0.1.1" +slab = "0.4.7" +smol = "1.2.5" +sync_cow = "0.1.1" +tokio = { version = "1.21.2", features = ["full"] } +triggered = "0.1.2" +triple_buffer = "6.2.0" +try-lock = "0.2.3" +waitgroup = "0.1.2" +wg = "0.3.1" diff --git a/special/src/arcswap.rs b/special/src/arcswap.rs new file mode 100644 index 0000000..1e7f15e --- /dev/null +++ b/special/src/arcswap.rs @@ -0,0 +1,24 @@ + +use arc_swap::ArcSwap; +use std::sync::Arc; +use crossbeam_utils::thread; + +pub fn arc_swap_example() { + let value = ArcSwap::from(Arc::new(5)); + thread::scope(|scope| { + scope.spawn(|_| { + let new_value = Arc::new(4); + value.store(new_value); + }); + for _ in 0..10 { + scope.spawn(|_| { + loop { + let v = value.load(); + println!("value is {}", v); + return; + } + }); + } + }).unwrap() + +} \ No newline at end of file diff --git a/special/src/lib.rs b/special/src/lib.rs index 14f3d13..bbf44d1 100644 --- a/special/src/lib.rs +++ b/special/src/lib.rs @@ -1,7 +1,23 @@ -pub mod oslock; -pub mod oneshot; -pub mod map; +mod oslock; +mod oneshots; +mod map; +mod primitive; +mod notify; +mod queue; +mod scc_examples; +mod sema_examples; +mod singleflight_example; +mod synccow; +mod arcswap; pub use oslock::*; -pub use oneshot::*; -pub use map::*; \ No newline at end of file +pub use oneshots::*; +pub use map::*; +pub use primitive::*; +pub use notify::*; +pub use queue::*; +pub use scc_examples::*; +pub use sema_examples::*; +pub use singleflight_example::*; +pub use synccow::*; +pub use arcswap::*; \ No newline at end of file diff --git a/special/src/main.rs b/special/src/main.rs index d0070a1..4187220 100644 --- a/special/src/main.rs +++ b/special/src/main.rs @@ -1,5 +1,68 @@ use special::*; + fn main() { process_lock(); + + try_lock_example1(); + + sharded_slab_read(); + sharded_slab_write(); + sharded_slab_pool(); + slab_example(); + + event_listener_example(); + triggered_example(); + barrage_example(); + + hashmap_example(); + flurry_hashmap(); + flurry_hashset(); + evmap_example(); + + concurrent_queue_example(); + triple_buffer_example(); + + async_lock_mutex(); + async_lock_rwlock(); + async_lock_barrier(); + + portable_atomic_i128(); + portable_atomic_u128(); + portable_atomic_f64(); + atomic_float_example(); + atomig_example(); + atomicbox_examples(); + + simple_mutex_example(); + + oneshot_example(); + async_oneshot_example(); + catty_example(); + + waitgroup_example(); + wg_example(); + awaitgroup_example(); + + scc_hashmap(); + scc_hashindex(); + scc_treeindex(); + scc_hashset(); + scc_queue(); + + + async_lock_semaphore(); + async_weighted_semaphore_example(); + tokio_semaphore_example(); + + singleflight_example(); + async_singleflight_example(); + + sync_cow_example().unwrap(); + arc_swap_example(); + + atomic_waker_example(); } + + +// lockfree \ No newline at end of file diff --git a/special/src/map.rs b/special/src/map.rs index e69de29..5f6b4ad 100644 --- a/special/src/map.rs +++ b/special/src/map.rs @@ -0,0 +1,107 @@ +use dashmap::DashMap; +use std::sync::Arc; +use std::thread; + +pub fn hashmap_example() { + let map = Arc::new(DashMap::new()); + + let map1 = map.clone(); + let whandle = thread::spawn(move || { + map1.insert(1, 2); + map1.insert(2, 3); + }); + + let map2 = map.clone(); + let rhandle = thread::spawn(move || { + loop { + if let Some(v) = map2.get(&1) { + println!("get value {} for key 1", *v); + break; + } + } + + loop { + if let Some(v) = map2.get(&2) { + println!("get value {} for key 2", *v); + break; + } + } + }); + + whandle.join().unwrap(); + rhandle.join().unwrap(); +} + +pub fn flurry_hashmap() { + let map = flurry::HashMap::new(); + + assert_eq!(map.pin().insert(37, "a"), None); + assert_eq!(map.pin().is_empty(), false); +} + +pub fn flurry_hashset() { + // Initialize a new hash set. + let books = flurry::HashSet::new(); + let guard = books.guard(); + + // Add some books + books.insert("Fight Club", &guard); + books.insert("Three Men In A Raft", &guard); + books.insert("The Book of Dust", &guard); + books.insert("The Dry", &guard); + + // Check for a specific one. + if !books.contains(&"The Drunken Botanist", &guard) { + println!("We don't have The Drunken Botanist."); + } + + // Remove a book. + books.remove(&"Three Men In A Raft", &guard); + + // Iterate over everything. + for book in books.iter(&guard) { + println!("{}", book); + } +} + +pub fn evmap_example() { + let (book_reviews_r, mut book_reviews_w) = evmap::new(); + + let readers: Vec<_> = (0..4) + .map(|_| { + let r = book_reviews_r.clone(); + thread::spawn(move || { + loop { + let l = r.len(); + if l == 0 { + thread::yield_now(); + } else { + // the reader will either see all the reviews, + // or none of them, since refresh() is atomic. + assert_eq!(l, 4); + break; + } + } + }) + }) + .collect(); + + // do some writes + book_reviews_w.insert("Adventures of Huckleberry Finn", "My favorite book."); + book_reviews_w.insert("Grimms' Fairy Tales", "Masterpiece."); + book_reviews_w.insert("Pride and Prejudice", "Very enjoyable."); + book_reviews_w.insert("The Adventures of Sherlock Holmes", "Eye lyked it alot."); + // expose the writes + book_reviews_w.refresh(); + + // you can read through the write handle + assert_eq!(book_reviews_w.len(), 4); + + // the original read handle still works too + assert_eq!(book_reviews_r.len(), 4); + + // all the threads should eventually see .len() == 4 + for r in readers.into_iter() { + assert!(r.join().is_ok()); + } +} diff --git a/special/src/mco.rs b/special/src/mco.rs deleted file mode 100644 index 94d37aa..0000000 --- a/special/src/mco.rs +++ /dev/null @@ -1,2 +0,0 @@ -// singleflight -// async-singleflight \ No newline at end of file diff --git a/special/src/notify.rs b/special/src/notify.rs new file mode 100644 index 0000000..60bd4bf --- /dev/null +++ b/special/src/notify.rs @@ -0,0 +1,76 @@ +use event_listener::Event; +use std::sync::atomic::{AtomicBool, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::Duration; + +pub fn event_listener_example() { + let flag = Arc::new(AtomicBool::new(false)); + let event = Arc::new(Event::new()); + + // Spawn a thread that will set the flag after 1 second. + thread::spawn({ + let flag = flag.clone(); + let event = event.clone(); + move || { + // Wait for a second. + thread::sleep(Duration::from_secs(1)); + + // Set the flag. + flag.store(true, Ordering::SeqCst); + + // Notify all listeners that the flag has been set. + event.notify(usize::MAX); + } + }); + + // Wait until the flag is set. + loop { + // Check the flag. + if flag.load(Ordering::SeqCst) { + break; + } + + // Start listening for events. + let listener = event.listen(); + + // Check the flag again after creating the listener. + if flag.load(Ordering::SeqCst) { + break; + } + + // Wait for a notification and continue the loop. + listener.wait(); + } + + println!("flag is set"); +} + +pub fn triggered_example() { + smol::block_on(async { + let (trigger, listener) = triggered::trigger(); + + let task = smol::spawn(async { + // Blocks until `trigger.trigger()` below + listener.await; + + println!("Triggered async task"); + }); + + // This will make any thread blocked in `Listener::wait()` or async task awaiting the + // listener continue execution again. + trigger.trigger(); + + let _ = task.await; + }) +} + +pub fn barrage_example() { + smol::block_on(async { + let (tx, rx) = barrage::unbounded(); + let rx2 = rx.clone(); + tx.send_async("Hello!").await.unwrap(); + assert_eq!(rx.recv_async().await, Ok("Hello!")); + assert_eq!(rx2.recv_async().await, Ok("Hello!")); + }); +} diff --git a/special/src/oneshot.rs b/special/src/oneshot.rs deleted file mode 100644 index 08c2ed8..0000000 --- a/special/src/oneshot.rs +++ /dev/null @@ -1,2 +0,0 @@ -// oneshot -// catty \ No newline at end of file diff --git a/special/src/oneshots.rs b/special/src/oneshots.rs new file mode 100644 index 0000000..30c1b9d --- /dev/null +++ b/special/src/oneshots.rs @@ -0,0 +1,42 @@ +use std::thread; + +pub fn oneshot_example() { + let (sender, receiver) = oneshot::channel::(); + let sender = thread::spawn(move || { + sender.send(1).unwrap(); + }); + let receiver = thread::spawn(move || { + let v = receiver.recv().unwrap(); + println!("get value {}", v); + }); + sender.join().unwrap(); + receiver.join().unwrap(); +} + +pub fn async_oneshot_example() { + let (mut sender, receiver) = async_oneshot::oneshot(); + smol::block_on(async { + sender.send(1).unwrap(); + }); + + smol::block_on(async { + let v = receiver.try_recv().unwrap(); + println!("get value {}", v); + }); +} + +pub fn catty_example() { + let (sender, mut receiver) = ::oneshot(); + let sender = thread::spawn(move || { + sender.send(1).unwrap(); + }); + let receiver = thread::spawn(move || { + let v = receiver.try_recv().unwrap(); + if v.is_some() { + println!("get value {}", v.unwrap()); + } + + }); + sender.join().unwrap(); + receiver.join().unwrap(); +} \ No newline at end of file diff --git a/special/src/oslock.rs b/special/src/oslock.rs index fcf391c..46b63e9 100644 --- a/special/src/oslock.rs +++ b/special/src/oslock.rs @@ -11,12 +11,12 @@ pub fn process_lock() { println!("lock success"); break; } - if start.elapsed() > Duration::from_secs(10) { + if start.elapsed() > Duration::from_millis(500) { println!("lock timeout"); break; } std::thread::sleep(Duration::from_millis(100)); } - std::thread::sleep(Duration::from_secs(10)); + std::thread::sleep(Duration::from_millis(500)); } \ No newline at end of file diff --git a/special/src/primitive.rs b/special/src/primitive.rs deleted file mode 100644 index abd36c6..0000000 --- a/special/src/primitive.rs +++ /dev/null @@ -1,12 +0,0 @@ -// Mutex -// RWLock -// wg -// async-xxx -// awaitgroup -// usync - - -// waitfor - -// atomig -// atomicbox \ No newline at end of file diff --git a/special/src/primitive/async_lock_examples.rs b/special/src/primitive/async_lock_examples.rs new file mode 100644 index 0000000..f9a2e04 --- /dev/null +++ b/special/src/primitive/async_lock_examples.rs @@ -0,0 +1,52 @@ +use async_lock::*; +use std::sync::Arc; +use std::thread; + +pub fn async_lock_mutex() { + let lock = Arc::new(Mutex::new(0)); + + let lock1 = lock.clone(); + smol::block_on(async { + let mut guard = lock1.lock().await; + *guard += 1; + }); + + let lock2 = lock.clone(); + smol::block_on(async { + let guard = lock2.lock().await; + println!("lock2 {}", *guard); + }); +} + +pub fn async_lock_rwlock() { + let lock = Arc::new(RwLock::new(0)); + + let lock1 = lock.clone(); + smol::block_on(async { + let mut guard = lock1.write().await; + *guard += 1; + }); + + let lock2 = lock.clone(); + smol::block_on(async { + let guard = lock2.read().await; + println!("lock2 {}", *guard); + }); +} + +pub fn async_lock_barrier() { + let barrier = Arc::new(Barrier::new(5)); + + thread::scope(|s| { + for _ in 0..5 { + let barrier = barrier.clone(); + s.spawn(move || { + smol::block_on(async { + println!("before wait"); + barrier.wait().await; + println!("after wait"); + }); + }); + } + }); +} diff --git a/special/src/primitive/atomic_examples.rs b/special/src/primitive/atomic_examples.rs new file mode 100644 index 0000000..686c751 --- /dev/null +++ b/special/src/primitive/atomic_examples.rs @@ -0,0 +1,74 @@ +use atomicbox::AtomicBox; +use atomig::Atomic; +use portable_atomic::*; +use std::sync::atomic::Ordering; +use std::sync::atomic::Ordering::Relaxed; + +pub fn portable_atomic_i128() { + let mut some_var = AtomicI128::new(10); + assert_eq!(*some_var.get_mut(), 10); + *some_var.get_mut() = 5; + assert_eq!(some_var.load(Ordering::SeqCst), 5); + + assert_eq!(some_var.load(Ordering::Relaxed), 5); +} + +pub fn portable_atomic_u128() { + let mut some_var = AtomicU128::new(10); + assert_eq!(*some_var.get_mut(), 10); + *some_var.get_mut() = 5; + assert_eq!(some_var.load(Ordering::SeqCst), 5); + + assert_eq!(some_var.load(Ordering::Relaxed), 5); +} + +pub fn portable_atomic_f32() { + let mut some_var = AtomicF32::new(10.0); + assert_eq!(*some_var.get_mut(), 10.0); + *some_var.get_mut() = 5.0; + assert_eq!(some_var.load(Ordering::SeqCst), 5.0); + + assert_eq!(some_var.load(Ordering::Relaxed), 5.0); +} + +pub fn portable_atomic_f64() { + let mut some_var = AtomicF64::new(10.0f64); + assert_eq!(*some_var.get_mut(), 10.0); + *some_var.get_mut() = 5.0; + assert_eq!(some_var.load(Ordering::SeqCst), 5.0); + + assert_eq!(some_var.load(Ordering::Relaxed), 5.0); +} + +pub fn atomic_float_example() { + let some_var = atomic_float::AtomicF32::new(800.0f32); + some_var.fetch_add(30.0, Relaxed); + some_var.fetch_sub(-55.0, Relaxed); + some_var.fetch_neg(Relaxed); + + assert_eq!(some_var.load(Relaxed), -885.0); + + let some_var = atomic_float::AtomicF64::new(800.0f64); + some_var.fetch_add(30.0, Relaxed); + some_var.fetch_sub(-55.0, Relaxed); + some_var.fetch_neg(Relaxed); + + assert_eq!(some_var.load(Relaxed), -885.0); +} + +pub fn atomig_example() { + let some_var = Atomic::new(0); + some_var.store(800, Relaxed); + + some_var.fetch_add(30, Relaxed); + some_var.fetch_sub(-55, Relaxed); + + assert_eq!(some_var.load(Relaxed), 885); +} + +pub fn atomicbox_examples() { + let atom = AtomicBox::new(Box::new("one")); + let mut boxed = Box::new("two"); + atom.swap_mut(&mut boxed, Ordering::AcqRel); + assert_eq!(*boxed, "one"); +} diff --git a/special/src/primitive/atomic_waker_examples.rs b/special/src/primitive/atomic_waker_examples.rs new file mode 100644 index 0000000..7490b7b --- /dev/null +++ b/special/src/primitive/atomic_waker_examples.rs @@ -0,0 +1,65 @@ +use futures::future::Future; +use futures::task::{Context, Poll, AtomicWaker}; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::Relaxed; +use std::pin::Pin; + +struct Inner { + waker: AtomicWaker, + set: AtomicBool, +} + +#[derive(Clone)] +struct Flag(Arc); + +impl Flag { + pub fn new() -> Self { + Flag(Arc::new(Inner { + waker: AtomicWaker::new(), + set: AtomicBool::new(false), + })) + } + + pub fn signal(&self) { + self.0.set.store(true, Relaxed); + self.0.waker.wake(); + } +} + +impl Future for Flag { + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // quick check to avoid registration if already done. + if self.0.set.load(Relaxed) { + return Poll::Ready(true); + } + + self.0.waker.register(cx.waker()); + + // Need to check condition **after** `register` to avoid a race + // condition that would result in lost notifications. + if self.0.set.load(Relaxed) { + Poll::Ready(true) + } else { + Poll::Pending + } + } +} + +// extraced from futures::task::AtomicWaker +pub fn atomic_waker_example() { + smol::block_on(async { + let flag = Flag::new(); + let flag2 = flag.clone(); + + smol::spawn(async move { + smol::Timer::after(std::time::Duration::from_secs(1)).await; + flag2.signal(); + }) + .detach(); + + println!("Waiting for flag: {}", flag.await); + }); +} \ No newline at end of file diff --git a/special/src/primitive/mod.rs b/special/src/primitive/mod.rs new file mode 100644 index 0000000..330cc51 --- /dev/null +++ b/special/src/primitive/mod.rs @@ -0,0 +1,24 @@ +// usync + + +// waitfor + +// atomig +// atomicbox + +mod try_lock_examples; +mod sharded_slab_example; +mod async_lock_examples; +mod atomic_examples; +mod simple_mutex_examples; +mod waitgroup_examples; +mod atomic_waker_examples; + +pub use try_lock_examples::*; +pub use sharded_slab_example::*; +pub use async_lock_examples::*; +pub use atomic_examples::*; +pub use simple_mutex_examples::*; +pub use waitgroup_examples::*; +pub use atomic_waker_examples::*; + diff --git a/special/src/primitive/sharded_slab_example.rs b/special/src/primitive/sharded_slab_example.rs new file mode 100644 index 0000000..ab3d2d7 --- /dev/null +++ b/special/src/primitive/sharded_slab_example.rs @@ -0,0 +1,78 @@ +use sharded_slab::Pool; +use sharded_slab::Slab; + +use std::sync::{Arc, Mutex}; + +// Slabs provide pre-allocated storage for many instances of a single data type. +// When a large number of values of a single type are required, this can be more efficient than allocating each item individually. +// Since the allocated items are the same size, memory fragmentation is reduced, and creating and removing new items can be very cheap. + +pub fn sharded_slab_read() { + let slab = Arc::new(Slab::new()); + + let slab2 = slab.clone(); + let thread2 = std::thread::spawn(move || { + let key = slab2.insert("hello from thread two").unwrap(); + assert_eq!(slab2.get(key).unwrap(), "hello from thread two"); + key + }); + + let key1 = slab.insert("hello from thread one").unwrap(); + assert_eq!(slab.get(key1).unwrap(), "hello from thread one"); + + // Wait for thread 2 to complete. + let key2 = thread2.join().unwrap(); + + // The item inserted by thread 2 remains in the slab. + assert_eq!(slab.get(key2).unwrap(), "hello from thread two"); +} + +pub fn sharded_slab_write() { + let slab = Arc::new(Slab::new()); + + let key = slab + .insert(Mutex::new(String::from("hello world"))) + .unwrap(); + + let slab2 = slab.clone(); + let thread2 = std::thread::spawn(move || { + let hello = slab2.get(key).expect("item missing"); + let mut hello = hello.lock().expect("mutex poisoned"); + *hello = String::from("hello everyone!"); + }); + + thread2.join().unwrap(); + + let hello = slab.get(key).expect("item missing"); + let hello = hello.lock().expect("mutex poisoned"); + assert_eq!(hello.as_str(), "hello everyone!"); +} + +pub fn sharded_slab_pool() { + let pool: Pool = Pool::new(); + + let mut guard = pool.create().unwrap(); + let key = guard.key(); + guard.push_str("hello world"); + + drop(guard); // release the guard, allowing immutable access. + assert_eq!(pool.get(key).unwrap(), String::from("hello world")); + + // Mark this entry to be cleared. + pool.clear(key); + // The cleared entry is no longer available in the pool + assert!(pool.get(key).is_none()); +} + +pub fn slab_example() { + let mut slab = slab::Slab::new(); + + let hello = slab.insert("hello"); + let world = slab.insert("world"); + + assert_eq!(slab[hello], "hello"); + assert_eq!(slab[world], "world"); + + slab[world] = "earth"; + assert_eq!(slab[world], "earth"); +} diff --git a/special/src/primitive/simple_mutex_examples.rs b/special/src/primitive/simple_mutex_examples.rs new file mode 100644 index 0000000..89c0063 --- /dev/null +++ b/special/src/primitive/simple_mutex_examples.rs @@ -0,0 +1,21 @@ +use simple_mutex::Mutex; +use std::sync::Arc; +use std::thread; + +pub fn simple_mutex_example() { + let m = Arc::new(Mutex::new(0)); + let mut handles = vec![]; + + for _ in 0..10 { + let m = m.clone(); + handles.push(thread::spawn(move || { + *m.lock() += 1; + })); + } + + for handle in handles { + handle.join().unwrap(); + } + + println!("m = {:?}", m); +} \ No newline at end of file diff --git a/special/src/primitive/try_lock_examples.rs b/special/src/primitive/try_lock_examples.rs new file mode 100644 index 0000000..33c1784 --- /dev/null +++ b/special/src/primitive/try_lock_examples.rs @@ -0,0 +1,31 @@ +use std::sync::Arc; +use try_lock::TryLock; + +pub fn try_lock_example1() { + // a thing we want to share + struct Widget { + name: String, + } + + // lock it up! + let widget1 = Arc::new(TryLock::new(Widget { + name: "Spanner".into(), + })); + + let widget2 = widget1.clone(); + + // mutate the widget + let mut locked = widget1.try_lock().expect("example isn't locked yet"); + locked.name.push_str(" Bundle"); + + // hands off, buddy + let not_locked = widget2.try_lock(); + assert!(not_locked.is_none(), "widget1 has the lock"); + + // ok, you can have it + drop(locked); + + let locked2 = widget2.try_lock().expect("widget1 lock is released"); + + assert_eq!(locked2.name, "Spanner Bundle"); +} diff --git a/special/src/primitive/waitgroup_examples.rs b/special/src/primitive/waitgroup_examples.rs new file mode 100644 index 0000000..eb8c484 --- /dev/null +++ b/special/src/primitive/waitgroup_examples.rs @@ -0,0 +1,65 @@ +use waitgroup::WaitGroup; + +pub fn waitgroup_example() { + smol::block_on(async { + let wg = WaitGroup::new(); + for _ in 0..100 { + let w = wg.worker(); + let _ = smol::spawn(async move { + // do work + drop(w); // drop w means task finished + }); + } + + wg.wait().await; + }) +} + +pub fn wg_example() { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + use std::thread::{sleep, spawn}; + use std::time::Duration; + use wg::WaitGroup; + + let wg = WaitGroup::new(); + let ctr = Arc::new(AtomicUsize::new(0)); + + for _ in 0..5 { + let ctrx = ctr.clone(); + let t_wg = wg.add(1); + spawn(move || { + // mock some time consuming task + sleep(Duration::from_millis(50)); + ctrx.fetch_add(1, Ordering::Relaxed); + + // mock task is finished + t_wg.done(); + }); + } + + wg.wait(); + assert_eq!(ctr.load(Ordering::Relaxed), 5); +} + +pub fn awaitgroup_example() { + use awaitgroup::WaitGroup; + + smol::block_on(async { + let mut wg = WaitGroup::new(); + for _ in 0..5 { + // Create a new worker. + let worker = wg.worker(); + + let _ = smol::spawn(async { + // Do some work... + + // This task is done all of its work. + worker.done(); + }); + } + + // Block until all other tasks have finished their work. + wg.wait().await; + }); +} diff --git a/special/src/queue.rs b/special/src/queue.rs index e69de29..fe45398 100644 --- a/special/src/queue.rs +++ b/special/src/queue.rs @@ -0,0 +1,45 @@ +use concurrent_queue::ConcurrentQueue; +use std::sync::Arc; +use std::thread; + +use triple_buffer::triple_buffer; + +pub fn concurrent_queue_example() { + let q = Arc::new(ConcurrentQueue::unbounded()); + + let q1 = q.clone(); + let whandle = thread::spawn(move || { + for i in 0..10 { + q1.push(i).unwrap(); + } + }); + + let q2 = q.clone(); + let rhandle = thread::spawn(move || loop { + if let Ok(v) = q2.pop() { + println!("get value {}", v); + } else { + println!("queue closed"); + break; + } + }); + + whandle.join().unwrap(); + rhandle.join().unwrap(); +} + +pub fn triple_buffer_example() { + let (mut buf_input, mut buf_output) = triple_buffer(&0); + + // The producer thread can move a value into the buffer at any time + let producer = std::thread::spawn(move || buf_input.write(42)); + + // The consumer thread can read the latest value at any time + let consumer = std::thread::spawn(move || { + let latest = buf_output.read(); + assert!(*latest == 42 || *latest == 0); + }); + + producer.join().unwrap(); + consumer.join().unwrap(); +} diff --git a/special/src/scc.rs b/special/src/scc.rs deleted file mode 100644 index a809607..0000000 --- a/special/src/scc.rs +++ /dev/null @@ -1 +0,0 @@ -// scc \ No newline at end of file diff --git a/special/src/scc_examples.rs b/special/src/scc_examples.rs new file mode 100644 index 0000000..88ffaa5 --- /dev/null +++ b/special/src/scc_examples.rs @@ -0,0 +1,61 @@ +use scc::*; +use std::collections::hash_map::RandomState; + +pub fn scc_hashmap() { + let hashmap: HashMap = HashMap::with_capacity(1000); + assert_eq!(hashmap.capacity(), 1024); + + let ticket = hashmap.reserve(10000); + assert!(ticket.is_some()); + assert_eq!(hashmap.capacity(), 16384); + for i in 0..16 { + assert!(hashmap.insert(i, i).is_ok()); + } + drop(ticket); + + assert_eq!(hashmap.capacity(), 1024); +} + +pub fn scc_hashindex() { + let hashindex: HashIndex = HashIndex::default(); + + assert!(!hashindex.remove(&1)); + assert!(hashindex.insert(1, 0).is_ok()); + assert!(hashindex.remove(&1)); +} + +pub fn scc_treeindex() { + let treeindex: TreeIndex = TreeIndex::new(); + + assert!(treeindex.insert(1, 10).is_ok()); + assert_eq!(treeindex.insert(1, 11).err().unwrap(), (1, 11)); + assert_eq!(treeindex.read(&1, |_k, v| *v).unwrap(), 10); +} + +pub fn scc_hashset() { + let hashset: HashSet = HashSet::with_capacity(1000); + assert_eq!(hashset.capacity(), 1024); + + let ticket = hashset.reserve(10000); + assert!(ticket.is_some()); + assert_eq!(hashset.capacity(), 16384); + for i in 0..16 { + assert!(hashset.insert(i).is_ok()); + } + drop(ticket); + + assert_eq!(hashset.capacity(), 1024); +} + +pub fn scc_queue() { + let queue: Queue = Queue::default(); + + queue.push(37); + queue.push(3); + queue.push(1); + + assert_eq!(queue.pop().map(|e| **e), Some(37)); + assert_eq!(queue.pop().map(|e| **e), Some(3)); + assert_eq!(queue.pop().map(|e| **e), Some(1)); + assert!(queue.pop().is_none()); +} diff --git a/special/src/sema_examples.rs b/special/src/sema_examples.rs new file mode 100644 index 0000000..5c69e6e --- /dev/null +++ b/special/src/sema_examples.rs @@ -0,0 +1,52 @@ +use futures::pin_mut; +use futures::poll; +use std::sync::Arc; + +pub fn tokio_semaphore_example() { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let semaphore = Arc::new(tokio::sync::Semaphore::new(3)); + let mut join_handles = Vec::new(); + + for _ in 0..5 { + let permit = semaphore.clone().acquire_owned().await.unwrap(); + join_handles.push(tokio::spawn(async move { + // perform task... + // explicitly own `permit` in the task + drop(permit); + })); + } + + for handle in join_handles { + handle.await.unwrap(); + } + }); +} + +pub fn async_weighted_semaphore_example() { + smol::block_on(async { + let sem = async_weighted_semaphore::Semaphore::new(1); + let a = sem.acquire(2); + let b = sem.acquire(1); + pin_mut!(a); + pin_mut!(b); + assert!(poll!(&mut a).is_pending()); + assert!(poll!(&mut b).is_pending()); + + sem.release(1); + assert!(poll!(&mut a).is_ready()); + assert!(poll!(&mut b).is_ready()); + }); +} + +pub fn async_lock_semaphore() { + let s = Arc::new(async_lock::Semaphore::new(2)); + + let _g1 = s.try_acquire_arc().unwrap(); + let g2 = s.try_acquire_arc().unwrap(); + + assert!(s.try_acquire_arc().is_none()); + drop(g2); + assert!(s.try_acquire_arc().is_some()); +} diff --git a/special/src/singleflight_example.rs b/special/src/singleflight_example.rs new file mode 100644 index 0000000..5cf3c6d --- /dev/null +++ b/special/src/singleflight_example.rs @@ -0,0 +1,50 @@ +use futures::future::join_all; +use singleflight_async::SingleFlight; +use std::sync::Arc; + +use async_singleflight::Group; + +pub fn singleflight_example() { + smol::block_on(async { + let group = SingleFlight::new(); + let mut futures = Vec::new(); + for _ in 0..10 { + futures.push(group.work("key", || async { + println!("will sleep to simulate async task"); + smol::Timer::after(std::time::Duration::from_millis(100)).await; + println!("real task done"); + "my-result" + })); + } + + for fut in futures.into_iter() { + assert_eq!(fut.await, "my-result"); + println!("task finished"); + } + }); +} + +const RES: usize = 7; + +async fn expensive_fn() -> Result { + smol::Timer::after(std::time::Duration::from_millis(100)).await; + Ok(RES) +} + +pub fn async_singleflight_example() { + smol::block_on(async { + let g = Arc::new(Group::<_, ()>::new()); + let mut handlers = Vec::new(); + for _ in 0..10 { + let g = g.clone(); + handlers.push(smol::spawn(async move { + let res = g.work("key", expensive_fn()).await.0; + let r = res.unwrap(); + println!("{}", r); + })); + + } + + join_all(handlers).await; + }); +} diff --git a/special/src/synccow.rs b/special/src/synccow.rs new file mode 100644 index 0000000..98aae22 --- /dev/null +++ b/special/src/synccow.rs @@ -0,0 +1,36 @@ +use sync_cow::SyncCow; +use std::sync::Arc; +use std::any::Any; + + +pub fn sync_cow_example() -> Result<(),Box> { + let cow = Arc::new(SyncCow::new(5)); + + // Arc is only needed to pass the ref to the threads + let cow_write_arc = cow.clone(); + let cow_read_arc = cow.clone(); + let cow_result_arc = cow.clone(); + + let writer = std::thread::spawn(move || { + let cow = &*cow_write_arc; // unpack immediately to avoid Arc deref + let mut val = 0; + cow.edit(|x| { + val = *x; + *x = 4; + }); + println!("Cow was {} when writing", val); + }); + + let reader = std::thread::spawn(move || { + let cow = &*cow_read_arc; // unpack immediately to avoid Arc deref + println!("Cow was {} when reading", cow.read()); + }); + + writer.join()?; + reader.join()?; + + let cow = &*cow_result_arc; // unpack immediately to avoid Arc deref + println!("Cow was {} when result", cow.read()); + + Ok(()) +} \ No newline at end of file diff --git a/sync_primitive/Cargo.toml b/sync_primitive/Cargo.toml index d8ae5e2..a81ad40 100644 --- a/sync_primitive/Cargo.toml +++ b/sync_primitive/Cargo.toml @@ -6,4 +6,5 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -rand = "0.8.5" \ No newline at end of file +once_cell = "1.18.0" +rand = "0.8.5" diff --git a/sync_primitive/src/barrier.rs b/sync_primitive/src/barrier.rs index e9c9fdf..21d8e4c 100644 --- a/sync_primitive/src/barrier.rs +++ b/sync_primitive/src/barrier.rs @@ -1,3 +1,4 @@ +use core::time; use std::sync::Arc; use std::sync::Barrier; use std::thread; @@ -21,6 +22,33 @@ pub fn barrier_example() { })); } + for handle in handles { + handle.join().unwrap(); + } +} + +pub fn barrier_recycle_example() { + let barrier = Arc::new(Barrier::new(10)); + let mut handles = vec![]; + + for _ in 0..10 { + let barrier = barrier.clone(); + handles.push(thread::spawn(move || { + println!("before wait1"); + let dur = rand::thread_rng().gen_range(100..1000); + thread::sleep(std::time::Duration::from_millis(dur)); + + //step1 + barrier.wait(); + println!("after wait1"); + thread::sleep(time::Duration::from_secs(1)); + + //step2 + barrier.wait(); + println!("after wait2"); + })); + } + for handle in handles { handle.join().unwrap(); } diff --git a/sync_primitive/src/exclusive.rs b/sync_primitive/src/exclusive.rs new file mode 100644 index 0000000..d1248e8 --- /dev/null +++ b/sync_primitive/src/exclusive.rs @@ -0,0 +1,12 @@ +use std::sync::Exclusive; + +pub fn exclusive_lock_example() { + let mut exclusive = Exclusive::new(92); + println!("ready"); + std::thread::spawn(move || { + let counter = exclusive.get_mut(); + println!("{}", *counter); + *counter = 100; + }).join().unwrap(); + +} \ No newline at end of file diff --git a/sync_primitive/src/lazy.rs b/sync_primitive/src/lazy.rs new file mode 100644 index 0000000..d2d100c --- /dev/null +++ b/sync_primitive/src/lazy.rs @@ -0,0 +1,29 @@ +use std::cell::LazyCell; +use std::sync::LazyLock; + +pub fn lazy_cell_example() { + let lazy: LazyCell = LazyCell::new(|| { + println!("initializing"); + 92 + }); + println!("ready"); + println!("{}", *lazy); + println!("{}", *lazy); +} + +use std::collections::HashMap; +static HASHMAP: LazyLock> = LazyLock::new(|| { + println!("initializing"); + let mut m = HashMap::new(); + m.insert(13, "Spica".to_string()); + m.insert(74, "Hoyten".to_string()); + m +}); + +pub fn lazy_lock_example() { + println!("ready"); + std::thread::spawn(|| { + println!("{:?}", HASHMAP.get(&13)); + }).join().unwrap(); + println!("{:?}", HASHMAP.get(&74)); +} \ No newline at end of file diff --git a/sync_primitive/src/lib.rs b/sync_primitive/src/lib.rs index 0f41318..4b1b645 100644 --- a/sync_primitive/src/lib.rs +++ b/sync_primitive/src/lib.rs @@ -1,4 +1,6 @@ #![feature(sync_unsafe_cell)] +#![feature(lazy_cell)] +#![feature(exclusive_wrapper)] pub mod arc; pub mod mutex; @@ -8,6 +10,8 @@ pub mod barrier; pub mod cond; pub mod mpsc; pub mod atomic; +pub mod lazy; +pub mod exclusive; pub use arc::*; pub use mutex::*; @@ -16,4 +20,6 @@ pub use once::*; pub use barrier::*; pub use cond::*; pub use mpsc::*; -pub use atomic::*; \ No newline at end of file +pub use atomic::*; +pub use lazy::*; +pub use exclusive::*; \ No newline at end of file diff --git a/sync_primitive/src/main.rs b/sync_primitive/src/main.rs index 5864f62..00b77de 100644 --- a/sync_primitive/src/main.rs +++ b/sync_primitive/src/main.rs @@ -8,12 +8,17 @@ fn main() { mutex_example1(); mutex_example2_poison(); mutex_example3_drop(); - + rwlock_example(); + read_after_write(); once_example(); + oncecell_example(); + oncelock_example(); + once_cell_example(); barrier_example(); + barrier_recycle_example(); condvar_example(); condvar_example2(); @@ -23,4 +28,9 @@ fn main() { atomic_example(); atomic_example2(); + + lazy_cell_example(); + lazy_lock_example(); + + exclusive_lock_example(); } diff --git a/sync_primitive/src/mutex.rs b/sync_primitive/src/mutex.rs index 7f65eb6..d105947 100644 --- a/sync_primitive/src/mutex.rs +++ b/sync_primitive/src/mutex.rs @@ -84,3 +84,4 @@ pub fn mutex_example3_drop() { println!("Result: {:?}", res_mutex.lock().unwrap()); } + diff --git a/sync_primitive/src/once.rs b/sync_primitive/src/once.rs index a21f76a..cb76a1e 100644 --- a/sync_primitive/src/once.rs +++ b/sync_primitive/src/once.rs @@ -2,7 +2,10 @@ #![allow(unused_assignments)] use std::sync::Once; - +use std::sync::OnceLock; +use std::cell::OnceCell; +use std::thread::sleep; +use std::time::Duration; pub fn once_example() { let once = Once::new(); @@ -16,4 +19,51 @@ pub fn once_example() { println!("Once: {}", val); } +} + +pub fn oncecell_example() { + let cell = OnceCell::new(); + assert!(cell.get().is_none()); + + let value: &String = cell.get_or_init(|| { + "Hello, World!".to_string() + }); + assert_eq!(value, "Hello, World!"); + assert!(cell.get().is_some()); + + println!("OnceCell: {}", cell.get().is_some()) +} + +pub fn oncelock_example() { + static CELL: OnceLock = OnceLock::new(); + assert!(CELL.get().is_none()); + + std::thread::spawn(|| { + let value: &String = CELL.get_or_init(|| { + "Hello, World!".to_string() + }); + assert_eq!(value, "Hello, World!"); + }).join().unwrap(); + + + sleep(Duration::from_secs(1)); + + let value: Option<&String> = CELL.get(); + assert!(value.is_some()); + assert_eq!(value.unwrap().as_str(), "Hello, World!"); + + println!("OnceLock: {}", value.is_some()) +} + +use std::{sync::Mutex, collections::HashMap}; +use once_cell::sync::Lazy; +static GLOBAL_DATA: Lazy>> = Lazy::new(|| { + let mut m = HashMap::new(); + m.insert(13, "Spica".to_string()); + m.insert(74, "Hoyten".to_string()); + Mutex::new(m) +}); + +pub fn once_cell_example() { + println!("{:?}", GLOBAL_DATA.lock().unwrap()); } \ No newline at end of file diff --git a/sync_primitive/src/rwlock.rs b/sync_primitive/src/rwlock.rs index d8f8076..3ce6977 100644 --- a/sync_primitive/src/rwlock.rs +++ b/sync_primitive/src/rwlock.rs @@ -35,4 +35,53 @@ pub fn rwlock_example() { } println!("RwLock: {}", *rwlock.read().unwrap()); -} \ No newline at end of file +} +pub fn read_after_write() { + // 创建一个可共享的可变整数,使用RwLock包装 + let counter = Arc::new(RwLock::new(0)); + + // 创建一个线程持有读锁 + let read_handle = { + let counter = counter.clone(); + thread::spawn(move || { + // 获取读锁 + let num = counter.read().unwrap(); + println!("Reader#1: {}", *num); + + // 休眠模拟读取操作 + thread::sleep(std::time::Duration::from_secs(10)); + }) + }; + + // 创建一个线程请求写锁 + let write_handle = { + let counter = counter.clone(); + thread::spawn(move || { + // 休眠一小段时间,确保读锁已经被获取 + thread::sleep(std::time::Duration::from_secs(1)); + + // 尝试获取写锁 + let mut num = counter.write().unwrap(); + *num += 1; + println!("Writer : Incremented counter to {}", *num); + }) + }; + + // 创建一个线程请求读锁 + let read_handle_2 = { + let counter = counter.clone(); + thread::spawn(move || { + // 休眠一小段时间,确保写锁已经被获取 + thread::sleep(std::time::Duration::from_secs(2)); + + // 尝试获取读锁 + let num = counter.read().unwrap(); + println!("Reader#2: {}", *num); + }) + }; + + // 等待读取线程和写入线程完成 + read_handle.join().unwrap(); + write_handle.join().unwrap(); + read_handle_2.join().unwrap(); +} \ No newline at end of file diff --git a/thread/Cargo.toml b/thread/Cargo.toml index b4790cc..71b62e7 100644 --- a/thread/Cargo.toml +++ b/thread/Cargo.toml @@ -16,6 +16,7 @@ go-spawn = "0.1.2" num_threads = "0.1.6" parking = "2.0.0" num_cpus = "1.13.1" +scopeguard = "1.2.0" [target.'cfg(any(windows, linux))'.dependencies] affinity = "0.1.2" diff --git a/thread/src/main.rs b/thread/src/main.rs index c057196..ce74514 100644 --- a/thread/src/main.rs +++ b/thread/src/main.rs @@ -36,11 +36,14 @@ fn main() { use_affinity(); go_thread(); - + scopeguard_defer(); + park_thread(); panic_example(); panic_caught_example(); info(); + + join_all_example(); } diff --git a/thread/src/threads.rs b/thread/src/threads.rs index ece69c0..b1e851e 100644 --- a/thread/src/threads.rs +++ b/thread/src/threads.rs @@ -15,6 +15,7 @@ use send_wrapper::SendWrapper; use thread_amount::thread_amount; use thread_control::*; use thread_priority::*; +use scopeguard::{guard, defer,defer_on_unwind, defer_on_success}; #[cfg(not(target_os = "macos"))] use affinity::*; @@ -56,6 +57,7 @@ pub fn start_two_threads() { handle2.join().unwrap(); } + pub fn start_n_threads() { const N: isize = 10; @@ -176,7 +178,7 @@ pub fn start_one_thread_with_move() { handle.join().unwrap(); - let handle = thread::spawn(move|| { + let handle = thread::spawn(move || { println!("Hello from a thread with move again, x={}!", x); }); handle.join().unwrap(); @@ -185,7 +187,6 @@ pub fn start_one_thread_with_move() { println!("Hello from a thread without move"); }); handle.join().unwrap(); - } // pub fn start_one_thread_with_move2() { @@ -360,7 +361,6 @@ pub fn rayon_scope() { assert_eq!(x, a.len()); } - // pub fn wrong_send() { // let counter = Rc::new(42); @@ -398,7 +398,9 @@ pub fn print_thread_amount() { let handle = thread::spawn(move || { thread::sleep(Duration::from_millis(1000)); - println!("thread amount: {}", amount.unwrap()); + if !amount.is_none() { + println!("thread amount: {}", amount.unwrap()); + } }); handles.push(handle); @@ -441,7 +443,9 @@ pub fn use_affinity() { affinity::get_thread_affinity().unwrap() ); } - +fn foo() { + println!("foo"); +} pub fn go_thread() { let counter = Arc::new(AtomicI64::new(0)); let counter_cloned = counter.clone(); @@ -453,6 +457,8 @@ pub fn go_thread() { } } + go!(foo()); + // Join the most recent thread spawned by `go_spawn` that has not yet been joined. assert!(join!().is_ok()); assert_eq!(counter.load(Ordering::SeqCst), 100); @@ -490,8 +496,41 @@ pub fn info() { } let count = thread_amount::thread_amount(); - println!("thread_amount: {}", count.unwrap()); + if !count.is_none() { + println!("thread_amount: {}", count.unwrap()); + } let count = num_cpus::get(); println!("num_cpus: {}", count); } + +pub fn scopeguard_defer() { + defer! { + println!("scopeguard: Called at return or panic"); + } + println!("scopeguard: Called first before panic"); + // panic!(); + println!("scopeguard: Called first after panic"); +} + + + + +macro_rules! join_all { + ($($x:ident),*) => { + $($x.join().unwrap();)* + } +} + + +pub fn join_all_example() { + let handle1 = thread::spawn(|| { + println!("Hello from a thread1!"); + }); + + let handle2 = thread::spawn(|| { + println!("Hello from a thread2!"); + }); + + join_all!(handle1,handle2); +} \ No newline at end of file diff --git a/timer_examples/Cargo.toml b/timer_examples/Cargo.toml index 4b1b49f..07f23cc 100644 --- a/timer_examples/Cargo.toml +++ b/timer_examples/Cargo.toml @@ -6,7 +6,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-io = "2.1.0" chrono = "0.4.22" +futures-lite = "2.0.0" +futures-timer = "3.0.2" safina-timer = "0.1.11" smol = "1.2.5" ticker = "0.1.1" diff --git a/timer_examples/src/main.rs b/timer_examples/src/main.rs index 80759d4..0458674 100644 --- a/timer_examples/src/main.rs +++ b/timer_examples/src/main.rs @@ -6,4 +6,9 @@ fn main() { timer_repeat(); safina_timer_example(); + + futures_timer_example(); + + async_io_timer_example(); + async_io_interval(); } diff --git a/timer_examples/src/tickers.rs b/timer_examples/src/tickers.rs index 3f47f20..568a974 100644 --- a/timer_examples/src/tickers.rs +++ b/timer_examples/src/tickers.rs @@ -1,5 +1,5 @@ -use ticker::Ticker; use std::time::Duration; +use ticker::Ticker; pub fn ticker_example() { let ticker = Ticker::new(0..10, Duration::from_secs(1)); @@ -7,3 +7,23 @@ pub fn ticker_example() { println!("{:?}", i) } } + +pub fn async_io_interval() { + use async_io::Timer; + use futures_lite::StreamExt; + + let mut count = 0; + + smol::block_on(async { + let mut tick = Timer::interval(Duration::from_secs(1)); + + while let Some(_) = tick.next().await { + println!("第{}秒", count); + count += 1; + + if count >= 10 { + break; + } + } + }); +} diff --git a/timer_examples/src/timers.rs b/timer_examples/src/timers.rs index 11ca426..d5af7ec 100644 --- a/timer_examples/src/timers.rs +++ b/timer_examples/src/timers.rs @@ -84,3 +84,27 @@ pub fn safina_timer_example() { safina_timer::sleep_until(deadline).await; }); } + +pub fn futures_timer_example() { + use futures_timer::Delay; + use std::time::Duration; + + smol::block_on(async { + for _ in 0..5 { + Delay::new(Duration::from_secs(1)).await; + println!("重复定时任务触发!"); + } + }); +} + +pub fn async_io_timer_example() { + use async_io::Timer; + use std::time::Duration; + + let timer = Timer::after(Duration::from_secs(1)); + + smol::block_on(async { + timer.await; + println!("一秒过去了"); + }); +} diff --git a/tokio_examples/Cargo.toml b/tokio_examples/Cargo.toml index ac4bb3c..3330b4c 100644 --- a/tokio_examples/Cargo.toml +++ b/tokio_examples/Cargo.toml @@ -7,3 +7,4 @@ edition = "2021" [dependencies] tokio = { version = "1.21.2", features = ["full"] } +tokio-rayon = "2.1.0" diff --git a/tokio_examples/src/lib.rs b/tokio_examples/src/lib.rs index dbd53ca..a6f4ced 100644 --- a/tokio_examples/src/lib.rs +++ b/tokio_examples/src/lib.rs @@ -175,4 +175,26 @@ pub fn watch_example() { Ok(()) }); +} + +/// 实现fib +pub fn fib(n: usize) -> usize { + if n == 0 || n == 1 { + return n; + } + + return fib(n-1) + fib(n-2); +} + +pub fn tokio_rayon_example() { + let rt = tokio::runtime::Runtime::new().unwrap(); + + rt.block_on(async { + let nft = tokio_rayon::spawn(|| { + fib(20) + }).await; + + assert_eq!(nft, 6765); + }) + } \ No newline at end of file diff --git a/tokio_examples/src/main.rs b/tokio_examples/src/main.rs index 65733a3..f660448 100644 --- a/tokio_examples/src/main.rs +++ b/tokio_examples/src/main.rs @@ -18,4 +18,5 @@ fn main() { notify_example(); notify_example2(); + tokio_rayon_example(); }