@@ -46,11 +46,11 @@ pub async fn create_update_stream(
4646 time_partition_limit,
4747 custom_partition,
4848 static_schema_flag,
49- update_stream ,
49+ update_stream_flag ,
5050 stream_type,
5151 ) = fetch_headers_from_put_stream_request ( req) ;
5252
53- if metadata:: STREAM_INFO . stream_exists ( stream_name) && update_stream != "true" {
53+ if metadata:: STREAM_INFO . stream_exists ( stream_name) && update_stream_flag != "true" {
5454 return Err ( StreamError :: Custom {
5555 msg : format ! (
5656 "Logstream {stream_name} already exists, please create a new log stream with unique name"
@@ -59,58 +59,36 @@ pub async fn create_update_stream(
5959 } ) ;
6060 }
6161
62- // if the stream not found in memory map,
63- //check if it exists in the storage
64- //create stream and schema from storage
6562 if !metadata:: STREAM_INFO . stream_exists ( stream_name)
6663 && CONFIG . parseable . mode == Mode :: Query
6764 && create_stream_and_schema_from_storage ( stream_name) . await ?
6865 {
6966 return Err ( StreamError :: Custom {
70- msg : format ! (
71- "Logstream {stream_name} already exists, please create a new log stream with unique name"
72- ) ,
73- status : StatusCode :: BAD_REQUEST ,
74- } ) ;
67+ msg : format ! (
68+ "Logstream {stream_name} already exists, please create a new log stream with unique name"
69+ ) ,
70+ status : StatusCode :: BAD_REQUEST ,
71+ } ) ;
7572 }
7673
77- if update_stream == "true" {
78- if !STREAM_INFO . stream_exists ( stream_name) {
79- return Err ( StreamError :: StreamNotFound ( stream_name. to_string ( ) ) ) ;
80- }
81- if !time_partition. is_empty ( ) {
82- return Err ( StreamError :: Custom {
83- msg : "Altering the time partition of an existing stream is restricted." . to_string ( ) ,
84- status : StatusCode :: BAD_REQUEST ,
85- } ) ;
86- }
87-
88- if !static_schema_flag. is_empty ( ) {
89- return Err ( StreamError :: Custom {
90- msg : "Altering the schema of an existing stream is restricted." . to_string ( ) ,
91- status : StatusCode :: BAD_REQUEST ,
92- } ) ;
93- }
74+ if update_stream_flag == "true" {
75+ return update_stream (
76+ req,
77+ stream_name,
78+ & time_partition,
79+ & static_schema_flag,
80+ & time_partition_limit,
81+ & custom_partition,
82+ )
83+ . await ;
84+ }
9485
95- if !time_partition_limit. is_empty ( ) {
96- let time_partition_days = validate_time_partition_limit ( & time_partition_limit) ?;
97- update_time_partition_limit_in_stream ( stream_name. to_string ( ) , time_partition_days)
98- . await ?;
99- return Ok ( req. headers ( ) . clone ( ) ) ;
100- }
86+ let time_partition_in_days = if !time_partition_limit. is_empty ( ) {
87+ validate_time_partition_limit ( & time_partition_limit) ?
88+ } else {
89+ ""
90+ } ;
10191
102- if !custom_partition. is_empty ( ) {
103- validate_custom_partition ( & custom_partition) ?;
104- update_custom_partition_in_stream ( stream_name. to_string ( ) , & custom_partition) . await ?;
105- } else {
106- update_custom_partition_in_stream ( stream_name. to_string ( ) , "" ) . await ?;
107- }
108- return Ok ( req. headers ( ) . clone ( ) ) ;
109- }
110- let mut time_partition_in_days = "" ;
111- if !time_partition_limit. is_empty ( ) {
112- time_partition_in_days = validate_time_partition_limit ( & time_partition_limit) ?;
113- }
11492 if !custom_partition. is_empty ( ) {
11593 validate_custom_partition ( & custom_partition) ?;
11694 }
@@ -141,6 +119,51 @@ pub async fn create_update_stream(
141119 Ok ( req. headers ( ) . clone ( ) )
142120}
143121
122+ async fn update_stream (
123+ req : & HttpRequest ,
124+ stream_name : & str ,
125+ time_partition : & str ,
126+ static_schema_flag : & str ,
127+ time_partition_limit : & str ,
128+ custom_partition : & str ,
129+ ) -> Result < HeaderMap , StreamError > {
130+ if !STREAM_INFO . stream_exists ( stream_name) {
131+ return Err ( StreamError :: StreamNotFound ( stream_name. to_string ( ) ) ) ;
132+ }
133+ if !time_partition. is_empty ( ) {
134+ return Err ( StreamError :: Custom {
135+ msg : "Altering the time partition of an existing stream is restricted." . to_string ( ) ,
136+ status : StatusCode :: BAD_REQUEST ,
137+ } ) ;
138+ }
139+ if !static_schema_flag. is_empty ( ) {
140+ return Err ( StreamError :: Custom {
141+ msg : "Altering the schema of an existing stream is restricted." . to_string ( ) ,
142+ status : StatusCode :: BAD_REQUEST ,
143+ } ) ;
144+ }
145+ if !time_partition_limit. is_empty ( ) {
146+ let time_partition_days = validate_time_partition_limit ( time_partition_limit) ?;
147+ update_time_partition_limit_in_stream ( stream_name. to_string ( ) , time_partition_days) . await ?;
148+ return Ok ( req. headers ( ) . clone ( ) ) ;
149+ }
150+ validate_and_update_custom_partition ( stream_name, custom_partition) . await ?;
151+ return Ok ( req. headers ( ) . clone ( ) ) ;
152+ }
153+
154+ async fn validate_and_update_custom_partition (
155+ stream_name : & str ,
156+ custom_partition : & str ,
157+ ) -> Result < ( ) , StreamError > {
158+ if !custom_partition. is_empty ( ) {
159+ validate_custom_partition ( custom_partition) ?;
160+ update_custom_partition_in_stream ( stream_name. to_string ( ) , custom_partition) . await ?;
161+ } else {
162+ update_custom_partition_in_stream ( stream_name. to_string ( ) , "" ) . await ?;
163+ }
164+ Ok ( ( ) )
165+ }
166+
144167pub fn fetch_headers_from_put_stream_request (
145168 req : & HttpRequest ,
146169) -> ( String , String , String , String , String , String ) {
0 commit comments