Skip to content

Commit 51142cb

Browse files
authored
Disable Standalone Mode after migration to the Distributed Mode (#707)
* disable standalone when in distributed mode impl standalone/distributed check add migration function - Eshan Chatterjee update StorageMetadata struct add StandaloneWithDistributed error in MetadataError impl ToString for Mode * fix: double print of error message * fix: logic to handle server mode if EnvChange None If the server was restarted with the same staging and in standalone mode the app was not blocking that case * Refactor determine_environment function
1 parent 0d11d84 commit 51142cb

File tree

6 files changed

+158
-28
lines changed

6 files changed

+158
-28
lines changed

server/src/handlers/http/logstream.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -476,6 +476,10 @@ pub mod error {
476476
fn from(value: MetadataError) -> Self {
477477
match value {
478478
MetadataError::StreamMetaNotFound(s) => StreamError::StreamNotFound(s),
479+
MetadataError::StandaloneWithDistributed(s) => StreamError::Custom {
480+
msg: s,
481+
status: StatusCode::INTERNAL_SERVER_ERROR,
482+
},
479483
}
480484
}
481485
}

server/src/metadata.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,8 @@ pub mod error {
245245
pub enum MetadataError {
246246
#[error("Metadata for stream {0} not found. Please create the stream and try again")]
247247
StreamMetaNotFound(String),
248+
#[error("Metadata Error: {0}")]
249+
StandaloneWithDistributed(String),
248250
}
249251

250252
#[derive(Debug, thiserror::Error)]

server/src/migration.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
6060
let metadata = metadata_migration::v2_v3(storage_metadata);
6161
put_remote_metadata(&*object_store, &metadata).await?;
6262
}
63+
Some("v3") => {
64+
let mdata = metadata_migration::update_v3(storage_metadata);
65+
put_remote_metadata(&*object_store, &mdata).await?;
66+
}
6367
_ => (),
6468
}
6569
}
@@ -75,13 +79,18 @@ pub async fn run_metadata_migration(config: &Config) -> anyhow::Result<()> {
7579
let metadata = metadata_migration::v2_v3(staging_metadata);
7680
put_staging_metadata(config, &metadata)?;
7781
}
82+
Some("v3") => {
83+
let mdata = metadata_migration::update_v3(staging_metadata);
84+
put_staging_metadata(config, &mdata)?;
85+
}
7886
_ => (),
7987
}
8088
}
8189

8290
Ok(())
8391
}
8492

