Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion quickwit-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ quickwit-storage = { version = "0.2.1", path = "../quickwit-storage" }
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] }
rand = "0.8"
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="46d5de9", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="447811c", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
futures = "0.3"
futures-util = { version = "0.3.1", default-features = false }
uuid = "0.8"
Expand Down
4 changes: 2 additions & 2 deletions quickwit-directories/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ futures = "0.3"
serde = "1"
serde_cbor = "0.11"
serde_json = "1"
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="46d5de9", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="447811c", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
quickwit-storage = { version = "0.2.1", path = "../quickwit-storage" }
uuid = "0.8"
once_cell = "1"
Expand All @@ -25,7 +25,7 @@ tracing = "0.1.29"
thiserror = "1"
anyhow = "1"
async-trait = "0.1"
chrono = "0.4"
time = { version = "0.3.7", features = ["std"] }

[dev-dependencies]
tempfile = '3'
Expand Down
10 changes: 5 additions & 5 deletions quickwit-directories/src/debug_proxy_directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use std::time::{Duration, Instant};
use std::{fmt, io, mem};

use async_trait::async_trait;
use tantivy::chrono::{DateTime, Utc};
use tantivy::directory::error::OpenReadError;
use tantivy::directory::{FileHandle, OwnedBytes};
use tantivy::{Directory, HasLen};
use time::OffsetDateTime;

use crate::StorageDirectory;

Expand Down Expand Up @@ -63,14 +63,14 @@ pub struct ReadOperation {
pub offset: usize,
/// The number of bytes fetched
pub num_bytes: usize,
/// The date at which the operation was performed.
pub start_date: DateTime<Utc>,
/// The date at which the operation was performed (UTC timezone).
pub start_date: OffsetDateTime,
/// The elapsed time to run the read operatioon.
pub duration: Duration,
}

