Skip to content

Commit 1b11629

Browse files
authored
Merge branch 'main' into compose-distributed
2 parents c096bcf + e355858 commit 1b11629

File tree

4 files changed

+83
-88
lines changed

4 files changed

+83
-88
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,13 @@ You can download and run the Parseable binary on your laptop.
6767
- Linux or MacOS
6868

6969
```bash
70-
curl https://raw.githubusercontent.com/parseablehq/parseable/main/scripts/download.sh | bash
70+
curl -fsSL https://logg.ing/install | sh
7171
```
7272

7373
- Windows
7474

7575
```pwsh
76-
powershell -c "irm https://raw.githubusercontent.com/parseablehq/parseable/main/scripts/download.ps1 | iex"
76+
powershell -c "irm https://logg.ing/install-windows | iex"
7777
```
7878

7979
Once this runs successfully, you'll see dashboard at [http://localhost:8000 ↗︎](http://localhost:8000). You can login to the dashboard default credentials `admin`, `admin`.

scripts/download.sh

Lines changed: 13 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,13 @@
33
# supported CPU architectures and operating systems
44
SUPPORTED_ARCH=("x86_64" "arm64")
55
SUPPORTED_OS=("linux" "darwin")
6-
# Associate binaries with CPU architectures and operating systems
7-
declare -A BINARIES=(
8-
["x86_64-linux"]="Parseable_x86_64-unknown-linux-gnu"
9-
["arm64-linux"]="Parseable_aarch64-unknown-linux-gnu"
10-
["x86_64-darwin"]="Parseable_x86_64-apple-darwin"
11-
["arm64-darwin"]="Parseable_aarch64-apple-darwin"
12-
)
6+
DOWNLOAD_BASE_URL="parseable.gateway.scarf.sh/"
7+
138
# Get the system's CPU architecture and operating system
149
CPU_ARCH=$(uname -m)
1510
OS=$(uname -s | tr '[:upper:]' '[:lower:]')
1611

12+
printf "\n=========================\n"
1713
printf "Detected CPU architecture: %s\n" "$CPU_ARCH"
1814
printf "Detected operating system: %s\n" "$OS"
1915

@@ -34,38 +30,29 @@ if ! echo "${SUPPORTED_OS[@]}" | grep -q "\\b${OS}\\b"; then
3430
echo "Error: Unsupported operating system (${OS})."
3531
exit 1
3632
fi
33+
3734
# Get the latest release information using GitHub API
3835
release=$(curl -s "https://api.github.com/repos/parseablehq/parseable/releases/latest")
36+
# find the release tag
37+
release_tag=$(echo "$release" | grep -o "\"tag_name\":\s*\"[^\"]*\"" | cut -d '"' -f 4)
38+
printf "Found latest release version: $release_tag\n"
3939

40-
printf "Fetching release information for parseable...\n"
41-
42-
# Loop through binaries in the release and find the appropriate one
43-
for arch_os in "${CPU_ARCH}-${OS}"; do
44-
binary_name="${BINARIES[$arch_os]}"
45-
download_url=$(echo "$release" | grep -o "\"browser_download_url\":\s*\"[^\"]*${binary_name}\"" | cut -d '"' -f 4)
46-
if [ -n "$download_url" ]; then
47-
break
48-
fi
49-
done
40+
download_url=${DOWNLOAD_BASE_URL}${CPU_ARCH}-${OS}.${release_tag}
5041

51-
printf "Checking for existing installation...\n"
5242
if [[ -d ${INSTALL_DIR} ]]; then
5343
printf "A Previous version of parseable already exists. Run 'parseable --version' to check the version."
54-
printf "or consider removing that before Installing"
44+
printf "or consider removing that before new installation\n"
5545
exit 1
56-
5746
else
58-
printf "No Previous installation found\n"
59-
printf "Installing parseable...\n"
6047
mkdir -p ${BIN_DIR}
6148
fi
6249