93+
/// run the migration for all streams
8594
pub async fn run_migration(config: &Config) -> anyhow::Result<()> {
8695
let storage = config.storage().get_object_store();
8796
let streams = storage.list_streams().await?;

server/src/migration/metadata_migration.rs

Lines changed: 63 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,22 +17,54 @@
1717
*/
1818

1919
use rand::distributions::DistString;
20-
use serde_json::{Map, Value};
20+
use serde_json::{Map, Value as JsonValue};
2121

22-
pub fn v1_v3(mut storage_metadata: serde_json::Value) -> Value {
22+
use crate::option::CONFIG;
23+
24+
/*
25+
v1
26+
{
27+
"version": "v1",
28+
"mode": "drive"
29+
"user": string,
30+
"staging": "string",
31+
"storage": "string",
32+
"deployment_id": "string"
33+
"stream": string,
34+
"default_role": null
35+
}
36+
*/
37+
pub fn v1_v3(mut storage_metadata: JsonValue) -> JsonValue {
2338
let metadata = storage_metadata.as_object_mut().unwrap();
24-
*metadata.get_mut("version").unwrap() = Value::String("v3".to_string());
39+
*metadata.get_mut("version").unwrap() = JsonValue::String("v3".to_string());
2540
metadata.remove("user");
2641
metadata.remove("stream");
27-
metadata.insert("users".to_string(), Value::Array(vec![]));
28-
metadata.insert("streams".to_string(), Value::Array(vec![]));
29-
metadata.insert("roles".to_string(), Value::Array(vec![]));
42+
metadata.insert("users".to_string(), JsonValue::Array(vec![]));
43+
metadata.insert("streams".to_string(), JsonValue::Array(vec![]));
44+
metadata.insert("roles".to_string(), JsonValue::Array(vec![]));
45+
metadata.insert(
46+
"server_mode".to_string(),
47+
JsonValue::String(CONFIG.parseable.mode.to_string()),
48+
);
3049
storage_metadata
3150
}
3251

33-
pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
52+
/*
53+
v2
54+
{
55+
"version": "v2",
56+
"users": [
57+
{
58+
"role": ["privilege1", "privilege2", ...]
59+
},
60+
...
61+
]
62+
...
63+
}
64+
*/
65+
pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue {
3466
let metadata = storage_metadata.as_object_mut().unwrap();
35-
*metadata.get_mut("version").unwrap() = Value::String("v3".to_string());
67+
*metadata.get_mut("version").unwrap() = JsonValue::String("v3".to_string());
3668
let users = metadata
3769
.get_mut("users")
3870
.expect("users field is present")
@@ -46,7 +78,7 @@ pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
4678
// user is an object
4779
let user = user.as_object_mut().unwrap();
4880
// take out privileges
49-
let Value::Array(privileges) = user.remove("role").expect("role exists for v2") else {
81+
let JsonValue::Array(privileges) = user.remove("role").expect("role exists for v2") else {
5082
panic!("privileges is an arrray")
5183
};
5284

@@ -55,15 +87,34 @@ pub fn v2_v3(mut storage_metadata: serde_json::Value) -> Value {
5587
if !privileges.is_empty() {
5688
let role_name =
5789
rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 8);
58-
privileges_map.push((role_name.clone(), Value::Array(privileges)));
59-
roles.push(Value::from(role_name));
90+
privileges_map.push((role_name.clone(), JsonValue::Array(privileges)));
91+
roles.push(JsonValue::from(role_name));
6092
}
6193
user.insert("roles".to_string(), roles.into());
6294
}
6395

6496
metadata.insert(
6597
"roles".to_string(),
66-
Value::Object(Map::from_iter(privileges_map)),
98+
JsonValue::Object(Map::from_iter(privileges_map)),
99+
);
100+
metadata.insert(
101+
"server_mode".to_string(),
102+
JsonValue::String(CONFIG.parseable.mode.to_string()),
67103
);
68104
storage_metadata
69105
}
106+
107+
// maybe rename
108+
pub fn update_v3(mut storage_metadata: JsonValue) -> JsonValue {
109+
let metadata = storage_metadata.as_object_mut().unwrap();
110+
let sm = metadata.get("server_mode");
111+
112+
if sm.is_none() {
113+
metadata.insert(
114+
"server_mode".to_string(),
115+
JsonValue::String(CONFIG.parseable.mode.to_string()),
116+
);
117+
}
118+
119+
storage_metadata
120+
}

server/src/option.rs

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,11 +201,26 @@ pub enum Mode {
201201
impl Mode {
202202
pub fn to_str(&self) -> &str {
203203
match self {
204-
Mode::Query => "Query Server",
205-
Mode::Ingest => "Ingest Server",
204+
Mode::Query => "Query",
205+
Mode::Ingest => "Ingest",
206206
Mode::All => "All",
207207
}
208208
}
209+
210+
pub fn from_string(mode: &str) -> Result<Self, String> {
211+
match mode {
212+
"Query" => Ok(Mode::Query),
213+
"Ingest" => Ok(Mode::Ingest),
214+
"All" => Ok(Mode::All),
215+
x => Err(format!("Invalid mode: {}", x)),
216+
}
217+
}
218+
}
219+
220+
impl ToString for Mode {
221+
fn to_string(&self) -> String {
222+
self.to_str().to_string()
223+
}
209224
}
210225

211226
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]

server/src/storage/store_metadata.rs

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use once_cell::sync::OnceCell;
2626
use std::io;
2727

2828
use crate::{
29+
metadata::error::stream_info::MetadataError,
2930
option::{Mode, CONFIG, JOIN_COMMUNITY},
3031
rbac::{role::model::DefaultPrivilege, user::User},
3132
storage::ObjectStorageError,
@@ -55,6 +56,7 @@ pub struct StorageMetadata {
5556
pub deployment_id: uid::Uid,
5657
pub users: Vec<User>,
5758
pub streams: Vec<String>,
59+
pub server_mode: String,
5860
#[serde(default)]
5961
pub roles: HashMap<String, Vec<DefaultPrivilege>>,
6062
#[serde(default)]
@@ -69,6 +71,7 @@ impl StorageMetadata {
6971
staging: CONFIG.staging_dir().to_path_buf(),
7072
storage: CONFIG.storage().get_endpoint(),
7173
deployment_id: uid::gen(),
74+
server_mode: CONFIG.parseable.mode.to_string(),
7275
users: Vec::new(),
7376
streams: Vec::new(),
7477
roles: HashMap::default(),
@@ -100,18 +103,8 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
100103
let storage = CONFIG.storage().get_object_store();
101104
let remote_metadata = storage.get_metadata().await?;
102105

103-
let check = match (staging_metadata, remote_metadata) {
104-
(Some(staging), Some(remote)) => {
105-
if staging.deployment_id == remote.deployment_id {
106-
EnvChange::None(remote)
107-
} else {
108-
EnvChange::NewRemote
109-
}
110-
}
111-
(None, Some(remote)) => EnvChange::NewStaging(remote),
112-
(Some(_), None) => EnvChange::NewRemote,
113-
(None, None) => EnvChange::CreateBoth,
114-
};
106+
// Env Change needs to be updated
107+
let check = determine_environment(staging_metadata, remote_metadata);
115108

116109
// flags for if metadata needs to be synced
117110
let mut overwrite_staging = false;
@@ -121,6 +114,12 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
121114
EnvChange::None(metadata) => {
122115
// overwrite staging anyways so that it matches remote in case of any divergence
123116
overwrite_staging = true;
117+
if CONFIG.parseable.mode == Mode::All {
118+
standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
119+
.map_err(|err| {
120+
ObjectStorageError::Custom(err.to_string())
121+
})?;
122+
}
124123
Ok(metadata)
125124
},
126125
EnvChange::NewRemote => {
@@ -134,8 +133,22 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
134133
// overwrite remote in all and query mode
135134
// because staging dir has changed.
136135
match CONFIG.parseable.mode {
137-
Mode::All | Mode::Query => overwrite_remote = true,
138-
_ => {
136+
Mode::All => {
137+
standalone_when_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here"))
138+
.map_err(|err| {
139+
ObjectStorageError::Custom(err.to_string())
140+
})?;
141+
overwrite_remote = true;
142+
},
143+
Mode::Query => {
144+
overwrite_remote = true;
145+
metadata.server_mode = CONFIG.parseable.mode.to_string();
146+
metadata.staging = CONFIG.staging_dir().to_path_buf();
147+
},
148+
Mode::Ingest => {
149+
// if ingest server is started fetch the metadata from remote
150+
// update the server mode for local metadata
151+
metadata.server_mode = CONFIG.parseable.mode.to_string();
139152
metadata.staging = CONFIG.staging_dir().to_path_buf();
140153
},
141154
}
@@ -173,6 +186,32 @@ pub async fn resolve_parseable_metadata() -> Result<StorageMetadata, ObjectStora
173186
Ok(metadata)
174187
}
175188

189+
fn determine_environment(
190+
staging_metadata: Option<StorageMetadata>,
191+
remote_metadata: Option<StorageMetadata>,
192+
) -> EnvChange {
193+
match (staging_metadata, remote_metadata) {
194+
(Some(staging), Some(remote)) => {
195+
// if both staging and remote have same deployment id
196+
if staging.deployment_id == remote.deployment_id {
197+
EnvChange::None(remote)
198+
} else if Mode::from_string(&remote.server_mode).unwrap() == Mode::All
199+
&& (CONFIG.parseable.mode == Mode::Query || CONFIG.parseable.mode == Mode::Ingest)
200+
{
201+
// if you are switching to distributed mode from standalone mode
202+
// it will create a new staging rather than a new remote
203+
EnvChange::NewStaging(remote)
204+
} else {
205+
// it is a new remote
206+
EnvChange::NewRemote
207+
}
208+
}
209+
(None, Some(remote)) => EnvChange::NewStaging(remote),
210+
(Some(_), None) => EnvChange::NewRemote,
211+
(None, None) => EnvChange::CreateBoth,
212+
}
213+
}
214+
176215
// variant contain remote metadata
177216
#[derive(Debug, Clone, PartialEq, Eq)]
178217
pub enum EnvChange {
@@ -187,6 +226,16 @@ pub enum EnvChange {
187226
CreateBoth,
188227
}
189228

229+
fn standalone_when_distributed(remote_server_mode: Mode) -> Result<(), MetadataError> {
230+
// mode::all -> mode::query | mode::ingest allowed
231+
// but mode::query | mode::ingest -> mode::all not allowed
232+
if remote_server_mode == Mode::Query {
233+
return Err(MetadataError::StandaloneWithDistributed("Starting Standalone Mode is not permitted when Distributed Mode is enabled. Please restart the server with Distributed Mode enabled.".to_string()));
234+
}
235+
236+
Ok(())
237+
}
238+
190239
pub fn get_staging_metadata() -> io::Result<Option<StorageMetadata>> {
191240
let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
192241
let bytes = match fs::read(path) {

0 commit comments

Comments
 (0)