struct ReadOperationBuilder {
start_date: DateTime<Utc>,
start_date: OffsetDateTime,
start_instant: Instant,
path: PathBuf,
offset: usize,
Expand All @@ -79,7 +79,7 @@ struct ReadOperationBuilder {
impl ReadOperationBuilder {
pub fn new(path: &Path) -> Self {
let start_instant = Instant::now();
let start_date = Utc::now();
let start_date = OffsetDateTime::now_utc();
ReadOperationBuilder {
start_date,
start_instant,
Expand Down
4 changes: 2 additions & 2 deletions quickwit-doc-mapper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ once_cell = "1.10"
regex = "1"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tantivy = { git = "https://github.com/quickwit-oss/tantivy", rev = "46d5de9", default-features = false, features = ["mmap", "lz4-compression", "quickwit"] }
tantivy-query-grammar = { git = "https://github.com/quickwit-oss/tantivy/", rev = "46d5de9" }
tantivy = { git = "https://github.com/quickwit-oss/tantivy", rev = "447811c", default-features = false, features = ["mmap", "lz4-compression", "quickwit"] }
tantivy-query-grammar = { git = "https://github.com/quickwit-oss/tantivy/", rev = "447811c" }
thiserror = "1.0"
tracing = "0.1.29"
typetag = "0.1"
Expand Down
9 changes: 8 additions & 1 deletion quickwit-doc-mapper/src/default_doc_mapper/default_mapper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ mod tests {
{
"timestamp": 1586960586000,
"body": "20200415T072306-0700 INFO This is a great log",
"response_date2": "2021-12-19T16:39:57+00:00",
"response_date": "2021-12-19T16:39:57Z",
"response_time": 2.3,
"response_payload": "YWJj",
Expand All @@ -490,7 +491,7 @@ mod tests {
const EXPECTED_JSON_PATHS_AND_VALUES: &str = r#"{
"timestamp": [1586960586000],
"body": ["20200415T072306-0700 INFO This is a great log"],
"response_date": ["2021-12-19T16:39:57+00:00"],
"response_date": ["2021-12-19T16:39:57Z"],
"response_time": [2.3],
"response_payload": [[97,98,99]],
"owner": ["foo"],
Expand Down Expand Up @@ -567,6 +568,12 @@ mod tests {
.iter()
.map(|expected_value| format!("{}", expected_value))
.any(|expected_value| expected_value == value);
if !is_value_in_expected_values {
panic!(
"Could not find: {:?} in {:?}",
value, expected_json_paths_and_values
);
}
assert!(is_value_in_expected_values);
}
});
Expand Down
31 changes: 18 additions & 13 deletions quickwit-doc-mapper/src/default_doc_mapper/field_mapping_entry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@
use std::convert::TryFrom;

use anyhow::bail;
use chrono::{FixedOffset, Utc};
use itertools::{process_results, Itertools};
use serde::{Deserialize, Serialize};
use serde_json::{self, Value as JsonValue};
use tantivy::schema::{
BytesOptions, Cardinality, DocParsingError as TantivyDocParser, FieldType, IndexRecordOption,
NumericOptions, TextFieldIndexing, TextOptions, Value,
};
use tantivy::time::format_description::well_known::Rfc3339;
use tantivy::time::OffsetDateTime;
use tantivy::DateTime;
use thiserror::Error;

use super::{default_as_true, FieldMappingType};
Expand Down Expand Up @@ -335,17 +337,16 @@ impl FieldMappingEntry {
)?
}
JsonValue::String(value_as_str) => {
let dt_with_fixed_tz: chrono::DateTime<FixedOffset> =
chrono::DateTime::parse_from_rfc3339(&value_as_str).map_err(|err| {
let date_time_utc = DateTime::new_utc(
OffsetDateTime::parse(&value_as_str, &Rfc3339).map_err(|err| {
DocParsingError::ValueError(
self.name.clone(),
format!("Expected RFC 3339 date, got '{}'. {:?}", value_as_str, err),
)
})?;
vec![(
FieldPath::new(&self.name),
Value::Date(dt_with_fixed_tz.with_timezone(&Utc)),
)]
})?,
);

vec![(FieldPath::new(&self.name), Value::Date(date_time_utc))]
}
JsonValue::Null => {
vec![]
Expand Down Expand Up @@ -747,10 +748,11 @@ impl From<TantivyDocParser> for DocParsingError {
#[cfg(test)]
mod tests {
use anyhow::bail;
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, TimeZone, Utc};
use matches::matches;
use serde_json::json;
use tantivy::schema::{Cardinality, Value};
use tantivy::time::{Date, Month, PrimitiveDateTime, Time};
use tantivy::DateTime;

use super::FieldMappingEntry;
use crate::default_doc_mapper::FieldMappingType;
Expand Down Expand Up @@ -1332,11 +1334,14 @@ mod tests {

// Successful parsing
let parsed_value = entry.parse(json!("2021-12-19T16:39:57-01:00"))?;
let datetime = NaiveDateTime::new(
NaiveDate::from_ymd(2021, 12, 19),
NaiveTime::from_hms(17, 39, 57),

let datetime = PrimitiveDateTime::new(
Date::from_calendar_date(2021, Month::December, 19).unwrap(),
Time::from_hms(17, 39, 57).unwrap(),
);
let datetime_utc = Utc.from_utc_datetime(&datetime);
// let datetime = datetime!(2021-12-19 17:39:57);

let datetime_utc = DateTime::new_primitive(datetime); // Utc.from_utc_datetime(&datetime);
assert_eq!(parsed_value.len(), 1);
assert_eq!(parsed_value[0].1, Value::Date(datetime_utc));

Expand Down
4 changes: 3 additions & 1 deletion quickwit-indexing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@ rusoto_kinesis = { version = "0.47", default-features = false, features = ["rust
serde = "1"
serde_json = "1"
serde_yaml = "0.8"
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="46d5de9", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="447811c", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
tempfile = "3.3"
thiserror = "1"
tokio = { version = "1", features = ["sync"] }
tracing = "0.1.29"
ulid = "0.5"
tokio-stream = "0.1"
arc-swap = "1.4"
time = { version = "0.3.7", features = ["std"] }


[features]
kafka = ["rdkafka"]
Expand Down
4 changes: 2 additions & 2 deletions quickwit-indexing/src/actors/uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_metastore::{Metastore, SplitMetadata};
use quickwit_storage::SplitPayloadBuilder;
use tantivy::chrono::Utc;
use time::OffsetDateTime;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::{info, info_span, warn, Instrument, Span};

Expand Down Expand Up @@ -194,7 +194,7 @@ fn create_split_metadata(split: &PackagedSplit, footer_offsets: Range<u64>) -> S
num_docs: split.num_docs as usize,
time_range: split.time_range.clone(),
original_size_in_bytes: split.size_in_bytes,
create_timestamp: Utc::now().timestamp(),
create_timestamp: OffsetDateTime::now_utc().unix_timestamp(),
tags: split.tags.clone(),
demux_num_ops: split.demux_num_ops,
footer_offsets,
Expand Down
8 changes: 5 additions & 3 deletions quickwit-indexing/src/garbage_collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use futures::StreamExt;
use quickwit_actors::ActorContext;
use quickwit_metastore::{Metastore, MetastoreError, SplitMetadata, SplitState};
use quickwit_storage::StorageError;
use tantivy::chrono::Utc;
use thiserror::Error;
use time::OffsetDateTime;
use tracing::error;

use crate::actors::GarbageCollector;
Expand Down Expand Up @@ -83,7 +83,8 @@ pub async fn run_garbage_collect(
ctx_opt: Option<&ActorContext<GarbageCollector>>,
) -> anyhow::Result<Vec<FileEntry>> {
// Select staged splits with staging timestamp older than grace period timestamp.
let grace_period_timestamp = Utc::now().timestamp() - staged_grace_period.as_secs() as i64;
let grace_period_timestamp =
OffsetDateTime::now_utc().unix_timestamp() - staged_grace_period.as_secs() as i64;

let deletable_staged_splits: Vec<SplitMetadata> = metastore
.list_splits(index_id, SplitState::Staged, None, None)
Expand Down Expand Up @@ -123,7 +124,8 @@ pub async fn run_garbage_collect(
.await?;

// We wait another 2 minutes until the split is actually deleted.
let grace_period_deletion = Utc::now().timestamp() - deletion_grace_period.as_secs() as i64;
let grace_period_deletion =
OffsetDateTime::now_utc().unix_timestamp() - deletion_grace_period.as_secs() as i64;
let splits_to_delete = metastore
.list_splits(index_id, SplitState::MarkedForDeletion, None, None)
.await?
Expand Down
2 changes: 1 addition & 1 deletion quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ once_cell = "1"
opentelemetry = "0.17"
tracing-opentelemetry = "0.17"
rayon = "1"
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="46d5de9", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="447811c", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }

[dependencies.quickwit-cluster]
path = '../quickwit-cluster'
Expand Down
2 changes: 1 addition & 1 deletion quickwit-storage/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ futures = '0.3'
serde_json = "1"
base64 = '0.13'
tracing = "0.1.29"
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="46d5de9", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
tantivy = { git= "https://github.com/quickwit-oss/tantivy", rev="447811c", default-features=false, features = ["mmap", "lz4-compression", "quickwit"] }
once_cell = '1'
regex = '1'
thiserror = '1'
Expand Down