63-
6450
# Download the binary using curl or wget
51+
printf "Downloading Parseable version $release_tag, for OS: $OS, CPU architecture: $CPU_ARCH\n\n"
6552
if command -v curl &>/dev/null; then
66-
curl -L -o "${BIN_NAME}" "$download_url" 2&>> /dev/null
53+
curl -L -o "${BIN_NAME}" "$download_url"
6754
elif command -v wget &>/dev/null; then
68-
wget -O "${BIN_NAME}" "$download_url" 2&>> /dev/null
55+
wget -O "${BIN_NAME}" "$download_url"
6956
else
7057
echo "Error: Neither curl nor wget found. Please install either curl or wget."
7158
exit 1
@@ -79,4 +66,4 @@ printf "Adding parseable to the path\n"
7966
PATH_STR="export PATH=${BIN_DIR}"':$PATH'
8067
echo ${PATH_STR} >> ${RC_FILE_PATH}
8168

82-
echo "parseable was added to the path. Please refresh the environment by sourcing the ${RC_PATH}"
69+
echo "parseable was added to the path. Please refresh the environment by sourcing the ${RC_FILE_PATH}"

server/src/handlers/http/query.rs

Lines changed: 65 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use crate::event::commit_schema;
3737
use crate::metrics::QUERY_EXECUTE_TIME;
3838
use crate::option::{Mode, CONFIG};
3939
use crate::query::error::ExecuteError;
40+
use crate::query::Query as LogicalQuery;
4041
use crate::query::{TableScanVisitor, QUERY_SESSION};
4142
use crate::rbac::role::{Action, Permission};
4243
use crate::rbac::Users;
@@ -67,69 +68,37 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
6768
let raw_logical_plan = session_state
6869
.create_logical_plan(&query_request.query)
6970
.await?;
71+
7072
// create a visitor to extract the table name
7173
let mut visitor = TableScanVisitor::default();
7274
let _ = raw_logical_plan.visit(&mut visitor);
73-
let table_name = visitor
74-
.into_inner()
75-
.pop()
76-
.ok_or(QueryError::MalformedQuery(
77-
"No table found from sql".to_string(),
78-
))?;
75+
76+
let tables = visitor.into_inner();
7977

8078
if CONFIG.parseable.mode == Mode::Query {
81-
if let Ok(new_schema) = fetch_schema(&table_name).await {
82-
// commit schema merges the schema internally and updates the schema in storage.
83-
commit_schema_to_storage(&table_name, new_schema.clone())
84-
.await
85-
.map_err(QueryError::ObjectStorage)?;
86-
commit_schema(&table_name, Arc::new(new_schema)).map_err(QueryError::EventError)?;
79+
for table in tables {
80+
if let Ok(new_schema) = fetch_schema(&table).await {
81+
// commit schema merges the schema internally and updates the schema in storage.
82+
commit_schema_to_storage(&table, new_schema.clone())
83+
.await
84+
.map_err(QueryError::ObjectStorage)?;
85+
commit_schema(&table, Arc::new(new_schema)).map_err(QueryError::EventError)?;
86+
}
8787
}
8888
}
89-
90-
let mut query = into_query(&query_request, &session_state).await?;
89+
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;
9190

9291
let creds = extract_session_key_from_req(&req).expect("expects basic auth");
93-
let permissions = Users.get_permissions(&creds);
92+
let permissions: Vec<Permission> = Users.get_permissions(&creds);
9493

95-
// check authorization of this query if it references physical table;
96-
let table_name = query.table_name();
97-
if let Some(ref table) = table_name {
98-
let mut authorized = false;
99-
let mut tags = Vec::new();
100-
101-
// in permission check if user can run query on the stream.
102-
// also while iterating add any filter tags for this stream
103-
for permission in permissions {
104-
match permission {
105-
Permission::Stream(Action::All, _) => {
106-
authorized = true;
107-
break;
108-
}
109-
Permission::StreamWithTag(Action::Query, ref stream, tag)
110-
if stream == table || stream == "*" =>
111-
{
112-
authorized = true;
113-
if let Some(tag) = tag {
114-
tags.push(tag)
115-
}
116-
}
117-
_ => (),
118-
}
119-
}
120-
121-
if !authorized {
122-
return Err(QueryError::Unauthorized);
123-
}
124-
125-
if !tags.is_empty() {
126-
query.filter_tag = Some(tags)
127-
}
128-
}
94+
let table_name = query
95+
.first_table_name()
96+
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query".to_string()))?;
97+
authorize_and_set_filter_tags(&mut query, permissions, &table_name)?;
12998

13099
let time = Instant::now();
131100

132-
let (records, fields) = query.execute(table_name.clone().unwrap()).await?;
101+
let (records, fields) = query.execute(table_name.clone()).await?;
133102
let response = QueryResponse {
134103
records,
135104
fields,
@@ -138,16 +107,55 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
138107
}
139108
.to_http();
140109

141-
if let Some(table) = table_name {
142-
let time = time.elapsed().as_secs_f64();
143-
QUERY_EXECUTE_TIME
144-
.with_label_values(&[&table])
145-
.observe(time);
146-
}
110+
let time = time.elapsed().as_secs_f64();
111+
112+
QUERY_EXECUTE_TIME
113+
.with_label_values(&[&table_name])
114+
.observe(time);
147115

148116
Ok(response)
149117
}
150118

