Skip to content
Merged
225 changes: 194 additions & 31 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@ package cmd

import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"os"
"pb/pkg/config"
"pb/pkg/model"
"strings"
"time"

tea "github.com/charmbracelet/bubbletea"
Expand All @@ -38,13 +38,20 @@ var (
endFlagShort = "t"
defaultEnd = "now"

// save filter flags
saveFilterFlag = "save-as"
saveFilterShort = "s"
//save filter with time flags
saveFilterTimeFlag = "with-time"
saveFilterTimeShort = "w"

interactiveFlag = "interactive"
interactiveFlagShort = "i"
)

var query = &cobra.Command{
Use: "query [query] [flags]",
Example: " pb query \"select * from frontend\" --from=10m --to=now",
Use: "run [query] [flags]",
Example: " pb query run \"select * from frontend\" --from=10m --to=now",
Short: "Run SQL query on a log stream",
Long: "\nRun SQL query on a log stream. Default output format is json. Use -i flag to open interactive table view.",
Args: cobra.MaximumNArgs(1),
Expand All @@ -56,9 +63,10 @@ var query = &cobra.Command{
// <steam-name> here is the first stream that server returns
if len(args) == 0 || args[0] == "" || args[0] == " " {
fmt.Println("please enter your query")
fmt.Printf("Example:\n pb query \"select * from frontend\" --from=10m --to=now")
fmt.Printf("Example:\n pb query run \"select * from frontend\" --from=10m --to=now\n")
return nil

} else {
query = args[0]
}

start, err := command.Flags().GetString(startFlag)
Expand Down Expand Up @@ -87,6 +95,17 @@ var query = &cobra.Command{
return err
}

keepTime, err := command.Flags().GetBool(saveFilterTimeFlag)
if err != nil {
return err
}

filterName, err := command.Flags().GetString(saveFilterFlag)
if err != nil {
return err
}
filterNameTrimmed := strings.Trim(filterName, " ")

if interactive {
p := tea.NewProgram(model.NewQueryModel(DefaultProfile, query, startTime, endTime), tea.WithAltScreen())
if _, err := p.Run(); err != nil {
Expand All @@ -96,15 +115,38 @@ var query = &cobra.Command{
return nil
}

// Checks if there is filter name which is not empty. Empty filter name wont be allowed
if command.Flags().Changed(saveFilterFlag) {
if filterName == "" || len(filterNameTrimmed) == 0 || filterName == "=" {
fmt.Println("please provide a filter name")
command.Help()
return nil
} else if filterName != "" {
if keepTime {
createFilterWithTime(query, filterNameTrimmed, start, end)

} else {
// if there is no keep time filter pass empty values for startTime and endTime
createFilter(query, filterNameTrimmed)
}
}
} else if filterName == "" && keepTime {
fmt.Println("please provide a filter name")
command.Help()
return nil
}

client := DefaultClient()
return fetchData(&client, query, start, end)
},
}

var QueryCmd = func() *cobra.Command {
query.Flags().BoolP(saveFilterTimeFlag, saveFilterTimeShort, false, "Save the time range associated in the query to the filter") // save time for a filter flag; default value = false (boolean type)
query.Flags().BoolP(interactiveFlag, interactiveFlagShort, false, "open the query result in interactive mode")
query.Flags().StringP(startFlag, startFlagShort, defaultStart, "Start time for query. Takes date as '2024-10-12T07:20:50.52Z' or string like '10m', '1hr'")
query.Flags().StringP(endFlag, endFlagShort, defaultEnd, "End time for query. Takes date as '2024-10-12T07:20:50.52Z' or 'now'")
query.Flags().StringP(saveFilterFlag, saveFilterShort, "", "Save a query filter") // save filter flag. Default value = FILTER_NAME (type string)
return query
}()

Expand Down Expand Up @@ -137,59 +179,180 @@ func fetchData(client *HTTPClient, query string, startTime string, endTime strin
return
}

func fetchFirstStream() (string, error) {
// Returns start and end time for query in RFC3339 format
func parseTime(start, end string) (time.Time, time.Time, error) {
if start == defaultStart && end == defaultEnd {
return time.Now().Add(-1 * time.Minute), time.Now(), nil
}

startTime, err := time.Parse(time.RFC3339, start)
if err != nil {
// try parsing as duration
duration, err := time.ParseDuration(start)
if err != nil {
return time.Time{}, time.Time{}, err
}
startTime = time.Now().Add(-1 * duration)
}

endTime, err := time.Parse(time.RFC3339, end)
if err != nil {
if end == "now" {
endTime = time.Now()
} else {
return time.Time{}, time.Time{}, err
}
}

return startTime, endTime, nil
}

// create a request body for saving filter without time_filter
func createFilter(query string, filterName string) (err error) {

userConfig, err := config.ReadConfigFromFile()
if err != nil {
return err
}

var userName string
if profile, ok := userConfig.Profiles[userConfig.DefaultProfile]; ok {
userName = profile.Username
} else {
fmt.Println("Default profile not found.")
return
}

index := strings.Index(query, "from")
fromPart := strings.TrimSpace(query[index+len("from"):])
streamName := strings.Fields(fromPart)[0]

queryTemplate := `{
"filter_type":"sql",
"filter_query": "%s"
}`

saveFilterTemplate := `
{
"stream_name": "%s",
"filter_name": "%s",
"user_id": "%s",
"query": %s,
"time_filter": null
}`

queryField := fmt.Sprintf(queryTemplate, query)

finalQuery := fmt.Sprintf(saveFilterTemplate, streamName, filterName, userName, queryField)

saveFilterToServer(finalQuery)

return err

}

// create a request body for saving filter with time_filter
func createFilterWithTime(query string, filterName string, startTime string, endTime string) (err error) {
userConfig, err := config.ReadConfigFromFile()
if err != nil {
return err
}

var userName string
if profile, ok := userConfig.Profiles[userConfig.DefaultProfile]; ok {
userName = profile.Username
} else {
fmt.Println("Default profile not found.")
return
}

index := strings.Index(query, "from")
fromPart := strings.TrimSpace(query[index+len("from"):])
streamName := strings.Fields(fromPart)[0]

start, end, err := parseTimeToUTC(startTime, endTime)
if err != nil {
fmt.Println("Oops something went wrong!!!!")
return err
}

queryTemplate := `{
"filter_type":"sql",
"filter_query": "%s"
}`

timeTemplate := `{
"from": "%s",
"to": "%s"
}`
timeField := fmt.Sprintf(timeTemplate, start, end)

saveFilterTemplate := `
{
"stream_name": "%s",
"filter_name": "%s",
"user_id": "%s",
"query": %s,
"time_filter": %s
}`

queryField := fmt.Sprintf(queryTemplate, query)

finalQuery := fmt.Sprintf(saveFilterTemplate, streamName, filterName, userName, queryField, timeField)

saveFilterToServer(finalQuery)

return err
}

// fires a request to the server to save the filter with the associated user and stream
func saveFilterToServer(finalQuery string) (err error) {
client := DefaultClient()
req, err := client.NewRequest("GET", "logstream", nil)

req, err := client.NewRequest("POST", "filters", bytes.NewBuffer([]byte(finalQuery)))
if err != nil {
return "", err
return
}

resp, err := client.client.Do(req)
if err != nil {
return "", err
return
}

if resp.StatusCode == 200 {
items := []map[string]string{}
if err := json.NewDecoder(resp.Body).Decode(&items); err != nil {
return "", err
}
defer resp.Body.Close()

if len(items) == 0 {
return "", errors.New("no stream found on the server, please create a stream to proceed")
}
// return with the first stream that is present in the list
for _, v := range items {
return v["name"], nil
}
if resp.StatusCode != 200 {
fmt.Printf("\nSomething went wrong")
}
return "", fmt.Errorf("received error status code %d from server", resp.StatusCode)

return err
}

// Returns start and end time for query in RFC3339 format
func parseTime(start, end string) (time.Time, time.Time, error) {
// parses a time duration to supported utc format
func parseTimeToUTC(start, end string) (time.Time, time.Time, error) {
if start == defaultStart && end == defaultEnd {
return time.Now().Add(-1 * time.Minute), time.Now(), nil
now := time.Now().UTC()
return now.Add(-1 * time.Minute), now, nil
}

startTime, err := time.Parse(time.RFC3339, start)
if err != nil {
// try parsing as duration
duration, err := time.ParseDuration(start)
if err != nil {
return time.Time{}, time.Time{}, err
}
startTime = time.Now().Add(-1 * duration)
startTime = time.Now().Add(-1 * duration).UTC()
} else {
startTime = startTime.UTC()
}

endTime, err := time.Parse(time.RFC3339, end)
if err != nil {
if end == "now" {
endTime = time.Now()
endTime = time.Now().UTC()
} else {
return time.Time{}, time.Time{}, err
}
} else {
endTime = endTime.UTC()
}

return startTime, endTime, nil
Expand Down
23 changes: 21 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,13 @@ var stream = &cobra.Command{
PersistentPreRunE: cmd.PreRunDefaultProfile,
}

var query = &cobra.Command{
Use: "query",
Short: "Run SQL query on a log stream",
Long: "\nRun SQL query on a log stream. Default output format is json. Use -i flag to open interactive table view.",
PersistentPreRunE: cmd.PreRunDefaultProfile,
}

func main() {
profile.AddCommand(cmd.AddProfileCmd)
profile.AddCommand(cmd.RemoveProfileCmd)
Expand All @@ -105,8 +112,10 @@ func main() {
stream.AddCommand(cmd.ListStreamCmd)
stream.AddCommand(cmd.StatStreamCmd)

query.AddCommand(cmd.QueryCmd)

cli.AddCommand(profile)
cli.AddCommand(cmd.QueryCmd)
cli.AddCommand(query)
cli.AddCommand(stream)
cli.AddCommand(user)
cli.AddCommand(role)
Expand All @@ -125,12 +134,22 @@ func main() {
cli.CompletionOptions.HiddenDefaultCmd = true

// create a default profile if file does not exist
if _, err := config.ReadConfigFromFile(); os.IsNotExist(err) {
if previousConfig, err := config.ReadConfigFromFile(); os.IsNotExist(err) {
conf := config.Config{
Profiles: map[string]config.Profile{"demo": defaultInitialProfile()},
DefaultProfile: "demo",
}
config.WriteConfigToFile(&conf)
} else {
//updates the demo profile for existing users
_, exists := previousConfig.Profiles["demo"]
if exists {
conf := config.Config{
Profiles: map[string]config.Profile{"demo": defaultInitialProfile()},
DefaultProfile: "demo",
}
config.WriteConfigToFile(&conf)
}
}

err := cli.Execute()
Expand Down