Skip to content

Commit dcc9cec

Browse files
committed
source: preserve Position arg in ObjectUriBatchReader
Before the fix, we lose the granularity between Beginning and offset 0 which affects the output checkpoint_delta.
1 parent 5bc658d commit dcc9cec

File tree

3 files changed

+72
-24
lines changed

3 files changed

+72
-24
lines changed

quickwit/quickwit-indexing/src/source/doc_file_reader.rs

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ use quickwit_common::Progress;
2222
use quickwit_common::uri::Uri;
2323
use quickwit_metastore::checkpoint::PartitionId;
2424
use quickwit_proto::metastore::SourceType;
25-
use quickwit_proto::types::Position;
25+
use quickwit_proto::types::{Offset, Position};
2626
use quickwit_storage::StorageResolver;
2727
use tokio::io::{AsyncBufReadExt, AsyncRead, AsyncReadExt, BufReader};
2828

@@ -146,8 +146,13 @@ impl DocFileReader {
146146
pub struct ObjectUriBatchReader {
147147
partition_id: PartitionId,
148148
reader: DocFileReader,
149-
current_offset: usize,
150-
is_eof: bool,
149+
current_position: Position,
150+
}
151+
152+
fn parse_offset(offset: &Offset) -> anyhow::Result<usize> {
153+
offset
154+
.as_usize()
155+
.context("file offset should be stored as usize")
151156
}
152157

153158
impl ObjectUriBatchReader {
@@ -157,26 +162,22 @@ impl ObjectUriBatchReader {
157162
uri: &Uri,
158163
position: Position,
159164
) -> anyhow::Result<Self> {
160-
let current_offset = match position {
161-
Position::Beginning => 0,
162-
Position::Offset(offset) => offset
163-
.as_usize()
164-
.context("file offset should be stored as usize")?,
165+
let current_offset = match &position {
165166
Position::Eof(_) => {
166167
return Ok(ObjectUriBatchReader {
167168
partition_id,
168169
reader: DocFileReader::empty(),
169-
current_offset: 0,
170-
is_eof: true,
170+
current_position: position,
171171
});
172172
}
173+
Position::Beginning => 0,
174+
Position::Offset(offset) => parse_offset(offset)?,
173175
};
174176
let reader = DocFileReader::from_uri(storage_resolver, uri, current_offset).await?;
175177
Ok(ObjectUriBatchReader {
176178
partition_id,
177179
reader,
178-
current_offset,
179-
is_eof: false,
180+
current_position: position,
180181
})
181182
}
182183

@@ -186,11 +187,14 @@ impl ObjectUriBatchReader {
186187
source_type: SourceType,
187188
) -> anyhow::Result<BatchBuilder> {
188189
let mut batch_builder = BatchBuilder::new(source_type);
189-
if self.is_eof {
190-
return Ok(batch_builder);
191-
}
192-
let limit_num_bytes = self.current_offset + BATCH_NUM_BYTES_LIMIT as usize;
193-
let mut new_offset = self.current_offset;
190+
let current_offset = match &self.current_position {
191+
Position::Eof(_) => return Ok(batch_builder),
192+
Position::Beginning => 0,
193+
Position::Offset(offset) => parse_offset(offset)?,
194+
};
195+
196+
let limit_num_bytes = current_offset + BATCH_NUM_BYTES_LIMIT as usize;
197+
let mut new_offset = current_offset;
194198
let mut eof_position: Option<Position> = None;
195199
while new_offset < limit_num_bytes {
196200
if let Some(record) = source_progress
@@ -200,28 +204,26 @@ impl ObjectUriBatchReader {
200204
new_offset = record.next_offset as usize;
201205
batch_builder.add_doc(record.doc);
202206
if record.is_last {
203-
self.is_eof = true;
204207
eof_position = Some(Position::eof(new_offset));
205208
break;
206209
}
207210
} else {
208-
self.is_eof = true;
209211
eof_position = Some(Position::eof(new_offset));
210212
break;
211213
}
212214
}
213215
let to_position = eof_position.unwrap_or(Position::offset(new_offset));
214216
batch_builder.checkpoint_delta.record_partition_delta(
215217
self.partition_id.clone(),
216-
Position::offset(self.current_offset),
217-
to_position,
218+
self.current_position.clone(),
219+
to_position.clone(),
218220
)?;
219-
self.current_offset = new_offset;
221+
self.current_position = to_position;
220222
Ok(batch_builder)
221223
}
222224

223225
pub fn is_eof(&self) -> bool {
224-
self.is_eof
226+
self.current_position.is_eof()
225227
}
226228
}
227229

quickwit/quickwit-indexing/src/source/queue_sources/coordinator.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,52 @@ mod tests {
564564
assert!(coordinator.local_state.is_awaiting_commit(&partition_id_2));
565565
}
566566

567+
#[tokio::test]
568+
async fn test_checkpoint_delta_of_existing_messages() {
569+
let (dummy_doc_file_1, _) = generate_dummy_doc_file(false, 10).await;
570+
let test_uri_1 = Uri::from_str(dummy_doc_file_1.path().to_str().unwrap()).unwrap();
571+
let partition_id_1 = PreProcessedPayload::ObjectUri(test_uri_1.clone()).partition_id();
572+
573+
let (dummy_doc_file_2, _) = generate_dummy_doc_file(false, 10).await;
574+
let test_uri_2 = Uri::from_str(dummy_doc_file_2.path().to_str().unwrap()).unwrap();
575+
let partition_id_2 = PreProcessedPayload::ObjectUri(test_uri_2.clone()).partition_id();
576+
577+
let queue = Arc::new(MemoryQueueForTests::new());
578+
let shared_state = init_state(
579+
"test-index",
580+
&[
581+
(
582+
partition_id_1.clone(),
583+
("existing_token_1".to_string(), Position::Beginning, true),
584+
),
585+
(
586+
partition_id_2.clone(),
587+
(
588+
"existing_token_2".to_string(),
589+
Position::offset((DUMMY_DOC.len() + 1) * 2),
590+
true,
591+
),
592+
),
593+
],
594+
);
595+
let mut coordinator = setup_coordinator(queue.clone(), shared_state.clone());
596+
let batches = process_messages(
597+
&mut coordinator,
598+
queue,
599+
&[(&test_uri_1, "ack-id-1"), (&test_uri_2, "ack-id-2")],
600+
)
601+
.await;
602+
assert_eq!(batches.len(), 2);
603+
let deltas = batches[0].checkpoint_delta.iter().collect::<Vec<_>>();
604+
assert_eq!(deltas.len(), 1);
605+
assert_eq!(deltas[0].1.from, Position::Beginning);
606+
assert_eq!(deltas[0].1.to, Position::eof(350u64));
607+
let deltas = batches[1].checkpoint_delta.iter().collect::<Vec<_>>();
608+
assert_eq!(deltas.len(), 1);
609+
assert_eq!(deltas[0].1.from, Position::Offset(70u64.into()));
610+
assert_eq!(deltas[0].1.to, Position::eof(350u64));
611+
}
612+
567613
#[tokio::test]
568614
async fn test_process_multiple_coordinator() {
569615
let queue = Arc::new(MemoryQueueForTests::new());

quickwit/quickwit-proto/src/types/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ pub use doc_mapping_uid::DocMappingUid;
3434
pub use doc_uid::{DocUid, DocUidGenerator};
3535
pub use index_uid::IndexUid;
3636
pub use pipeline_uid::PipelineUid;
37-
pub use position::Position;
37+
pub use position::{Offset, Position};
3838
pub use shard_id::ShardId;
3939

4040
/// The size of an ULID in bytes. Use `ULID_LEN` for the length of Base32 encoded ULID strings.

0 commit comments

Comments
 (0)