diff --git a/.github/workflows/MainDistributionPipeline.yml b/.github/workflows/MainDistributionPipeline.yml index d14857f2..ec318563 100644 --- a/.github/workflows/MainDistributionPipeline.yml +++ b/.github/workflows/MainDistributionPipeline.yml @@ -14,19 +14,19 @@ concurrency: jobs: duckdb-stable-build: name: Build extension binaries - uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@v1.2.1 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_distribution.yml@main with: extension_name: httpfs - duckdb_version: v1.2.1 - ci_tools_version: v1.2.1 + duckdb_version: v1.3.0 + ci_tools_version: main duckdb-stable-deploy: name: Deploy extension binaries needs: duckdb-stable-build - uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@v1.2.1 + uses: duckdb/extension-ci-tools/.github/workflows/_extension_deploy.yml@main secrets: inherit with: extension_name: httpfs - duckdb_version: v1.2.1 - ci_tools_version: v1.2.1 - deploy_latest: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }} \ No newline at end of file + duckdb_version: v1.3.0 + ci_tools_version: main + deploy_latest: ${{ startsWith(github.ref, 'refs/tags/v') || github.ref == 'refs/heads/main' }} diff --git a/CMakeLists.txt b/CMakeLists.txt index 9a8d39b9..92d45479 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,6 +14,7 @@ build_static_extension( extension/httpfs/hffs.cpp extension/httpfs/s3fs.cpp extension/httpfs/httpfs.cpp + extension/httpfs/httpfs_client.cpp extension/httpfs/http_state.cpp extension/httpfs/crypto.cpp extension/httpfs/create_secret_functions.cpp @@ -23,10 +24,10 @@ set(PARAMETERS "-warnings") build_loadable_extension( httpfs ${PARAMETERS} - extension/httpfs/httpfs extension/httpfs/hffs.cpp extension/httpfs/s3fs.cpp extension/httpfs/httpfs.cpp + extension/httpfs/httpfs_client.cpp extension/httpfs/http_state.cpp extension/httpfs/crypto.cpp extension/httpfs/create_secret_functions.cpp diff --git a/duckdb b/duckdb index 8e52ec43..71c5c07c 160000 --- a/duckdb +++ b/duckdb @@ -1 +1 @@ -Subproject commit 8e52ec43959ab363643d63cb78ee214577111da4 +Subproject commit 71c5c07cdd295e9409c0505885033ae9eb6b5ddd diff --git a/extension-ci-tools b/extension-ci-tools index 58970c53..555f1c85 160000 --- a/extension-ci-tools +++ b/extension-ci-tools @@ -1 +1 @@ -Subproject commit 58970c538d35919db875096460c05806056f4de0 +Subproject commit 555f1c857dbe27f87b3e9c7656354d37c239ffaf diff --git a/extension/httpfs/create_secret_functions.cpp b/extension/httpfs/create_secret_functions.cpp index 52fdeae0..b3984b33 100644 --- a/extension/httpfs/create_secret_functions.cpp +++ b/extension/httpfs/create_secret_functions.cpp @@ -12,7 +12,7 @@ void CreateS3SecretFunctions::Register(DatabaseInstance &instance) { RegisterCreateSecretFunction(instance, "gcs"); } -static Value MapToStruct(const Value &map){ +static Value MapToStruct(const Value &map) { auto children = MapValue::GetChildren(map); child_list_t struct_fields; @@ -109,17 +109,21 @@ unique_ptr CreateS3SecretFunctions::CreateSecretFunctionInternal(Cli refresh = true; secret->secret_map["refresh_info"] = MapToStruct(named_param.second); } else { - throw InvalidInputException("Unknown named parameter passed to CreateSecretFunctionInternal: " + lower_name); + throw InvalidInputException("Unknown named parameter passed to CreateSecretFunctionInternal: " + + lower_name); } } return std::move(secret); } -CreateSecretInfo CreateS3SecretFunctions::GenerateRefreshSecretInfo(const SecretEntry &secret_entry, Value &refresh_info) { - const auto &kv_secret = dynamic_cast(*secret_entry.secret); +CreateSecretInput CreateS3SecretFunctions::GenerateRefreshSecretInfo(const SecretEntry &secret_entry, + Value &refresh_info) { + const auto &kv_secret = dynamic_cast(*secret_entry.secret); - CreateSecretInfo result(OnCreateConflict::REPLACE_ON_CONFLICT, secret_entry.persist_type); + CreateSecretInput result; + result.on_conflict = OnCreateConflict::REPLACE_ON_CONFLICT; + result.persist_type = SecretPersistType::TEMPORARY; result.type = kv_secret.GetType(); result.name = kv_secret.GetName(); @@ -141,7 +145,7 @@ CreateSecretInfo CreateS3SecretFunctions::GenerateRefreshSecretInfo(const Secret //! Function that will automatically try to refresh a secret bool CreateS3SecretFunctions::TryRefreshS3Secret(ClientContext &context, const SecretEntry &secret_to_refresh) { - const auto &kv_secret = dynamic_cast(*secret_to_refresh.secret); + const auto &kv_secret = dynamic_cast(*secret_to_refresh.secret); Value refresh_info; if (!kv_secret.TryGetValue("refresh_info", refresh_info)) { @@ -153,12 +157,15 @@ bool CreateS3SecretFunctions::TryRefreshS3Secret(ClientContext &context, const S // TODO: change SecretManager API to avoid requiring catching this exception try { auto res = secret_manager.CreateSecret(context, refresh_input); - auto &new_secret = dynamic_cast(*res->secret); - DUCKDB_LOG_INFO(context, "httpfs.SecretRefresh", "Successfully refreshed secret: %s, new key_id: %s", secret_to_refresh.secret->GetName(), new_secret.TryGetValue("key_id").ToString()); + auto &new_secret = dynamic_cast(*res->secret); + DUCKDB_LOG_INFO(context, "Successfully refreshed secret: %s, new key_id: %s", + secret_to_refresh.secret->GetName(), new_secret.TryGetValue("key_id").ToString()); return true; } catch (std::exception &ex) { ErrorData error(ex); - string new_message = StringUtil::Format("Exception thrown while trying to refresh secret %s. To fix this, please recreate or remove the secret and try again. Error: '%s'", secret_to_refresh.secret->GetName(), error.Message()); + string new_message = StringUtil::Format("Exception thrown while trying to refresh secret %s. To fix this, " + "please recreate or remove the secret and try again. Error: '%s'", + secret_to_refresh.secret->GetName(), error.Message()); throw Exception(error.Type(), new_message); } } @@ -204,6 +211,7 @@ void CreateS3SecretFunctions::RegisterCreateSecretFunction(DatabaseInstance &ins secret_type.name = type; secret_type.deserializer = KeyValueSecret::Deserialize; secret_type.default_provider = "config"; + secret_type.extension = "httpfs"; ExtensionUtil::RegisterSecretType(instance, secret_type); @@ -218,6 +226,7 @@ void CreateBearerTokenFunctions::Register(DatabaseInstance &instance) { secret_type_hf.name = HUGGINGFACE_TYPE; secret_type_hf.deserializer = KeyValueSecret::Deserialize; secret_type_hf.default_provider = "config"; + secret_type_hf.extension = "httpfs"; ExtensionUtil::RegisterSecretType(instance, secret_type_hf); // Huggingface config provider diff --git a/extension/httpfs/crypto.cpp b/extension/httpfs/crypto.cpp index af56f11a..04bd795e 100644 --- a/extension/httpfs/crypto.cpp +++ b/extension/httpfs/crypto.cpp @@ -31,68 +31,81 @@ void hex256(hash_bytes &in, hash_str &out) { } } -const EVP_CIPHER *GetCipher(const string &key) { - // For now, we only support GCM ciphers - switch (key.size()) { - case 16: - return EVP_aes_128_gcm(); - case 24: - return EVP_aes_192_gcm(); - case 32: - return EVP_aes_256_gcm(); - default: - throw InternalException("Invalid AES key length"); - } -} - -AESGCMStateSSL::AESGCMStateSSL() : gcm_context(EVP_CIPHER_CTX_new()) { - if (!(gcm_context)) { +AESStateSSL::AESStateSSL(const std::string *key) : context(EVP_CIPHER_CTX_new()) { + if (!(context)) { throw InternalException("AES GCM failed with initializing context"); } } -AESGCMStateSSL::~AESGCMStateSSL() { +AESStateSSL::~AESStateSSL() { // Clean up - EVP_CIPHER_CTX_free(gcm_context); + EVP_CIPHER_CTX_free(context); } -bool AESGCMStateSSL::IsOpenSSL() { - return ssl; +const EVP_CIPHER *AESStateSSL::GetCipher(const string &key) { + + switch (cipher) { + case GCM: + switch (key.size()) { + case 16: + return EVP_aes_128_gcm(); + case 24: + return EVP_aes_192_gcm(); + case 32: + return EVP_aes_256_gcm(); + default: + throw InternalException("Invalid AES key length"); + } + case CTR: + switch (key.size()) { + case 16: + return EVP_aes_128_ctr(); + case 24: + return EVP_aes_192_ctr(); + case 32: + return EVP_aes_256_ctr(); + default: + throw InternalException("Invalid AES key length"); + } + + default: + throw duckdb::InternalException("Invalid Encryption/Decryption Cipher: %d", static_cast(cipher)); + } } -void AESGCMStateSSL::GenerateRandomData(data_ptr_t data, idx_t len) { +void AESStateSSL::GenerateRandomData(data_ptr_t data, idx_t len) { // generate random bytes for nonce RAND_bytes(data, len); } -void AESGCMStateSSL::InitializeEncryption(const_data_ptr_t iv, idx_t iv_len, const string *key) { +void AESStateSSL::InitializeEncryption(const_data_ptr_t iv, idx_t iv_len, const string *key) { mode = ENCRYPT; - if (1 != EVP_EncryptInit_ex(gcm_context, GetCipher(*key), NULL, const_data_ptr_cast(key->data()), iv)) { + if (1 != EVP_EncryptInit_ex(context, GetCipher(*key), NULL, const_data_ptr_cast(key->data()), iv)) { throw InternalException("EncryptInit failed"); } } -void AESGCMStateSSL::InitializeDecryption(const_data_ptr_t iv, idx_t iv_len, const string *key) { +void AESStateSSL::InitializeDecryption(const_data_ptr_t iv, idx_t iv_len, const string *key) { mode = DECRYPT; - if (1 != EVP_DecryptInit_ex(gcm_context, GetCipher(*key), NULL, const_data_ptr_cast(key->data()), iv)) { + if (1 != EVP_DecryptInit_ex(context, GetCipher(*key), NULL, const_data_ptr_cast(key->data()), iv)) { throw InternalException("DecryptInit failed"); } } -size_t AESGCMStateSSL::Process(const_data_ptr_t in, idx_t in_len, data_ptr_t out, idx_t out_len) { +size_t AESStateSSL::Process(const_data_ptr_t in, idx_t in_len, data_ptr_t out, idx_t out_len) { switch (mode) { case ENCRYPT: - if (1 != EVP_EncryptUpdate(gcm_context, data_ptr_cast(out), reinterpret_cast(&out_len), + if (1 != EVP_EncryptUpdate(context, data_ptr_cast(out), reinterpret_cast(&out_len), const_data_ptr_cast(in), (int)in_len)) { throw InternalException("EncryptUpdate failed"); } break; case DECRYPT: - if (1 != EVP_DecryptUpdate(gcm_context, data_ptr_cast(out), reinterpret_cast(&out_len), + if (1 != EVP_DecryptUpdate(context, data_ptr_cast(out), reinterpret_cast(&out_len), const_data_ptr_cast(in), (int)in_len)) { throw InternalException("DecryptUpdate failed"); @@ -107,30 +120,30 @@ size_t AESGCMStateSSL::Process(const_data_ptr_t in, idx_t in_len, data_ptr_t out return out_len; } -size_t AESGCMStateSSL::Finalize(data_ptr_t out, idx_t out_len, data_ptr_t tag, idx_t tag_len) { +size_t AESStateSSL::FinalizeGCM(data_ptr_t out, idx_t out_len, data_ptr_t tag, idx_t tag_len) { auto text_len = out_len; switch (mode) { - case ENCRYPT: - { - if (1 != EVP_EncryptFinal_ex(gcm_context, data_ptr_cast(out) + out_len, reinterpret_cast(&out_len))) { + case ENCRYPT: { + if (1 != EVP_EncryptFinal_ex(context, data_ptr_cast(out) + out_len, reinterpret_cast(&out_len))) { throw InternalException("EncryptFinal failed"); } text_len += out_len; + // The computed tag is written at the end of a chunk - if (1 != EVP_CIPHER_CTX_ctrl(gcm_context, EVP_CTRL_GCM_GET_TAG, tag_len, tag)) { + if (1 != EVP_CIPHER_CTX_ctrl(context, EVP_CTRL_GCM_GET_TAG, tag_len, tag)) { throw InternalException("Calculating the tag failed"); } return text_len; } - case DECRYPT: - { + case DECRYPT: { // Set expected tag value - if (!EVP_CIPHER_CTX_ctrl(gcm_context, EVP_CTRL_GCM_SET_TAG, tag_len, tag)) { + if (!EVP_CIPHER_CTX_ctrl(context, EVP_CTRL_GCM_SET_TAG, tag_len, tag)) { throw InternalException("Finalizing tag failed"); } + // EVP_DecryptFinal() will return an error code if final block is not correctly formatted. - int ret = EVP_DecryptFinal_ex(gcm_context, data_ptr_cast(out) + out_len, reinterpret_cast(&out_len)); + int ret = EVP_DecryptFinal_ex(context, data_ptr_cast(out) + out_len, reinterpret_cast(&out_len)); text_len += out_len; if (ret > 0) { @@ -144,12 +157,46 @@ size_t AESGCMStateSSL::Finalize(data_ptr_t out, idx_t out_len, data_ptr_t tag, i } } +size_t AESStateSSL::Finalize(data_ptr_t out, idx_t out_len, data_ptr_t tag, idx_t tag_len) { + + if (cipher == GCM) { + return FinalizeGCM(out, out_len, tag, tag_len); + } + + auto text_len = out_len; + switch (mode) { + + case ENCRYPT: { + if (1 != EVP_EncryptFinal_ex(context, data_ptr_cast(out) + out_len, reinterpret_cast(&out_len))) { + throw InternalException("EncryptFinal failed"); + } + + return text_len += out_len; + } + + case DECRYPT: { + // EVP_DecryptFinal() will return an error code if final block is not correctly formatted. + int ret = EVP_DecryptFinal_ex(context, data_ptr_cast(out) + out_len, reinterpret_cast(&out_len)); + text_len += out_len; + + if (ret > 0) { + // success + return text_len; + } + + throw InvalidInputException("Computed AES tag differs from read AES tag, are you using the right key?"); + } + default: + throw InternalException("Unhandled encryption mode %d", static_cast(mode)); + } +} + } // namespace duckdb extern "C" { // Call the member function through the factory object -DUCKDB_EXTENSION_API AESGCMStateSSLFactory *CreateSSLFactory() { - return new AESGCMStateSSLFactory(); +DUCKDB_EXTENSION_API AESStateSSLFactory *CreateSSLFactory() { + return new AESStateSSLFactory(); }; } diff --git a/extension/httpfs/hffs.cpp b/extension/httpfs/hffs.cpp index 0273141d..d91e2cf2 100644 --- a/extension/httpfs/hffs.cpp +++ b/extension/httpfs/hffs.cpp @@ -12,9 +12,6 @@ #include #include -#define CPPHTTPLIB_OPENSSL_SUPPORT -#include "httplib.hpp" - #include namespace duckdb { @@ -47,49 +44,34 @@ static string ParseNextUrlFromLinkHeader(const string &link_header_content) { HFFileHandle::~HFFileHandle() {}; -unique_ptr HFFileHandle::CreateClient(optional_ptr client_context) { - return HTTPFileSystem::GetClient(this->http_params, parsed_url.endpoint.c_str(), this); +unique_ptr HFFileHandle::CreateClient() { + return http_params.http_util.InitializeClient(http_params, parsed_url.endpoint); } -string HuggingFaceFileSystem::ListHFRequest(ParsedHFUrl &url, HTTPParams &http_params, string &next_page_url, +string HuggingFaceFileSystem::ListHFRequest(ParsedHFUrl &url, HTTPFSParams &http_params, string &next_page_url, optional_ptr state) { - HeaderMap header_map; - auto headers = HTTPFileSystem::InitializeHeaders(header_map, http_params); + HTTPHeaders header_map; string link_header_result; - auto client = HTTPFileSystem::GetClient(http_params, url.endpoint.c_str(), nullptr); std::stringstream response; - - std::function request([&]() { - if (state) { - state->get_count++; - } - - return client->Get( - next_page_url.c_str(), *headers, - [&](const duckdb_httplib_openssl::Response &response) { - if (response.status >= 400) { - throw HTTPException(response, "HTTP GET error on '%s' (HTTP %d)", next_page_url, response.status); - } - auto link_res = response.headers.find("Link"); - if (link_res != response.headers.end()) { - link_header_result = link_res->second; - } - return true; - }, - [&](const char *data, size_t data_length) { - if (state) { - state->total_bytes_received += data_length; - } - response << string(data, data_length); - return true; - }); - }); - - auto res = RunRequestWithRetry(request, next_page_url, "GET", http_params, nullptr); - - if (res->code != 200) { - throw IOException(res->error + " error for HTTP GET to '" + next_page_url + "'"); + GetRequestInfo get_request( + url.endpoint, next_page_url, header_map, http_params, + [&](const HTTPResponse &response) { + if (static_cast(response.status) >= 400) { + throw HTTPException(response, "HTTP GET error on '%s' (HTTP %d)", next_page_url, response.status); + } + if (response.HasHeader("Link")) { + link_header_result = response.GetHeaderValue("Link"); + } + return true; + }, + [&](const_data_ptr_t data, idx_t data_length) { + response << string(const_char_ptr_cast(data), data_length); + return true; + }); + auto res = http_params.http_util.Request(get_request); + if (res->status != HTTPStatusCode::OK_200) { + throw IOException(res->GetError() + " error for HTTP GET to '" + next_page_url + "'"); } if (!link_header_result.empty()) { @@ -201,7 +183,7 @@ void ParseListResult(string &input, vector &files, vector &direc // - hf://datasets/lhoestq/demo1/default/train/*.parquet // - hf://datasets/lhoestq/demo1/*/train/file_[abc].parquet // - hf://datasets/lhoestq/demo1/**/train/*.parquet -vector HuggingFaceFileSystem::Glob(const string &path, FileOpener *opener) { +vector HuggingFaceFileSystem::Glob(const string &path, FileOpener *opener) { // Ensure the glob pattern is a valid HF url auto parsed_glob_url = HFUrlParse(path); auto first_wildcard_pos = parsed_glob_url.path.find_first_of("*[\\"); @@ -223,7 +205,9 @@ vector HuggingFaceFileSystem::Glob(const string &path, FileOpener *opene FileOpenerInfo info; info.file_path = path; - auto http_params = HTTPParams::ReadFrom(opener, info); + auto http_util = HTTPFSUtil::GetHTTPUtil(opener); + auto params = http_util->InitializeParameters(opener, info); + auto &http_params = params->Cast(); SetParams(http_params, path, opener); auto http_state = HTTPState::TryGetState(opener).get(); @@ -251,7 +235,7 @@ vector HuggingFaceFileSystem::Glob(const string &path, FileOpener *opene } vector pattern_splits = StringUtil::Split(parsed_glob_url.path, "/"); - vector result; + vector result; for (const auto &file : files) { vector file_splits = StringUtil::Split(file, "/"); @@ -267,43 +251,43 @@ vector HuggingFaceFileSystem::Glob(const string &path, FileOpener *opene return result; } -unique_ptr HuggingFaceFileSystem::HeadRequest(FileHandle &handle, string hf_url, - HeaderMap header_map) { +unique_ptr HuggingFaceFileSystem::HeadRequest(FileHandle &handle, string hf_url, HTTPHeaders header_map) { auto &hf_handle = handle.Cast(); auto http_url = HuggingFaceFileSystem::GetFileUrl(hf_handle.parsed_url); return HTTPFileSystem::HeadRequest(handle, http_url, header_map); } -unique_ptr HuggingFaceFileSystem::GetRequest(FileHandle &handle, string s3_url, HeaderMap header_map) { +unique_ptr HuggingFaceFileSystem::GetRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) { auto &hf_handle = handle.Cast(); auto http_url = HuggingFaceFileSystem::GetFileUrl(hf_handle.parsed_url); return HTTPFileSystem::GetRequest(handle, http_url, header_map); } -unique_ptr HuggingFaceFileSystem::GetRangeRequest(FileHandle &handle, string s3_url, - HeaderMap header_map, idx_t file_offset, - char *buffer_out, idx_t buffer_out_len) { +unique_ptr HuggingFaceFileSystem::GetRangeRequest(FileHandle &handle, string s3_url, + HTTPHeaders header_map, idx_t file_offset, + char *buffer_out, idx_t buffer_out_len) { auto &hf_handle = handle.Cast(); auto http_url = HuggingFaceFileSystem::GetFileUrl(hf_handle.parsed_url); return HTTPFileSystem::GetRangeRequest(handle, http_url, header_map, file_offset, buffer_out, buffer_out_len); } -unique_ptr HuggingFaceFileSystem::CreateHandle(const string &path, FileOpenFlags flags, +unique_ptr HuggingFaceFileSystem::CreateHandle(const OpenFileInfo &file, FileOpenFlags flags, optional_ptr opener) { D_ASSERT(flags.Compression() == FileCompressionType::UNCOMPRESSED); - auto parsed_url = HFUrlParse(path); + auto parsed_url = HFUrlParse(file.path); FileOpenerInfo info; - info.file_path = path; + info.file_path = file.path; - auto params = HTTPParams::ReadFrom(opener, info); - SetParams(params, path, opener); + auto http_util = HTTPFSUtil::GetHTTPUtil(opener); + auto params = http_util->InitializeParameters(opener, info); + SetParams(params->Cast(), file.path, opener); - return duckdb::make_uniq(*this, std::move(parsed_url), path, flags, params); + return duckdb::make_uniq(*this, std::move(parsed_url), file, flags, std::move(params)); } -void HuggingFaceFileSystem::SetParams(HTTPParams ¶ms, const string &path, optional_ptr opener) { +void HuggingFaceFileSystem::SetParams(HTTPFSParams ¶ms, const string &path, optional_ptr opener) { auto secret_manager = FileOpener::TryGetSecretManager(opener); auto transaction = FileOpener::TryGetCatalogTransaction(opener); if (secret_manager && transaction) { diff --git a/extension/httpfs/httpfs.cpp b/extension/httpfs/httpfs.cpp index e18ca418..34bcc5d1 100644 --- a/extension/httpfs/httpfs.cpp +++ b/extension/httpfs/httpfs.cpp @@ -8,7 +8,7 @@ #include "duckdb/common/thread.hpp" #include "duckdb/common/types/hash.hpp" #include "duckdb/function/scalar/strftime_format.hpp" -#include "duckdb/logging/http_logger.hpp" +#include "duckdb/logging/file_system_logger.hpp" #include "duckdb/main/client_context.hpp" #include "duckdb/main/database.hpp" #include "duckdb/main/secret/secret_manager.hpp" @@ -19,64 +19,57 @@ #include #include -#define CPPHTTPLIB_OPENSSL_SUPPORT -#include "httplib.hpp" - namespace duckdb { -duckdb::unique_ptr HTTPFileSystem::InitializeHeaders(HeaderMap &header_map, - const HTTPParams &http_params) { - auto headers = make_uniq(); - for (auto &entry : header_map) { - headers->insert(entry); - } - - for (auto &entry : http_params.extra_headers) { - headers->insert(entry); +shared_ptr HTTPFSUtil::GetHTTPUtil(optional_ptr opener) { + if (opener) { + auto db = opener->TryGetDatabase(); + if (db) { + auto &config = DBConfig::GetConfig(*db); + return config.http_util; + } } - - return headers; + return make_shared_ptr(); } -HTTPParams HTTPParams::ReadFrom(optional_ptr opener, optional_ptr info) { - auto result = HTTPParams(); +unique_ptr HTTPFSUtil::InitializeParameters(optional_ptr opener, + optional_ptr info) { + auto result = make_uniq(*this); + result->Initialize(opener); // No point in continueing without an opener if (!opener) { - return result; + return std::move(result); } Value value; // Setting lookups - FileOpener::TryGetCurrentSetting(opener, "http_timeout", result.timeout, info); - FileOpener::TryGetCurrentSetting(opener, "force_download", result.force_download, info); - FileOpener::TryGetCurrentSetting(opener, "http_retries", result.retries, info); - FileOpener::TryGetCurrentSetting(opener, "http_retry_wait_ms", result.retry_wait_ms, info); - FileOpener::TryGetCurrentSetting(opener, "http_retry_backoff", result.retry_backoff, info); - FileOpener::TryGetCurrentSetting(opener, "http_keep_alive", result.keep_alive, info); - FileOpener::TryGetCurrentSetting(opener, "enable_server_cert_verification", result.enable_server_cert_verification, + FileOpener::TryGetCurrentSetting(opener, "http_timeout", result->timeout, info); + FileOpener::TryGetCurrentSetting(opener, "force_download", result->force_download, info); + FileOpener::TryGetCurrentSetting(opener, "http_retries", result->retries, info); + FileOpener::TryGetCurrentSetting(opener, "http_retry_wait_ms", result->retry_wait_ms, info); + FileOpener::TryGetCurrentSetting(opener, "http_retry_backoff", result->retry_backoff, info); + FileOpener::TryGetCurrentSetting(opener, "http_keep_alive", result->keep_alive, info); + FileOpener::TryGetCurrentSetting(opener, "enable_server_cert_verification", result->enable_server_cert_verification, info); - FileOpener::TryGetCurrentSetting(opener, "ca_cert_file", result.ca_cert_file, info); - FileOpener::TryGetCurrentSetting(opener, "hf_max_per_page", result.hf_max_per_page, info); + FileOpener::TryGetCurrentSetting(opener, "ca_cert_file", result->ca_cert_file, info); + FileOpener::TryGetCurrentSetting(opener, "hf_max_per_page", result->hf_max_per_page, info); // HTTP Secret lookups KeyValueSecretReader settings_reader(*opener, info, "http"); string proxy_setting; - if (settings_reader.TryGetSecretKeyOrSetting("http_proxy", "http_proxy", proxy_setting) && - !proxy_setting.empty()) { + if (settings_reader.TryGetSecretKey("http_proxy", proxy_setting) && !proxy_setting.empty()) { idx_t port; string host; HTTPUtil::ParseHTTPProxyHost(proxy_setting, host, port); - result.http_proxy = host; - result.http_proxy_port = port; + result->http_proxy = host; + result->http_proxy_port = port; } - settings_reader.TryGetSecretKeyOrSetting("http_proxy_username", "http_proxy_username", - result.http_proxy_username); - settings_reader.TryGetSecretKeyOrSetting("http_proxy_password", "http_proxy_password", - result.http_proxy_password); - settings_reader.TryGetSecretKey("bearer_token", result.bearer_token); + settings_reader.TryGetSecretKey("http_proxy_username", result->http_proxy_username); + settings_reader.TryGetSecretKey("http_proxy_password", result->http_proxy_password); + settings_reader.TryGetSecretKey("bearer_token", result->bearer_token); Value extra_headers; if (settings_reader.TryGetSecretKey("extra_http_headers", extra_headers)) { @@ -84,14 +77,14 @@ HTTPParams HTTPParams::ReadFrom(optional_ptr opener, optional_ptr()] = kv[1].GetValue(); + result->extra_headers[kv[0].GetValue()] = kv[1].GetValue(); } } - return result; + return std::move(result); } -unique_ptr HTTPClientCache::GetClient() { +unique_ptr HTTPClientCache::GetClient() { lock_guard lck(lock); if (clients.size() == 0) { return nullptr; @@ -102,407 +95,245 @@ unique_ptr HTTPClientCache::GetClient() { return client; } -void HTTPClientCache::StoreClient(unique_ptr client) { +void HTTPClientCache::StoreClient(unique_ptr client) { lock_guard lck(lock); clients.push_back(std::move(client)); } -void HTTPFileSystem::ParseUrl(string &url, string &path_out, string &proto_host_port_out) { - if (url.rfind("http://", 0) != 0 && url.rfind("https://", 0) != 0) { - throw IOException("URL needs to start with http:// or https://"); - } - auto slash_pos = url.find('/', 8); - if (slash_pos == string::npos) { - throw IOException("URL needs to contain a '/' after the host"); - } - proto_host_port_out = url.substr(0, slash_pos); - - path_out = url.substr(slash_pos); - - if (path_out.empty()) { - throw IOException("URL needs to contain a path"); - } -} - -// Retry the request performed by fun using the exponential backoff strategy defined in params. Before retry, the -// retry callback is called -duckdb::unique_ptr -HTTPFileSystem::RunRequestWithRetry(const std::function &request, string &url, - string method, const HTTPParams ¶ms, - const std::function &retry_cb) { - idx_t tries = 0; - while (true) { - std::exception_ptr caught_e = nullptr; - duckdb_httplib_openssl::Error err; - duckdb_httplib_openssl::Response response; - int status; - - try { - auto res = request(); - err = res.error(); - if (err == duckdb_httplib_openssl::Error::Success) { - status = res->status; - response = res.value(); - } - } catch (IOException &e) { - caught_e = std::current_exception(); - } catch (HTTPException &e) { - caught_e = std::current_exception(); - } - - // Note: all duckdb_httplib_openssl::Error types will be retried. - if (err == duckdb_httplib_openssl::Error::Success) { - switch (status) { - case 408: // Request Timeout - case 418: // Server is pretending to be a teapot - case 429: // Rate limiter hit - case 500: // Server has error - case 503: // Server has error - case 504: // Server has error - break; - default: - return make_uniq(response, url); - } - } - - tries += 1; - - if (tries <= params.retries) { - if (tries > 1) { - uint64_t sleep_amount = (uint64_t)((float)params.retry_wait_ms * pow(params.retry_backoff, tries - 2)); - std::this_thread::sleep_for(std::chrono::milliseconds(sleep_amount)); - } - if (retry_cb) { - retry_cb(); - } - } else { - if (caught_e) { - std::rethrow_exception(caught_e); - } else if (err == duckdb_httplib_openssl::Error::Success) { - throw HTTPException(response, "Request returned HTTP %d for HTTP %s to '%s'", status, method, url); - } else { - throw IOException("%s error for HTTP %s to '%s' with status %d", to_string(err), method, url, status); - } - } - } -} - -unique_ptr HTTPFileSystem::PostRequest(FileHandle &handle, string url, HeaderMap header_map, - duckdb::unique_ptr &buffer_out, idx_t &buffer_out_len, - char *buffer_in, idx_t buffer_in_len, string params) { +unique_ptr HTTPFileSystem::PostRequest(FileHandle &handle, string url, HTTPHeaders header_map, + string &buffer_out, char *buffer_in, idx_t buffer_in_len, + string params) { auto &hfh = handle.Cast(); - string path, proto_host_port; - ParseUrl(url, path, proto_host_port); - auto headers = InitializeHeaders(header_map, hfh.http_params); - idx_t out_offset = 0; - - std::function request([&]() { - auto client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); - - if (hfh.state) { - hfh.state->post_count++; - hfh.state->total_bytes_sent += buffer_in_len; - } - - // We use a custom Request method here, because there is no Post call with a contentreceiver in httplib - duckdb_httplib_openssl::Request req; - req.method = "POST"; - req.path = path; - req.headers = *headers; - req.headers.emplace("Content-Type", "application/octet-stream"); - req.content_receiver = [&](const char *data, size_t data_length, uint64_t /*offset*/, - uint64_t /*total_length*/) { - if (hfh.state) { - hfh.state->total_bytes_received += data_length; - } - if (out_offset + data_length > buffer_out_len) { - // Buffer too small, increase its size by at least 2x to fit the new value - auto new_size = MaxValue(out_offset + data_length, buffer_out_len * 2); - auto tmp = duckdb::unique_ptr {new char[new_size]}; - memcpy(tmp.get(), buffer_out.get(), buffer_out_len); - buffer_out = std::move(tmp); - buffer_out_len = new_size; - } - memcpy(buffer_out.get() + out_offset, data, data_length); - out_offset += data_length; - return true; - }; - req.body.assign(buffer_in, buffer_in_len); - return client->send(req); - }); - - return RunRequestWithRetry(request, url, "POST", hfh.http_params); -} - -unique_ptr HTTPFileSystem::GetClient(const HTTPParams &http_params, - const char *proto_host_port, - optional_ptr hfh) { - auto client = make_uniq(proto_host_port); - client->set_follow_location(true); - client->set_keep_alive(http_params.keep_alive); - if (!http_params.ca_cert_file.empty()) { - client->set_ca_cert_path(http_params.ca_cert_file.c_str()); - } - client->enable_server_certificate_verification(http_params.enable_server_cert_verification); - client->set_write_timeout(http_params.timeout, http_params.timeout_usec); - client->set_read_timeout(http_params.timeout, http_params.timeout_usec); - client->set_connection_timeout(http_params.timeout, http_params.timeout_usec); - client->set_decompress(false); - if (hfh && hfh->http_logger) { - client->set_logger( - hfh->http_logger->GetLogger()); - } - if (!http_params.bearer_token.empty()) { - client->set_bearer_token_auth(http_params.bearer_token.c_str()); - } - - if (!http_params.http_proxy.empty()) { - client->set_proxy(http_params.http_proxy, http_params.http_proxy_port); - - if (!http_params.http_proxy_username.empty()) { - client->set_proxy_basic_auth(http_params.http_proxy_username, http_params.http_proxy_password); - } - } - - return client; + auto &http_util = hfh.http_params.http_util; + PostRequestInfo post_request(url, header_map, hfh.http_params, const_data_ptr_cast(buffer_in), buffer_in_len); + auto result = http_util.Request(post_request); + buffer_out = std::move(post_request.buffer_out); + return result; } -unique_ptr HTTPFileSystem::PutRequest(FileHandle &handle, string url, HeaderMap header_map, - char *buffer_in, idx_t buffer_in_len, string params) { +unique_ptr HTTPFileSystem::PutRequest(FileHandle &handle, string url, HTTPHeaders header_map, + char *buffer_in, idx_t buffer_in_len, string params) { auto &hfh = handle.Cast(); - string path, proto_host_port; - ParseUrl(url, path, proto_host_port); - auto headers = InitializeHeaders(header_map, hfh.http_params); - - std::function request([&]() { - auto client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); - if (hfh.state) { - hfh.state->put_count++; - hfh.state->total_bytes_sent += buffer_in_len; - } - return client->Put(path.c_str(), *headers, buffer_in, buffer_in_len, "application/octet-stream"); - }); - - return RunRequestWithRetry(request, url, "PUT", hfh.http_params); + auto &http_util = hfh.http_params.http_util; + string content_type = "application/octet-stream"; + PutRequestInfo put_request(url, header_map, hfh.http_params, (const_data_ptr_t)buffer_in, buffer_in_len, + content_type); + return http_util.Request(put_request); } -unique_ptr HTTPFileSystem::HeadRequest(FileHandle &handle, string url, HeaderMap header_map) { +unique_ptr HTTPFileSystem::HeadRequest(FileHandle &handle, string url, HTTPHeaders header_map) { auto &hfh = handle.Cast(); - string path, proto_host_port; - ParseUrl(url, path, proto_host_port); - auto headers = InitializeHeaders(header_map, hfh.http_params); - auto http_client = hfh.GetClient(nullptr); - - std::function request([&]() { - if (hfh.state) { - hfh.state->head_count++; - } - return http_client->Head(path.c_str(), *headers); - }); + auto &http_util = hfh.http_params.http_util; + auto http_client = hfh.GetClient(); - // Refresh the client on retries - std::function on_retry( - [&]() { http_client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); }); + HeadRequestInfo head_request(url, header_map, hfh.http_params); + auto response = http_util.Request(head_request, http_client); - auto response = RunRequestWithRetry(request, url, "HEAD", hfh.http_params, on_retry); hfh.StoreClient(std::move(http_client)); return response; } -unique_ptr HTTPFileSystem::DeleteRequest(FileHandle &handle, string url, HeaderMap header_map) { - auto &hfh = handle.Cast(); - string path, proto_host_port; - ParseUrl(url, path, proto_host_port); - auto headers = InitializeHeaders(header_map, hfh.http_params); - auto http_client = hfh.GetClient(nullptr); - - std::function request([&]() { - if (hfh.state) { - hfh.state->delete_count++; - } - return http_client->Delete(path.c_str(), *headers); - }); - // Refresh the client on retries - std::function on_retry( - [&]() { http_client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); }); +unique_ptr HTTPFileSystem::DeleteRequest(FileHandle &handle, string url, HTTPHeaders header_map) { + auto &hfh = handle.Cast(); + auto &http_util = hfh.http_params.http_util; + auto http_client = hfh.GetClient(); + DeleteRequestInfo delete_request(url, header_map, hfh.http_params); + auto response = http_util.Request(delete_request, http_client); - auto response = RunRequestWithRetry(request, url, "DELETE", hfh.http_params, on_retry); hfh.StoreClient(std::move(http_client)); return response; } -unique_ptr HTTPFileSystem::GetRequest(FileHandle &handle, string url, HeaderMap header_map) { +HTTPException HTTPFileSystem::GetHTTPError(FileHandle &, const HTTPResponse &response, const string &url) { + auto status_message = HTTPFSUtil::GetStatusMessage(response.status); + string error = "HTTP GET error on '" + url + "' (HTTP " + to_string(static_cast(response.status)) + " " + + status_message + ")"; + if (response.status == HTTPStatusCode::RangeNotSatisfiable_416) { + error += " This could mean the file was changed. Try disabling the duckdb http metadata cache " + "if enabled, and confirm the server supports range requests."; + } + return HTTPException(response, error); +} + +unique_ptr HTTPFileSystem::GetRequest(FileHandle &handle, string url, HTTPHeaders header_map) { auto &hfh = handle.Cast(); - string path, proto_host_port; - ParseUrl(url, path, proto_host_port); - auto headers = InitializeHeaders(header_map, hfh.http_params); + auto &http_util = hfh.http_params.http_util; D_ASSERT(hfh.cached_file_handle); - auto http_client = hfh.GetClient(nullptr); - - std::function request([&]() { - D_ASSERT(hfh.state); - hfh.state->get_count++; - return http_client->Get( - path.c_str(), *headers, - [&](const duckdb_httplib_openssl::Response &response) { - if (response.status >= 400) { - string error = "HTTP GET error on '" + url + "' (HTTP " + to_string(response.status) + ")"; - if (response.status == 416) { - error += " This could mean the file was changed. Try disabling the duckdb http metadata cache " - "if enabled, and confirm the server supports range requests."; - } - throw IOException(error); + auto http_client = hfh.GetClient(); + GetRequestInfo get_request( + url, header_map, hfh.http_params, + [&](const HTTPResponse &response) { + if (static_cast(response.status) >= 400) { + string error = + "HTTP GET error on '" + url + "' (HTTP " + to_string(static_cast(response.status)) + ")"; + if (response.status == HTTPStatusCode::RangeNotSatisfiable_416) { + error += " This could mean the file was changed. Try disabling the duckdb http metadata cache " + "if enabled, and confirm the server supports range requests."; } - return true; - }, - [&](const char *data, size_t data_length) { - D_ASSERT(hfh.state); - if (hfh.state) { - hfh.state->total_bytes_received += data_length; + throw HTTPException(error); + } + return true; + }, + [&](const_data_ptr_t data, idx_t data_length) { + if (!hfh.cached_file_handle->GetCapacity()) { + hfh.cached_file_handle->AllocateBuffer(data_length); + hfh.length = data_length; + hfh.cached_file_handle->Write(const_char_ptr_cast(data), data_length); + } else { + auto new_capacity = hfh.cached_file_handle->GetCapacity(); + while (new_capacity < hfh.length + data_length) { + new_capacity *= 2; } - if (!hfh.cached_file_handle->GetCapacity()) { - hfh.cached_file_handle->AllocateBuffer(data_length); - hfh.length = data_length; - hfh.cached_file_handle->Write(data, data_length); - } else { - auto new_capacity = hfh.cached_file_handle->GetCapacity(); - while (new_capacity < hfh.length + data_length) { - new_capacity *= 2; - } - // Grow buffer when running out of space - if (new_capacity != hfh.cached_file_handle->GetCapacity()) { - hfh.cached_file_handle->GrowBuffer(new_capacity, hfh.length); - } - // We can just copy stuff - hfh.cached_file_handle->Write(data, data_length, hfh.length); - hfh.length += data_length; + // Grow buffer when running out of space + if (new_capacity != hfh.cached_file_handle->GetCapacity()) { + hfh.cached_file_handle->GrowBuffer(new_capacity, hfh.length); } - return true; - }); - }); + // We can just copy stuff + hfh.cached_file_handle->Write(const_char_ptr_cast(data), data_length, hfh.length); + hfh.length += data_length; + } + return true; + }); - std::function on_retry( - [&]() { http_client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); }); + auto response = http_util.Request(get_request, http_client); - auto response = RunRequestWithRetry(request, url, "GET", hfh.http_params, on_retry); hfh.StoreClient(std::move(http_client)); return response; } -unique_ptr HTTPFileSystem::GetRangeRequest(FileHandle &handle, string url, HeaderMap header_map, - idx_t file_offset, char *buffer_out, idx_t buffer_out_len) { +unique_ptr HTTPFileSystem::GetRangeRequest(FileHandle &handle, string url, HTTPHeaders header_map, + idx_t file_offset, char *buffer_out, idx_t buffer_out_len) { auto &hfh = handle.Cast(); - string path, proto_host_port; - ParseUrl(url, path, proto_host_port); - auto headers = InitializeHeaders(header_map, hfh.http_params); + auto &http_util = hfh.http_params.http_util; // send the Range header to read only subset of file string range_expr = "bytes=" + to_string(file_offset) + "-" + to_string(file_offset + buffer_out_len - 1); - headers->insert(pair("Range", range_expr)); + header_map.Insert("Range", range_expr); - auto http_client = hfh.GetClient(nullptr); + auto http_client = hfh.GetClient(); idx_t out_offset = 0; - std::function request([&]() { - if (hfh.state) { - hfh.state->get_count++; - } - return http_client->Get( - path.c_str(), *headers, - [&](const duckdb_httplib_openssl::Response &response) { - if (response.status >= 400) { - string error = "HTTP GET error on '" + url + "' (HTTP " + to_string(response.status) + ")"; - if (response.status == 416) { - error += " This could mean the file was changed. Try disabling the duckdb http metadata cache " - "if enabled, and confirm the server supports range requests."; - } - throw HTTPException(response, error); + GetRequestInfo get_request( + url, header_map, hfh.http_params, + [&](const HTTPResponse &response) { + if (static_cast(response.status) >= 400) { + string error = + "HTTP GET error on '" + url + "' (HTTP " + to_string(static_cast(response.status)) + ")"; + if (response.status == HTTPStatusCode::RangeNotSatisfiable_416) { + error += " This could mean the file was changed. Try disabling the duckdb http metadata cache " + "if enabled, and confirm the server supports range requests."; } - if (response.status < 300) { // done redirecting - out_offset = 0; - if (response.has_header("Content-Length")) { - auto content_length = stoll(response.get_header_value("Content-Length", 0)); - if ((idx_t)content_length != buffer_out_len) { - throw IOException("HTTP GET error: Content-Length from server mismatches requested " - "range, server may not support range requests."); - } + throw HTTPException(response, error); + } + if (static_cast(response.status) < 300) { // done redirecting + out_offset = 0; + if (response.HasHeader("Content-Length")) { + auto content_length = stoll(response.GetHeaderValue("Content-Length")); + if ((idx_t)content_length != buffer_out_len) { + throw HTTPException("HTTP GET error: Content-Length from server mismatches requested " + "range, server may not support range requests."); } } - return true; - }, - [&](const char *data, size_t data_length) { - if (hfh.state) { - hfh.state->total_bytes_received += data_length; - } - if (buffer_out != nullptr) { - if (data_length + out_offset > buffer_out_len) { - // As of v0.8.2-dev4424 we might end up here when very big files are served from servers - // that returns more data than requested via range header. This is an uncommon but legal - // behaviour, so we have to improve logic elsewhere to properly handle this case. - - // To avoid corruption of memory, we bail out. - throw IOException("Server sent back more data than expected, `SET force_download=true` might " - "help in this case"); - } - memcpy(buffer_out + out_offset, data, data_length); - out_offset += data_length; + } + return true; + }, + [&](const_data_ptr_t data, idx_t data_length) { + if (buffer_out != nullptr) { + if (data_length + out_offset > buffer_out_len) { + // As of v0.8.2-dev4424 we might end up here when very big files are served from servers + // that returns more data than requested via range header. This is an uncommon but legal + // behaviour, so we have to improve logic elsewhere to properly handle this case. + + // To avoid corruption of memory, we bail out. + throw HTTPException("Server sent back more data than expected, `SET force_download=true` might " + "help in this case"); } - return true; - }); - }); + memcpy(buffer_out + out_offset, data, data_length); + out_offset += data_length; + } + return true; + }); - std::function on_retry( - [&]() { http_client = GetClient(hfh.http_params, proto_host_port.c_str(), &hfh); }); + auto response = http_util.Request(get_request, http_client); - auto response = RunRequestWithRetry(request, url, "GET Range", hfh.http_params, on_retry); hfh.StoreClient(std::move(http_client)); return response; } -HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const string &path, FileOpenFlags flags, const HTTPParams &http_params) - : FileHandle(fs, path, flags), http_params(http_params), flags(flags), length(0), buffer_available(0), - buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0) { +void TimestampToTimeT(timestamp_t timestamp, time_t &result) { + auto components = Timestamp::GetComponents(timestamp); + struct tm tm {}; + tm.tm_year = components.year - 1900; + tm.tm_mon = components.month - 1; + tm.tm_mday = components.day; + tm.tm_hour = components.hour; + tm.tm_min = components.minute; + tm.tm_sec = components.second; + tm.tm_isdst = 0; + result = mktime(&tm); +} + +HTTPFileHandle::HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, + unique_ptr params_p) + : FileHandle(fs, file.path, flags), params(std::move(params_p)), http_params(params->Cast()), + flags(flags), length(0), buffer_available(0), buffer_idx(0), file_offset(0), buffer_start(0), buffer_end(0) { + // check if the handle has extended properties that can be set directly in the handle + // if we have these properties we don't need to do a head request to obtain them later + if (file.extended_info) { + auto &info = file.extended_info->options; + auto lm_entry = info.find("last_modified"); + if (lm_entry != info.end()) { + TimestampToTimeT(lm_entry->second.GetValue(), last_modified); + } + auto etag_entry = info.find("etag"); + if (etag_entry != info.end()) { + etag = StringValue::Get(etag_entry->second); + } + auto fs_entry = info.find("file_size"); + if (fs_entry != info.end()) { + length = fs_entry->second.GetValue(); + } + if (lm_entry != info.end() && etag_entry != info.end() && fs_entry != info.end()) { + // we found all relevant entries (last_modified, etag and file size) + // skip head request + initialized = true; + } + } } - -unique_ptr HTTPFileSystem::CreateHandle(const string &path, FileOpenFlags flags, +unique_ptr HTTPFileSystem::CreateHandle(const OpenFileInfo &file, FileOpenFlags flags, optional_ptr opener) { D_ASSERT(flags.Compression() == FileCompressionType::UNCOMPRESSED); FileOpenerInfo info; - info.file_path = path; - auto params = HTTPParams::ReadFrom(opener, info); + info.file_path = file.path; + + auto http_util = HTTPFSUtil::GetHTTPUtil(opener); + auto params = http_util->InitializeParameters(opener, info); auto secret_manager = FileOpener::TryGetSecretManager(opener); auto transaction = FileOpener::TryGetCatalogTransaction(opener); if (secret_manager && transaction) { - auto secret_match = secret_manager->LookupSecret(*transaction, path, "bearer"); + auto secret_match = secret_manager->LookupSecret(*transaction, file.path, "bearer"); if (secret_match.HasMatch()) { const auto &kv_secret = dynamic_cast(*secret_match.secret_entry->secret); - params.bearer_token = kv_secret.TryGetValue("token", true).ToString(); + auto &httpfs_params = params->Cast(); + httpfs_params.bearer_token = kv_secret.TryGetValue("token", true).ToString(); } } - - auto result = duckdb::make_uniq(*this, path, flags, params); - - auto client_context = FileOpener::TryGetClientContext(opener); - if (client_context && ClientConfig::GetConfig(*client_context).enable_http_logging) { - result->http_logger = client_context->client_data->http_logger.get(); - } - - return result; + return duckdb::make_uniq(*this, file, flags, std::move(params)); } -unique_ptr HTTPFileSystem::OpenFile(const string &path, FileOpenFlags flags, - optional_ptr opener) { +unique_ptr HTTPFileSystem::OpenFileExtended(const OpenFileInfo &file, FileOpenFlags flags, + optional_ptr opener) { D_ASSERT(flags.Compression() == FileCompressionType::UNCOMPRESSED); if (flags.ReturnNullIfNotExists()) { try { - auto handle = CreateHandle(path, flags, opener); + auto handle = CreateHandle(file, flags, opener); handle->Initialize(opener); return std::move(handle); } catch (...) { @@ -510,8 +341,11 @@ unique_ptr HTTPFileSystem::OpenFile(const string &path, FileOpenFlag } } - auto handle = CreateHandle(path, flags, opener); + auto handle = CreateHandle(file, flags, opener); handle->Initialize(opener); + + DUCKDB_LOG_FILE_SYSTEM_OPEN((*handle)); + return std::move(handle); } @@ -520,12 +354,14 @@ unique_ptr HTTPFileSystem::OpenFile(const string &path, FileOpenFlag void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) { auto &hfh = handle.Cast(); - D_ASSERT(hfh.state); + D_ASSERT(hfh.http_params.state); if (hfh.cached_file_handle) { if (!hfh.cached_file_handle->Initialized()) { throw InternalException("Cached file not initialized properly"); } memcpy(buffer, hfh.cached_file_handle->GetData() + location, nr_bytes); + DUCKDB_LOG_FILE_SYSTEM_READ(handle, nr_bytes, location); + hfh.file_offset = location + nr_bytes; return; } @@ -536,17 +372,19 @@ void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, id bool skip_buffer = hfh.flags.DirectIO() || hfh.flags.RequireParallelAccess(); if (skip_buffer && to_read > 0) { GetRangeRequest(hfh, hfh.path, {}, location, (char *)buffer, to_read); - + DUCKDB_LOG_FILE_SYSTEM_READ(handle, nr_bytes, location); // Update handle status within critical section for parallel access. if (hfh.flags.RequireParallelAccess()) { std::lock_guard lck(hfh.mu); hfh.buffer_available = 0; hfh.buffer_idx = 0; + hfh.file_offset = location + nr_bytes; return; } hfh.buffer_available = 0; hfh.buffer_idx = 0; + hfh.file_offset = location + nr_bytes; return; } @@ -559,7 +397,7 @@ void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, id hfh.buffer_idx = 0; } - idx_t start_offset = location; // Start file offset to read from. + idx_t start_offset = location; // Start file offset to read from. while (to_read > 0) { auto buffer_read_len = MinValue(hfh.buffer_available, to_read); if (buffer_read_len > 0) { @@ -585,8 +423,7 @@ void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, id start_offset += to_read; break; } else { - GetRangeRequest(hfh, hfh.path, {}, start_offset, (char *)hfh.read_buffer.get(), - new_buffer_available); + GetRangeRequest(hfh, hfh.path, {}, start_offset, (char *)hfh.read_buffer.get(), new_buffer_available); hfh.buffer_available = new_buffer_available; hfh.buffer_idx = 0; hfh.buffer_start = start_offset; @@ -594,6 +431,8 @@ void HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes, id } } } + hfh.file_offset = location + nr_bytes; + DUCKDB_LOG_FILE_SYSTEM_READ(handle, nr_bytes, location); } int64_t HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) { @@ -601,7 +440,6 @@ int64_t HTTPFileSystem::Read(FileHandle &handle, void *buffer, int64_t nr_bytes) idx_t max_read = hfh.length - hfh.file_offset; nr_bytes = MinValue(max_read, nr_bytes); Read(handle, buffer, nr_bytes, hfh.file_offset); - hfh.file_offset += nr_bytes; return nr_bytes; } @@ -629,6 +467,11 @@ time_t HTTPFileSystem::GetLastModifiedTime(FileHandle &handle) { return sfh.last_modified; } +string HTTPFileSystem::GetVersionTag(FileHandle &handle) { + auto &sfh = handle.Cast(); + return sfh.etag; +} + bool HTTPFileSystem::FileExists(const string &filename, optional_ptr opener) { try { auto handle = OpenFile(filename, FileFlags::FILE_FLAGS_READ, opener); @@ -683,15 +526,15 @@ static optional_ptr TryGetMetadataCache(optional_ptrGetCachedFile(path); + const auto &cache_entry = http_params.state->GetCachedFile(path); cached_file_handle = cache_entry->GetHandle(); if (!cached_file_handle->Initialized()) { // Try to fully download the file first const auto full_download_result = hfs.GetRequest(*this, path, {}); - if (full_download_result->code != 200) { + if (full_download_result->status != HTTPStatusCode::OK_200) { throw HTTPException(*full_download_result, "Full download failed to to URL \"%s\": %s (%s)", - full_download_result->http_url, to_string(full_download_result->code), - full_download_result->error); + full_download_result->url, static_cast(full_download_result->status), + full_download_result->GetError()); } // Mark the file as initialized, set its final length, and unlock it to allowing parallel reads cached_file_handle->SetInitialized(length); @@ -702,53 +545,64 @@ void HTTPFileHandle::FullDownload(HTTPFileSystem &hfs, bool &should_write_cache) } } -void HTTPFileHandle::Initialize(optional_ptr opener) { - auto &hfs = file_system.Cast(); - state = HTTPState::TryGetState(opener); - if (!state) { - state = make_shared_ptr(); +bool HTTPFileSystem::TryParseLastModifiedTime(const string ×tamp, time_t &result) { + StrpTimeFormat::ParseResult parse_result; + if (!StrpTimeFormat::TryParse("%a, %d %h %Y %T %Z", timestamp, parse_result)) { + return false; } + struct tm tm {}; + tm.tm_year = parse_result.data[0] - 1900; + tm.tm_mon = parse_result.data[1] - 1; + tm.tm_mday = parse_result.data[2]; + tm.tm_hour = parse_result.data[3]; + tm.tm_min = parse_result.data[4]; + tm.tm_sec = parse_result.data[5]; + tm.tm_isdst = 0; + result = mktime(&tm); + return true; +} - auto client_context = FileOpener::TryGetClientContext(opener); - if (client_context && ClientConfig::GetConfig(*client_context).enable_http_logging) { - http_logger = client_context->client_data->http_logger.get(); +optional_idx TryParseContentRange(const HTTPHeaders &headers) { + if (!headers.HasHeader("Content-Range")) { + return optional_idx(); } - - auto current_cache = TryGetMetadataCache(opener, hfs); - - bool should_write_cache = false; - if (http_params.force_download) { - FullDownload(hfs, should_write_cache); - return; + string content_range = headers.GetHeaderValue("Content-Range"); + auto range_find = content_range.find("/"); + if (range_find == std::string::npos || content_range.size() < range_find + 1) { + return optional_idx(); } - - if (current_cache && !flags.OpenForWriting()) { - HTTPMetadataCacheEntry value; - bool found = current_cache->Find(path, value); - - if (found) { - last_modified = value.last_modified; - length = value.length; - - if (flags.OpenForReading()) { - read_buffer = duckdb::unique_ptr(new data_t[READ_BUFFER_LEN]); - } - return; - } - - should_write_cache = true; + string range_length = content_range.substr(range_find + 1); + if (range_length == "*") { + return optional_idx(); + } + try { + return std::stoull(range_length); + } catch (...) { + return optional_idx(); } +} - // If we're writing to a file, we might as well remove it from the cache - if (current_cache && flags.OpenForWriting()) { - current_cache->Erase(path); +optional_idx TryParseContentLength(const HTTPHeaders &headers) { + if (!headers.HasHeader("Content-Length")) { + return optional_idx(); } + string content_length = headers.GetHeaderValue("Content-Length"); + try { + return std::stoull(content_length); + } catch (...) { + return optional_idx(); + } +} +void HTTPFileHandle::LoadFileInfo() { + if (initialized) { + // already initialized + return; + } + auto &hfs = file_system.Cast(); auto res = hfs.HeadRequest(*this, path, {}); - string range_length; - - if (res->code != 200) { - if (flags.OpenForWriting() && res->code == 404) { + if (res->status != HTTPStatusCode::OK_200) { + if (flags.OpenForWriting() && res->status == HTTPStatusCode::NotFound_404) { if (!flags.CreateFileIfNotExists() && !flags.OverwriteExistingFile()) { throw IOException("Unable to open URL \"" + path + "\" for writing: file does not exist and CREATE flag is not set"); @@ -757,71 +611,98 @@ void HTTPFileHandle::Initialize(optional_ptr opener) { return; } else { // HEAD request fail, use Range request for another try (read only one byte) - if (flags.OpenForReading() && res->code != 404) { + if (flags.OpenForReading() && res->status != HTTPStatusCode::NotFound_404) { auto range_res = hfs.GetRangeRequest(*this, path, {}, 0, nullptr, 2); - if (range_res->code != 206 && range_res->code != 202 && range_res->code != 200) { - throw IOException("Unable to connect to URL \"%s\": %d (%s).", path, res->code, res->error); - } - auto range_find = range_res->headers["Content-Range"].find("/"); - if (!(range_find == std::string::npos || range_res->headers["Content-Range"].size() < range_find + 1)) { - range_length = range_res->headers["Content-Range"].substr(range_find + 1); - if (range_length != "*") { - res = std::move(range_res); - } + if (range_res->status != HTTPStatusCode::PartialContent_206 && + range_res->status != HTTPStatusCode::Accepted_202 && range_res->status != HTTPStatusCode::OK_200) { + // It failed again + throw HTTPException(*range_res, "Unable to connect to URL \"%s\": %d (%s).", path, + static_cast(res->status), res->GetError()); } + res = std::move(range_res); } else { - // It failed again - throw HTTPException(*res, "Unable to connect to URL \"%s\": %s (%s).", res->http_url, - to_string(res->code), res->error); + throw HTTPException(*res, "Unable to connect to URL \"%s\": %d (%s).", res->url, + static_cast(res->status), res->GetError()); } } } + length = 0; + optional_idx content_size; + content_size = TryParseContentRange(res->headers); + if (!content_size.IsValid()) { + content_size = TryParseContentLength(res->headers); + } + if (content_size.IsValid()) { + length = content_size.GetIndex(); + } + if (res->headers.HasHeader("Last-Modified")) { + HTTPFileSystem::TryParseLastModifiedTime(res->headers.GetHeaderValue("Last-Modified"), last_modified); + } + if (res->headers.HasHeader("ETag")) { + etag = res->headers.GetHeaderValue("ETag"); + } + initialized = true; +} - // Initialize the read buffer now that we know the file exists - if (flags.OpenForReading()) { - read_buffer = duckdb::unique_ptr(new data_t[READ_BUFFER_LEN]); +void HTTPFileHandle::Initialize(optional_ptr opener) { + auto &hfs = file_system.Cast(); + http_params.state = HTTPState::TryGetState(opener); + if (!http_params.state) { + http_params.state = make_shared_ptr(); } - if (res->headers.find("Content-Length") == res->headers.end() || res->headers["Content-Length"].empty()) { - // There was no content-length header, we can not do range requests here, so we set the length to 0 - length = 0; - } else { - try { - if (res->headers.find("Content-Range") == res->headers.end() || res->headers["Content-Range"].empty()) { - length = std::stoll(res->headers["Content-Length"]); - } else { - length = std::stoll(range_length); + if (opener) { + TryAddLogger(*opener); + } + + auto current_cache = TryGetMetadataCache(opener, hfs); + + bool should_write_cache = false; + if (flags.OpenForReading()) { + if (http_params.force_download) { + FullDownload(hfs, should_write_cache); + return; + } + + if (current_cache) { + HTTPMetadataCacheEntry value; + bool found = current_cache->Find(path, value); + + if (found) { + last_modified = value.last_modified; + length = value.length; + etag = value.etag; + + if (flags.OpenForReading()) { + read_buffer = duckdb::unique_ptr(new data_t[READ_BUFFER_LEN]); + } + return; } - } catch (std::invalid_argument &e) { - throw IOException("Invalid Content-Length header received: %s", res->headers["Content-Length"]); - } catch (std::out_of_range &e) { - throw IOException("Invalid Content-Length header received: %s", res->headers["Content-Length"]); + + should_write_cache = true; } } - if (state && length == 0) { - FullDownload(hfs, should_write_cache); - } - if (!res->headers["Last-Modified"].empty()) { - StrpTimeFormat::ParseResult result; - if (StrpTimeFormat::TryParse("%a, %d %h %Y %T %Z", res->headers["Last-Modified"], result)) { - struct tm tm {}; - tm.tm_year = result.data[0] - 1900; - tm.tm_mon = result.data[1] - 1; - tm.tm_mday = result.data[2]; - tm.tm_hour = result.data[3]; - tm.tm_min = result.data[4]; - tm.tm_sec = result.data[5]; - tm.tm_isdst = 0; - last_modified = mktime(&tm); + LoadFileInfo(); + + if (flags.OpenForReading()) { + if (http_params.state && length == 0) { + FullDownload(hfs, should_write_cache); + } + if (should_write_cache) { + current_cache->Insert(path, {length, last_modified, etag}); } + + // Initialize the read buffer now that we know the file exists + read_buffer = duckdb::unique_ptr(new data_t[READ_BUFFER_LEN]); } - if (should_write_cache) { - current_cache->Insert(path, {length, last_modified}); + // If we're writing to a file, we might as well remove it from the cache + if (current_cache && flags.OpenForWriting()) { + current_cache->Erase(path); } } -unique_ptr HTTPFileHandle::GetClient(optional_ptr context) { +unique_ptr HTTPFileHandle::GetClient() { // Try to fetch a cached client auto cached_client = client_cache.GetClient(); if (cached_client) { @@ -829,35 +710,21 @@ unique_ptr HTTPFileHandle::GetClient(optional_pt } // Create a new client - return CreateClient(context); + return CreateClient(); } -unique_ptr HTTPFileHandle::CreateClient(optional_ptr context) { +unique_ptr HTTPFileHandle::CreateClient() { // Create a new client string path_out, proto_host_port; - HTTPFileSystem::ParseUrl(path, path_out, proto_host_port); - auto http_client = HTTPFileSystem::GetClient(this->http_params, proto_host_port.c_str(), this); - if (context && ClientConfig::GetConfig(*context).enable_http_logging) { - http_logger = context->client_data->http_logger.get(); - http_client->set_logger( - http_logger->GetLogger()); - } - return http_client; + HTTPUtil::DecomposeURL(path, path_out, proto_host_port); + return http_params.http_util.InitializeClient(http_params, proto_host_port); } -void HTTPFileHandle::StoreClient(unique_ptr client) { +void HTTPFileHandle::StoreClient(unique_ptr client) { client_cache.StoreClient(std::move(client)); } -ResponseWrapper::ResponseWrapper(duckdb_httplib_openssl::Response &res, string &original_url) { - code = res.status; - error = res.reason; - for (auto &h : res.headers) { - headers[h.first] = h.second; - } - http_url = original_url; - body = res.body; -} - -HTTPFileHandle::~HTTPFileHandle() = default; +HTTPFileHandle::~HTTPFileHandle() { + DUCKDB_LOG_FILE_SYSTEM_CLOSE((*this)); +}; } // namespace duckdb diff --git a/extension/httpfs/httpfs_client.cpp b/extension/httpfs/httpfs_client.cpp new file mode 100644 index 00000000..84eb457b --- /dev/null +++ b/extension/httpfs/httpfs_client.cpp @@ -0,0 +1,167 @@ +#include "httpfs_client.hpp" +#include "http_state.hpp" + +#define CPPHTTPLIB_OPENSSL_SUPPORT +#include "httplib.hpp" + +namespace duckdb { + +class HTTPFSClient : public HTTPClient { +public: + HTTPFSClient(HTTPFSParams &http_params, const string &proto_host_port) { + client = make_uniq(proto_host_port); + client->set_follow_location(http_params.follow_location); + client->set_keep_alive(http_params.keep_alive); + if (!http_params.ca_cert_file.empty()) { + client->set_ca_cert_path(http_params.ca_cert_file.c_str()); + } + client->enable_server_certificate_verification(http_params.enable_server_cert_verification); + client->set_write_timeout(http_params.timeout, http_params.timeout_usec); + client->set_read_timeout(http_params.timeout, http_params.timeout_usec); + client->set_connection_timeout(http_params.timeout, http_params.timeout_usec); + client->set_decompress(false); + if (!http_params.bearer_token.empty()) { + client->set_bearer_token_auth(http_params.bearer_token.c_str()); + } + + if (!http_params.http_proxy.empty()) { + client->set_proxy(http_params.http_proxy, http_params.http_proxy_port); + + if (!http_params.http_proxy_username.empty()) { + client->set_proxy_basic_auth(http_params.http_proxy_username, http_params.http_proxy_password); + } + } + state = http_params.state; + } + + unique_ptr Get(GetRequestInfo &info) override { + if (state) { + state->get_count++; + } + auto headers = TransformHeaders(info.headers, info.params); + if (!info.response_handler && !info.content_handler) { + return TransformResult(client->Get(info.path, headers)); + } else { + return TransformResult(client->Get( + info.path.c_str(), headers, + [&](const duckdb_httplib_openssl::Response &response) { + auto http_response = TransformResponse(response); + return info.response_handler(*http_response); + }, + [&](const char *data, size_t data_length) { + if (state) { + state->total_bytes_received += data_length; + } + return info.content_handler(const_data_ptr_cast(data), data_length); + })); + } + } + unique_ptr Put(PutRequestInfo &info) override { + if (state) { + state->put_count++; + state->total_bytes_sent += info.buffer_in_len; + } + auto headers = TransformHeaders(info.headers, info.params); + return TransformResult(client->Put(info.path, headers, const_char_ptr_cast(info.buffer_in), info.buffer_in_len, + info.content_type)); + } + + unique_ptr Head(HeadRequestInfo &info) override { + if (state) { + state->head_count++; + } + auto headers = TransformHeaders(info.headers, info.params); + return TransformResult(client->Head(info.path, headers)); + } + + unique_ptr Delete(DeleteRequestInfo &info) override { + if (state) { + state->delete_count++; + } + auto headers = TransformHeaders(info.headers, info.params); + return TransformResult(client->Delete(info.path, headers)); + } + + unique_ptr Post(PostRequestInfo &info) override { + if (state) { + state->post_count++; + state->total_bytes_sent += info.buffer_in_len; + } + // We use a custom Request method here, because there is no Post call with a contentreceiver in httplib + duckdb_httplib_openssl::Request req; + req.method = "POST"; + req.path = info.path; + req.headers = TransformHeaders(info.headers, info.params); + req.headers.emplace("Content-Type", "application/octet-stream"); + req.content_receiver = [&](const char *data, size_t data_length, uint64_t /*offset*/, + uint64_t /*total_length*/) { + if (state) { + state->total_bytes_received += data_length; + } + info.buffer_out += string(data, data_length); + return true; + }; + req.body.assign(const_char_ptr_cast(info.buffer_in), info.buffer_in_len); + return TransformResult(client->send(req)); + } + +private: + duckdb_httplib_openssl::Headers TransformHeaders(const HTTPHeaders &header_map, const HTTPParams ¶ms) { + duckdb_httplib_openssl::Headers headers; + for (auto &entry : header_map) { + headers.insert(entry); + } + for (auto &entry : params.extra_headers) { + headers.insert(entry); + } + return headers; + } + + unique_ptr TransformResponse(const duckdb_httplib_openssl::Response &response) { + auto status_code = HTTPUtil::ToStatusCode(response.status); + auto result = make_uniq(status_code); + result->body = response.body; + result->reason = response.reason; + for (auto &entry : response.headers) { + result->headers.Insert(entry.first, entry.second); + } + return result; + } + + unique_ptr TransformResult(duckdb_httplib_openssl::Result &&res) { + if (res.error() == duckdb_httplib_openssl::Error::Success) { + auto &response = res.value(); + return TransformResponse(response); + } else { + auto result = make_uniq(HTTPStatusCode::INVALID); + result->request_error = to_string(res.error()); + return result; + } + } + +private: + unique_ptr client; + optional_ptr state; +}; + +unique_ptr HTTPFSUtil::InitializeClient(HTTPParams &http_params, const string &proto_host_port) { + auto client = make_uniq(http_params.Cast(), proto_host_port); + return std::move(client); +} + +unordered_map HTTPFSUtil::ParseGetParameters(const string &text) { + duckdb_httplib_openssl::Params query_params; + duckdb_httplib_openssl::detail::parse_query_text(text, query_params); + + unordered_map result; + for (auto &entry : query_params) { + result.emplace(std::move(entry.first), std::move(entry.second)); + } + return result; +} + +string HTTPFSUtil::GetName() const { + return "HTTPFS"; +} + +} // namespace duckdb diff --git a/extension/httpfs/httpfs_config.py b/extension/httpfs/httpfs_config.py index 99119a27..a9949cb9 100644 --- a/extension/httpfs/httpfs_config.py +++ b/extension/httpfs/httpfs_config.py @@ -17,6 +17,7 @@ 'http_state.cpp', 'httpfs.cpp', 'httpfs_extension.cpp', + 'httpfs_client.cpp', 's3fs.cpp', ] ] diff --git a/extension/httpfs/httpfs_extension.cpp b/extension/httpfs/httpfs_extension.cpp index 0ad8effb..c9bc9853 100644 --- a/extension/httpfs/httpfs_extension.cpp +++ b/extension/httpfs/httpfs_extension.cpp @@ -11,7 +11,6 @@ namespace duckdb { static void LoadInternal(DatabaseInstance &instance) { - S3FileSystem::Verify(); // run some tests to see if all the hashes work out auto &fs = instance.GetFileSystem(); fs.RegisterSubSystem(make_uniq()); @@ -22,8 +21,8 @@ static void LoadInternal(DatabaseInstance &instance) { // Global HTTP config // Single timeout value is used for all 4 types of timeouts, we could split it into 4 if users need that - config.AddExtensionOption("http_timeout", "HTTP timeout read/write/connection/retry (in seconds)", LogicalType::UBIGINT, - Value::UBIGINT(HTTPParams::DEFAULT_TIMEOUT_SECONDS)); + config.AddExtensionOption("http_timeout", "HTTP timeout read/write/connection/retry (in seconds)", + LogicalType::UBIGINT, Value::UBIGINT(HTTPParams::DEFAULT_TIMEOUT_SECONDS)); config.AddExtensionOption("http_retries", "HTTP retries on I/O error", LogicalType::UBIGINT, Value(3)); config.AddExtensionOption("http_retry_wait_ms", "Time between retries", LogicalType::UBIGINT, Value(100)); config.AddExtensionOption("force_download", "Forces upfront download of file", LogicalType::BOOLEAN, Value(false)); @@ -62,6 +61,7 @@ static void LoadInternal(DatabaseInstance &instance) { // HuggingFace options config.AddExtensionOption("hf_max_per_page", "Debug option to limit number of items returned in list requests", LogicalType::UBIGINT, Value::UBIGINT(0)); + config.http_util = make_shared_ptr(); auto provider = make_uniq(config); provider->SetAll(); @@ -70,7 +70,7 @@ static void LoadInternal(DatabaseInstance &instance) { CreateBearerTokenFunctions::Register(instance); // set pointer to OpenSSL encryption state - config.encryption_util = make_shared_ptr(); + config.encryption_util = make_shared_ptr(); } void HttpfsExtension::Load(DuckDB &db) { LoadInternal(*db.instance); diff --git a/extension/httpfs/include/create_secret_functions.hpp b/extension/httpfs/include/create_secret_functions.hpp index e0252f51..54b7566d 100644 --- a/extension/httpfs/include/create_secret_functions.hpp +++ b/extension/httpfs/include/create_secret_functions.hpp @@ -7,7 +7,6 @@ struct CreateSecretInput; struct S3AuthParams; class CreateSecretFunction; class BaseSecret; -struct CreateSecretInfo; struct SecretEntry; struct CreateS3SecretFunctions { @@ -16,7 +15,7 @@ struct CreateS3SecretFunctions { static void Register(DatabaseInstance &instance); //! Secret refreshing mechanisms - static CreateSecretInfo GenerateRefreshSecretInfo(const SecretEntry &secret_entry, Value &refresh_info); + static CreateSecretInput GenerateRefreshSecretInfo(const SecretEntry &secret_entry, Value &refresh_info); static bool TryRefreshS3Secret(ClientContext &context, const SecretEntry &secret_to_refresh); protected: diff --git a/extension/httpfs/include/crypto.hpp b/extension/httpfs/include/crypto.hpp index ff37f234..f819356f 100644 --- a/extension/httpfs/include/crypto.hpp +++ b/extension/httpfs/include/crypto.hpp @@ -7,6 +7,7 @@ #include typedef struct evp_cipher_ctx_st EVP_CIPHER_CTX; +typedef struct evp_cipher_st EVP_CIPHER; namespace duckdb { @@ -21,40 +22,42 @@ void hmac256(std::string message, hash_bytes secret, hash_bytes &out); void hex256(hash_bytes &in, hash_str &out); -class DUCKDB_EXTENSION_API AESGCMStateSSL : public duckdb::EncryptionState { +class DUCKDB_EXTENSION_API AESStateSSL : public duckdb::EncryptionState { public: - explicit AESGCMStateSSL(); - ~AESGCMStateSSL() override; + explicit AESStateSSL(const std::string *key = nullptr); + ~AESStateSSL() override; public: - bool IsOpenSSL() override; void InitializeEncryption(const_data_ptr_t iv, idx_t iv_len, const std::string *key) override; void InitializeDecryption(const_data_ptr_t iv, idx_t iv_len, const std::string *key) override; size_t Process(const_data_ptr_t in, idx_t in_len, data_ptr_t out, idx_t out_len) override; size_t Finalize(data_ptr_t out, idx_t out_len, data_ptr_t tag, idx_t tag_len) override; void GenerateRandomData(data_ptr_t data, idx_t len) override; + const EVP_CIPHER *GetCipher(const string &key); + size_t FinalizeGCM(data_ptr_t out, idx_t out_len, data_ptr_t tag, idx_t tag_len); + private: - bool ssl = true; - EVP_CIPHER_CTX *gcm_context; + EVP_CIPHER_CTX *context; Mode mode; + Cipher cipher = GCM; }; } // namespace duckdb extern "C" { -class DUCKDB_EXTENSION_API AESGCMStateSSLFactory : public duckdb::EncryptionUtil { +class DUCKDB_EXTENSION_API AESStateSSLFactory : public duckdb::EncryptionUtil { public: - explicit AESGCMStateSSLFactory() { + explicit AESStateSSLFactory() { } - duckdb::shared_ptr CreateEncryptionState() const override { - return duckdb::make_shared_ptr(); + duckdb::shared_ptr CreateEncryptionState(const std::string *key = nullptr) const override { + return duckdb::make_shared_ptr(); } - ~AESGCMStateSSLFactory() override { + ~AESStateSSLFactory() override { } }; } diff --git a/extension/httpfs/include/hffs.hpp b/extension/httpfs/include/hffs.hpp index 27f44c33..7fe1af11 100644 --- a/extension/httpfs/include/hffs.hpp +++ b/extension/httpfs/include/hffs.hpp @@ -22,13 +22,13 @@ class HuggingFaceFileSystem : public HTTPFileSystem { public: ~HuggingFaceFileSystem() override; - vector Glob(const string &path, FileOpener *opener = nullptr) override; + vector Glob(const string &path, FileOpener *opener = nullptr) override; - duckdb::unique_ptr HeadRequest(FileHandle &handle, string hf_url, HeaderMap header_map) override; - duckdb::unique_ptr GetRequest(FileHandle &handle, string hf_url, HeaderMap header_map) override; - duckdb::unique_ptr GetRangeRequest(FileHandle &handle, string hf_url, HeaderMap header_map, - idx_t file_offset, char *buffer_out, - idx_t buffer_out_len) override; + duckdb::unique_ptr HeadRequest(FileHandle &handle, string hf_url, HTTPHeaders header_map) override; + duckdb::unique_ptr GetRequest(FileHandle &handle, string hf_url, HTTPHeaders header_map) override; + duckdb::unique_ptr GetRangeRequest(FileHandle &handle, string hf_url, HTTPHeaders header_map, + idx_t file_offset, char *buffer_out, + idx_t buffer_out_len) override; bool CanHandleFile(const string &fpath) override { return fpath.rfind("hf://", 0) == 0; @@ -42,13 +42,13 @@ class HuggingFaceFileSystem : public HTTPFileSystem { string GetTreeUrl(const ParsedHFUrl &url, idx_t limit); string GetFileUrl(const ParsedHFUrl &url); - static void SetParams(HTTPParams ¶ms, const string &path, optional_ptr opener); + static void SetParams(HTTPFSParams ¶ms, const string &path, optional_ptr opener); protected: - duckdb::unique_ptr CreateHandle(const string &path, FileOpenFlags flags, + duckdb::unique_ptr CreateHandle(const OpenFileInfo &file, FileOpenFlags flags, optional_ptr opener) override; - string ListHFRequest(ParsedHFUrl &url, HTTPParams &http_params, string &next_page_url, + string ListHFRequest(ParsedHFUrl &url, HTTPFSParams &http_params, string &next_page_url, optional_ptr state); }; @@ -56,13 +56,13 @@ class HFFileHandle : public HTTPFileHandle { friend class HuggingFaceFileSystem; public: - HFFileHandle(FileSystem &fs, ParsedHFUrl hf_url, string http_url, FileOpenFlags flags, - const HTTPParams &http_params) - : HTTPFileHandle(fs, std::move(http_url), flags, http_params), parsed_url(std::move(hf_url)) { + HFFileHandle(FileSystem &fs, ParsedHFUrl hf_url, const OpenFileInfo &file, FileOpenFlags flags, + unique_ptr http_params) + : HTTPFileHandle(fs, file, flags, std::move(http_params)), parsed_url(std::move(hf_url)) { } ~HFFileHandle() override; - unique_ptr CreateClient(optional_ptr client_context) override; + unique_ptr CreateClient() override; protected: ParsedHFUrl parsed_url; diff --git a/extension/httpfs/include/http_metadata_cache.hpp b/extension/httpfs/include/http_metadata_cache.hpp index 73d032b0..8fc7909c 100644 --- a/extension/httpfs/include/http_metadata_cache.hpp +++ b/extension/httpfs/include/http_metadata_cache.hpp @@ -18,6 +18,7 @@ namespace duckdb { struct HTTPMetadataCacheEntry { idx_t length; time_t last_modified; + string etag; }; // Simple cache with a max age for an entry to be valid diff --git a/extension/httpfs/include/httpfs.hpp b/extension/httpfs/include/httpfs.hpp index 93258cec..a6b8570f 100644 --- a/extension/httpfs/include/httpfs.hpp +++ b/extension/httpfs/include/httpfs.hpp @@ -5,80 +5,25 @@ #include "http_state.hpp" #include "duckdb/common/pair.hpp" #include "duckdb/common/unordered_map.hpp" +#include "duckdb/common/exception/http_exception.hpp" #include "duckdb/main/client_data.hpp" #include "http_metadata_cache.hpp" +#include "httpfs_client.hpp" #include -namespace duckdb_httplib_openssl { -struct Response; -class Result; -class Client; -namespace detail { -struct ci; -} -using Headers = std::multimap; -} // namespace duckdb_httplib_openssl - namespace duckdb { -class HTTPLogger; - -using HeaderMap = case_insensitive_map_t; - -// avoid including httplib in header -struct ResponseWrapper { -public: - explicit ResponseWrapper(duckdb_httplib_openssl::Response &res, string &original_url); - int code; - string error; - HeaderMap headers; - string http_url; - string body; -}; - -struct HTTPParams { - - static constexpr uint64_t DEFAULT_TIMEOUT_SECONDS = 30; // 30 sec - static constexpr uint64_t DEFAULT_RETRIES = 3; - static constexpr uint64_t DEFAULT_RETRY_WAIT_MS = 100; - static constexpr float DEFAULT_RETRY_BACKOFF = 4; - static constexpr bool DEFAULT_FORCE_DOWNLOAD = false; - static constexpr bool DEFAULT_KEEP_ALIVE = true; - static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false; - static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0; - - uint64_t timeout = DEFAULT_TIMEOUT_SECONDS; // seconds component of a timeout - uint64_t timeout_usec = 0; // usec component of a timeout - uint64_t retries = DEFAULT_RETRIES; - uint64_t retry_wait_ms = DEFAULT_RETRY_WAIT_MS; - float retry_backoff = DEFAULT_RETRY_BACKOFF; - bool force_download = DEFAULT_FORCE_DOWNLOAD; - bool keep_alive = DEFAULT_KEEP_ALIVE; - bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; - idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE; - - string ca_cert_file; - string http_proxy; - idx_t http_proxy_port; - string http_proxy_username; - string http_proxy_password; - string bearer_token; - unordered_map extra_headers; - - static HTTPParams ReadFrom(optional_ptr opener, optional_ptr info); -}; - class HTTPClientCache { public: //! Get a client from the client cache - unique_ptr GetClient(); + unique_ptr GetClient(); //! Store a client in the cache for reuse - void StoreClient(unique_ptr client); + void StoreClient(unique_ptr client); protected: //! The cached clients - vector> clients; + vector> clients; //! Lock to fetch a client mutex lock; }; @@ -87,7 +32,7 @@ class HTTPFileSystem; class HTTPFileHandle : public FileHandle { public: - HTTPFileHandle(FileSystem &fs, const string &path, FileOpenFlags flags, const HTTPParams ¶ms); + HTTPFileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, unique_ptr params); ~HTTPFileHandle() override; // This two-phase construction allows subclasses more flexible setup. virtual void Initialize(optional_ptr opener); @@ -95,14 +40,15 @@ class HTTPFileHandle : public FileHandle { // We keep an http client stored for connection reuse with keep-alive headers HTTPClientCache client_cache; - optional_ptr http_logger; - - const HTTPParams http_params; + unique_ptr params; + HTTPFSParams &http_params; // File handle info FileOpenFlags flags; idx_t length; time_t last_modified; + string etag; + bool initialized = false; // When using full file download, the full file will be written to a cached file handle unique_ptr cached_file_handle; @@ -121,14 +67,12 @@ class HTTPFileHandle : public FileHandle { duckdb::unique_ptr read_buffer; constexpr static idx_t READ_BUFFER_LEN = 1000000; - shared_ptr state; - - void AddHeaders(HeaderMap &map); + void AddHeaders(HTTPHeaders &map); // Get a Client to run requests over - unique_ptr GetClient(optional_ptr client_context); + unique_ptr GetClient(); // Return the client for re-use - void StoreClient(unique_ptr client); + void StoreClient(unique_ptr client); public: void Close() override { @@ -136,7 +80,9 @@ class HTTPFileHandle : public FileHandle { protected: //! Create a new Client - virtual unique_ptr CreateClient(optional_ptr client_context); + virtual unique_ptr CreateClient(); + //! Perform a HEAD request to get the file info (if not yet loaded) + void LoadFileInfo(); private: //! Fully downloads a file @@ -145,35 +91,27 @@ class HTTPFileHandle : public FileHandle { class HTTPFileSystem : public FileSystem { public: - static duckdb::unique_ptr - GetClient(const HTTPParams &http_params, const char *proto_host_port, optional_ptr hfs); - static void ParseUrl(string &url, string &path_out, string &proto_host_port_out); - duckdb::unique_ptr OpenFile(const string &path, FileOpenFlags flags, - optional_ptr opener = nullptr) final; - static duckdb::unique_ptr InitializeHeaders(HeaderMap &header_map, - const HTTPParams &http_params); - - vector Glob(const string &path, FileOpener *opener = nullptr) override { + static bool TryParseLastModifiedTime(const string ×tamp, time_t &result); + + vector Glob(const string &path, FileOpener *opener = nullptr) override { return {path}; // FIXME } // HTTP Requests - virtual duckdb::unique_ptr HeadRequest(FileHandle &handle, string url, HeaderMap header_map); + virtual duckdb::unique_ptr HeadRequest(FileHandle &handle, string url, HTTPHeaders header_map); // Get Request with range parameter that GETs exactly buffer_out_len bytes from the url - virtual duckdb::unique_ptr GetRangeRequest(FileHandle &handle, string url, HeaderMap header_map, - idx_t file_offset, char *buffer_out, - idx_t buffer_out_len); + virtual duckdb::unique_ptr GetRangeRequest(FileHandle &handle, string url, HTTPHeaders header_map, + idx_t file_offset, char *buffer_out, idx_t buffer_out_len); // Get Request without a range (i.e., downloads full file) - virtual duckdb::unique_ptr GetRequest(FileHandle &handle, string url, HeaderMap header_map); + virtual duckdb::unique_ptr GetRequest(FileHandle &handle, string url, HTTPHeaders header_map); // Post Request that can handle variable sized responses without a content-length header (needed for s3 multipart) - virtual duckdb::unique_ptr PostRequest(FileHandle &handle, string url, HeaderMap header_map, - duckdb::unique_ptr &buffer_out, - idx_t &buffer_out_len, char *buffer_in, idx_t buffer_in_len, - string params = ""); - virtual duckdb::unique_ptr PutRequest(FileHandle &handle, string url, HeaderMap header_map, - char *buffer_in, idx_t buffer_in_len, string params = ""); + virtual duckdb::unique_ptr PostRequest(FileHandle &handle, string url, HTTPHeaders header_map, + string &result, char *buffer_in, idx_t buffer_in_len, + string params = ""); + virtual duckdb::unique_ptr PutRequest(FileHandle &handle, string url, HTTPHeaders header_map, + char *buffer_in, idx_t buffer_in_len, string params = ""); - virtual duckdb::unique_ptr DeleteRequest(FileHandle &handle, string url, HeaderMap header_map); + virtual duckdb::unique_ptr DeleteRequest(FileHandle &handle, string url, HTTPHeaders header_map); // FS methods void Read(FileHandle &handle, void *buffer, int64_t nr_bytes, idx_t location) override; @@ -183,6 +121,7 @@ class HTTPFileSystem : public FileSystem { void FileSync(FileHandle &handle) override; int64_t GetFileSize(FileHandle &handle) override; time_t GetLastModifiedTime(FileHandle &handle) override; + string GetVersionTag(FileHandle &handle) override; bool FileExists(const string &filename, optional_ptr opener) override; void Seek(FileHandle &handle, idx_t location) override; idx_t SeekPosition(FileHandle &handle) override; @@ -207,12 +146,17 @@ class HTTPFileSystem : public FileSystem { optional_ptr GetGlobalCache(); protected: - virtual duckdb::unique_ptr CreateHandle(const string &path, FileOpenFlags flags, - optional_ptr opener); + unique_ptr OpenFileExtended(const OpenFileInfo &file, FileOpenFlags flags, + optional_ptr opener) override; + bool SupportsOpenFileExtended() const override { + return true; + } + + virtual HTTPException GetHTTPError(FileHandle &, const HTTPResponse &response, const string &url); - static duckdb::unique_ptr - RunRequestWithRetry(const std::function &request, string &url, string method, - const HTTPParams ¶ms, const std::function &retry_cb = {}); +protected: + virtual duckdb::unique_ptr CreateHandle(const OpenFileInfo &file, FileOpenFlags flags, + optional_ptr opener); private: // Global cache diff --git a/extension/httpfs/include/httpfs_client.hpp b/extension/httpfs/include/httpfs_client.hpp new file mode 100644 index 00000000..1d7620cf --- /dev/null +++ b/extension/httpfs/include/httpfs_client.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include "duckdb/common/http_util.hpp" + +namespace duckdb { +class HTTPLogger; +class FileOpener; +struct FileOpenerInfo; +class HTTPState; + +struct HTTPFSParams : public HTTPParams { + HTTPFSParams(HTTPUtil &http_util) : HTTPParams(http_util) { + } + + static constexpr bool DEFAULT_ENABLE_SERVER_CERT_VERIFICATION = false; + static constexpr uint64_t DEFAULT_HF_MAX_PER_PAGE = 0; + static constexpr bool DEFAULT_FORCE_DOWNLOAD = false; + + bool force_download = DEFAULT_FORCE_DOWNLOAD; + bool enable_server_cert_verification = DEFAULT_ENABLE_SERVER_CERT_VERIFICATION; + idx_t hf_max_per_page = DEFAULT_HF_MAX_PER_PAGE; + string ca_cert_file; + string bearer_token; + shared_ptr state; +}; + +class HTTPFSUtil : public HTTPUtil { +public: + unique_ptr InitializeParameters(optional_ptr opener, + optional_ptr info) override; + unique_ptr InitializeClient(HTTPParams &http_params, const string &proto_host_port) override; + + static unordered_map ParseGetParameters(const string &text); + static shared_ptr GetHTTPUtil(optional_ptr opener); + + string GetName() const override; +}; + +} // namespace duckdb diff --git a/extension/httpfs/include/s3fs.hpp b/extension/httpfs/include/s3fs.hpp index bc8b879c..83634297 100644 --- a/extension/httpfs/include/s3fs.hpp +++ b/extension/httpfs/include/s3fs.hpp @@ -12,9 +12,6 @@ #include "duckdb/common/case_insensitive_map.hpp" #include "httpfs.hpp" -#define CPPHTTPLIB_OPENSSL_SUPPORT -#include "httplib.hpp" - #include #include #include @@ -111,9 +108,9 @@ class S3FileHandle : public HTTPFileHandle { friend class S3FileSystem; public: - S3FileHandle(FileSystem &fs, string path_p, FileOpenFlags flags, const HTTPParams &http_params, + S3FileHandle(FileSystem &fs, const OpenFileInfo &file, FileOpenFlags flags, unique_ptr http_params_p, const S3AuthParams &auth_params_p, const S3ConfigParams &config_params_p) - : HTTPFileHandle(fs, std::move(path_p), flags, http_params), auth_params(auth_params_p), + : HTTPFileHandle(fs, file, flags, std::move(http_params_p)), auth_params(auth_params_p), config_params(config_params_p), uploads_in_progress(0), parts_uploaded(0), upload_finalized(false), uploader_has_error(false), upload_exception(nullptr) { if (flags.OpenForReading() && flags.OpenForWriting()) { @@ -159,7 +156,7 @@ class S3FileHandle : public HTTPFileHandle { atomic uploader_has_error {false}; std::exception_ptr upload_exception; - unique_ptr CreateClient(optional_ptr client_context) override; + unique_ptr CreateClient() override; //! Rethrow IO Exception originating from an upload thread void RethrowIOError() { @@ -178,21 +175,17 @@ class S3FileSystem : public HTTPFileSystem { string GetName() const override; public: - duckdb::unique_ptr HeadRequest(FileHandle &handle, string s3_url, HeaderMap header_map) override; - duckdb::unique_ptr GetRequest(FileHandle &handle, string url, HeaderMap header_map) override; - duckdb::unique_ptr GetRangeRequest(FileHandle &handle, string s3_url, HeaderMap header_map, - idx_t file_offset, char *buffer_out, - idx_t buffer_out_len) override; - duckdb::unique_ptr PostRequest(FileHandle &handle, string s3_url, HeaderMap header_map, - duckdb::unique_ptr &buffer_out, idx_t &buffer_out_len, - char *buffer_in, idx_t buffer_in_len, - string http_params = "") override; - duckdb::unique_ptr PutRequest(FileHandle &handle, string s3_url, HeaderMap header_map, - char *buffer_in, idx_t buffer_in_len, - string http_params = "") override; - duckdb::unique_ptr DeleteRequest(FileHandle &handle, string s3_url, HeaderMap header_map) override; - - static void Verify(); + duckdb::unique_ptr HeadRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) override; + duckdb::unique_ptr GetRequest(FileHandle &handle, string url, HTTPHeaders header_map) override; + duckdb::unique_ptr GetRangeRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map, + idx_t file_offset, char *buffer_out, + idx_t buffer_out_len) override; + duckdb::unique_ptr PostRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map, + string &buffer_out, char *buffer_in, idx_t buffer_in_len, + string http_params = "") override; + duckdb::unique_ptr PutRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map, + char *buffer_in, idx_t buffer_in_len, string http_params = "") override; + duckdb::unique_ptr DeleteRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) override; bool CanHandleFile(const string &fpath) override; bool OnDiskFile(FileHandle &handle) override { @@ -218,7 +211,7 @@ class S3FileSystem : public HTTPFileSystem { // Note: caller is responsible to not call this method twice on the same buffer static void UploadBuffer(S3FileHandle &file_handle, shared_ptr write_buffer); - vector Glob(const string &glob_pattern, FileOpener *opener = nullptr) override; + vector Glob(const string &glob_pattern, FileOpener *opener = nullptr) override; bool ListFiles(const string &directory, const std::function &callback, FileOpener *opener = nullptr) override; @@ -230,23 +223,24 @@ class S3FileSystem : public HTTPFileSystem { return true; } + static HTTPException GetS3Error(S3AuthParams &s3_auth_params, const HTTPResponse &response, const string &url); + protected: static void NotifyUploadsInProgress(S3FileHandle &file_handle); - duckdb::unique_ptr CreateHandle(const string &path, FileOpenFlags flags, + duckdb::unique_ptr CreateHandle(const OpenFileInfo &file, FileOpenFlags flags, optional_ptr opener) override; void FlushBuffer(S3FileHandle &handle, shared_ptr write_buffer); string GetPayloadHash(char *buffer, idx_t buffer_len); - // helper for ReadQueryParams - void GetQueryParam(const string &key, string ¶m, CPPHTTPLIB_NAMESPACE::Params &query_params); + HTTPException GetHTTPError(FileHandle &, const HTTPResponse &response, const string &url) override; }; // Helper class to do s3 ListObjectV2 api call https://docs.aws.amazon.com/AmazonS3/latest/API/API_ListObjectsV2.html struct AWSListObjectV2 { static string Request(string &path, HTTPParams &http_params, S3AuthParams &s3_auth_params, string &continuation_token, optional_ptr state, bool use_delimiter = false); - static void ParseKey(string &aws_response, vector &result); + static void ParseFileList(string &aws_response, vector &result); static vector ParseCommonPrefix(string &aws_response); static string ParseContinuationToken(string &aws_response); }; diff --git a/extension/httpfs/s3fs.cpp b/extension/httpfs/s3fs.cpp index 3f78a4f2..55f396ad 100644 --- a/extension/httpfs/s3fs.cpp +++ b/extension/httpfs/s3fs.cpp @@ -4,6 +4,8 @@ #include "duckdb.hpp" #ifndef DUCKDB_AMALGAMATION #include "duckdb/common/exception/http_exception.hpp" +#include "duckdb/logging/log_type.hpp" +#include "duckdb/logging/file_system_logger.hpp" #include "duckdb/common/helper.hpp" #include "duckdb/common/thread.hpp" #include "duckdb/common/types/timestamp.hpp" @@ -23,11 +25,11 @@ namespace duckdb { -static HeaderMap create_s3_header(string url, string query, string host, string service, string method, - const S3AuthParams &auth_params, string date_now = "", string datetime_now = "", - string payload_hash = "", string content_type = "") { +static HTTPHeaders create_s3_header(string url, string query, string host, string service, string method, + const S3AuthParams &auth_params, string date_now = "", string datetime_now = "", + string payload_hash = "", string content_type = "") { - HeaderMap res; + HTTPHeaders res; res["Host"] = host; // If access key is not set, we don't set the headers at all to allow accessing public files through s3 urls if (auth_params.secret_access_key.empty() && auth_params.access_key_id.empty()) { @@ -110,14 +112,6 @@ static HeaderMap create_s3_header(string url, string query, string host, string return res; } -static duckdb::unique_ptr initialize_http_headers(HeaderMap &header_map) { - auto headers = make_uniq(); - for (auto &entry : header_map) { - headers->insert(entry); - } - return headers; -} - string S3FileSystem::UrlDecode(string input) { return StringUtil::URLDecode(input, true); } @@ -279,11 +273,11 @@ void S3FileHandle::Close() { } } -unique_ptr S3FileHandle::CreateClient(optional_ptr client_context) { +unique_ptr S3FileHandle::CreateClient() { auto parsed_url = S3FileSystem::S3UrlParse(path, this->auth_params); string proto_host_port = parsed_url.http_proto + parsed_url.host; - return HTTPFileSystem::GetClient(this->http_params, proto_host_port.c_str(), this); + return http_params.http_util.InitializeClient(http_params, proto_host_port); } // Opens the multipart upload and returns the ID @@ -291,20 +285,15 @@ string S3FileSystem::InitializeMultipartUpload(S3FileHandle &file_handle) { auto &s3fs = (S3FileSystem &)file_handle.file_system; // AWS response is around 300~ chars in docs so this should be enough to not need a resize - idx_t response_buffer_len = 1000; - auto response_buffer = duckdb::unique_ptr {new char[response_buffer_len]}; - + string result; string query_param = "uploads="; - auto res = s3fs.PostRequest(file_handle, file_handle.path, {}, response_buffer, response_buffer_len, nullptr, 0, - query_param); + auto res = s3fs.PostRequest(file_handle, file_handle.path, {}, result, nullptr, 0, query_param); - if (res->code != 200) { - throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %s)", res->http_url, res->error, - to_string(res->code)); + if (res->status != HTTPStatusCode::OK_200) { + throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %d)", res->url, res->GetError(), + static_cast(res->status)); } - string result(response_buffer.get(), response_buffer_len); - auto open_tag_pos = result.find("", 0); auto close_tag_pos = result.find("", open_tag_pos); @@ -333,23 +322,22 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptrpart_no + 1) + "&" + "uploadId=" + S3FileSystem::UrlEncode(file_handle.multipart_upload_id, true); - unique_ptr res; - case_insensitive_map_t::iterator etag_lookup; + unique_ptr res; + string etag; try { res = s3fs.PutRequest(file_handle, file_handle.path, {}, (char *)write_buffer->Ptr(), write_buffer->idx, query_param); - if (res->code != 200) { - throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %s)", res->http_url, res->error, - to_string(res->code)); + if (res->status != HTTPStatusCode::OK_200) { + throw HTTPException(*res, "Unable to connect to URL %s: %s (HTTP code %d)", res->url, res->GetError(), + static_cast(res->status)); } - etag_lookup = res->headers.find("ETag"); - if (etag_lookup == res->headers.end()) { + if (!res->headers.HasHeader("ETag")) { throw IOException("Unexpected response when uploading part to S3"); } - + etag = res->headers.GetHeaderValue("ETag"); } catch (std::exception &ex) { ErrorData error(ex); if (error.Type() != ExceptionType::IO && error.Type() != ExceptionType::HTTP) { @@ -369,7 +357,7 @@ void S3FileSystem::UploadBuffer(S3FileHandle &file_handle, shared_ptr lck(file_handle.part_etags_lock); - file_handle.part_etags.insert(std::pair(write_buffer->part_no, etag_lookup->second)); + file_handle.part_etags.insert(std::pair(write_buffer->part_no, etag)); } file_handle.parts_uploaded++; @@ -460,18 +448,15 @@ void S3FileSystem::FinalizeMultipartUpload(S3FileHandle &file_handle) { string body = ss.str(); // Response is around ~400 in AWS docs so this should be enough to not need a resize - idx_t response_buffer_len = 1000; - auto response_buffer = duckdb::unique_ptr {new char[response_buffer_len]}; + string result; string query_param = "uploadId=" + S3FileSystem::UrlEncode(file_handle.multipart_upload_id, true); - auto res = s3fs.PostRequest(file_handle, file_handle.path, {}, response_buffer, response_buffer_len, - (char *)body.c_str(), body.length(), query_param); - string result(response_buffer.get(), response_buffer_len); - + auto res = + s3fs.PostRequest(file_handle, file_handle.path, {}, result, (char *)body.c_str(), body.length(), query_param); auto open_tag_pos = result.find("code, - result); + throw HTTPException(*res, "Unexpected response during S3 multipart upload finalization: %d\n\n%s", + static_cast(res->status), result); } } @@ -512,7 +497,7 @@ shared_ptr S3FileHandle::GetBuffer(uint16_t write_buffer_idx) { return new_write_buffer; } -void S3FileSystem::GetQueryParam(const string &key, string ¶m, duckdb_httplib_openssl::Params &query_params) { +void GetQueryParam(const string &key, string ¶m, unordered_map &query_params) { auto found_param = query_params.find(key); if (found_param != query_params.end()) { param = found_param->second; @@ -525,8 +510,7 @@ void S3FileSystem::ReadQueryParams(const string &url_query_param, S3AuthParams & return; } - duckdb_httplib_openssl::Params query_params; - duckdb_httplib_openssl::detail::parse_query_text(url_query_param, query_params); + auto query_params = HTTPFSUtil::ParseGetParameters(url_query_param); GetQueryParam("s3_region", params.region, query_params); GetQueryParam("s3_access_key_id", params.access_key_id, query_params); @@ -652,9 +636,9 @@ string ParsedS3Url::GetHTTPUrl(S3AuthParams &auth_params, const string &http_que return full_url; } -unique_ptr S3FileSystem::PostRequest(FileHandle &handle, string url, HeaderMap header_map, - duckdb::unique_ptr &buffer_out, idx_t &buffer_out_len, - char *buffer_in, idx_t buffer_in_len, string http_params) { +unique_ptr S3FileSystem::PostRequest(FileHandle &handle, string url, HTTPHeaders header_map, + string &result, char *buffer_in, idx_t buffer_in_len, + string http_params) { auto auth_params = handle.Cast().auth_params; auto parsed_s3_url = S3UrlParse(url, auth_params); string http_url = parsed_s3_url.GetHTTPUrl(auth_params, http_params); @@ -662,11 +646,11 @@ unique_ptr S3FileSystem::PostRequest(FileHandle &handle, string auto headers = create_s3_header(parsed_s3_url.path, http_params, parsed_s3_url.host, "s3", "POST", auth_params, "", "", payload_hash, "application/octet-stream"); - return HTTPFileSystem::PostRequest(handle, http_url, headers, buffer_out, buffer_out_len, buffer_in, buffer_in_len); + return HTTPFileSystem::PostRequest(handle, http_url, headers, result, buffer_in, buffer_in_len); } -unique_ptr S3FileSystem::PutRequest(FileHandle &handle, string url, HeaderMap header_map, - char *buffer_in, idx_t buffer_in_len, string http_params) { +unique_ptr S3FileSystem::PutRequest(FileHandle &handle, string url, HTTPHeaders header_map, + char *buffer_in, idx_t buffer_in_len, string http_params) { auto auth_params = handle.Cast().auth_params; auto parsed_s3_url = S3UrlParse(url, auth_params); string http_url = parsed_s3_url.GetHTTPUrl(auth_params, http_params); @@ -678,7 +662,7 @@ unique_ptr S3FileSystem::PutRequest(FileHandle &handle, string return HTTPFileSystem::PutRequest(handle, http_url, headers, buffer_in, buffer_in_len); } -unique_ptr S3FileSystem::HeadRequest(FileHandle &handle, string s3_url, HeaderMap header_map) { +unique_ptr S3FileSystem::HeadRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) { auto auth_params = handle.Cast().auth_params; auto parsed_s3_url = S3UrlParse(s3_url, auth_params); string http_url = parsed_s3_url.GetHTTPUrl(auth_params); @@ -687,7 +671,7 @@ unique_ptr S3FileSystem::HeadRequest(FileHandle &handle, string return HTTPFileSystem::HeadRequest(handle, http_url, headers); } -unique_ptr S3FileSystem::GetRequest(FileHandle &handle, string s3_url, HeaderMap header_map) { +unique_ptr S3FileSystem::GetRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) { auto auth_params = handle.Cast().auth_params; auto parsed_s3_url = S3UrlParse(s3_url, auth_params); string http_url = parsed_s3_url.GetHTTPUrl(auth_params); @@ -696,8 +680,8 @@ unique_ptr S3FileSystem::GetRequest(FileHandle &handle, string return HTTPFileSystem::GetRequest(handle, http_url, headers); } -unique_ptr S3FileSystem::GetRangeRequest(FileHandle &handle, string s3_url, HeaderMap header_map, - idx_t file_offset, char *buffer_out, idx_t buffer_out_len) { +unique_ptr S3FileSystem::GetRangeRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map, + idx_t file_offset, char *buffer_out, idx_t buffer_out_len) { auto auth_params = handle.Cast().auth_params; auto parsed_s3_url = S3UrlParse(s3_url, auth_params); string http_url = parsed_s3_url.GetHTTPUrl(auth_params); @@ -706,7 +690,7 @@ unique_ptr S3FileSystem::GetRangeRequest(FileHandle &handle, st return HTTPFileSystem::GetRangeRequest(handle, http_url, headers, file_offset, buffer_out, buffer_out_len); } -unique_ptr S3FileSystem::DeleteRequest(FileHandle &handle, string s3_url, HeaderMap header_map) { +unique_ptr S3FileSystem::DeleteRequest(FileHandle &handle, string s3_url, HTTPHeaders header_map) { auto auth_params = handle.Cast().auth_params; auto parsed_s3_url = S3UrlParse(s3_url, auth_params); string http_url = parsed_s3_url.GetHTTPUrl(auth_params); @@ -715,113 +699,22 @@ unique_ptr S3FileSystem::DeleteRequest(FileHandle &handle, stri return HTTPFileSystem::DeleteRequest(handle, http_url, headers); } -unique_ptr S3FileSystem::CreateHandle(const string &path, FileOpenFlags flags, +unique_ptr S3FileSystem::CreateHandle(const OpenFileInfo &file, FileOpenFlags flags, optional_ptr opener) { - FileOpenerInfo info = {path}; + FileOpenerInfo info = {file.path}; S3AuthParams auth_params = S3AuthParams::ReadFrom(opener, info); // Scan the query string for any s3 authentication parameters - auto parsed_s3_url = S3UrlParse(path, auth_params); + auto parsed_s3_url = S3UrlParse(file.path, auth_params); ReadQueryParams(parsed_s3_url.query_param, auth_params); - return duckdb::make_uniq(*this, path, flags, HTTPParams::ReadFrom(opener, info), auth_params, - S3ConfigParams::ReadFrom(opener)); -} + auto http_util = HTTPFSUtil::GetHTTPUtil(opener); + auto params = http_util->InitializeParameters(opener, info); -// this computes the signature from https://czak.pl/2015/09/15/s3-rest-api-with-curl.html -void S3FileSystem::Verify() { - S3AuthParams auth_params; - auth_params.region = "us-east-1"; - auth_params.access_key_id = "AKIAIOSFODNN7EXAMPLE"; - auth_params.secret_access_key = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"; - - auto test_header = create_s3_header("/", "", "my-precious-bucket.s3.amazonaws.com", "s3", "GET", auth_params, - "20150915", "20150915T124500Z"); - if (test_header["Authorization"] != - "AWS4-HMAC-SHA256 Credential=AKIAIOSFODNN7EXAMPLE/20150915/us-east-1/s3/aws4_request, " - "SignedHeaders=host;x-amz-content-sha256;x-amz-date, " - "Signature=182072eb53d85c36b2d791a1fa46a12d23454ec1e921b02075c23aee40166d5a") { - throw std::runtime_error("test fail"); - } - - if (UrlEncode("/category=Books/") != "/category%3DBooks/") { - throw std::runtime_error("test fail"); - } - if (UrlEncode("/?category=Books&title=Ducks Retreat/") != "/%3Fcategory%3DBooks%26title%3DDucks%20Retreat/") { - throw std::runtime_error("test fail"); - } - if (UrlEncode("/?category=Books&title=Ducks Retreat/", true) != - "%2F%3Fcategory%3DBooks%26title%3DDucks%20Retreat%2F") { - throw std::runtime_error("test fail"); - } - // AWS_SECRET_ACCESS_KEY="vs1BZPxSL2qVARBSg5vCMKJsavCoEPlo/HSHRaVe" AWS_ACCESS_KEY_ID="ASIAYSPIOYDTHTBIITVC" - // AWS_SESSION_TOKEN="IQoJb3JpZ2luX2VjENX//////////wEaCWV1LXdlc3QtMSJHMEUCIQDfjzs9BYHrEXDMU/NR+PHV1uSTr7CSVSQdjKSfiPRLdgIgCCztF0VMbi9+uHHAfBVKhV4t9MlUrQg3VAOIsLxrWyoqlAIIHRAAGgw1ODk0MzQ4OTY2MTQiDOGl2DsYxENcKCbh+irxARe91faI+hwUhT60sMGRFg0GWefKnPclH4uRFzczrDOcJlAAaQRJ7KOsT8BrJlrY1jSgjkO7PkVjPp92vi6lJX77bg99MkUTJActiOKmd84XvAE5bFc/jFbqechtBjXzopAPkKsGuaqAhCenXnFt6cwq+LZikv/NJGVw7TRphLV+Aq9PSL9XwdzIgsW2qXwe1c3rxDNj53yStRZHVggdxJ0OgHx5v040c98gFphzSULHyg0OY6wmCMTYcswpb4kO2IIi6AiD9cY25TlwPKRKPi5CdBsTPnyTeW62u7PvwK0fTSy4ZuJUuGKQnH2cKmCXquEwoOHEiQY6nQH9fzY/EDGHMRxWWhxu0HiqIfsuFqC7GS0p0ToKQE+pzNsvVwMjZc+KILIDDQpdCWRIwu53I5PZy2Cvk+3y4XLvdZKQCsAKqeOc4c94UAS4NmUT7mCDOuRV0cLBVM8F0JYBGrUxyI+YoIvHhQWmnRLuKgTb5PkF7ZWrXBHFWG5/tZDOvBbbaCWTlRCL9b0Vpg5+BM/81xd8jChP4w83" - // aws --region eu-west-1 --debug s3 ls my-precious-bucket 2>&1 | less - string canonical_query_string = "delimiter=%2F&encoding-type=url&list-type=2&prefix="; // aws s3 ls - - S3AuthParams auth_params2; - auth_params2.region = "eu-west-1"; - auth_params2.access_key_id = "ASIAYSPIOYDTHTBIITVC"; - auth_params2.secret_access_key = "vs1BZPxSL2qVARBSg5vCMKJsavCoEPlo/HSHRaVe"; - auth_params2.session_token = - "IQoJb3JpZ2luX2VjENX//////////wEaCWV1LXdlc3QtMSJHMEUCIQDfjzs9BYHrEXDMU/" - "NR+PHV1uSTr7CSVSQdjKSfiPRLdgIgCCztF0VMbi9+" - "uHHAfBVKhV4t9MlUrQg3VAOIsLxrWyoqlAIIHRAAGgw1ODk0MzQ4OTY2MTQiDOGl2DsYxENcKCbh+irxARe91faI+" - "hwUhT60sMGRFg0GWefKnPclH4uRFzczrDOcJlAAaQRJ7KOsT8BrJlrY1jSgjkO7PkVjPp92vi6lJX77bg99MkUTJA" - "ctiOKmd84XvAE5bFc/jFbqechtBjXzopAPkKsGuaqAhCenXnFt6cwq+LZikv/" - "NJGVw7TRphLV+" - "Aq9PSL9XwdzIgsW2qXwe1c3rxDNj53yStRZHVggdxJ0OgHx5v040c98gFphzSULHyg0OY6wmCMTYcswpb4kO2IIi6" - "AiD9cY25TlwPKRKPi5CdBsTPnyTeW62u7PvwK0fTSy4ZuJUuGKQnH2cKmCXquEwoOHEiQY6nQH9fzY/" - "EDGHMRxWWhxu0HiqIfsuFqC7GS0p0ToKQE+pzNsvVwMjZc+KILIDDQpdCWRIwu53I5PZy2Cvk+" - "3y4XLvdZKQCsAKqeOc4c94UAS4NmUT7mCDOuRV0cLBVM8F0JYBGrUxyI+" - "YoIvHhQWmnRLuKgTb5PkF7ZWrXBHFWG5/tZDOvBbbaCWTlRCL9b0Vpg5+BM/81xd8jChP4w83"; - - auto test_header2 = create_s3_header("/", canonical_query_string, "my-precious-bucket.s3.eu-west-1.amazonaws.com", - "s3", "GET", auth_params2, "20210904", "20210904T121746Z"); - if (test_header2["Authorization"] != - "AWS4-HMAC-SHA256 Credential=ASIAYSPIOYDTHTBIITVC/20210904/eu-west-1/s3/aws4_request, " - "SignedHeaders=host;x-amz-content-sha256;x-amz-date;x-amz-security-token, " - "Signature=4d9d6b59d7836b6485f6ad822de97be40287da30347d83042ea7fbed530dc4c0") { - throw std::runtime_error("test fail"); - } - - S3AuthParams auth_params3; - auth_params3.region = "eu-west-1"; - auth_params3.access_key_id = "S3RVER"; - auth_params3.secret_access_key = "S3RVER"; - - auto test_header3 = - create_s3_header("/correct_auth_test.csv", "", "test-bucket-ceiveran.s3.amazonaws.com", "s3", "PUT", - auth_params3, "20220121", "20220121T141452Z", - "28a0cf6ac5c4cb73793091fe6ecc6a68bf90855ac9186158748158f50241bb0c", "text/data;charset=utf-8"); - if (test_header3["Authorization"] != "AWS4-HMAC-SHA256 Credential=S3RVER/20220121/eu-west-1/s3/aws4_request, " - "SignedHeaders=content-type;host;x-amz-content-sha256;x-amz-date, " - "Signature=5d9a6cbfaa78a6d0f2ab7df0445e2f1cc9c80cd3655ac7de9e7219c036f23f02") { - throw std::runtime_error("test3 fail"); - } - - // bug #4082 - S3AuthParams auth_params4; - auth_params4.region = "auto"; - auth_params4.access_key_id = "asdf"; - auth_params4.secret_access_key = "asdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdfasdf"; - create_s3_header("/", "", "exampple.com", "s3", "GET", auth_params4); - - if (UrlEncode("/category=Books/") != "/category%3DBooks/") { - throw std::runtime_error("test fail"); - } - if (UrlEncode("/?category=Books&title=Ducks Retreat/") != "/%3Fcategory%3DBooks%26title%3DDucks%20Retreat/") { - throw std::runtime_error("test fail"); - } - if (UrlEncode("/?category=Books&title=Ducks Retreat/", true) != - "%2F%3Fcategory%3DBooks%26title%3DDucks%20Retreat%2F") { - throw std::runtime_error("test fail"); - } - - // TODO add a test that checks the signing for path-style + return duckdb::make_uniq(*this, file, flags, std::move(params), auth_params, + S3ConfigParams::ReadFrom(opener)); } - void S3FileHandle::Initialize(optional_ptr opener) { try { HTTPFileHandle::Initialize(opener); @@ -883,8 +776,9 @@ void S3FileSystem::RemoveFile(const string &path, optional_ptr opene auto &s3fh = handle->Cast(); auto res = DeleteRequest(*handle, s3fh.path, {}); - if (res->code != 200 && res->code != 204) { - throw IOException("Could not remove file \"%s\": %s", {{"errno", to_string(res->code)}}, path, res->error); + if (res->status != HTTPStatusCode::OK_200 && res->status != HTTPStatusCode::NoContent_204) { + throw IOException("Could not remove file \"%s\": %s", {{"errno", to_string(static_cast(res->status))}}, + path, res->GetError()); } } @@ -895,7 +789,7 @@ void S3FileSystem::RemoveDirectory(const string &path, optional_ptr try { this->RemoveFile(file, opener); } catch (IOException &e) { - string errmsg(e.what()); + string errmsg(e.what()); if (errmsg.find("No such file or directory") != std::string::npos) { return; } @@ -946,6 +840,8 @@ void S3FileSystem::Write(FileHandle &handle, void *buffer, int64_t nr_bytes, idx s3fh.file_offset += bytes_to_write; bytes_written += bytes_to_write; } + + DUCKDB_LOG_FILE_SYSTEM_WRITE(handle, bytes_written, s3fh.file_offset - bytes_written); } static bool Match(vector::const_iterator key, vector::const_iterator key_end, @@ -973,7 +869,7 @@ static bool Match(vector::const_iterator key, vector::const_iter return key == key_end && pattern == pattern_end; } -vector S3FileSystem::Glob(const string &glob_pattern, FileOpener *opener) { +vector S3FileSystem::Glob(const string &glob_pattern, FileOpener *opener) { if (opener == nullptr) { throw InternalException("Cannot S3 Glob without FileOpener"); } @@ -998,21 +894,22 @@ vector S3FileSystem::Glob(const string &glob_pattern, FileOpener *opener } string shared_path = parsed_glob_url.substr(0, first_wildcard_pos); - auto http_params = HTTPParams::ReadFrom(opener, info); + auto http_util = HTTPFSUtil::GetHTTPUtil(opener); + auto http_params = http_util->InitializeParameters(opener, info); ReadQueryParams(parsed_s3_url.query_param, s3_auth_params); // Do main listobjectsv2 request - vector s3_keys; + vector s3_keys; string main_continuation_token; // Main paging loop do { // main listobject call, may - string response_str = AWSListObjectV2::Request(shared_path, http_params, s3_auth_params, + string response_str = AWSListObjectV2::Request(shared_path, *http_params, s3_auth_params, main_continuation_token, HTTPState::TryGetState(opener).get()); main_continuation_token = AWSListObjectV2::ParseContinuationToken(response_str); - AWSListObjectV2::ParseKey(response_str, s3_keys); + AWSListObjectV2::ParseFileList(response_str, s3_keys); // Repeat requests until the keys of all common prefixes are parsed. auto common_prefixes = AWSListObjectV2::ParseCommonPrefix(response_str); @@ -1025,9 +922,9 @@ vector S3FileSystem::Glob(const string &glob_pattern, FileOpener *opener string common_prefix_continuation_token; do { auto prefix_res = - AWSListObjectV2::Request(prefix_path, http_params, s3_auth_params, common_prefix_continuation_token, - HTTPState::TryGetState(opener).get()); - AWSListObjectV2::ParseKey(prefix_res, s3_keys); + AWSListObjectV2::Request(prefix_path, *http_params, s3_auth_params, + common_prefix_continuation_token, HTTPState::TryGetState(opener).get()); + AWSListObjectV2::ParseFileList(prefix_res, s3_keys); auto more_prefixes = AWSListObjectV2::ParseCommonPrefix(prefix_res); common_prefixes.insert(common_prefixes.end(), more_prefixes.begin(), more_prefixes.end()); common_prefix_continuation_token = AWSListObjectV2::ParseContinuationToken(prefix_res); @@ -1036,19 +933,20 @@ vector S3FileSystem::Glob(const string &glob_pattern, FileOpener *opener } while (!main_continuation_token.empty()); vector pattern_splits = StringUtil::Split(parsed_s3_url.key, "/"); - vector result; - for (const auto &s3_key : s3_keys) { + vector result; + for (auto &s3_key : s3_keys) { - vector key_splits = StringUtil::Split(s3_key, "/"); + vector key_splits = StringUtil::Split(s3_key.path, "/"); bool is_match = Match(key_splits.begin(), key_splits.end(), pattern_splits.begin(), pattern_splits.end()); if (is_match) { - auto result_full_url = parsed_s3_url.prefix + parsed_s3_url.bucket + "/" + s3_key; + auto result_full_url = parsed_s3_url.prefix + parsed_s3_url.bucket + "/" + s3_key.path; // if a ? char was present, we re-add it here as the url parsing will have trimmed it. if (!parsed_s3_url.query_param.empty()) { result_full_url += '?' + parsed_s3_url.query_param; } - result.push_back(result_full_url); + s3_key.path = std::move(result_full_url); + result.push_back(std::move(s3_key)); } } return result; @@ -1069,12 +967,33 @@ bool S3FileSystem::ListFiles(const string &directory, const std::function(); + return GetS3Error(s3_handle.auth_params, response, url); + } + return HTTPFileSystem::GetHTTPError(handle, response, url); +} string AWSListObjectV2::Request(string &path, HTTPParams &http_params, S3AuthParams &s3_auth_params, string &continuation_token, optional_ptr state, bool use_delimiter) { auto parsed_url = S3FileSystem::S3UrlParse(path, s3_auth_params); @@ -1098,54 +1017,102 @@ string AWSListObjectV2::Request(string &path, HTTPParams &http_params, S3AuthPar auto header_map = create_s3_header(req_path, req_params, parsed_url.host, "s3", "GET", s3_auth_params, "", "", "", ""); - auto headers = initialize_http_headers(header_map); - auto client = S3FileSystem::GetClient(http_params, (parsed_url.http_proto + parsed_url.host).c_str(), - nullptr); // Get requests use fresh connection + // Get requests use fresh connection + string full_host = parsed_url.http_proto + parsed_url.host; std::stringstream response; - auto res = client->Get( - listobjectv2_url.c_str(), *headers, - [&](const duckdb_httplib_openssl::Response &response) { - if (response.status >= 400) { - throw HTTPException(response, "HTTP GET error on '%s' (HTTP %d)", listobjectv2_url, response.status); + GetRequestInfo get_request( + full_host, listobjectv2_url, header_map, http_params, + [&](const HTTPResponse &response) { + if (static_cast(response.status) >= 400) { + string trimmed_path = path; + StringUtil::RTrim(trimmed_path, "/"); + trimmed_path += listobjectv2_url; + throw S3FileSystem::GetS3Error(s3_auth_params, response, trimmed_path); } return true; }, - [&](const char *data, size_t data_length) { - if (state) { - state->total_bytes_received += data_length; - } - response << string(data, data_length); + [&](const_data_ptr_t data, idx_t data_length) { + response << string(const_char_ptr_cast(data), data_length); return true; }); - if (state) { - state->get_count++; - } - if (res.error() != duckdb_httplib_openssl::Error::Success) { - throw IOException(to_string(res.error()) + " error for HTTP GET to '" + listobjectv2_url + "'"); + auto result = http_params.http_util.Request(get_request); + if (result->HasRequestError()) { + throw IOException("%s error for HTTP GET to '%s'", result->GetRequestError(), listobjectv2_url); } return response.str(); } -void AWSListObjectV2::ParseKey(string &aws_response, vector &result) { +optional_idx FindTagContents(const string &response, const string &tag, idx_t cur_pos, string &result) { + string open_tag = "<" + tag + ">"; + string close_tag = ""; + auto open_tag_pos = response.find(open_tag, cur_pos); + if (open_tag_pos == string::npos) { + // tag not found + return optional_idx(); + } + auto close_tag_pos = response.find(close_tag, open_tag_pos + open_tag.size()); + if (close_tag_pos == string::npos) { + throw InternalException("Failed to parse S3 result: found open tag for %s but did not find matching close tag", + tag); + } + result = response.substr(open_tag_pos + open_tag.size(), close_tag_pos - open_tag_pos - open_tag.size()); + return close_tag_pos + close_tag.size(); +} + +void AWSListObjectV2::ParseFileList(string &aws_response, vector &result) { + // Example S3 response: + // + // lineitem_sf10_partitioned_shipdate/l_shipdate%3D1997-03-28/data_0.parquet + // 2024-11-09T11:38:08.000Z + // "bdf10f525f8355fb80d1ff2d8c62cc8b" + // 1127863 + // STANDARD + // idx_t cur_pos = 0; while (true) { - auto next_open_tag_pos = aws_response.find("", cur_pos); - if (next_open_tag_pos == string::npos) { + string contents; + auto next_pos = FindTagContents(aws_response, "Contents", cur_pos, contents); + if (!next_pos.IsValid()) { + // exhausted all contents break; - } else { - auto next_close_tag_pos = aws_response.find("", next_open_tag_pos + 5); - if (next_close_tag_pos == string::npos) { - throw InternalException("Failed to parse S3 result"); - } - auto parsed_path = S3FileSystem::UrlDecode( - aws_response.substr(next_open_tag_pos + 5, next_close_tag_pos - next_open_tag_pos - 5)); - if (parsed_path.back() != '/') { - result.push_back(parsed_path); - } - cur_pos = next_close_tag_pos + 6; } + // move to the next position + cur_pos = next_pos.GetIndex(); + + // parse the contents + string key; + auto key_pos = FindTagContents(contents, "Key", 0, key); + if (!key_pos.IsValid()) { + throw InternalException("Key not found in S3 response: %s", contents); + } + auto parsed_path = S3FileSystem::UrlDecode(key); + if (parsed_path.back() == '/') { + // not a file but a directory + continue; + } + // construct the file + OpenFileInfo result_file(parsed_path); + + auto extra_info = make_shared_ptr(); + // get file attributes + string last_modified, etag, size; + auto last_modified_pos = FindTagContents(contents, "LastModified", 0, last_modified); + if (last_modified_pos.IsValid()) { + extra_info->options["last_modified"] = Value(last_modified).DefaultCastAs(LogicalType::TIMESTAMP); + } + auto etag_pos = FindTagContents(contents, "ETag", 0, etag); + if (etag_pos.IsValid()) { + etag = StringUtil::Replace(etag, """, "\""); + extra_info->options["etag"] = Value(std::move(etag)); + } + auto size_pos = FindTagContents(contents, "Size", 0, size); + if (size_pos.IsValid()) { + extra_info->options["file_size"] = Value(size).DefaultCastAs(LogicalType::UBIGINT); + } + result_file.extended_info = std::move(extra_info); + result.push_back(std::move(result_file)); } } diff --git a/extension_config.cmake b/extension_config.cmake index fa408204..58810439 100644 --- a/extension_config.cmake +++ b/extension_config.cmake @@ -8,6 +8,9 @@ else () set(LOAD_HTTPFS_TESTS "") endif() +duckdb_extension_load(json) +duckdb_extension_load(parquet) + duckdb_extension_load(httpfs SOURCE_DIR ${CMAKE_CURRENT_LIST_DIR} INCLUDE_DIR ${CMAKE_CURRENT_LIST_DIR}/extension/httpfs/include diff --git a/test/sql/secret/secret_refresh.test b/test/sql/secret/secret_refresh.test index e1cc7ed4..85c8738a 100644 --- a/test/sql/secret/secret_refresh.test +++ b/test/sql/secret/secret_refresh.test @@ -61,7 +61,7 @@ statement ok FROM "s3://test-bucket/test-file.parquet" query I -SELECT message[0:46] FROM duckdb_logs WHERE type='httpfs.SecretRefresh' +SELECT message[0:46] FROM duckdb_logs WHERE message like '%Successfully refreshed secret%' ---- Successfully refreshed secret: s1, new key_id: @@ -84,7 +84,7 @@ FROM "s3://test-bucket/test-file.parquet" HTTP 403 query I -SELECT message[0:46] FROM duckdb_logs WHERE type='httpfs.SecretRefresh' +SELECT message[0:46] FROM duckdb_logs WHERE message like '%Successfully refreshed secret%' ---- Successfully refreshed secret: s1, new key_id: @@ -125,5 +125,5 @@ HTTP 403 # -> log empty query II -SELECT log_level, message FROM duckdb_logs WHERE type='httpfs.SecretRefresh' +SELECT log_level, message FROM duckdb_logs WHERE message like '%Successfully refreshed secret%' ---- diff --git a/test/sql/secret/secret_refresh_attach.test b/test/sql/secret/secret_refresh_attach.test index a62ed157..c20881d9 100644 --- a/test/sql/secret/secret_refresh_attach.test +++ b/test/sql/secret/secret_refresh_attach.test @@ -45,6 +45,6 @@ ATTACH 's3://test-bucket/presigned/attach.db' AS db (READONLY 1); # Secret refresh has been triggered query II -SELECT log_level, message FROM duckdb_logs WHERE type='httpfs.SecretRefresh' +SELECT log_level, message FROM duckdb_logs WHERE message like '%Successfully refreshed secret%' ---- INFO Successfully refreshed secret: uhuh_this_mah_sh, new key_id: all the girls \ No newline at end of file diff --git a/test/sql/secret/test_secret_type.test b/test/sql/secret/test_secret_type.test new file mode 100644 index 00000000..2024b2f4 --- /dev/null +++ b/test/sql/secret/test_secret_type.test @@ -0,0 +1,20 @@ +# name: test/sql/secret/test_secret_type.test +# description: Test the secret types added by this extension +# group: [httpfs] + +require httpfs + +statement ok +LOAD httpfs + +statement ok +PRAGMA enable_verification + +query II +SELECT type, default_provider from duckdb_secret_types() where extension='httpfs' order by type; +---- +aws config +gcs config +huggingface config +r2 config +s3 config