Skip to content

Commit 65bca2f

Browse files
committed
Updated with tokio usage
1 parent a9884d4 commit 65bca2f

File tree

3 files changed

+224
-16
lines changed

3 files changed

+224
-16
lines changed

Lesson_12/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,3 +4,5 @@ version = "0.1.0"
44
edition = "2024"
55

66
[dependencies]
7+
rand = "0.9.1"
8+
tokio = { version = "1.45.0", features = ["full"] }

Lesson_12/README.MD

Lines changed: 193 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,4 +93,196 @@ pub fn multi_producer() {
9393
}
9494
```
9595

96-
Bu örnekte 10 farklı **thread** kanala mesaj bırakır. Thread'ler **spawn** çağırısı ile ayağa kaldırılmadan önce **transmitter** nesnesinin bir klonunun oluşturulduğunda dikkat edilmelidir. Her bir **thread** kendi **transmitter** klonunu kullanarak kanala mesaj bırakır. Mesajlar kanala senkron sırada bırakılır. İlerleyen satırlarda bir **for** döngüsü ile kanala gelen mesajların **Receiver** nesnesi üzerinden yakalanması işlemi gerçekleştirilir. Dikkat edilmesi gereken noktalardan birisi de **drop** çağrısıdır. Açık bir şekilde **transmitter** nesnesi açıkça **drop** edilmiştir. Bu yapılmadığı durumda receiver dan mesajlar dinlenmeye devam eder ve program sonlanmaz. Bu bazı durumlarda zaten istenen bir durumdur. Sürekli dinlemede kalması gereken bir receiver gerektiren senaryolar buna örnek verilebilir.
96+
Bu örnekte 10 farklı **thread** kanala mesaj bırakır. Thread'ler **spawn** çağırısı ile ayağa kaldırılmadan önce **transmitter** nesnesinin bir klonunun oluşturulduğunda dikkat edilmelidir. Her bir **thread** kendi **transmitter** klonunu kullanarak kanala mesaj bırakır. Mesajlar kanala senkron sırada bırakılır. İlerleyen satırlarda bir **for** döngüsü ile kanala gelen mesajların **Receiver** nesnesi üzerinden yakalanması işlemi gerçekleştirilir. Dikkat edilmesi gereken noktalardan birisi de **drop** çağrısıdır. Açık bir şekilde **transmitter** nesnesi açıkça **drop** edilmiştir. Bu yapılmadığı durumda receiver dan mesajlar dinlenmeye devam eder ve program sonlanmaz. Zire klonlanan receiver örnekler halen daha yaşamaktadır. Bazı durumlarda zaten istenen bir durumdur. Sürekli dinlemede kalması gereken bir receiver gerektiren senaryolar buna örnek verilebilir. Farklı bir örnekle devam edip otomatik kapanma durumunu ele alalım.
97+
98+
```rust
99+
use std::sync::mpsc::channel;
100+
use std::thread;
101+
use std::time::Duration;
102+
103+
fn main() {
104+
multi_producer();
105+
}
106+
107+
pub fn multi_producer() {
108+
let (transmitter, receiver) = channel();
109+
let transmitter_clone = transmitter.clone();
110+
111+
thread::spawn(move || {
112+
transmitter.send(String::from("Hello there!")).unwrap();
113+
thread::sleep(Duration::from_secs(2));
114+
});
115+
116+
thread::spawn(move || {
117+
transmitter_clone
118+
.send(String::from("Why are you so serious!"))
119+
.unwrap();
120+
});
121+
122+
for received in receiver {
123+
println!("Incoming message: {}", received);
124+
}
125+
126+
println!("End of program");
127+
}
128+
```
129+
130+
Bu örnekte transmitter ve transmitter_clone nesneleri tanımlandıkları thread'ler sonlandığında düşürülürler ve dolayısıyla receiver üzerinden yakalanacak kanal mesajlarının sayısı bellidir. Dolayısıyla program beklendiği şekilde tüm kanal mesajları işlendikten sonra sonlanır.
131+
132+
## Örnek Senaryo
133+
134+
Kanal kullanımları ile ilgili örnek bir senaryo ele alalım. Bu senaryoda sistemdeki n sayıda rapor dosyasının n thread ile işlenmesi söz konusudur. Her bir thread ele aldığı dosyayı işledikten sonra kanal üzerine bilgi bırakır. En sonunda tüm bu bilgiler receiver üzerinden toplanır. İlk versiyonda standart kütüphanenin Sender ve Receiver yapıları kullanılmaktadır.
135+
136+
```rust
137+
use std::sync::mpsc::channel;
138+
use std::thread;
139+
use std::time::Duration;
140+
141+
use rand::Rng;
142+
143+
fn main() {
144+
process_reports();
145+
}
146+
147+
pub fn process_reports() {
148+
let (transmitter, receiver) = channel();
149+
150+
let reports = [
151+
"salary.json",
152+
"invoices.json",
153+
"summary.json",
154+
"personnel.json",
155+
];
156+
157+
for report in reports {
158+
let transmitter = transmitter.clone();
159+
thread::spawn(move || {
160+
let mut rng = rand::thread_rng();
161+
let sleep_time = rng.gen_range(2..=5);
162+
transmitter
163+
.send(format!("Processing '{}' report...", report))
164+
.unwrap();
165+
166+
// Rapor dosyalarının işlendiği bazı business'lar çağırdığımızı düşünelim
167+
168+
thread::sleep(Duration::from_secs(sleep_time));
169+
170+
transmitter
171+
.send(format!(
172+
"Finished processing '{}' in {} seconds",
173+
report, sleep_time
174+
))
175+
.unwrap();
176+
});
177+
}
178+
179+
drop(transmitter);
180+
println!("Started the processing reports");
181+
for result in receiver {
182+
println!("Status {}", result);
183+
}
184+
println!("Completed the processing reports");
185+
}
186+
```
187+
188+
Her dosya sıralı bir şekilde döngüye girer ve herbirisi için ayrı bir **thread** açılır. Bu thread'lerde dosyalar ile ilgili farklı iş süreçlerinin işletildiğini düşünebiliriz. Dosya işleyişlerinin farklı sürelerde gerçekleştiğini simüle etmek için rand kütüphanesi ile üretilen rastgele değerlerde beklemeler yapılmaktadır.
189+
190+
## Asenkronluk
191+
192+
Rust'ın MPSC modülü aslında gerçek anlamda bir asenkronluk sağlamaz. Bir başka deyişle Sender'dan mesajlar asenkron olarak ilerletilebilse de Receiver tarafı bunları senkron olarak işler. Tam bir asenkron çalışma sağlayabilmek için yardımcı küfelerden _(crates)_ yararlanılabilir. Aşağıdaki ilk senaryoda ana thread'in bloklanmasına neden olan bir döngü kullanımı söz konusudur.
193+
194+
```rust
195+
use std::sync::mpsc::channel;
196+
use std::thread;
197+
use std::time::Duration;
198+
199+
fn main() {
200+
do_with_standard();
201+
}
202+
203+
pub fn do_with_standard() {
204+
let (transmitter, receiver) = channel();
205+
206+
for i in 1..=5 {
207+
let tx_clone = transmitter.clone();
208+
thread::spawn(move || {
209+
thread::sleep(Duration::from_secs(5));
210+
tx_clone.send(format!("Task {} completed", i)).unwrap();
211+
});
212+
}
213+
214+
drop(transmitter);
215+
216+
println!("Waiting for all tasks...");
217+
218+
// Aşağıdaki döngü main thread üzerine çalışıp buradaki akışı bloklamaya neden olacak
219+
for i in 0..10 {
220+
thread::sleep(Duration::from_secs(1));
221+
println!("Main task is working...Counting {}", i);
222+
}
223+
224+
while let Ok(message) = receiver.recv() {
225+
println!("{}", message);
226+
}
227+
228+
println!("All tasks completed!");
229+
}
230+
```
231+
232+
// ÇALIŞMA ZAMANI GÖRÜNTÜSÜ EKLENECEK
233+
234+
Aynı örneği **tokio** küfesini kullanarak ele aldığımızda ise ana thread'in bloklanmadan for döngüsünün işletildiğini görebiliriz. Bu receiver tarafında mesajların asenkron ele alınabildiğinin de ispatıdır. tokio küfesini kullanmak için **Full feature** seti ile eklenmesi gerekmektedir.
235+
236+
```bash
237+
cargo add tokio -F full
238+
```
239+
240+
Örnek kod;
241+
242+
```rust
243+
use std::time::Duration;
244+
use tokio::sync::mpsc;
245+
246+
#[tokio::main]
247+
pub async fn main() {
248+
do_with_tokio().await;
249+
}
250+
251+
pub async fn do_with_tokio() {
252+
let (transmitter, mut receiver) = mpsc::channel(10);
253+
254+
for i in 1..=5 {
255+
let tx_clone = transmitter.clone();
256+
tokio::spawn(async move {
257+
tokio::time::sleep(Duration::from_secs(5)).await;
258+
tx_clone
259+
.send(format!("Task {} completed", i))
260+
.await
261+
.unwrap();
262+
});
263+
}
264+
265+
drop(transmitter);
266+
267+
println!("Waiting for all tasks...");
268+
269+
/*
270+
Standart mpsc örneğinden farklı olarak burada ana thread bloklanmadan
271+
döngünün asenkron olarak çalıştırılması sağlanır.
272+
*/
273+
tokio::spawn(async {
274+
for i in 0..10 {
275+
tokio::time::sleep(Duration::from_secs(1)).await;
276+
println!("Main task is working...Counting {}", i);
277+
}
278+
});
279+
280+
while let Some(message) = receiver.recv().await {
281+
println!("{}", message);
282+
}
283+
284+
println!("All tasks completed!");
285+
}
286+
```
287+
288+
// ÇALIŞMA ZAMANI ÇIKTISI EKLENECEK

Lesson_12/src/main.rs

Lines changed: 29 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,43 @@
1-
use std::sync::mpsc::channel;
2-
use std::thread;
31
use std::time::Duration;
2+
use tokio::sync::mpsc;
43

5-
fn main() {
6-
multi_producer();
4+
#[tokio::main]
5+
pub async fn main() {
6+
do_with_tokio().await;
77
}
88

9-
pub fn multi_producer() {
10-
let (transmitter, receiver) = channel();
9+
pub async fn do_with_tokio() {
10+
let (transmitter, mut receiver) = mpsc::channel(10);
1111

12-
for i in 0..10 {
13-
let transmitter_clone = transmitter.clone();
14-
thread::spawn(move || {
15-
transmitter_clone
16-
.send(format!("Sending message is {}", i))
12+
for i in 1..=5 {
13+
let tx_clone = transmitter.clone();
14+
tokio::spawn(async move {
15+
tokio::time::sleep(Duration::from_secs(5)).await;
16+
tx_clone
17+
.send(format!("Task {} completed", i))
18+
.await
1719
.unwrap();
18-
thread::sleep(Duration::from_secs(2));
1920
});
2021
}
2122

2223
drop(transmitter);
2324

24-
for received in receiver {
25-
println!("Incoming message is '{}'", received);
25+
println!("Waiting for all tasks...");
26+
27+
/*
28+
Standart mpsc örneğinden farklı olarak burada ana thread bloklanmadan
29+
döngünün asenkron olarak çalıştırılması sağlanır.
30+
*/
31+
tokio::spawn(async {
32+
for i in 0..10 {
33+
tokio::time::sleep(Duration::from_secs(1)).await;
34+
println!("Main task is working...Counting {}", i);
35+
}
36+
});
37+
38+
while let Some(message) = receiver.recv().await {
39+
println!("{}", message);
2640
}
2741

28-
println!("End of program");
42+
println!("All tasks completed!");
2943
}

0 commit comments

Comments
 (0)