Skip to content

Commit d5053e7

Browse files
committed
Merge branch 'main' into ct/remove-bound-partition-spec
2 parents 75756c3 + 54926a2 commit d5053e7

32 files changed

+1470
-216
lines changed

.asf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ github:
4040

4141
required_pull_request_reviews:
4242
required_approving_review_count: 1
43+
dismiss_stale_reviews: true
4344

4445
required_linear_history: true
4546
del_branch_on_merge: true

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ members = [
2323
"crates/iceberg",
2424
"crates/integration_tests",
2525
"crates/integrations/*",
26-
"crates/puffin",
2726
"crates/test_utils",
2827
]
2928
exclude = ["bindings/python"]
@@ -72,6 +71,7 @@ itertools = "0.13"
7271
log = "0.4"
7372
mockito = "1"
7473
murmur3 = "0.5.2"
74+
num-bigint = "0.4.6"
7575
once_cell = "1"
7676
opendal = "0.50.1"
7777
ordered-float = "4"

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ The Apache Iceberg community is built on the principles described in the [Apache
101101

102102
- [Databend](https://github.com/datafuselabs/databend/): An open-source cloud data warehouse that serves as a cost-effective alternative to Snowflake.
103103
- [iceberg-catalog](https://github.com/hansetag/iceberg-catalog): A Rust implementation of the Iceberg REST Catalog specification.
104+
- [RisingWave](https://github.com/risingwavelabs/risingwave): A Postgres-compatible SQL database designed for real-time event streaming data processing, analysis, and management.
104105

105106
## License
106107

crates/catalog/rest/src/catalog.rs

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -256,9 +256,10 @@ impl RestCatalog {
256256
async fn context(&self) -> Result<&RestContext> {
257257
self.ctx
258258
.get_or_try_init(|| async {
259-
let catalog_config = RestCatalog::load_config(&self.user_config).await?;
259+
let client = HttpClient::new(&self.user_config)?;
260+
let catalog_config = RestCatalog::load_config(&client, &self.user_config).await?;
260261
let config = self.user_config.clone().merge_with_config(catalog_config);
261-
let client = HttpClient::new(&config)?;
262+
let client = client.update_with(&config)?;
262263

263264
Ok(RestContext { config, client })
264265
})
@@ -268,9 +269,10 @@ impl RestCatalog {
268269
/// Load the runtime config from the server by user_config.
269270
///
270271
/// It's required for a rest catalog to update it's config after creation.
271-
async fn load_config(user_config: &RestCatalogConfig) -> Result<CatalogConfig> {
272-
let client = HttpClient::new(user_config)?;
273-
272+
async fn load_config(
273+
client: &HttpClient,
274+
user_config: &RestCatalogConfig,
275+
) -> Result<CatalogConfig> {
274276
let mut request = client.request(Method::GET, user_config.config_endpoint());
275277

276278
if let Some(warehouse_location) = &user_config.warehouse {
@@ -280,6 +282,7 @@ impl RestCatalog {
280282
let config = client
281283
.query::<CatalogConfig, ErrorResponse, OK>(request.build()?)
282284
.await?;
285+
283286
Ok(config)
284287
}
285288

@@ -777,7 +780,7 @@ mod tests {
777780
"expires_in": 86400
778781
}"#,
779782
)
780-
.expect(2)
783+
.expect(1)
781784
.create_async()
782785
.await
783786
}
@@ -831,7 +834,7 @@ mod tests {
831834
"expires_in": 86400
832835
}"#,
833836
)
834-
.expect(2)
837+
.expect(1)
835838
.create_async()
836839
.await;
837840

crates/catalog/rest/src/client.rs

Lines changed: 76 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ impl Debug for HttpClient {
5454
}
5555

5656
impl HttpClient {
57+
/// Create a new http client.
5758
pub fn new(cfg: &RestCatalogConfig) -> Result<Self> {
5859
Ok(HttpClient {
5960
client: Client::new(),
@@ -66,6 +67,32 @@ impl HttpClient {
6667
})
6768
}
6869

70+
/// Update the http client with new configuration.
71+
///
72+
/// If cfg carries new value, we will use cfg instead.
73+
/// Otherwise, we will keep the old value.
74+
pub fn update_with(self, cfg: &RestCatalogConfig) -> Result<Self> {
75+
Ok(HttpClient {
76+
client: self.client,
77+
78+
token: Mutex::new(
79+
cfg.token()
80+
.or_else(|| self.token.into_inner().ok().flatten()),
81+
),
82+
token_endpoint: (!cfg.get_token_endpoint().is_empty())
83+
.then(|| cfg.get_token_endpoint())
84+
.unwrap_or(self.token_endpoint),
85+
credential: cfg.credential().or(self.credential),
86+
extra_headers: (!cfg.extra_headers()?.is_empty())
87+
.then(|| cfg.extra_headers())
88+
.transpose()?
89+
.unwrap_or(self.extra_headers),
90+
extra_oauth_params: (!cfg.extra_oauth_params().is_empty())
91+
.then(|| cfg.extra_oauth_params())
92+
.unwrap_or(self.extra_oauth_params),
93+
})
94+
}
95+
6996
/// This API is testing only to assert the token.
7097
#[cfg(test)]
7198
pub(crate) async fn token(&self) -> Option<String> {
@@ -134,28 +161,39 @@ impl HttpClient {
134161
.request(Method::POST, &self.token_endpoint)
135162
.form(&params)
136163
.build()?;
164+
let auth_url = auth_req.url().clone();
137165
let auth_resp = self.client.execute(auth_req).await?;
138166

139167
let auth_res: TokenResponse = if auth_resp.status().as_u16() == OK {
140-
let text = auth_resp.bytes().await?;
168+
let text = auth_resp
169+
.bytes()
170+
.await
171+
.map_err(|err| err.with_url(auth_url.clone()))?;
141172
Ok(serde_json::from_slice(&text).map_err(|e| {
142173
Error::new(
143174
ErrorKind::Unexpected,
144175
"Failed to parse response from rest catalog server!",
145176
)
177+
.with_context("operation", "auth")
178+
.with_context("url", auth_url.to_string())
146179
.with_context("json", String::from_utf8_lossy(&text))
147180
.with_source(e)
148181
})?)
149182
} else {
150183
let code = auth_resp.status();
151-
let text = auth_resp.bytes().await?;
184+
let text = auth_resp
185+
.bytes()
186+
.await
187+
.map_err(|err| err.with_url(auth_url.clone()))?;
152188
let e: ErrorResponse = serde_json::from_slice(&text).map_err(|e| {
153189
Error::new(
154190
ErrorKind::Unexpected,
155191
"Failed to parse response from rest catalog server!",
156192
)
157-
.with_context("json", String::from_utf8_lossy(&text))
158193
.with_context("code", code.to_string())
194+
.with_context("operation", "auth")
195+
.with_context("url", auth_url.to_string())
196+
.with_context("json", String::from_utf8_lossy(&text))
159197
.with_source(e)
160198
})?;
161199
Err(Error::from(e))
@@ -193,28 +231,41 @@ impl HttpClient {
193231
) -> Result<R> {
194232
self.authenticate(&mut request).await?;
195233

234+
let method = request.method().clone();
235+
let url = request.url().clone();
236+
196237
let resp = self.client.execute(request).await?;
197238

198239
if resp.status().as_u16() == SUCCESS_CODE {
199-
let text = resp.bytes().await?;
240+
let text = resp
241+
.bytes()
242+
.await
243+
.map_err(|err| err.with_url(url.clone()))?;
200244
Ok(serde_json::from_slice::<R>(&text).map_err(|e| {
201245
Error::new(
202246
ErrorKind::Unexpected,
203247
"Failed to parse response from rest catalog server!",
204248
)
249+
.with_context("method", method.to_string())
250+
.with_context("url", url.to_string())
205251
.with_context("json", String::from_utf8_lossy(&text))
206252
.with_source(e)
207253
})?)
208254
} else {
209255
let code = resp.status();
210-
let text = resp.bytes().await?;
256+
let text = resp
257+
.bytes()
258+
.await
259+
.map_err(|err| err.with_url(url.clone()))?;
211260
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
212261
Error::new(
213262
ErrorKind::Unexpected,
214263
"Failed to parse response from rest catalog server!",
215264
)
216-
.with_context("json", String::from_utf8_lossy(&text))
217265
.with_context("code", code.to_string())
266+
.with_context("method", method.to_string())
267+
.with_context("url", url.to_string())
268+
.with_context("json", String::from_utf8_lossy(&text))
218269
.with_source(e)
219270
})?;
220271
Err(e.into())
@@ -227,20 +278,28 @@ impl HttpClient {
227278
) -> Result<()> {
228279
self.authenticate(&mut request).await?;
229280

281+
let method = request.method().clone();
282+
let url = request.url().clone();
283+
230284
let resp = self.client.execute(request).await?;
231285

232286
if resp.status().as_u16() == SUCCESS_CODE {
233287
Ok(())
234288
} else {
235289
let code = resp.status();
236-
let text = resp.bytes().await?;
290+
let text = resp
291+
.bytes()
292+
.await
293+
.map_err(|err| err.with_url(url.clone()))?;
237294
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
238295
Error::new(
239296
ErrorKind::Unexpected,
240297
"Failed to parse response from rest catalog server!",
241298
)
242-
.with_context("json", String::from_utf8_lossy(&text))
243299
.with_context("code", code.to_string())
300+
.with_context("method", method.to_string())
301+
.with_context("url", url.to_string())
302+
.with_context("json", String::from_utf8_lossy(&text))
244303
.with_source(e)
245304
})?;
246305
Err(e.into())
@@ -255,19 +314,27 @@ impl HttpClient {
255314
) -> Result<R> {
256315
self.authenticate(&mut request).await?;
257316

317+
let method = request.method().clone();
318+
let url = request.url().clone();
319+
258320
let resp = self.client.execute(request).await?;
259321

260322
if let Some(ret) = handler(&resp) {
261323
Ok(ret)
262324
} else {
263325
let code = resp.status();
264-
let text = resp.bytes().await?;
326+
let text = resp
327+
.bytes()
328+
.await
329+
.map_err(|err| err.with_url(url.clone()))?;
265330
let e = serde_json::from_slice::<E>(&text).map_err(|e| {
266331
Error::new(
267332
ErrorKind::Unexpected,
268333
"Failed to parse response from rest catalog server!",
269334
)
270335
.with_context("code", code.to_string())
336+
.with_context("method", method.to_string())
337+
.with_context("url", url.to_string())
271338
.with_context("json", String::from_utf8_lossy(&text))
272339
.with_source(e)
273340
})?;

crates/iceberg/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ futures = { workspace = true }
6363
itertools = { workspace = true }
6464
moka = { version = "0.12.8", features = ["future"] }
6565
murmur3 = { workspace = true }
66+
num-bigint = { workspace = true }
6667
once_cell = { workspace = true }
6768
opendal = { workspace = true }
6869
ordered-float = { workspace = true }
@@ -81,6 +82,7 @@ tokio = { workspace = true, optional = true }
8182
typed-builder = { workspace = true }
8283
url = { workspace = true }
8384
uuid = { workspace = true }
85+
zstd = { workspace = true }
8486

8587
[dev-dependencies]
8688
ctor = { workspace = true }

0 commit comments

Comments
 (0)