Skip to content

Commit 12a9ad5

Browse files
author
Devdutt Shenoi
committed
refactor: move set_metadata
1 parent bca8f4a commit 12a9ad5

File tree

3 files changed

+7
-28
lines changed

3 files changed

+7
-28
lines changed

src/migration/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ pub async fn run_migration(config: &Parseable) -> anyhow::Result<()> {
138138
let Some(metadata) = migration_stream(&stream_name, &*storage).await? else {
139139
continue;
140140
};
141-
PARSEABLE.set_stream_meta(&stream_name, metadata).await;
141+
config.get_or_create_stream(&stream_name).set_metadata(metadata).await;
142142
}
143143

144144
Ok(())

src/parseable/mod.rs

Lines changed: 1 addition & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::{
4848
object_storage::parseable_json_path, ObjectStorageError, ObjectStorageProvider,
4949
ObjectStoreFormat, Owner, Permisssion, StreamType,
5050
},
51-
validator, LOCK_EXPECT,
51+
validator,
5252
};
5353

5454
mod staging;
@@ -175,32 +175,6 @@ impl Parseable {
175175
}
176176
}
177177

178-
/// Stores the provided stream metadata in memory mapping
179-
pub async fn set_stream_meta(&self, stream_name: &str, updated_metadata: LogStreamMetadata) {
180-
let map = self.streams.read().expect(LOCK_EXPECT);
181-
182-
match map.get(stream_name) {
183-
Some(stream) => {
184-
let mut metadata = stream.metadata.write().expect(LOCK_EXPECT);
185-
*metadata = updated_metadata;
186-
}
187-
None => {
188-
drop(map);
189-
self.streams.write().expect(LOCK_EXPECT).insert(
190-
stream_name.to_owned(),
191-
Stream::new(
192-
self.options.clone(),
193-
stream_name,
194-
updated_metadata,
195-
self.ingestor_metadata
196-
.as_ref()
197-
.map(|meta| meta.get_ingestor_id()),
198-
),
199-
);
200-
}
201-
}
202-
}
203-
204178
// validate the storage, if the proper path for staging directory is provided
205179
// if the proper data directory is provided, or s3 bucket is provided etc
206180
pub async fn validate_storage(&self) -> Result<Option<Bytes>, ObjectStorageError> {

src/parseable/streams.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,11 @@ impl Stream {
458458
Schema::try_merge(vec![schema, current_schema]).unwrap()
459459
}
460460

461+
/// Stores the provided stream metadata in memory mapping
462+
pub async fn set_metadata(&self, updated_metadata: LogStreamMetadata) {
463+
*self.metadata.write().expect(LOCK_EXPECT) = updated_metadata;
464+
}
465+
461466
pub fn get_first_event(&self) -> Option<String> {
462467
self.metadata
463468
.read()

0 commit comments

Comments
 (0)