Skip to content

Commit d418218

Browse files
limit the age gap between events to 1 hr
1 parent f0e7a17 commit d418218

File tree

2 files changed

+58
-10
lines changed

2 files changed

+58
-10
lines changed

src/cli.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,17 @@ pub struct Options {
475475
help = "OIDC scope to request (default: openid profile email)"
476476
)]
477477
pub scope: String,
478+
479+
// event's maximum chunk age in hours
480+
#[arg(
481+
long,
482+
env = "P_EVENT_MAX_CHUNK_AGE",
483+
// Accept 0 to disallow older-than-reference events; cap to one week by default.
484+
value_parser = clap::value_parser!(u64).range(0..=168),
485+
default_value = "1",
486+
help = "Max allowed age gap (in hours) between events within the same node, relative to the reference event"
487+
)]
488+
pub event_max_chunk_age: u64,
478489
}
479490

480491
#[derive(Parser, Debug)]

src/utils/json/flatten.rs

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
use std::collections::BTreeMap;
2020
use std::num::NonZeroU32;
21+
use std::sync::Mutex;
2122

2223
use chrono::{DateTime, Duration, Utc};
2324
use serde_json::map::Map;
@@ -27,6 +28,9 @@ use thiserror::Error;
2728

2829
use crate::parseable::PARSEABLE;
2930

31+
// Global variable to track the first timestamp encountered during validation
32+
static REFERENCE_TIMESTAMP: Mutex<Option<DateTime<Utc>>> = Mutex::new(None);
33+
3034
#[derive(Error, Debug)]
3135
pub enum JsonFlattenError {
3236
#[error("Cannot flatten this JSON")]
@@ -45,8 +49,12 @@ pub enum JsonFlattenError {
4549
FieldNotString(String),
4650
#[error("Field {0} is not in the correct datetime format")]
4751
InvalidDatetimeFormat(String),
48-
#[error("Field {0} value is more than {1} days old")]
49-
TimestampTooOld(String, i64),
52+
#[error("Field {0} value '{2}' is more than {1} days old")]
53+
TimestampTooOld(String, i64, DateTime<Utc>),
54+
#[error(
55+
"Field {0} timestamp '{2}' is more than {1} hours older than reference timestamp '{3}'"
56+
)]
57+
TimestampTooOldRelative(String, i64, DateTime<Utc>, DateTime<Utc>),
5058
#[error("Expected object in array of objects")]
5159
ExpectedObjectInArray,
5260
#[error("Found non-object element while flattening array of objects")]
@@ -169,14 +177,43 @@ pub fn validate_time_partition(
169177
partition_key.to_owned(),
170178
));
171179
};
172-
let cutoff_date = Utc::now().naive_utc() - Duration::days(limit_days);
173-
if parsed_timestamp.naive_utc() >= cutoff_date {
174-
Ok(())
175-
} else {
176-
Err(JsonFlattenError::TimestampTooOld(
177-
partition_key.to_owned(),
178-
limit_days,
179-
))
180+
181+
// Access the global reference timestamp and handle poisoning
182+
let mut reference_timestamp = REFERENCE_TIMESTAMP
183+
.lock()
184+
.unwrap_or_else(|p| p.into_inner());
185+
186+
match *reference_timestamp {
187+
None => {
188+
// First timestamp encountered - validate against cutoff date
189+
let cutoff_ts = Utc::now() - Duration::days(limit_days);
190+
if parsed_timestamp >= cutoff_ts {
191+
// Set the reference timestamp
192+
*reference_timestamp = Some(parsed_timestamp);
193+
Ok(())
194+
} else {
195+
Err(JsonFlattenError::TimestampTooOld(
196+
partition_key.to_owned(),
197+
limit_days,
198+
parsed_timestamp,
199+
))
200+
}
201+
}
202+
Some(ref_timestamp) => {
203+
// Subsequent timestamps - validate they're not more than configured hours older than reference
204+
let max_age_hours = PARSEABLE.options.event_max_chunk_age as i64;
205+
let max_age_before_ref = ref_timestamp - Duration::hours(max_age_hours);
206+
if parsed_timestamp >= max_age_before_ref {
207+
Ok(())
208+
} else {
209+
Err(JsonFlattenError::TimestampTooOldRelative(
210+
partition_key.to_owned(),
211+
max_age_hours,
212+
parsed_timestamp,
213+
ref_timestamp,
214+
))
215+
}
216+
}
180217
}
181218
}
182219

0 commit comments

Comments
 (0)