Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .goreleaser.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ builds:
- -trimpath
- -tags=kqueue
ldflags:
- -s -w -X main.Version={{ .Version }} -X main.Commit={{ .ShortCommit }}
- -s -w -X main.Version=v{{ .Version }} -X main.Commit={{ .ShortCommit }}

archives:
- format: tar.gz
Expand Down
6 changes: 3 additions & 3 deletions cmd/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ var query = &cobra.Command{
Use: "query [query] [flags]",
Example: " pb query \"select * from frontend\" --from=10m --to=now",
Short: "Run SQL query on a log stream",
Long: "\nqRun SQL query on a log stream. Default output format is json. Use -i flag to open interactive table view.",
Long: "\nRun SQL query on a log stream. Default output format is json. Use -i flag to open interactive table view.",
Args: cobra.MaximumNArgs(1),
PreRunE: PreRunDefaultProfile,
RunE: func(command *cobra.Command, args []string) error {
Expand All @@ -72,15 +72,15 @@ var query = &cobra.Command{
start = defaultStart
}

end, _ := command.Flags().GetString(endFlag)
end, err := command.Flags().GetString(endFlag)
if err != nil {
return err
}
if end == "" {
end = defaultEnd
}

interactive, _ := command.Flags().GetBool(interactiveFlag)
interactive, err := command.Flags().GetBool(interactiveFlag)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions cmd/role.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ func (user *RoleData) Render() string {
}

var AddRoleCmd = &cobra.Command{
Use: "upsert role-name",
Example: " pb role upsert ingestors",
Use: "add role-name",
Example: " pb role add ingestors",
Short: "Add a new role",
Args: cobra.ExactArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down
2 changes: 1 addition & 1 deletion cmd/tail.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
var TailCmd = &cobra.Command{
Use: "tail stream-name",
Example: " pb tail backend_logs",
Short: "tail a log stream",
Short: "Stream live events from a log stream",
Args: cobra.ExactArgs(1),
PreRunE: PreRunDefaultProfile,
RunE: func(cmd *cobra.Command, args []string) error {
Expand Down
63 changes: 36 additions & 27 deletions pkg/model/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"os"
"pb/pkg/config"
"pb/pkg/iterator"
"regexp"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -167,33 +166,31 @@ func createIteratorFromModel(m *QueryModel) *iterator.QueryIterator[QueryData, F
startTime = startTime.Truncate(time.Minute)
endTime = endTime.Truncate(time.Minute).Add(time.Minute)

regex := regexp.MustCompile(`^select\s+(?:\*|\w+(?:,\s*\w+)*)\s+from\s+(\w+)(?:\s+;)?$`)
matches := regex.FindStringSubmatch(m.query.Value())
if matches == nil {
return nil
table := streamNameFromQuery(m.query.Value())
if table != "" {
iter := iterator.NewQueryIterator(
startTime, endTime,
false,
func(t1, t2 time.Time) (QueryData, FetchResult) {
client := &http.Client{
Timeout: time.Second * 50,
}
return fetchData(client, &m.profile, m.query.Value(), t1.UTC().Format(time.RFC3339), t2.UTC().Format(time.RFC3339))
},
func(t1, t2 time.Time) bool {
client := &http.Client{
Timeout: time.Second * 50,
}
res, err := fetchData(client, &m.profile, "select count(*) as count from "+table, m.timeRange.StartValueUtc(), m.timeRange.EndValueUtc())
if err == fetchErr {
return false
}
count := res.Records[0]["count"].(float64)
return count > 0
})
return &iter
}
table := matches[1]
iter := iterator.NewQueryIterator(
startTime, endTime,
false,
func(t1, t2 time.Time) (QueryData, FetchResult) {
client := &http.Client{
Timeout: time.Second * 50,
}
return fetchData(client, &m.profile, m.query.Value(), t1.UTC().Format(time.RFC3339), t2.UTC().Format(time.RFC3339))
},
func(t1, t2 time.Time) bool {
client := &http.Client{
Timeout: time.Second * 50,
}
res, err := fetchData(client, &m.profile, "select count(*) as count from "+table, m.timeRange.StartValueUtc(), m.timeRange.EndValueUtc())
if err == fetchErr {
return false
}
count := res.Records[0]["count"].(float64)
return count > 0
})
return &iter
return nil
}

func NewQueryModel(profile config.Profile, queryStr string, startTime, endTime time.Time) QueryModel {
Expand Down Expand Up @@ -653,3 +650,15 @@ func countDigits(num int) int {
numDigits := int(math.Log10(math.Abs(float64(num)))) + 1
return numDigits
}

func streamNameFromQuery(query string) string {
stream := ""
tokens := strings.Split(query, " ")
for i, token := range tokens {
if token == "from" {
stream = tokens[i+1]
break
}
}
return stream
}