|
16 | 16 | * |
17 | 17 | */ |
18 | 18 |
|
| 19 | +use std::thread; |
| 20 | + |
19 | 21 | use super::ingest::ingestor_logstream; |
20 | 22 | use super::ingest::ingestor_rbac; |
21 | 23 | use super::ingest::ingestor_role; |
@@ -54,7 +56,6 @@ use once_cell::sync::Lazy; |
54 | 56 | use relative_path::RelativePathBuf; |
55 | 57 | use serde_json::Value; |
56 | 58 | use tokio::sync::oneshot; |
57 | | -use tracing::error; |
58 | 59 | use tracing::info; |
59 | 60 |
|
60 | 61 | /// Metadata associated with this ingestor server |
@@ -199,65 +200,21 @@ impl ParseableServer for IngestServer { |
199 | 200 |
|
200 | 201 | migration::run_migration(&CONFIG).await?; |
201 | 202 |
|
202 | | - let (localsync_handler, mut localsync_outbox, localsync_inbox) = |
203 | | - sync::run_local_sync().await; |
204 | | - let (mut remote_sync_handler, mut remote_sync_outbox, mut remote_sync_inbox) = |
205 | | - sync::object_store_sync().await; |
206 | | - let ( |
207 | | - mut remote_conversion_handler, |
208 | | - mut remote_conversion_outbox, |
209 | | - mut remote_conversion_inbox, |
210 | | - ) = sync::arrow_conversion().await; |
| 203 | + // Run sync on a background thread |
| 204 | + let (cancel_tx, cancel_rx) = oneshot::channel(); |
| 205 | + thread::spawn(|| sync::handler(cancel_rx)); |
211 | 206 |
|
212 | 207 | tokio::spawn(airplane::server()); |
213 | 208 |
|
214 | 209 | // set the ingestor metadata |
215 | 210 | set_ingestor_metadata().await?; |
216 | 211 |
|
217 | 212 | // Ingestors shouldn't have to deal with OpenId auth flow |
218 | | - let app = self.start(shutdown_rx, prometheus.clone(), None); |
219 | | - |
220 | | - tokio::pin!(app); |
221 | | - loop { |
222 | | - tokio::select! { |
223 | | - e = &mut app => { |
224 | | - // actix server finished .. stop other threads and stop the server |
225 | | - remote_sync_inbox.send(()).unwrap_or(()); |
226 | | - localsync_inbox.send(()).unwrap_or(()); |
227 | | - remote_conversion_inbox.send(()).unwrap_or(()); |
228 | | - if let Err(e) = localsync_handler.await { |
229 | | - error!("Error joining remote_sync_handler: {:?}", e); |
230 | | - } |
231 | | - if let Err(e) = remote_sync_handler.await { |
232 | | - error!("Error joining remote_sync_handler: {:?}", e); |
233 | | - } |
234 | | - if let Err(e) = remote_conversion_handler.await { |
235 | | - error!("Error joining remote_conversion_handler: {:?}", e); |
236 | | - } |
237 | | - return e |
238 | | - }, |
239 | | - _ = &mut localsync_outbox => { |
240 | | - // crash the server if localsync fails for any reason |
241 | | - // panic!("Local Sync thread died. Server will fail now!") |
242 | | - return Err(anyhow::Error::msg("Failed to sync local data to drive. Please restart the Parseable server.\n\nJoin us on Parseable Slack if the issue persists after restart : https://launchpass.com/parseable")) |
243 | | - }, |
244 | | - _ = &mut remote_sync_outbox => { |
245 | | - // remote_sync failed, this is recoverable by just starting remote_sync thread again |
246 | | - if let Err(e) = remote_sync_handler.await { |
247 | | - error!("Error joining remote_sync_handler: {:?}", e); |
248 | | - } |
249 | | - (remote_sync_handler, remote_sync_outbox, remote_sync_inbox) = sync::object_store_sync().await; |
250 | | - }, |
251 | | - _ = &mut remote_conversion_outbox => { |
252 | | - // remote_conversion failed, this is recoverable by just starting remote_conversion thread again |
253 | | - if let Err(e) = remote_conversion_handler.await { |
254 | | - error!("Error joining remote_conversion_handler: {:?}", e); |
255 | | - } |
256 | | - (remote_conversion_handler, remote_conversion_outbox, remote_conversion_inbox) = sync::arrow_conversion().await; |
257 | | - } |
258 | | - |
259 | | - } |
260 | | - } |
| 213 | + let result = self.start(shutdown_rx, prometheus.clone(), None).await; |
| 214 | + // Cancel sync jobs |
| 215 | + cancel_tx.send(()).expect("Cancellation should not fail"); |
| 216 | + |
| 217 | + result |
261 | 218 | } |
262 | 219 | } |
263 | 220 |
|
|
0 commit comments