119+
fn authorize_and_set_filter_tags(
120+
query: &mut LogicalQuery,
121+
permissions: Vec<Permission>,
122+
table_name: &str,
123+
) -> Result<(), QueryError> {
124+
// check authorization of this query if it references physical table;
125+
let mut authorized = false;
126+
let mut tags = Vec::new();
127+
128+
// in permission check if user can run query on the stream.
129+
// also while iterating add any filter tags for this stream
130+
for permission in permissions {
131+
match permission {
132+
Permission::Stream(Action::All, _) => {
133+
authorized = true;
134+
break;
135+
}
136+
Permission::StreamWithTag(Action::Query, ref stream, tag)
137+
if stream == table_name || stream == "*" =>
138+
{
139+
authorized = true;
140+
if let Some(tag) = tag {
141+
tags.push(tag)
142+
}
143+
}
144+
_ => (),
145+
}
146+
}
147+
148+
if !authorized {
149+
return Err(QueryError::Unauthorized);
150+
}
151+
152+
if !tags.is_empty() {
153+
query.filter_tag = Some(tags)
154+
}
155+
156+
Ok(())
157+
}
158+
151159
impl FromRequest for Query {
152160
type Error = actix_web::Error;
153161
type Future = Pin<Box<dyn Future<Output = Result<Self, Self::Error>>>>;
@@ -178,7 +186,7 @@ impl FromRequest for Query {
178186
async fn into_query(
179187
query: &Query,
180188
session_state: &SessionState,
181-
) -> Result<crate::query::Query, QueryError> {
189+
) -> Result<LogicalQuery, QueryError> {
182190
if query.query.is_empty() {
183191
return Err(QueryError::EmptyQuery);
184192
}

server/src/query.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ impl Query {
167167
}
168168
}
169169

170-
pub fn table_name(&self) -> Option<String> {
170+
pub fn first_table_name(&self) -> Option<String> {
171171
let mut visitor = TableScanVisitor::default();
172172
let _ = self.raw_logical_plan.visit(&mut visitor);
173173
visitor.into_inner().pop()
@@ -192,7 +192,7 @@ impl TreeNodeVisitor for TableScanVisitor {
192192
match node {
193193
LogicalPlan::TableScan(table) => {
194194
self.tables.push(table.table_name.table().to_string());
195-
Ok(VisitRecursion::Stop)
195+
Ok(VisitRecursion::Skip)
196196
}
197197
_ => Ok(VisitRecursion::Continue),
198198
}
@@ -290,7 +290,7 @@ fn table_contains_any_time_filters(
290290
})
291291
.any(|expr| {
292292
matches!(&*expr.left, Expr::Column(Column { name, .. })
293-
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
293+
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
294294
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
295295
})
296296
}

0 commit comments

Comments
 (0)