Многонишково програмиране
14 ноември 2019
Преговор
Преговор
- Често срещани trait-ове (
Copy
,Drop
,Display
/Debug
и т.н.)
Преговор
- Често срещани trait-ове (
Copy
,Drop
,Display
/Debug
и т.н.) - предифиниране на оператори (
PartialEq
/Eq
,PartialOrd
/Ord
и други)
Преговор
- Често срещани trait-ове (
Copy
,Drop
,Display
/Debug
и т.н.) - предифиниране на оператори (
PartialEq
/Eq
,PartialOrd
/Ord
и други) - Iterator (for-цикли, адаптери)
Преговор
- Често срещани trait-ове (
Copy
,Drop
,Display
/Debug
и т.н.) - предифиниране на оператори (
PartialEq
/Eq
,PartialOrd
/Ord
и други) - Iterator (for-цикли, адаптери)
- Closures (анонимни функции и още нещо!)
Преговор
- Често срещани trait-ове (
Copy
,Drop
,Display
/Debug
и т.н.) - предифиниране на оператори (
PartialEq
/Eq
,PartialOrd
/Ord
и други) - Iterator (for-цикли, адаптери)
- Closures (анонимни функции и още нещо!)
- Fn trait-ове (когато искаме да подаваме closures на функции)
Fearless concurrency
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
thread::spawn
пуска нова нишка (на операционната система)
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
thread::spawn
пуска нова нишка (на операционната система)- и изпълнява подадената функция в нея
Нишки
use std::thread;
fn main() {
thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
}
use std::thread; fn main() { thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); }
thread::spawn
пуска нова нишка (на операционната система)- и изпълнява подадената функция в нея
- когато функцията завърши, нишката се спира
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
- програмата приключва когато главната нишка завърши
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
- програмата приключва когато главната нишка завърши
spawn
връщаJoinHandle
Нишки
use std::thread;
fn main() {
let handle = thread::spawn(|| println!("hi from spawned thread"));
println!("hi from main thread");
let _ = handle.join();
}
hi from main thread hi from spawned thread
use std::thread; fn main() { let handle = thread::spawn(|| println!("hi from spawned thread")); println!("hi from main thread"); let _ = handle.join(); }
- програмата приключва когато главната нишка завърши
spawn
връщаJoinHandle
- можем да използваме
join
за да изчакаме пуснатите нишки
Panic
Panic
panic!
в главната нишка спира програмата
Panic
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишката
Panic
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултат
Panic
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултатOk(T)
ако функцията е завършила успешно
Panic
panic!
в главната нишка спира програматаpanic!
в друга нишка спира нишкатаJoinHandle::join
връща резултатOk(T)
ако функцията е завършила успешноErr(Box<Any>)
ако е имало паника
Споделяне на стойности
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
Споделяне на стойности
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(|| {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
error[E0373]: closure may outlive the current function, but it borrows `nums`, which is owned by the current function --> src/bin/main_70d75afc36316cf9348e1b9a1b43baada82fd5b2.rs:6:32 | 6 | let handle = thread::spawn(|| { | ^^ may outlive borrowed value `nums` 7 | for i in &nums { | ---- `nums` is borrowed here | note: function requires argument type to outlive `'static` --> src/bin/main_70d75afc36316cf9348e1b9a1b43baada82fd5b2.rs:6:18 | 6 | let handle = thread::spawn(|| { | __________________^ 7 | | for i in &nums { 8 | | println!("number {}", i); 9 | | } 10| | }); | |______^ help: to force the closure to take ownership of `nums` (and any other referenced variables), use the `move` keyword | 6 | let handle = thread::spawn(move || { | ^^^^^^^
use std::thread; fn main() { let nums = (0..5).collect::>(); let handle = thread::spawn(|| { for i in &nums { println!("number {}", i); } }); let _ = handle.join(); }
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
- затова rust не позволява да подадем референции към локални променливи
Споделяне на стойности
- новосъздадената нишка може да надживее функцията в която е извикана
- затова rust не позволява да подадем референции към локални променливи
- това се налага от ограничението на
spawn
че приемаF: 'static
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
Споделяне на стойности
Можем да преместим стойността в новата нишка
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handle = thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
});
let _ = handle.join();
}
number 0 number 1 number 2 number 3 number 4
use std::thread; fn main() { let nums = (0..5).collect::>(); let handle = thread::spawn(move || { for i in &nums { println!("number {}", i); } }); let _ = handle.join(); }
Споделяне между няколко нишки
Как бихме споделили стойност между няколко нишки?
use std::thread;
fn main() {
let nums = (0..5).collect::<Vec<_>>();
let handles = (0..2).map(|_| {
thread::spawn(move || {
for i in &nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
Очевидно прехвърляне на собственост няма да работи - трябва ни референция!
Споделяне между няколко нишки
Вариант едно: референция, но статична!
use std::thread;
fn main() {
let boxed_nums = (0..5).collect::<Box<[_]>>();
let nums: &'static [_] = Box::leak(boxed_nums);
let handles = (0..2).map(|_| {
thread::spawn(move || {
for i in nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
number 0 number 1 number 2 number 3 number 4 number 0 number 1 number 2 number 3 number 4
use std::thread; fn main() { let boxed_nums = (0..5).collect::>(); let nums: &'static [_] = Box::leak(boxed_nums); let handles = (0..2).map(|_| { thread::spawn(move || { for i in nums { println!("number {}", i); } }) }) .collect:: >(); for h in handles { let _ = h.join(); } }
Споделяне между няколко нишки
Вариант едно: референция, но статична!
use std::thread;
fn main() {
let boxed_nums = (0..5).collect::<Box<[_]>>();
let nums: &'static [_] = Box::leak(boxed_nums);
let handles = (0..2).map(|_| {
thread::spawn(move || {
for i in nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
number 0 number 1 number 2 number 3 number 4 number 0 number 1 number 2 number 3 number 4
use std::thread; fn main() { let boxed_nums = (0..5).collect::>(); let nums: &'static [_] = Box::leak(boxed_nums); let handles = (0..2).map(|_| { thread::spawn(move || { for i in nums { println!("number {}", i); } }) }) .collect:: >(); for h in handles { let _ = h.join(); } }
Забележка: не правете това!
Споделяне между няколко нишки
Можем да пробваме с Rc
use std::rc::Rc;
use std::thread;
fn main() {
let nums_vec = (0..5).collect::<Vec<_>>();
let nums = Rc::new(nums_vec);
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
Споделяне между няколко нишки
Можем да пробваме с Rc
use std::rc::Rc;
use std::thread;
fn main() {
let nums_vec = (0..5).collect::<Vec<_>>();
let nums = Rc::new(nums_vec);
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0277]: `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely --> src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:13 | 12 | thread::spawn(move || { | ^^^^^^^^^^^^^ `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely | = help: within `[closure@src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:27: 16:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::vec::Vec<i32>>` = note: required because it appears within the type `[closure@src/bin/main_a64edfb8be3b487ac4411ab31257712271d4ea64.rs:12:27: 16:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`
use std::rc::Rc; use std::thread; fn main() { let nums_vec = (0..5).collect::>(); let nums = Rc::new(nums_vec); let handles = (0..2) .map(|_| { let nums = Rc::clone(&nums); thread::spawn(move || { for i in &*nums { println!("number {}", i); } }) }) .collect:: >(); for h in handles { let _ = h.join(); } }
Send и Sync
pub fn spawn<F, T>(f: F) -> JoinHandle<T> where
F: FnOnce() -> T,
F: Send + 'static,
T: Send + 'static
Send и Sync
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
)
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
- auto traits
Send и Sync
- Send - позволява прехвърляне на собственост между нишки
- Sync - позволява споделяне между нишки през референция (
&T
) - marker traits
- имплементирани са за повечето типове
- auto traits
- unsafe traits
Send и Sync
Аuto traits
- имплементират се автоматично ако всичките полета са съответно
Send
иSync
pub struct Token(u32);
pub struct Token(u32); fn main() {}
Send и Sync
Аuto traits
- имплементират се автоматично ако всичките полета са съответно
Send
иSync
pub struct Token(u32);
pub struct Token(u32); fn main() {}
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
- thread local типове, напр.
rand::rngs::ThreadRng
Send и Sync
Send
- позволява прехвърляне на собственост между нишки
- пример за типове, които не са
Send
:Rc
*const T
и*mut T
- thread local типове, напр.
rand::rngs::ThreadRng
- и други
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Cell
,RefCell
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Cell
,RefCell
Rc
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Cell
,RefCell
Rc
*const T
и*mut T
Send + Sync
Sync
- позволява споделяне на референция
&T
между нишки T: Sync
⟺&T: Send
- пример за типове, които не са
Sync
:Cell
,RefCell
Rc
*const T
и*mut T
- и други
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора - ако нашата нишка има
&mut Vec<_>
значи никой друг няма референция до вектора
Send + Sync
Sync
Това значи ли че Vec<T>
е Sync
?
- да, ако
T: Sync
- ако нашата нишка има
&Vec<_>
значи никой не може да модифицира вектора - ако нашата нишка има
&mut Vec<_>
значи никой друг няма референция до вектора - типове, които не са
Sync
, обикновено имат internal mutability без синхронизация
Send + Sync
Unsafe traits
- unsafe са за ръчна имплементация
struct MyBox(*mut u8);
unsafe impl Send for MyBox {}
unsafe impl Sync for MyBox {}
fn main() {} struct MyBox(*mut u8); unsafe impl Send for MyBox {} unsafe impl Sync for MyBox {}
Send + Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
Send + Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
Send + Sync
Деимплементация
// Само на nightly
#![feature(optin_builtin_traits)]
struct SpecialToken(u8);
impl !Send for SpecialToken {}
impl !Sync for SpecialToken {}
- автоматичната имплементация никога няма да е грешна от само себе си
- но може да пишем код, който разчита, че определен тип не може да се прехвърля / споделя
Send + Sync
Деимплементация
Хак за stable
use std::marker::PhantomData;
struct SpecialToken(u8, PhantomData<*const ()>);
fn main() {} use std::marker::PhantomData; struct SpecialToken(u8, PhantomData<*const ()>);
Arc
Да се върнем на кода, който не се компилираше
use std::rc::Rc;
use std::thread;
fn main() {
let nums = Rc::new((0..5).collect::<Vec<_>>());
let handles = (0..2)
.map(|_| {
let nums = Rc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
error[E0277]: `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely --> src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:13 | 11 | thread::spawn(move || { | ^^^^^^^^^^^^^ `std::rc::Rc<std::vec::Vec<i32>>` cannot be sent between threads safely | = help: within `[closure@src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`, the trait `std::marker::Send` is not implemented for `std::rc::Rc<std::vec::Vec<i32>>` = note: required because it appears within the type `[closure@src/bin/main_8e08fc7cb2794a1557ea5e32c268dd5d930f61fb.rs:11:27: 15:14 nums:std::rc::Rc<std::vec::Vec<i32>>]`
use std::rc::Rc; use std::thread; fn main() { let nums = Rc::new((0..5).collect::>()); let handles = (0..2) .map(|_| { let nums = Rc::clone(&nums); thread::spawn(move || { for i in &*nums { println!("number {}", i); } }) }) .collect:: >(); for h in handles { let _ = h.join(); } }
Arc
Решението е да заменим std::rc::Rc
с std::sync::Arc
use std::sync::Arc;
use std::thread;
fn main() {
let nums = Arc::new((0..5).collect::<Vec<_>>());
let handles = (0..2)
.map(|_| {
let nums = Arc::clone(&nums);
thread::spawn(move || {
for i in &*nums {
println!("number {}", i);
}
})
})
.collect::<Vec<_>>();
for h in handles {
let _ = h.join();
}
}
number 0 number 1 number 2 number 3 number 4 number 0 number 1 number 2 number 3 number 4
use std::sync::Arc; use std::thread; fn main() { let nums = Arc::new((0..5).collect::>()); let handles = (0..2) .map(|_| { let nums = Arc::clone(&nums); thread::spawn(move || { for i in &*nums { println!("number {}", i); } }) }) .collect:: >(); for h in handles { let _ = h.join(); } }
Arc
Arc
- Atomic Reference Counter
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността)
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността) - но използва атомарни операции за броене на референциите
Arc
- Atomic Reference Counter
- аналогично на Rc (споделена собственост, позволява само взимане на
&T
към вътрешността) - но използва атомарни операции за броене на референциите
- може да се използва за споделяне на стойности между нишки, ако
T: Send + Sync
Примитиви за синхронизация
Примитиви за синхронизация
Стандартния пример за грешен многонишков алгоритъм:
let v = Arc::new((0..100).collect::<Vec<_>>());
let mut sum = 0;
let t1 = {
let v = Arc::clone(&v);
thread::spawn(|| for i in &v[0..50] { sum += i; })
};
let t2 = {
let v = Arc::clone(&v);
thread::spawn(|| for i in &v[51..100] { sum += i; })
};
let _ = t1.join();
let _ = t2.join();
println!("sum: {}", sum);
error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:9:19 | 9 | thread::spawn(|| for i in &v[0..50] { sum += i; }) | ^^ - `v` is borrowed here | | | may outlive borrowed value `v` | note: function requires argument type to outlive `'static` --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:9:5 | 9 | thread::spawn(|| for i in &v[0..50] { sum += i; }) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword | 9 | thread::spawn(move || for i in &v[0..50] { sum += i; }) | ^^^^^^^ error[E0373]: closure may outlive the current function, but it borrows `sum`, which is owned by the current function --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:9:19 | 9 | thread::spawn(|| for i in &v[0..50] { sum += i; }) | ^^ --- `sum` is borrowed here | | | may outlive borrowed value `sum` | note: function requires argument type to outlive `'static` --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:9:5 | 9 | thread::spawn(|| for i in &v[0..50] { sum += i; }) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: to force the closure to take ownership of `sum` (and any other referenced variables), use the `move` keyword | 9 | thread::spawn(move || for i in &v[0..50] { sum += i; }) | ^^^^^^^ error[E0499]: cannot borrow `sum` as mutable more than once at a time --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:14:19 | 9 | thread::spawn(|| for i in &v[0..50] { sum += i; }) | -------------------------------------------------- | | | | | | | first borrow occurs due to use of `sum` in closure | | first mutable borrow occurs here | argument requires that `sum` is borrowed for `'static` ... 14 | thread::spawn(|| for i in &v[51..100] { sum += i; }) | ^^ --- second borrow occurs due to use of `sum` in closure | | | second mutable borrow occurs here error[E0373]: closure may outlive the current function, but it borrows `v`, which is owned by the current function --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:14:19 | 14 | thread::spawn(|| for i in &v[51..100] { sum += i; }) | ^^ - `v` is borrowed here | | | may outlive borrowed value `v` | note: function requires argument type to outlive `'static` --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:14:5 | 14 | thread::spawn(|| for i in &v[51..100] { sum += i; }) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: to force the closure to take ownership of `v` (and any other referenced variables), use the `move` keyword | 14 | thread::spawn(move || for i in &v[51..100] { sum += i; }) | ^^^^^^^ error[E0373]: closure may outlive the current function, but it borrows `sum`, which is owned by the current function --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:14:19 | 14 | thread::spawn(|| for i in &v[51..100] { sum += i; }) | ^^ --- `sum` is borrowed here | | | may outlive borrowed value `sum` | note: function requires argument type to outlive `'static` --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:14:5 | 14 | thread::spawn(|| for i in &v[51..100] { sum += i; }) | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ help: to force the closure to take ownership of `sum` (and any other referenced variables), use the `move` keyword | 14 | thread::spawn(move || for i in &v[51..100] { sum += i; }) | ^^^^^^^ error[E0502]: cannot borrow `sum` as immutable because it is also borrowed as mutable --> src/bin/main_9ab818289d8f7ec0e1d87f97c7a3c68bc6ef4b11.rs:19:21 | 9 | thread::spawn(|| for i in &v[0..50] { sum += i; }) | -------------------------------------------------- | | | | | | | first borrow occurs due to use of `sum` in closure | | mutable borrow occurs here | argument requires that `sum` is borrowed for `'static` ... 19 | println!("sum: {}", sum); | ^^^ immutable borrow occurs here
use std::sync::Arc; use std::thread; fn main() { let v = Arc::new((0..100).collect::>()); let mut sum = 0; let t1 = { let v = Arc::clone(&v); thread::spawn(|| for i in &v[0..50] { sum += i; }) }; let t2 = { let v = Arc::clone(&v); thread::spawn(|| for i in &v[51..100] { sum += i; }) }; let _ = t1.join(); let _ = t2.join(); println!("sum: {}", sum); }
Примитиви за синхронизация
Можем ли да го накараме да работи?
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Arc<i32>
- нямаме как да модифицираме съдържанието
Примитиви за синхронизация
Можем ли да го накараме да работи?
&mut i32
- не можем да имаме два пъти&mut
, а иspawn
очаква'static
Arc<i32>
- нямаме как да модифицираме съдържаниетоArc<Cell<i32>>
,Arc<RefCell<i32>>
-Cell
иRefCell
не саSync
Примитиви за синхронизация
Можем да го накараме да работи
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
- atomic integers
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
- atomic integers
- да връщаме резултат от нишката
Примитиви за синхронизация
Можем да го накараме да работи
Arc<Mutex<i32>>
- atomic integers
- да връщаме резултат от нишката
- …
Mutex
use std::sync::Mutex;
fn main() {
// мутекса опакова стойността, която предпазва
let mutex = Mutex::new(10);
{
// заключваме мутекса
// `lock` е умен указател с deref до `&T` и `&mut T`
let mut lock = mutex.lock().unwrap();
*lock += 32;
// мутекса се отключва когато `lock` се деалокира
}
}
use std::sync::Mutex; fn main() { // мутекса опакова стойността, която предпазва let mutex = Mutex::new(10); { // заключваме мутекса // `lock` е умен указател с deref до `&T` и `&mut T` let mut lock = mutex.lock().unwrap(); *lock += 32; // мутекса се отключва когато `lock` се деалокира } }
Mutex
- mutual exclusion
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го - ако мутекса е заключен и извикаме
lock
- нишката ни се спира
Mutex
- mutual exclusion
- използва се за да ни даде ексклузивен достъп до някакъв общ ресурс
- scope-а за който имаме ексклузивен достъп се нарича критична секция
- работи по следния начин
- съдържа флаг - дали мутекса е заключен или свободен
- ако мутекса е отключен и извикаме
lock
- заключваме го - ако мутекса е заключен и извикаме
lock
- нишката ни се спира - операционната система ще я събуди когато мутекса е свободен
Mutex
Mutex<()>
може да се използва като флаг
Mutex
Mutex<()>
може да се използва като флаг- дали е наш ред да изпълним някаква операция
Mutex
Mutex<()>
може да се използва като флаг- дали е наш ред да изпълним някаква операция
- понякога
Condvar
е по-подходящ за този случай
Mutex
Panic
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
- затова
Mutex::lock()
иMutex::try_lock()
връщат резултат
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
- затова
Mutex::lock()
иMutex::try_lock()
връщат резултат Ok(MutexGuard)
Mutex
Panic
- ако нишка е заключила мутекс и влезе в
panic!
по това време, мутекса се зачита за отровен - може данните пазени от мутекса да са останали в невалидно състояние
- затова
Mutex::lock()
иMutex::try_lock()
връщат резултат Ok(MutexGuard)
Err(PoisonError)
RwLock
- Reader-writer lock
RwLock
- Reader-writer lock
- позволява четене от много места
RwLock
- Reader-writer lock
- позволява четене от много места
- или писане от едно място
RwLock
- Reader-writer lock
- позволява четене от много места
- или писане от едно място
- подобно на
RefCell
в многонишков контекст
Mutex срещу RwLock
- Mutex е по-бърз и по-лек от RwLock
Mutex срещу RwLock
- Mutex е по-бърз и по-лек от RwLock
- добра практика е критичните секции да са възможно най-кратки
Mutex срещу RwLock
- Mutex е по-бърз и по-лек от RwLock
- добра практика е критичните секции да са възможно най-кратки
- ако спазваме това няма да имаме много съперничество и мутекса е добра идея
Mutex срещу RwLock
- Mutex е по-бърз и по-лек от RwLock
- добра практика е критичните секции да са възможно най-кратки
- ако спазваме това няма да имаме много съперничество и мутекса е добра идея
RwLock
може да се използва да опаковаме древни C++ библиотеки
Condvar
- Conditional variable
Condvar
- Conditional variable
- позволява да изчакаме за някакво събитие или състояние
Condvar
- Conditional variable
- позволява да изчакаме за някакво събитие или състояние
- нишката ни ще заспи докато това не се изпълни
Condvar
- Conditional variable
- позволява да изчакаме за някакво събитие или състояние
- нишката ни ще заспи докато това не се изпълни
- и ще бъде събудена от операционната система
Condvar
let pair = Arc::new((Mutex::new(false), Condvar::new()));
use std::sync::{Arc, Mutex, Condvar}; fn main() { let pair = Arc::new((Mutex::new(false), Condvar::new())); }
// Thread A
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
while !*started {
started = cvar.wait(started).unwrap();
}
use std::sync::{Arc, Mutex, Condvar}; fn main() { let pair = Arc::new((Mutex::new(false), Condvar::new())); // Thread A let (lock, cvar) = &*pair; let mut started = lock.lock().unwrap(); while !*started { started = cvar.wait(started).unwrap(); } }
// Thread B
let (lock, cvar) = &*pair;
let mut started = lock.lock().unwrap();
*started = true;
cvar.notify_one();
use std::sync::{Arc, Mutex, Condvar}; fn main() { let pair = Arc::new((Mutex::new(false), Condvar::new())); // Thread B let (lock, cvar) = &*pair; let mut started = lock.lock().unwrap(); *started = true; cvar.notify_one(); }
Други примитиви за синхронизация
Други примитиви за синхронизация
- вижте модула
std::sync
Други примитиви за синхронизация
- вижте модула
std::sync
- ако се интересувате можете да погледнете и интересна имплементация - parking_lot
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, …
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, …
- имплементират се чрез специални инструкции на процесора
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, …
- имплементират се чрез специални инструкции на процесора
- стоят в основата на много алгоритми
Атомарни числа
- AtomicBool, AtomicUsize, AtomicIsize, AtomicPtr
- AtomicU8, AtomicU16, …
- имплементират се чрез специални инструкции на процесора
- стоят в основата на много алгоритми
- удобни са за създаване на различни броячи и подобни
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото - аритметични операции:
fetch_add
,fetch_xor
, …
Атомарни числа
- препоръчително да се използват пред
Mutex<{integer}>
- интерфейса наподобява
Cell<{integer}>
, но саSend + Sync
- т.е. модифицират се през
&T
и връщат копие на числото - аритметични операции:
fetch_add
,fetch_xor
, … - oперации по паметта:
load
,store
,compare_and_swap
, …
Канали
Канали
Go-lang motto
Don't communicate by sharing memory,
share memory by communicating
Канали в стандартната библиотека
use std::sync::mpsc;
use std::thread;
fn main() {
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(10).unwrap();
});
println!("received {}", receiver.recv().unwrap());
}
received 10
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { sender.send(10).unwrap(); }); println!("received {}", receiver.recv().unwrap()); }
Типове канали
Неограничен канал
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
(Sender, Receiver)
Типове канали
Неограничен канал
- unbounded / infinitely buffered / "asynchronous"
std::sync::mpsc::channel()
(Sender, Receiver)
- изпращане на съобщение никога не блокира
Типове канали
Неограничен канал
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
assert_eq!(receiver.recv().unwrap(), 3);
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { sender.send(1).unwrap(); sender.send(2).unwrap(); sender.send(3).unwrap(); }); assert_eq!(receiver.recv().unwrap(), 1); assert_eq!(receiver.recv().unwrap(), 2); assert_eq!(receiver.recv().unwrap(), 3); }
Типове канали
Oграничен канал
- bounded / "synchronous"
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
- има буфер за
k
съобщения
Типове канали
Oграничен канал
- bounded / "synchronous"
std::sync::mpsc::sync_channel(k)
(SyncSender, Receiver)
- има буфер за
k
съобщения - изпращане на съобщения ще блокира ако буфера е пълен
Типове канали
Ограничен канал
let (sender, receiver) = mpsc::sync_channel(1);
thread::spawn(move || {
// записва съобщението и връща веднага
sender.send(1).unwrap();
// ще блокира докато главната нишка не извика `receiver.recv()`
sender.send(2).unwrap();
});
assert_eq!(receiver.recv().unwrap(), 1);
assert_eq!(receiver.recv().unwrap(), 2);
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::sync_channel(1); thread::spawn(move || { // записва съобщението и връща веднага sender.send(1).unwrap(); // ще блокира докато главната нишка не извика `receiver.recv()` sender.send(2).unwrap(); }); assert_eq!(receiver.recv().unwrap(), 1); assert_eq!(receiver.recv().unwrap(), 2); }
Множество изпращачи
let (sender, receiver) = mpsc::channel();
let sender2 = sender.clone();
thread::spawn(move || {
sender.send(1).unwrap();
sender.send(2).unwrap();
});
thread::spawn(move || {
sender2.send(3).unwrap();
sender2.send(4).unwrap();
});
println!("{} {} {} {}",
receiver.recv().unwrap(), receiver.recv().unwrap(),
receiver.recv().unwrap(), receiver.recv().unwrap());
3 4 1 2
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); let sender2 = sender.clone(); thread::spawn(move || { sender.send(1).unwrap(); sender.send(2).unwrap(); }); thread::spawn(move || { sender2.send(3).unwrap(); sender2.send(4).unwrap(); }); println!("{} {} {} {}", receiver.recv().unwrap(), receiver.recv().unwrap(), receiver.recv().unwrap(), receiver.recv().unwrap()); }
Sender
Методи
// изпраща `t`
// връща грешка ако получателят е бил унищожен
fn send(&self, t: T) -> Result<(), SendError<T>>
Sender
Методи
let (sender, receiver) = mpsc::channel();
assert_eq!(sender.send(12), Ok(()));
// унищожаваме получателя
// съобщението `12` никога няма да бъде получено
mem::drop(receiver);
// грешка - получателя е унищожен
// можем да си върнем съобщението `23` от грешката
assert_eq!(sender.send(23), Err(SendError(23)));
use std::mem; use std::sync::mpsc::{self, SendError}; fn main() { let (sender, receiver) = mpsc::channel(); assert_eq!(sender.send(12), Ok(())); // унищожаваме получателя // съобщението `12` никога няма да бъде получено mem::drop(receiver); // грешка - получателя е унищожен // можем да си върнем съобщението `23` от грешката assert_eq!(sender.send(23), Err(SendError(23))); }
SyncSender
Методи
// блокира ако буфера е пълен
fn send(&self, t: T) -> Result<(), SendError<T>>
// връща грешка ако буфера е пълен или получателят е бил унищожен
fn try_send(&self, t: T) -> Result<(), TrySendError<T>>
SyncSender
Методи
let (sender, receiver) = mpsc::sync_channel(1);
assert_eq!(sender.try_send(12), Ok(()));
assert_eq!(sender.try_send(23), Err(TrySendError::Full(23)));
mem::drop(receiver);
assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23)));
use std::mem; use std::sync::mpsc::{self, TrySendError}; fn main() { let (sender, receiver) = mpsc::sync_channel(1); assert_eq!(sender.try_send(12), Ok(())); assert_eq!(sender.try_send(23), Err(TrySendError::Full(23))); mem::drop(receiver); assert_eq!(sender.try_send(23), Err(TrySendError::Disconnected(23))); }
Множество получатели
Множество получатели
- не може - каналите са multi-producer, single-consumer
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiver
не може да се клонира
Множество получатели
- не може - каналите са multi-producer, single-consumer
Receiver
не може да се клонираReceiver
eSend
, но не еSync
Receiver
Методи
// блокира докато не получи съобщение
// връща грешка ако всички изпращачи са унищожени
fn recv(&self) -> Result<T, RecvError>
// не блокира
// връща грешка ако всички изпращачи са унищожени или няма съобщение в опашката
fn try_recv(&self) -> Result<T, TryRecvError>
// блокира за определено време
// връща грешка ако всички изпращачи са унищожени или е изтекло времето
fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>
Receiver
Методи
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
while let Ok(msg) = receiver.recv() {
println!("received {}", msg);
}
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { for i in (0..50).rev() { sender.send(i).unwrap(); } }); while let Ok(msg) = receiver.recv() { println!("received {}", msg); } }
Receiver
Итератори
let (sender, receiver) = mpsc::channel();
thread::spawn(move || {
for i in (0..50).rev() {
sender.send(i).unwrap();
}
});
for msg in receiver.iter() {
println!("received {}", msg);
}
use std::sync::mpsc; use std::thread; fn main() { let (sender, receiver) = mpsc::channel(); thread::spawn(move || { for i in (0..50).rev() { sender.send(i).unwrap(); } }); for msg in receiver.iter() { println!("received {}", msg); } }
Други структури
- crossbeam
- https://crates.io/crates/crossbeam
- MPMC channel с опция за select по няколко канала
- lock-free структури от данни - опашка, стек, deque
- scoped threads
- и доста utilities
Други библиотеки
- rayon
- https://crates.io/crates/rayon
- библиотека за паралелизъм по данни
- threadpool
- parallel iterators
- split/join