diff --git a/.DS_Store b/.DS_Store deleted file mode 100644 index d133349..0000000 Binary files a/.DS_Store and /dev/null differ diff --git a/.gitignore b/.gitignore index 088ba6b..c0eb52b 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,4 @@ Cargo.lock # These are backup files generated by rustfmt **/*.rs.bk +.history \ No newline at end of file diff --git a/README.md b/README.md index 9a94d6c..c29a3c6 100644 --- a/README.md +++ b/README.md @@ -67,4 +67,5 @@ Introduces [tokio](tokio_examples/src/main.rs). some special synchronization primitives and concurrency libs only for special single purpose. -- replace std::mpsc with crossbeam-channel: https://github.com/rust-lang/rust/pull/93563 \ No newline at end of file +- replace std::mpsc with crossbeam-channel: https://github.com/rust-lang/rust/pull/93563 + diff --git a/book/images/gear.png b/book/images/gear.png deleted file mode 100644 index eb5c21d..0000000 Binary files a/book/images/gear.png and /dev/null differ diff --git a/book_cn/rust_concurrency_cookbook.pdf b/book_cn/rust_concurrency_cookbook.pdf deleted file mode 100644 index ba66741..0000000 Binary files a/book_cn/rust_concurrency_cookbook.pdf and /dev/null differ diff --git a/crossbeam_examples/src/atomics.rs b/crossbeam_examples/src/atomics.rs index f1dc9c0..6592648 100644 --- a/crossbeam_examples/src/atomics.rs +++ b/crossbeam_examples/src/atomics.rs @@ -1,12 +1,10 @@ use crossbeam::atomic::AtomicCell; - pub fn atomic_cell_example() { let a = AtomicCell::new(0i32); a.store(1); assert_eq!(a.load(), 1); - assert_eq!(a.compare_exchange(1, 2), Ok(1)); assert_eq!(a.fetch_add(1), 2); @@ -20,6 +18,3 @@ pub fn atomic_cell_example() { assert_eq!(v, 100); assert_eq!(a.load(), 0); } - - - diff --git a/crossbeam_examples/src/main.rs b/crossbeam_examples/src/main.rs index f68af94..c7227d8 100644 --- a/crossbeam_examples/src/main.rs +++ b/crossbeam_examples/src/main.rs @@ -1,6 +1,8 @@ use crossbeam_examples::*; + fn main() { atomic_cell_example(); + atomic_consume_example(); crossbeam_deque_example(); arrayqueue_example(); diff --git a/parking_lot_examples/src/lib.rs b/parking_lot_examples/src/lib.rs index fe9b58a..4b926a0 100644 --- a/parking_lot_examples/src/lib.rs +++ b/parking_lot_examples/src/lib.rs @@ -53,6 +53,47 @@ pub fn mutex_example2() { println!("mutex_example2: done"); } +pub fn mutex_example3() { + const N: usize = 10; + + let mutex = Arc::new(Mutex::new(())); + + let handles: Vec<_> = (0..N) + .map(|i| { + let mutex = Arc::clone(&mutex); + thread::spawn(move || match mutex.try_lock() { + Some(_guard) => println!("thread {} got the lock", i), + None => println!("thread {} did not get the lock", i), + }) + }) + .collect(); + + for handle in handles { + handle.join().unwrap(); + } + + println!("mutex_example3: done"); +} + +pub fn mutex_example4() { + use parking_lot::Mutex; + use std::mem; + + let mutex = Mutex::new(1); + + // 使用mem::forget持有锁直到结束 + let _guard = mem::forget(mutex.lock()); + + // 一些访问受mutex保护的数据的代码 + + // 在结束前解锁mutex + unsafe { + mutex.force_unlock(); + } + + println!("mutex_example4: done"); +} + pub fn fairmutex_example() { const N: usize = 10; @@ -136,7 +177,6 @@ pub fn once_example() { } } - let handle = thread::spawn(|| { println!("thread 1 get_cached_val: {}", get_cached_val()); }); @@ -146,7 +186,6 @@ pub fn once_example() { handle.join().unwrap(); } - pub fn condvar_example() { use std::sync::Condvar; use std::sync::Mutex; @@ -167,4 +206,4 @@ pub fn condvar_example() { started = cvar.wait(started).unwrap(); // block until notified } println!("condvar_example: done"); -} \ No newline at end of file +} diff --git a/parking_lot_examples/src/main.rs b/parking_lot_examples/src/main.rs index a96484f..73cd8c7 100644 --- a/parking_lot_examples/src/main.rs +++ b/parking_lot_examples/src/main.rs @@ -3,6 +3,9 @@ use parking_lot_examples::*; fn main() { mutex_example(); mutex_example2(); + mutex_example3(); + mutex_example4(); + fairmutex_example(); rwmutex_example(); diff --git a/rayon_examples/src/lib.rs b/rayon_examples/src/lib.rs index 6d2c6eb..7fce8da 100644 --- a/rayon_examples/src/lib.rs +++ b/rayon_examples/src/lib.rs @@ -84,4 +84,8 @@ pub fn rayon_threadpool_example() { let n = pool.install(|| fib(20)); println!("{}", n); +} + +pub fn rayon_global_thread_pool_example() { + rayon::ThreadPoolBuilder::new().num_threads(22).build_global().unwrap(); } \ No newline at end of file diff --git a/rayon_examples/src/main.rs b/rayon_examples/src/main.rs index ae721a4..18ed96a 100644 --- a/rayon_examples/src/main.rs +++ b/rayon_examples/src/main.rs @@ -8,4 +8,5 @@ fn main() { rayon_scope_example2(); rayon_scopefifo_example(); rayon_threadpool_example(); + rayon_global_thread_pool_example(); } diff --git a/special/Cargo.toml b/special/Cargo.toml index 10e71c5..f3211fb 100644 --- a/special/Cargo.toml +++ b/special/Cargo.toml @@ -11,6 +11,7 @@ async-lock = "2.5.0" async-oneshot = "0.5.0" async-weighted-semaphore = "0.2.1" async_singleflight = "0.5.0" +atomic-waker = "1.1.2" atomic_float = "0.1.0" atomicbox = "0.4.0" atomig = "0.4.0" diff --git a/special/src/main.rs b/special/src/main.rs index 1864e80..4187220 100644 --- a/special/src/main.rs +++ b/special/src/main.rs @@ -61,6 +61,7 @@ fn main() { sync_cow_example().unwrap(); arc_swap_example(); + atomic_waker_example(); } diff --git a/special/src/oneshots.rs b/special/src/oneshots.rs index b785abc..30c1b9d 100644 --- a/special/src/oneshots.rs +++ b/special/src/oneshots.rs @@ -26,7 +26,7 @@ pub fn async_oneshot_example() { } pub fn catty_example() { - let (sender, mut receiver) = catty::oneshot(); + let (sender, mut receiver) = ::oneshot(); let sender = thread::spawn(move || { sender.send(1).unwrap(); }); diff --git a/special/src/primitive/atomic_waker_examples.rs b/special/src/primitive/atomic_waker_examples.rs new file mode 100644 index 0000000..7490b7b --- /dev/null +++ b/special/src/primitive/atomic_waker_examples.rs @@ -0,0 +1,65 @@ +use futures::future::Future; +use futures::task::{Context, Poll, AtomicWaker}; +use std::sync::Arc; +use std::sync::atomic::AtomicBool; +use std::sync::atomic::Ordering::Relaxed; +use std::pin::Pin; + +struct Inner { + waker: AtomicWaker, + set: AtomicBool, +} + +#[derive(Clone)] +struct Flag(Arc); + +impl Flag { + pub fn new() -> Self { + Flag(Arc::new(Inner { + waker: AtomicWaker::new(), + set: AtomicBool::new(false), + })) + } + + pub fn signal(&self) { + self.0.set.store(true, Relaxed); + self.0.waker.wake(); + } +} + +impl Future for Flag { + type Output = bool; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + // quick check to avoid registration if already done. + if self.0.set.load(Relaxed) { + return Poll::Ready(true); + } + + self.0.waker.register(cx.waker()); + + // Need to check condition **after** `register` to avoid a race + // condition that would result in lost notifications. + if self.0.set.load(Relaxed) { + Poll::Ready(true) + } else { + Poll::Pending + } + } +} + +// extraced from futures::task::AtomicWaker +pub fn atomic_waker_example() { + smol::block_on(async { + let flag = Flag::new(); + let flag2 = flag.clone(); + + smol::spawn(async move { + smol::Timer::after(std::time::Duration::from_secs(1)).await; + flag2.signal(); + }) + .detach(); + + println!("Waiting for flag: {}", flag.await); + }); +} \ No newline at end of file diff --git a/special/src/primitive/mod.rs b/special/src/primitive/mod.rs index cb6d23e..330cc51 100644 --- a/special/src/primitive/mod.rs +++ b/special/src/primitive/mod.rs @@ -12,6 +12,7 @@ mod async_lock_examples; mod atomic_examples; mod simple_mutex_examples; mod waitgroup_examples; +mod atomic_waker_examples; pub use try_lock_examples::*; pub use sharded_slab_example::*; @@ -19,4 +20,5 @@ pub use async_lock_examples::*; pub use atomic_examples::*; pub use simple_mutex_examples::*; pub use waitgroup_examples::*; +pub use atomic_waker_examples::*; diff --git a/thread/Cargo.toml b/thread/Cargo.toml index b4790cc..71b62e7 100644 --- a/thread/Cargo.toml +++ b/thread/Cargo.toml @@ -16,6 +16,7 @@ go-spawn = "0.1.2" num_threads = "0.1.6" parking = "2.0.0" num_cpus = "1.13.1" +scopeguard = "1.2.0" [target.'cfg(any(windows, linux))'.dependencies] affinity = "0.1.2" diff --git a/thread/src/main.rs b/thread/src/main.rs index c057196..ce74514 100644 --- a/thread/src/main.rs +++ b/thread/src/main.rs @@ -36,11 +36,14 @@ fn main() { use_affinity(); go_thread(); - + scopeguard_defer(); + park_thread(); panic_example(); panic_caught_example(); info(); + + join_all_example(); } diff --git a/thread/src/threads.rs b/thread/src/threads.rs index 86cb56d..b1e851e 100644 --- a/thread/src/threads.rs +++ b/thread/src/threads.rs @@ -15,6 +15,7 @@ use send_wrapper::SendWrapper; use thread_amount::thread_amount; use thread_control::*; use thread_priority::*; +use scopeguard::{guard, defer,defer_on_unwind, defer_on_success}; #[cfg(not(target_os = "macos"))] use affinity::*; @@ -56,6 +57,7 @@ pub fn start_two_threads() { handle2.join().unwrap(); } + pub fn start_n_threads() { const N: isize = 10; @@ -441,7 +443,9 @@ pub fn use_affinity() { affinity::get_thread_affinity().unwrap() ); } - +fn foo() { + println!("foo"); +} pub fn go_thread() { let counter = Arc::new(AtomicI64::new(0)); let counter_cloned = counter.clone(); @@ -453,6 +457,8 @@ pub fn go_thread() { } } + go!(foo()); + // Join the most recent thread spawned by `go_spawn` that has not yet been joined. assert!(join!().is_ok()); assert_eq!(counter.load(Ordering::SeqCst), 100); @@ -497,3 +503,34 @@ pub fn info() { let count = num_cpus::get(); println!("num_cpus: {}", count); } + +pub fn scopeguard_defer() { + defer! { + println!("scopeguard: Called at return or panic"); + } + println!("scopeguard: Called first before panic"); + // panic!(); + println!("scopeguard: Called first after panic"); +} + + + + +macro_rules! join_all { + ($($x:ident),*) => { + $($x.join().unwrap();)* + } +} + + +pub fn join_all_example() { + let handle1 = thread::spawn(|| { + println!("Hello from a thread1!"); + }); + + let handle2 = thread::spawn(|| { + println!("Hello from a thread2!"); + }); + + join_all!(handle1,handle2); +} \ No newline at end of file diff --git a/timer_examples/Cargo.toml b/timer_examples/Cargo.toml index 4b1b49f..07f23cc 100644 --- a/timer_examples/Cargo.toml +++ b/timer_examples/Cargo.toml @@ -6,7 +6,10 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-io = "2.1.0" chrono = "0.4.22" +futures-lite = "2.0.0" +futures-timer = "3.0.2" safina-timer = "0.1.11" smol = "1.2.5" ticker = "0.1.1" diff --git a/timer_examples/src/main.rs b/timer_examples/src/main.rs index 80759d4..0458674 100644 --- a/timer_examples/src/main.rs +++ b/timer_examples/src/main.rs @@ -6,4 +6,9 @@ fn main() { timer_repeat(); safina_timer_example(); + + futures_timer_example(); + + async_io_timer_example(); + async_io_interval(); } diff --git a/timer_examples/src/tickers.rs b/timer_examples/src/tickers.rs index 3f47f20..568a974 100644 --- a/timer_examples/src/tickers.rs +++ b/timer_examples/src/tickers.rs @@ -1,5 +1,5 @@ -use ticker::Ticker; use std::time::Duration; +use ticker::Ticker; pub fn ticker_example() { let ticker = Ticker::new(0..10, Duration::from_secs(1)); @@ -7,3 +7,23 @@ pub fn ticker_example() { println!("{:?}", i) } } + +pub fn async_io_interval() { + use async_io::Timer; + use futures_lite::StreamExt; + + let mut count = 0; + + smol::block_on(async { + let mut tick = Timer::interval(Duration::from_secs(1)); + + while let Some(_) = tick.next().await { + println!("第{}秒", count); + count += 1; + + if count >= 10 { + break; + } + } + }); +} diff --git a/timer_examples/src/timers.rs b/timer_examples/src/timers.rs index 11ca426..d5af7ec 100644 --- a/timer_examples/src/timers.rs +++ b/timer_examples/src/timers.rs @@ -84,3 +84,27 @@ pub fn safina_timer_example() { safina_timer::sleep_until(deadline).await; }); } + +pub fn futures_timer_example() { + use futures_timer::Delay; + use std::time::Duration; + + smol::block_on(async { + for _ in 0..5 { + Delay::new(Duration::from_secs(1)).await; + println!("重复定时任务触发!"); + } + }); +} + +pub fn async_io_timer_example() { + use async_io::Timer; + use std::time::Duration; + + let timer = Timer::after(Duration::from_secs(1)); + + smol::block_on(async { + timer.await; + println!("一秒过去了"); + }); +}