Skip to content

Commit 5b0eb89

Browse files
committed
wip
1 parent f8969d2 commit 5b0eb89

File tree

2 files changed

+98
-26
lines changed

2 files changed

+98
-26
lines changed

examples/pagerank.rs

Lines changed: 60 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,25 @@ use timely::logging::{LogManager, LoggerConfig};
1515

1616
use timely::logging::LogFilter;
1717

18+
fn ridiculous_shuffle(from: u64) -> u64 {
19+
from
20+
// if from % 16 < 8 {
21+
// 0
22+
// } else {
23+
// (from % 16) - 7
24+
// }
25+
}
26+
1827
fn main() {
1928

2029
let mut log_manager = LogManager::new();
2130
let logger_config = LoggerConfig::new(&mut log_manager);
2231

23-
log_manager.workers().to_tcp_socket();
24-
log_manager.comms().to_tcp_socket();
32+
let start = time::precise_time_ns();
33+
34+
// let buf: Option<::std::sync::Arc<::std::sync::Mutex<Vec<u8>>>> = None;
35+
let buf: Option<Vec<::std::sync::Arc<::std::sync::Mutex<Vec<u8>>>>> = Some(log_manager.workers().to_bufs());
36+
// log_manager.workers().to_tcp_socket();
2537

2638
timely::execute_from_args_logging(std::env::args().skip(3), logger_config, move |worker| {
2739

@@ -39,8 +51,8 @@ fn main() {
3951
// bring edges and ranks together!
4052
let changes = edge_stream.binary_frontier(
4153
&rank_stream,
42-
Exchange::new(|x: &((usize, usize), i64)| (x.0).0 as u64),
43-
Exchange::new(|x: &(usize, i64)| x.0 as u64),
54+
Exchange::new(|x: &((usize, usize), i64)| ridiculous_shuffle((x.0).0 as u64)),
55+
Exchange::new(|x: &(usize, i64)| ridiculous_shuffle(x.0 as u64)),
4456
"PageRank",
4557
|_capability| {
4658

@@ -175,42 +187,64 @@ fn main() {
175187
worker.step();
176188
}
177189

190+
let mut prev_time = time::precise_time_ns();
178191
for i in 1 .. 1000 {
179192
input.send(((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)), 1));
180193
input.send(((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)), -1));
181194
input.advance_to(1 + i);
182195
while probe.less_than(input.time()) {
183196
worker.step();
184197
}
198+
// eprintln!("EPOCH TIME {}", (time::precise_time_ns() - prev_time) as f64 / 1_000_000_000f64);
199+
prev_time = time::precise_time_ns();
185200
}
186201

187202
}).unwrap(); // asserts error-free execution;
188-
}
189203

190-
fn compact<T: Ord>(list: &mut Vec<(T, i64)>) {
191-
if list.len() > 0 {
192-
list.sort_by(|x,y| x.0.cmp(&y.0));
193-
for i in 0 .. list.len() - 1 {
194-
if list[i].0 == list[i+1].0 {
195-
list[i+1].1 += list[i].1;
196-
list[i].1 = 0;
204+
eprintln!("duration: {} sec", (time::precise_time_ns() - start) as f64 / 1_000_000_000f64);
205+
if let Some(buf) = buf {
206+
eprintln!("computation done, writing to socket");
207+
use std::io::Write;
208+
//let mut file = ::std::fs::File::create("log.out").expect("cannot create logging file");
209+
let mut threads = Vec::new();
210+
for v in buf.into_iter() {
211+
threads.push(std::thread::spawn(move || {
212+
let target: String = ::std::env::var("TIMELY_LOG_TARGET").expect("no $TIMELY_LOG_TARGET, e.g. 127.0.0.1:34254");
213+
let mut writer = std::io::BufWriter::with_capacity(4096, std::net::TcpStream::connect(target).expect("failed to connect to logging destination"));
214+
eprintln!("buf len: {}", v.lock().unwrap().len());
215+
writer.write(&(*v.lock().unwrap())[..]).expect("ABASDF");
216+
}));
217+
}
218+
for t in threads.into_iter() {
219+
t.join().unwrap();
197220
}
198221
}
199-
list.retain(|x| x.1 != 0);
200-
}
201-
}
222+
}
223+
224+
fn compact<T: Ord>(list: &mut Vec<(T, i64)>) {
225+
if list.len() > 0 {
226+
list.sort_by(|x,y| x.0.cmp(&y.0));
227+
for i in 0 .. list.len() - 1 {
228+
if list[i].0 == list[i+1].0 {
229+
list[i+1].1 += list[i].1;
230+
list[i].1 = 0;
231+
}
232+
}
233+
list.retain(|x| x.1 != 0);
234+
}
235+
}
202236

203-
// this method allocates some rank between elements of `edges`.
204-
fn allocate(rank: i64, edges: &[(usize, i64)], send: &mut Vec<(usize, i64)>) {
205-
if edges.len() > 0 {
206-
assert!(rank >= 0);
207-
assert!(edges.iter().all(|x| x.1 > 0));
208-
209-
let degree: i64 = edges.iter().map(|x| x.1 as i64).sum();
210-
let share = ((rank * 5) / 6) / degree;
211-
for i in 0 .. edges.len() {
212-
if (i as i64) < (share % (edges.len() as i64)) {
213-
send.push((edges[i].0, edges[i].1 * (share + 1)));
237+
// this method allocates some rank between elements of `edges`.
238+
fn allocate(rank: i64, edges: &[(usize, i64)], send: &mut Vec<(usize, i64)>) {
239+
if edges.len() > 0 {
240+
assert!(rank >= 0);
241+
assert!(edges.iter().all(|x| x.1 > 0));
242+
243+
let degree: i64 = edges.iter().map(|x| x.1 as i64).sum();
244+
let share = ((rank * 5) / 6) / degree;
245+
for i in 0 .. edges.len() {
246+
if (i as i64) < (share % (edges.len() as i64)) {
247+
send.push((edges[i].0, edges[i].1 * (share + 1)));
214248
}
215249
else {
216250
send.push((edges[i].0, edges[i].1 * share));

src/logging.rs

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,29 @@ impl LogManager {
7979
}
8080
}
8181

82+
struct SharedVec {
83+
inner: Arc<Mutex<Vec<u8>>>,
84+
}
85+
86+
impl SharedVec {
87+
pub fn new(inner: Arc<Mutex<Vec<u8>>>) -> Self {
88+
SharedVec {
89+
inner: inner,
90+
}
91+
}
92+
}
93+
94+
impl Write for SharedVec {
95+
fn write(&mut self, data: &[u8]) -> Result<usize, ::std::io::Error> {
96+
self.inner.lock().unwrap().extend_from_slice(data);
97+
Ok(data.len())
98+
}
99+
100+
fn flush(&mut self) -> Result<(), ::std::io::Error> {
101+
Ok(())
102+
}
103+
}
104+
82105
struct SharedEventWriter<T, D, W: Write> {
83106
inner: Mutex<EventWriter<T, D, W>>,
84107
}
@@ -116,6 +139,21 @@ impl FilteredLogManager<EventsSetup, LogEvent> {
116139

117140
self.log_manager.lock().unwrap().add_timely_subscription(self.filter.clone(), pusher);
118141
}
142+
143+
/// TODO(andreal)
144+
pub fn to_bufs(&mut self) -> Vec<Arc<Mutex<Vec<u8>>>> {
145+
let mut vecs = Vec::new();
146+
147+
for i in 0..4 {
148+
let buf = Arc::new(Mutex::new(Vec::<u8>::with_capacity(4_000_000_000)));
149+
let writer = SharedEventWriter::new(SharedVec::new(buf.clone()));
150+
let pusher: Arc<EventPusher<Product<RootTimestamp, u64>, LogMessage>+Send+Sync> = Arc::new(writer);
151+
self.log_manager.lock().unwrap().add_timely_subscription(Arc::new(move |s| s.index == i), pusher);
152+
vecs.push(buf);
153+
}
154+
155+
vecs
156+
}
119157
}
120158

121159
impl FilteredLogManager<CommsSetup, CommsEvent> {

0 commit comments

Comments
 (0)