@@ -17,12 +17,12 @@ package cmd
1717
1818import (
1919 "bytes"
20- "encoding/json"
21- "errors"
2220 "fmt"
2321 "io"
2422 "os"
23+ "pb/pkg/config"
2524 "pb/pkg/model"
25+ "strings"
2626 "time"
2727
2828 tea "github.com/charmbracelet/bubbletea"
@@ -38,13 +38,20 @@ var (
3838 endFlagShort = "t"
3939 defaultEnd = "now"
4040
41+ // save filter flags
42+ saveFilterFlag = "save-as"
43+ saveFilterShort = "s"
44+ //save filter with time flags
45+ saveFilterTimeFlag = "with-time"
46+ saveFilterTimeShort = "w"
47+
4148 interactiveFlag = "interactive"
4249 interactiveFlagShort = "i"
4350)
4451
4552var query = & cobra.Command {
46- Use : "query [query] [flags]" ,
47- Example : " pb query \" select * from frontend\" --from=10m --to=now" ,
53+ Use : "run [query] [flags]" ,
54+ Example : " pb query run \" select * from frontend\" --from=10m --to=now" ,
4855 Short : "Run SQL query on a log stream" ,
4956 Long : "\n Run SQL query on a log stream. Default output format is json. Use -i flag to open interactive table view." ,
5057 Args : cobra .MaximumNArgs (1 ),
@@ -56,9 +63,10 @@ var query = &cobra.Command{
5663 // <steam-name> here is the first stream that server returns
5764 if len (args ) == 0 || args [0 ] == "" || args [0 ] == " " {
5865 fmt .Println ("please enter your query" )
59- fmt .Printf ("Example:\n pb query \" select * from frontend\" --from=10m --to=now" )
66+ fmt .Printf ("Example:\n pb query run \" select * from frontend\" --from=10m --to=now\n " )
6067 return nil
61-
68+ } else {
69+ query = args [0 ]
6270 }
6371
6472 start , err := command .Flags ().GetString (startFlag )
@@ -87,6 +95,17 @@ var query = &cobra.Command{
8795 return err
8896 }
8997
98+ keepTime , err := command .Flags ().GetBool (saveFilterTimeFlag )
99+ if err != nil {
100+ return err
101+ }
102+
103+ filterName , err := command .Flags ().GetString (saveFilterFlag )
104+ if err != nil {
105+ return err
106+ }
107+ filterNameTrimmed := strings .Trim (filterName , " " )
108+
90109 if interactive {
91110 p := tea .NewProgram (model .NewQueryModel (DefaultProfile , query , startTime , endTime ), tea .WithAltScreen ())
92111 if _ , err := p .Run (); err != nil {
@@ -96,15 +115,38 @@ var query = &cobra.Command{
96115 return nil
97116 }
98117
118+ // Checks if there is filter name which is not empty. Empty filter name wont be allowed
119+ if command .Flags ().Changed (saveFilterFlag ) {
120+ if filterName == "" || len (filterNameTrimmed ) == 0 || filterName == "=" {
121+ fmt .Println ("please provide a filter name" )
122+ command .Help ()
123+ return nil
124+ } else if filterName != "" {
125+ if keepTime {
126+ createFilterWithTime (query , filterNameTrimmed , start , end )
127+
128+ } else {
129+ // if there is no keep time filter pass empty values for startTime and endTime
130+ createFilter (query , filterNameTrimmed )
131+ }
132+ }
133+ } else if filterName == "" && keepTime {
134+ fmt .Println ("please provide a filter name" )
135+ command .Help ()
136+ return nil
137+ }
138+
99139 client := DefaultClient ()
100140 return fetchData (& client , query , start , end )
101141 },
102142}
103143
104144var QueryCmd = func () * cobra.Command {
145+ query .Flags ().BoolP (saveFilterTimeFlag , saveFilterTimeShort , false , "Save the time range associated in the query to the filter" ) // save time for a filter flag; default value = false (boolean type)
105146 query .Flags ().BoolP (interactiveFlag , interactiveFlagShort , false , "open the query result in interactive mode" )
106147 query .Flags ().StringP (startFlag , startFlagShort , defaultStart , "Start time for query. Takes date as '2024-10-12T07:20:50.52Z' or string like '10m', '1hr'" )
107148 query .Flags ().StringP (endFlag , endFlagShort , defaultEnd , "End time for query. Takes date as '2024-10-12T07:20:50.52Z' or 'now'" )
149+ query .Flags ().StringP (saveFilterFlag , saveFilterShort , "" , "Save a query filter" ) // save filter flag. Default value = FILTER_NAME (type string)
108150 return query
109151}()
110152
@@ -137,59 +179,180 @@ func fetchData(client *HTTPClient, query string, startTime string, endTime strin
137179 return
138180}
139181
140- func fetchFirstStream () (string , error ) {
182+ // Returns start and end time for query in RFC3339 format
183+ func parseTime (start , end string ) (time.Time , time.Time , error ) {
184+ if start == defaultStart && end == defaultEnd {
185+ return time .Now ().Add (- 1 * time .Minute ), time .Now (), nil
186+ }
187+
188+ startTime , err := time .Parse (time .RFC3339 , start )
189+ if err != nil {
190+ // try parsing as duration
191+ duration , err := time .ParseDuration (start )
192+ if err != nil {
193+ return time.Time {}, time.Time {}, err
194+ }
195+ startTime = time .Now ().Add (- 1 * duration )
196+ }
197+
198+ endTime , err := time .Parse (time .RFC3339 , end )
199+ if err != nil {
200+ if end == "now" {
201+ endTime = time .Now ()
202+ } else {
203+ return time.Time {}, time.Time {}, err
204+ }
205+ }
206+
207+ return startTime , endTime , nil
208+ }
209+
210+ // create a request body for saving filter without time_filter
211+ func createFilter (query string , filterName string ) (err error ) {
212+
213+ userConfig , err := config .ReadConfigFromFile ()
214+ if err != nil {
215+ return err
216+ }
217+
218+ var userName string
219+ if profile , ok := userConfig .Profiles [userConfig .DefaultProfile ]; ok {
220+ userName = profile .Username
221+ } else {
222+ fmt .Println ("Default profile not found." )
223+ return
224+ }
225+
226+ index := strings .Index (query , "from" )
227+ fromPart := strings .TrimSpace (query [index + len ("from" ):])
228+ streamName := strings .Fields (fromPart )[0 ]
229+
230+ queryTemplate := `{
231+ "filter_type":"sql",
232+ "filter_query": "%s"
233+ }`
234+
235+ saveFilterTemplate := `
236+ {
237+ "stream_name": "%s",
238+ "filter_name": "%s",
239+ "user_id": "%s",
240+ "query": %s,
241+ "time_filter": null
242+ }`
243+
244+ queryField := fmt .Sprintf (queryTemplate , query )
245+
246+ finalQuery := fmt .Sprintf (saveFilterTemplate , streamName , filterName , userName , queryField )
247+
248+ saveFilterToServer (finalQuery )
249+
250+ return err
251+
252+ }
253+
254+ // create a request body for saving filter with time_filter
255+ func createFilterWithTime (query string , filterName string , startTime string , endTime string ) (err error ) {
256+ userConfig , err := config .ReadConfigFromFile ()
257+ if err != nil {
258+ return err
259+ }
260+
261+ var userName string
262+ if profile , ok := userConfig .Profiles [userConfig .DefaultProfile ]; ok {
263+ userName = profile .Username
264+ } else {
265+ fmt .Println ("Default profile not found." )
266+ return
267+ }
268+
269+ index := strings .Index (query , "from" )
270+ fromPart := strings .TrimSpace (query [index + len ("from" ):])
271+ streamName := strings .Fields (fromPart )[0 ]
272+
273+ start , end , err := parseTimeToUTC (startTime , endTime )
274+ if err != nil {
275+ fmt .Println ("Oops something went wrong!!!!" )
276+ return err
277+ }
278+
279+ queryTemplate := `{
280+ "filter_type":"sql",
281+ "filter_query": "%s"
282+ }`
283+
284+ timeTemplate := `{
285+ "from": "%s",
286+ "to": "%s"
287+ }`
288+ timeField := fmt .Sprintf (timeTemplate , start , end )
289+
290+ saveFilterTemplate := `
291+ {
292+ "stream_name": "%s",
293+ "filter_name": "%s",
294+ "user_id": "%s",
295+ "query": %s,
296+ "time_filter": %s
297+ }`
298+
299+ queryField := fmt .Sprintf (queryTemplate , query )
300+
301+ finalQuery := fmt .Sprintf (saveFilterTemplate , streamName , filterName , userName , queryField , timeField )
302+
303+ saveFilterToServer (finalQuery )
304+
305+ return err
306+ }
307+
308+ // fires a request to the server to save the filter with the associated user and stream
309+ func saveFilterToServer (finalQuery string ) (err error ) {
141310 client := DefaultClient ()
142- req , err := client .NewRequest ("GET" , "logstream" , nil )
311+
312+ req , err := client .NewRequest ("POST" , "filters" , bytes .NewBuffer ([]byte (finalQuery )))
143313 if err != nil {
144- return "" , err
314+ return
145315 }
146316
147317 resp , err := client .client .Do (req )
148318 if err != nil {
149- return "" , err
319+ return
150320 }
151321
152- if resp .StatusCode == 200 {
153- items := []map [string ]string {}
154- if err := json .NewDecoder (resp .Body ).Decode (& items ); err != nil {
155- return "" , err
156- }
157- defer resp .Body .Close ()
158-
159- if len (items ) == 0 {
160- return "" , errors .New ("no stream found on the server, please create a stream to proceed" )
161- }
162- // return with the first stream that is present in the list
163- for _ , v := range items {
164- return v ["name" ], nil
165- }
322+ if resp .StatusCode != 200 {
323+ fmt .Printf ("\n Something went wrong" )
166324 }
167- return "" , fmt .Errorf ("received error status code %d from server" , resp .StatusCode )
325+
326+ return err
168327}
169328
170- // Returns start and end time for query in RFC3339 format
171- func parseTime (start , end string ) (time.Time , time.Time , error ) {
329+ // parses a time duration to supported utc format
330+ func parseTimeToUTC (start , end string ) (time.Time , time.Time , error ) {
172331 if start == defaultStart && end == defaultEnd {
173- return time .Now ().Add (- 1 * time .Minute ), time .Now (), nil
332+ now := time .Now ().UTC ()
333+ return now .Add (- 1 * time .Minute ), now , nil
174334 }
175335
176336 startTime , err := time .Parse (time .RFC3339 , start )
177337 if err != nil {
178- // try parsing as duration
179338 duration , err := time .ParseDuration (start )
180339 if err != nil {
181340 return time.Time {}, time.Time {}, err
182341 }
183- startTime = time .Now ().Add (- 1 * duration )
342+ startTime = time .Now ().Add (- 1 * duration ).UTC ()
343+ } else {
344+ startTime = startTime .UTC ()
184345 }
185346
186347 endTime , err := time .Parse (time .RFC3339 , end )
187348 if err != nil {
188349 if end == "now" {
189- endTime = time .Now ()
350+ endTime = time .Now (). UTC ()
190351 } else {
191352 return time.Time {}, time.Time {}, err
192353 }
354+ } else {
355+ endTime = endTime .UTC ()
193356 }
194357
195358 return startTime , endTime , nil
0 commit comments