Skip to content

Commit 02b12a9

Browse files
fix: update hot tier
in case of no manifest for a particular date in the date range remove date from the range
1 parent 8b3d7a9 commit 02b12a9

File tree

3 files changed

+103
-48
lines changed

3 files changed

+103
-48
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -964,18 +964,19 @@ pub async fn put_stream_hot_tier(
964964

965965
STREAM_INFO.set_hot_tier(&stream_name, true)?;
966966
if let Some(hot_tier_manager) = HotTierManager::global() {
967-
let hottier = StreamHotTier {
967+
let mut hottier = StreamHotTier {
968968
start_date: hottier.start_date,
969969
end_date: hottier.end_date,
970970
size: hottier.size.clone(),
971971
used_size: Some("0GiB".to_string()),
972972
available_size: Some(hottier.size),
973+
updated_date_range: None,
973974
};
974975

975976
hot_tier_manager.validate(&stream_name, &hottier).await?;
976977

977978
hot_tier_manager
978-
.put_hot_tier(&stream_name, &hottier)
979+
.put_hot_tier(&stream_name, &mut hottier)
979980
.await?;
980981
let storage = CONFIG.storage().get_object_store();
981982
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;

server/src/hottier.rs

Lines changed: 99 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ pub struct StreamHotTier {
5858
pub start_date: String,
5959
#[serde(rename = "end_date")]
6060
pub end_date: String,
61+
#[serde(skip_serializing_if = "Option::is_none")]
62+
pub updated_date_range: Option<Vec<String>>,
6163
}
6264

6365
pub struct HotTierManager {
@@ -86,8 +88,11 @@ impl HotTierManager {
8688
stream: &str,
8789
stream_hot_tier: &StreamHotTier,
8890
) -> Result<(), HotTierError> {
89-
let date_list =
90-
self.get_date_list(&stream_hot_tier.start_date, &stream_hot_tier.end_date)?;
91+
let date_list = self.get_date_list(
92+
&stream_hot_tier.start_date,
93+
&stream_hot_tier.end_date,
94+
&None,
95+
)?;
9196
let object_store = CONFIG.storage().get_object_store();
9297
let s3_file_list = object_store.list_files(stream).await?;
9398
let mut manifest_list = Vec::new();
@@ -100,8 +105,6 @@ impl HotTierManager {
100105
.collect::<Vec<_>>();
101106

102107
for file in manifest_files_to_download {
103-
let path = self.hot_tier_path.join(file);
104-
fs::create_dir_all(path.parent().unwrap()).await?;
105108
let manifest_path: RelativePathBuf = RelativePathBuf::from(file);
106109
let manifest_file = object_store.get_object(&manifest_path).await?;
107110

@@ -119,16 +122,21 @@ impl HotTierManager {
119122
if human_size_to_bytes(&stream_hot_tier.size).unwrap() < total_size_to_download {
120123
return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(
121124
format!(
122-
"Total size required to download the files: {}. Provided hot tier size: {}. Not enough space in the hot tier. Please increase the hot tier size.",
125+
"Total size required to download the files: {}. Provided hot tier size: {}. Not enough space in the hot tier. Please increase the hot tier size and try again.",
123126
bytes_to_human_size(total_size_to_download),
124127
&stream_hot_tier.size
125128
),
126129
)));
127130
}
128131
if let Ok(mut existing_hot_tier) = self.get_hot_tier(stream).await {
129-
let available_date_list = self.get_hot_tier_date_list(stream).await?;
130-
self.delete_from_hot_tier(&mut existing_hot_tier, stream, &available_date_list, true)
131-
.await?;
132+
let available_date_list = self.fetch_hot_tier_dates(stream).await?;
133+
self.delete_files_from_hot_tier(
134+
&mut existing_hot_tier,
135+
stream,
136+
&available_date_list,
137+
true,
138+
)
139+
.await?;
132140
}
133141

134142
Ok(())
@@ -155,8 +163,21 @@ impl HotTierManager {
155163
pub async fn put_hot_tier(
156164
&self,
157165
stream: &str,
158-
hot_tier: &StreamHotTier,
166+
hot_tier: &mut StreamHotTier,
159167
) -> Result<(), HotTierError> {
168+
let date_list = if let Some(updated_date_range) = &hot_tier.updated_date_range {
169+
updated_date_range
170+
.iter()
171+
.map(|date| NaiveDate::parse_from_str(date, "%Y-%m-%d").unwrap())
172+
.collect()
173+
} else {
174+
self.get_date_list(&hot_tier.start_date, &hot_tier.end_date, &None)?
175+
};
176+
hot_tier.updated_date_range = date_list
177+
.iter()
178+
.map(|date| date.to_string())
179+
.collect::<Vec<String>>()
180+
.into();
160181
let path = hot_tier_file_path(&self.hot_tier_path, stream)?;
161182
let bytes = serde_json::to_vec(hot_tier)?.into();
162183
self.filesystem.put(&path, bytes).await?;
@@ -200,19 +221,16 @@ impl HotTierManager {
200221
let mut stream_hot_tier = self.get_hot_tier(&stream).await?;
201222
let mut parquet_file_size =
202223
human_size_to_bytes(stream_hot_tier.used_size.as_ref().unwrap()).unwrap();
203-
let date_list =
204-
self.get_date_list(&stream_hot_tier.start_date, &stream_hot_tier.end_date)?;
205-
let available_date_list = self.get_hot_tier_date_list(&stream).await?;
224+
let date_list = self.get_hot_tier_time_range(&stream_hot_tier).await?;
225+
let available_date_list = self.fetch_hot_tier_dates(&stream).await?;
206226
let dates_to_delete: Vec<NaiveDate> = available_date_list
207227
.into_iter()
208228
.filter(|available_date| !date_list.contains(available_date))
209229
.collect();
210-
211230
if !dates_to_delete.is_empty() {
212-
self.delete_from_hot_tier(&mut stream_hot_tier, &stream, &dates_to_delete, false)
231+
self.delete_files_from_hot_tier(&mut stream_hot_tier, &stream, &dates_to_delete, false)
213232
.await?;
214233
}
215-
216234
let object_store = CONFIG.storage().get_object_store();
217235
let s3_file_list = object_store.list_files(&stream).await?;
218236

@@ -226,7 +244,7 @@ impl HotTierManager {
226244
object_store.clone(),
227245
)
228246
.await?;
229-
self.put_hot_tier(&stream, &stream_hot_tier).await?;
247+
self.put_hot_tier(&stream, &mut stream_hot_tier).await?;
230248
}
231249

232250
Ok(())
@@ -236,15 +254,21 @@ impl HotTierManager {
236254
&self,
237255
start_date: &str,
238256
end_date: &str,
257+
updated_date_range: &Option<Vec<String>>,
239258
) -> Result<Vec<NaiveDate>, HotTierError> {
240-
let (start_date, end_date) = parse_human_date(start_date, end_date)?;
241-
let mut date_list = Vec::new();
242-
let mut current_date = start_date;
243-
244-
while current_date <= end_date {
245-
date_list.push(current_date);
246-
current_date += chrono::Duration::days(1);
247-
}
259+
let (dt_start_date, dt_end_date) = if let Some(updated_date_range) = updated_date_range {
260+
parse_human_date(
261+
updated_date_range.first().unwrap(),
262+
updated_date_range.last().unwrap(),
263+
)?
264+
} else {
265+
parse_human_date(start_date, end_date)?
266+
};
267+
268+
let date_list: Vec<NaiveDate> = (0..)
269+
.map(|i| dt_start_date + chrono::Duration::days(i))
270+
.take_while(|&date| date <= dt_end_date)
271+
.collect();
248272

249273
Ok(date_list)
250274
}
@@ -259,14 +283,18 @@ impl HotTierManager {
259283
object_store: Arc<dyn ObjectStorage + Send>,
260284
) -> Result<(), HotTierError> {
261285
let date_str = date.to_string();
262-
let available_date_list = self.get_hot_tier_date_list(stream).await?;
286+
let available_date_list = self.fetch_hot_tier_dates(stream).await?;
263287
if available_date_list.contains(&date) && !date.eq(&Utc::now().date_naive()) {
264288
return Ok(());
265289
}
266290
let manifest_files_to_download = s3_file_list
267291
.iter()
268292
.filter(|file| file.starts_with(&format!("{}/date={}", stream, date_str)))
269293
.collect::<Vec<_>>();
294+
if manifest_files_to_download.is_empty() {
295+
self.update_hot_tier_time_range(stream, stream_hot_tier, &date_str)
296+
.await?;
297+
}
270298
let mut manifest_list = Vec::new();
271299
for file in manifest_files_to_download {
272300
let path = self.hot_tier_path.join(file);
@@ -309,11 +337,10 @@ impl HotTierManager {
309337
if human_size_to_bytes(&stream_hot_tier.available_size.clone().unwrap()).unwrap()
310338
<= parquet_file.file_size
311339
{
312-
let date_list = self.get_hot_tier_date_list(stream).await?;
340+
let date_list = self.fetch_hot_tier_dates(stream).await?;
313341
let date_to_delete = vec![*date_list.first().unwrap()];
314-
self.delete_from_hot_tier(stream_hot_tier, stream, &date_to_delete, false)
342+
self.delete_files_from_hot_tier(stream_hot_tier, stream, &date_to_delete, false)
315343
.await?;
316-
self.update_hot_tier(stream_hot_tier).await?;
317344
*parquet_file_size =
318345
human_size_to_bytes(&stream_hot_tier.used_size.clone().unwrap()).unwrap();
319346
}
@@ -336,7 +363,7 @@ impl HotTierManager {
336363
Ok(())
337364
}
338365

339-
pub async fn delete_from_hot_tier(
366+
pub async fn delete_files_from_hot_tier(
340367
&self,
341368
stream_hot_tier: &mut StreamHotTier,
342369
stream: &str,
@@ -369,6 +396,8 @@ impl HotTierManager {
369396
}
370397
fs::remove_dir_all(path.clone()).await?;
371398
}
399+
self.update_hot_tier_time_range(stream, stream_hot_tier, &date.to_string())
400+
.await?;
372401
}
373402

374403
Ok(())
@@ -383,15 +412,12 @@ impl HotTierManager {
383412
let (_, end_date) =
384413
parse_human_date(&stream_hot_tier.start_date, &stream_hot_tier.end_date)?;
385414
let is_end_date_today = end_date == current_date;
386-
let available_date_list = self.get_hot_tier_date_list(stream).await?;
415+
let available_date_list = self.fetch_hot_tier_dates(stream).await?;
387416
let is_current_date_available = available_date_list.contains(&current_date);
388417
Ok(available_date_list.len() == 1 && is_current_date_available && is_end_date_today)
389418
}
390419

391-
pub async fn get_hot_tier_date_list(
392-
&self,
393-
stream: &str,
394-
) -> Result<Vec<NaiveDate>, HotTierError> {
420+
pub async fn fetch_hot_tier_dates(&self, stream: &str) -> Result<Vec<NaiveDate>, HotTierError> {
395421
let mut date_list = Vec::new();
396422
let path = self.hot_tier_path.join(stream);
397423
if path.exists() {
@@ -408,23 +434,53 @@ impl HotTierManager {
408434
);
409435
}
410436
}
437+
date_list.sort();
411438
Ok(date_list)
412439
}
413440

414-
pub async fn update_hot_tier(
441+
pub async fn update_hot_tier_time_range(
415442
&self,
443+
stream: &str,
416444
stream_hot_tier: &mut StreamHotTier,
445+
date: &str,
417446
) -> Result<(), HotTierError> {
418-
let start_date = &stream_hot_tier.start_date;
419-
let end_date = &stream_hot_tier.end_date;
420-
let mut date_list = self.get_date_list(start_date, end_date)?;
421-
422-
date_list.retain(|date: &NaiveDate| *date.to_string() != *start_date);
423-
stream_hot_tier.start_date = date_list.first().unwrap().to_string();
447+
let mut existing_date_range = stream_hot_tier.updated_date_range.as_ref().unwrap().clone();
448+
existing_date_range.retain(|d| d != date);
449+
stream_hot_tier.updated_date_range = Some(existing_date_range);
450+
self.put_hot_tier(stream, stream_hot_tier).await?;
424451
Ok(())
425452
}
426453

427-
pub async fn get_hot_tier_manifests(
454+
pub async fn get_hot_tier_time_range(
455+
&self,
456+
stream_hot_tier: &StreamHotTier,
457+
) -> Result<Vec<NaiveDate>, HotTierError> {
458+
let (start_date, end_date) =
459+
parse_human_date(&stream_hot_tier.start_date, &stream_hot_tier.end_date)?;
460+
let date_list: Vec<NaiveDate> = (0..)
461+
.map(|i| start_date + chrono::Duration::days(i))
462+
.take_while(|&date| date <= end_date)
463+
.collect();
464+
let mut existing_date_range: Vec<NaiveDate> = stream_hot_tier
465+
.updated_date_range
466+
.as_ref()
467+
.unwrap()
468+
.iter()
469+
.map(|date| NaiveDate::parse_from_str(date, "%Y-%m-%d").unwrap())
470+
.collect();
471+
existing_date_range.sort();
472+
let mut updated_date_range = vec![*date_list.last().unwrap()];
473+
updated_date_range.extend(
474+
date_list
475+
.into_iter()
476+
.filter(|date| existing_date_range.contains(date)),
477+
);
478+
updated_date_range.sort();
479+
updated_date_range.dedup();
480+
Ok(updated_date_range)
481+
}
482+
483+
pub async fn get_hot_tier_manifest_files(
428484
&self,
429485
stream: &str,
430486
manifest_files: Vec<File>,
@@ -447,9 +503,7 @@ impl HotTierManager {
447503
stream: &str,
448504
) -> Result<Vec<File>, HotTierError> {
449505
let mut hot_tier_parquet_files: Vec<File> = Vec::new();
450-
let stream_hot_tier = self.get_hot_tier(stream).await?;
451-
let date_list =
452-
self.get_date_list(&stream_hot_tier.start_date, &stream_hot_tier.end_date)?;
506+
let date_list = self.fetch_hot_tier_dates(stream).await?;
453507
for date in date_list {
454508
let date_str = date.to_string();
455509
let path = &self

server/src/query/stream_schema_provider.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ impl TableProvider for StandardTableProvider {
422422
if let Some(hot_tier_manager) = HotTierManager::global() {
423423
if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) {
424424
let (hot_tier_files, remainder) = hot_tier_manager
425-
.get_hot_tier_manifests(&self.stream, manifest_files)
425+
.get_hot_tier_manifest_files(&self.stream, manifest_files)
426426
.await
427427
.map_err(|err| DataFusionError::External(Box::new(err)))?;
428428
// Assign remaining entries back to manifest list

0 commit comments

Comments
 (0)