diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 86ff60fb8..2f9dac6b6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -330,7 +330,7 @@ jobs: python3 setup.py --package-version | xargs python3 -m pip install python3 tools/build/configure.py cat .bazelrc - bazel build -s --verbose_failures //tensorflow_io/core:python/ops/libtensorflow_io.so + bazel build -s --verbose_failures //tensorflow_io/core:python/ops/libtensorflow_io.so //tensorflow_io/core:python/ops/libtensorflow_io_plugins.so - uses: actions/upload-artifact@v1 with: name: ${{ runner.os }}-bazel-bin diff --git a/README.md b/README.md index d55fe903c..305116f7e 100644 --- a/README.md +++ b/README.md @@ -311,6 +311,10 @@ libraries (.so). The gcc provided by Developer Toolset and rh-python36 should be Also, the libstdc++ has to be linked statically to avoid discrepancy of libstdc++ installed on CentOS vs. newer gcc version by devtoolset. +Furthermore, a special flag `--//tensorflow_io/core:static_build` has to be passed to Bazel +in order to avoid duplication of symbols in statically linked libraries for file system +plugins. + The following will install bazel, devtoolset-9, rh-python36, and build the shared libraries: ```sh #!/usr/bin/env bash @@ -331,10 +335,10 @@ scl enable rh-python36 devtoolset-9 \ scl enable rh-python36 devtoolset-9 \ './configure.sh' -# Build shared libraries +# Build shared libraries, notice the passing of --//tensorflow_io/core:static_build BAZEL_LINKOPTS="-static-libstdc++ -static-libgcc" BAZEL_LINKLIBS="-lm -l%:libstdc++.a" \ scl enable rh-python36 devtoolset-9 \ - 'bazel build -s --verbose_failures //tensorflow_io/...' + 'bazel build -s --verbose_failures --//tensorflow_io/core:static_build //tensorflow_io/...' # Once build is complete, shared libraries will be available in # `bazel-bin/tensorflow_io/core/python/ops/` and it is possible diff --git a/tensorflow_io/core/BUILD b/tensorflow_io/core/BUILD index 690c16962..0e5563153 100644 --- a/tensorflow_io/core/BUILD +++ b/tensorflow_io/core/BUILD @@ -7,6 +7,19 @@ load( "tf_io_copts", ) load("@io_bazel_rules_go//go:def.bzl", "go_binary") +load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") + +bool_flag( + name = "static_build", + build_setting_default = False, +) + +config_setting( + name = "static_build_on", + flag_values = { + ":static_build": "True", + }, +) cc_library( name = "cpuinfo", @@ -562,36 +575,23 @@ cc_library( ) cc_library( - name = "azfs_ops", + name = "file_system_plugins", srcs = [ "kernels/azfs_kernels.cc", - "kernels/azfs_kernels.h", + "kernels/file_system_plugins.cc", + "kernels/file_system_plugins.h", ], copts = tf_io_copts(), linkstatic = True, deps = [ "@com_github_azure_azure_storage_cpplite//:azure", + "@com_google_absl//absl/strings", "@local_config_tf//:libtensorflow_framework", "@local_config_tf//:tf_header_lib", ], alwayslink = 1, ) -# bazel test \ -# --action_env=TF_AZURE_USE_DEV_STORAGE=1 \ -# //tensorflow_io/azure:azfs_ops_test -cc_test( - name = "azfs_ops_test", - srcs = [ - "kernels/azfs_kernels_test.cc", - ], - deps = [ - ":azfs_ops", - "@com_google_googletest//:gtest", - "@com_google_googletest//:gtest_main", - ], -) - cc_library( name = "sql_ops", srcs = [ @@ -674,7 +674,6 @@ cc_binary( "//tensorflow_io/bigquery:bigquery_ops", "//tensorflow_io/core:audio_video_ops", "//tensorflow_io/core:avro_ops", - "//tensorflow_io/core:azfs_ops", "//tensorflow_io/core:cpuinfo", "//tensorflow_io/core:file_ops", "//tensorflow_io/core:grpc_ops", @@ -705,6 +704,23 @@ cc_binary( "//tensorflow_io/core:oss_ops", "//tensorflow_io/core/kernels/gsmemcachedfs:gs_memcached_file_system", ], + }) + select({ + "//tensorflow_io/core:static_build_on": [ + "//tensorflow_io/core:file_system_plugins", + ], + "//conditions:default": [], + }), +) + +cc_binary( + name = "python/ops/libtensorflow_io_plugins.so", + copts = tf_io_copts(), + linkshared = 1, + deps = select({ + "//tensorflow_io/core:static_build_on": [], + "//conditions:default": [ + "//tensorflow_io/core:file_system_plugins", + ], }), ) diff --git a/tensorflow_io/core/kernels/azfs_kernels.cc b/tensorflow_io/core/kernels/azfs_kernels.cc index a2c6e0575..d5de173d8 100644 --- a/tensorflow_io/core/kernels/azfs_kernels.cc +++ b/tensorflow_io/core/kernels/azfs_kernels.cc @@ -12,7 +12,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "tensorflow_io/core/kernels/azfs_kernels.h" #include #include @@ -20,38 +19,63 @@ limitations under the License. #include #include +#if defined(_MSC_VER) +#include +#endif + +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/strings/strip.h" #include "blob/blob_client.h" #include "logging.h" #include "storage_account.h" #include "storage_credential.h" #include "storage_errno.h" -#include "tensorflow/core/lib/io/path.h" -#include "tensorflow/core/lib/strings/str_util.h" -#include "tensorflow/core/platform/env.h" +#include "tensorflow_io/core/kernels/file_system_plugins.h" namespace tensorflow { namespace io { +namespace az { namespace { // TODO: DO NOT use a hardcoded path -Status GetTmpFilename(std::string *filename) { +bool GetTmpFilename(std::string* filename) { if (!filename) { - return errors::Internal("'filename' cannot be nullptr."); + // return errors::Internal("'filename' cannot be nullptr."); + return false; } #ifndef _WIN32 char buffer[] = "/tmp/az_blob_filesystem_XXXXXX"; int fd = mkstemp(buffer); if (fd < 0) { - return errors::Internal("Failed to create a temporary file."); + // return errors::Internal("Failed to create a temporary file."); + return false; } #else char buffer[] = "/tmp/az_blob_filesystem_XXXXXX"; - char *ret = _mktemp(buffer); + char* ret = _mktemp(buffer); if (ret == nullptr) { - return errors::Internal("Failed to create a temporary file."); + // return errors::Internal("Failed to create a temporary file."); + return false; } #endif *filename = buffer; - return Status::OK(); + // return Status::OK(); + return true; +} + +void ParseURI(const absl::string_view& fname, absl::string_view* scheme, + absl::string_view* host, absl::string_view* path) { + size_t scheme_chunk = fname.find("://"); + if (scheme_chunk == absl::string_view::npos) { + return; + } + size_t host_chunk = fname.find("/", scheme_chunk + 3); + if (host_chunk == absl::string_view::npos) { + return; + } + *scheme = absl::string_view(fname).substr(0, scheme_chunk); + *host = fname.substr(scheme_chunk + 3, host_chunk); + *path = fname.substr(host_chunk, -1); } constexpr char kAzBlobEndpoint[] = ".blob.core.windows.net"; @@ -61,25 +85,30 @@ constexpr char kAzBlobEndpoint[] = ".blob.core.windows.net"; /// For example, /// "az://account-name.blob.core.windows.net/container/path/to/file.txt" gets /// split into "account-name", "container" and "path/to/file.txt". -Status ParseAzBlobPath(StringPiece fname, bool empty_object_ok, - std::string *account, std::string *container, - std::string *object) { +void ParseAzBlobPath(const std::string& fname, bool empty_object_ok, + std::string* account, std::string* container, + std::string* object, TF_Status* status) { if (!account || !object) { - return errors::Internal("account and object cannot be null."); + TF_SetStatus(status, TF_INTERNAL, "account and object cannot be null"); + return; } - StringPiece scheme, accountp, objectp; - io::ParseURI(fname, &scheme, &accountp, &objectp); + absl::string_view scheme, accountp, objectp; + ParseURI(fname, &scheme, &accountp, &objectp); if (scheme != "az") { - return errors::InvalidArgument( + std::string error_message = absl::StrCat( "Azure Blob Storage path doesn't start with 'az://': ", fname); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } // Consume blob.core.windows.net if it exists absl::ConsumeSuffix(&accountp, kAzBlobEndpoint); if (accountp.empty() || accountp.compare(".") == 0) { - return errors::InvalidArgument( + std::string error_message = absl::StrCat( "Azure Blob Storage path doesn't contain a account name: ", fname); + TF_SetStatus(status, TF_INVALID_ARGUMENT, error_message.c_str()); + return; } *account = std::string(accountp); @@ -95,7 +124,15 @@ Status ParseAzBlobPath(StringPiece fname, bool empty_object_ok, *object = std::string(objectp.substr(pos + 1)); } - return Status::OK(); + if (!empty_object_ok && object->empty()) { + std::string error_message = absl::StrCat( + "Azure Blob Storage path doesn't contain a object name: ", fname); + TF_SetStatus(status, TF_INVALID_ARGUMENT, error_message.c_str()); + return; + } + + TF_SetStatus(status, TF_OK, ""); + return; } std::string errno_to_string() { @@ -144,7 +181,7 @@ std::string errno_to_string() { } std::shared_ptr get_credential( - const std::string &account) { + const std::string& account) { const auto key = std::getenv("TF_AZURE_STORAGE_KEY"); if (key != nullptr) { return std::make_shared(account, @@ -154,20 +191,21 @@ std::shared_ptr get_credential( } } +// TODO: Enable logging azure::storage_lite::blob_client_wrapper CreateAzBlobClientWrapper( - const std::string &account) { + const std::string& account) { azure::storage_lite::logger::set_logger( - [](azure::storage_lite::log_level level, const std::string &log_msg) { + [](azure::storage_lite::log_level level, const std::string& log_msg) { switch (level) { case azure::storage_lite::log_level::info: - _TF_LOG_INFO << log_msg; + // _TF_LOG_INFO << log_msg; break; case azure::storage_lite::log_level::error: case azure::storage_lite::log_level::critical: - _TF_LOG_ERROR << log_msg; + // _TF_LOG_ERROR << log_msg; break; case azure::storage_lite::log_level::warn: - _TF_LOG_WARNING << log_msg; + // _TF_LOG_WARNING << log_msg; break; case azure::storage_lite::log_level::trace: case azure::storage_lite::log_level::debug: @@ -202,27 +240,95 @@ azure::storage_lite::blob_client_wrapper CreateAzBlobClientWrapper( return blob_client_wrapper; } -class AzBlobRandomAccessFile : public RandomAccessFile { +void ListResources(const std::string& dir, const std::string& delimiter, + azure::storage_lite::blob_client_wrapper& blob_client, + std::vector* results, TF_Status* status) { + if (!results) { + TF_SetStatus(status, TF_INTERNAL, "results cannot be null"); + return; + } + + std::string account, container, object; + ParseAzBlobPath(dir, true, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + std::string continuation_token; + + if (container.empty()) { + std::vector containers; + do { + auto list_containers_response = + blob_client.list_containers_segmented("", continuation_token); + if (errno != 0) { + std::string error_message = + absl::StrCat("Failed to get containers of account ", dir, " (", + errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + + containers.insert(containers.end(), list_containers_response.begin(), + list_containers_response.end()); + } while (!continuation_token.empty()); + + std::transform( + std::begin(containers), std::end(containers), + std::back_inserter(*results), + [](azure::storage_lite::list_containers_item list_container_item) + -> std::string { return list_container_item.name; }); + + } else { + std::vector blobs; + do { + auto list_blobs_response = blob_client.list_blobs_segmented( + container, delimiter, continuation_token, object); + if (errno != 0) { + std::string error_message = absl::StrCat("Failed to get blobs of ", dir, + " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + + blobs.insert(blobs.end(), list_blobs_response.blobs.begin(), + list_blobs_response.blobs.end()); + + continuation_token = list_blobs_response.next_marker; + } while (!continuation_token.empty()); + + results->reserve(blobs.size()); + std::transform( + blobs.begin(), blobs.end(), std::back_inserter(*results), + [](azure::storage_lite::list_blobs_segmented_item list_blob_item) + -> std::string { return list_blob_item.name; }); + } + + TF_SetStatus(status, TF_OK, ""); +} + +class AzBlobRandomAccessFile { public: - AzBlobRandomAccessFile(const std::string &account, - const std::string &container, - const std::string &object) + AzBlobRandomAccessFile(const std::string& account, + const std::string& container, + const std::string& object) : account_(account), container_(container), object_(object) {} ~AzBlobRandomAccessFile() {} - Status Read(uint64 offset, size_t n, StringPiece *result, - char *scratch) const override { + int64_t Read(uint64_t offset, size_t n, char* buffer, + TF_Status* status) const { // If n == 0, then return Status::OK() // otherwise, if bytes_read < n then return OutofRange if (n == 0) { - *result = StringPiece("", 0); - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return 0; } auto blob_client = CreateAzBlobClientWrapper(account_); auto blob_property = blob_client.get_blob_property(container_, object_); if (errno != 0) { - return errors::Internal("Failed to get properties"); + TF_SetStatus(status, TF_INTERNAL, "Failed to get properties"); + return 0; } - int64 file_size = blob_property.size; + int64_t file_size = blob_property.size; size_t bytes_to_read = n; if (offset >= file_size) { @@ -231,39 +337,38 @@ class AzBlobRandomAccessFile : public RandomAccessFile { bytes_to_read = file_size - offset; } - if (bytes_to_read == 0) { - *result = StringPiece("", 0); - } else { + if (bytes_to_read > 0) { std::ostringstream oss; // https://stackoverflow.com/a/12481580 #ifndef __APPLE__ - oss.rdbuf()->pubsetbuf(scratch, bytes_to_read); + oss.rdbuf()->pubsetbuf(buffer, bytes_to_read); #endif blob_client.download_blob_to_stream(container_, object_, offset, bytes_to_read, oss); if (errno != 0) { - *result = StringPiece("", 0); - return errors::Internal("Failed to get contents of az://", account_, - kAzBlobEndpoint, "/", container_, "/", object_, - " (", errno_to_string(), ")"); + std::string error_message = absl::StrCat( + "Failed to get contents of az://", account_, kAzBlobEndpoint, "/", + container_, "/", object_, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return 0; } -#ifndef __APPLE__ - *result = StringPiece(scratch, bytes_to_read); -#else +#ifdef __APPLE__ auto blob_string = oss.str(); - if (scratch != nullptr) { - std::copy(blob_string.begin(), blob_string.end(), scratch); + if (buffer != nullptr) { + std::copy(blob_string.begin(), blob_string.end(), buffer); } - *result = StringPiece(blob_string); + bytes_to_read = blob_string.size(); #endif } + if (bytes_to_read < n) { - return errors::OutOfRange("EOF reached"); + TF_SetStatus(status, TF_OUT_OF_RANGE, "EOF reached"); + return bytes_to_read; } - - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return bytes_to_read; } private: @@ -272,82 +377,86 @@ class AzBlobRandomAccessFile : public RandomAccessFile { std::string object_; }; -class AzBlobWritableFile : public WritableFile { +class AzBlobWritableFile { public: - AzBlobWritableFile(const std::string &account, const std::string &container, - const std::string &object) + AzBlobWritableFile(const std::string& account, const std::string& container, + const std::string& object) : account_(account), container_(container), object_(object), sync_needed_(true) { - if (GetTmpFilename(&tmp_content_filename_).ok()) { + if (GetTmpFilename(&tmp_content_filename_)) { outfile_.open(tmp_content_filename_, std::ofstream::binary | std::ofstream::app); } } - ~AzBlobWritableFile() { Close().IgnoreError(); } + ~AzBlobWritableFile() { + TF_Status* status = TF_NewStatus(); + Close(status); + TF_DeleteStatus(status); + } + + void Append(const char* buffer, size_t n, TF_Status* status) { + if (!outfile_.is_open()) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "The internal temporary file is not writable"); + return; + } - Status Append(StringPiece data) override { - TF_RETURN_IF_ERROR(CheckWritable()); + std::string data(buffer, n); sync_needed_ = true; outfile_ << data; if (!outfile_.good()) { - return errors::Internal( - "Could not append to the internal temporary file."); + TF_SetStatus(status, TF_INTERNAL, + "Could not append to the internal temporary file"); + return; } - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); } - - Status Close() override { - if (outfile_.is_open()) { - TF_RETURN_IF_ERROR(Sync()); - outfile_.close(); - std::remove(tmp_content_filename_.c_str()); + void Sync(TF_Status* status) { + if (!outfile_.is_open()) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "The internal temporary file is not writable"); + return; } - return Status::OK(); - } - - Status Flush() override { return Sync(); } - Status Sync() override { - TF_RETURN_IF_ERROR(CheckWritable()); if (!sync_needed_) { - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return; } - const auto status = SyncImpl(); - if (status.ok()) { - sync_needed_ = false; - } - return status; - } - - private: - Status SyncImpl() { outfile_.flush(); if (!outfile_.good()) { - return errors::Internal( - "Could not write to the internal temporary file."); + TF_SetStatus(status, TF_INTERNAL, + "Could not write to the internal temporary file"); + return; } auto blob_client = CreateAzBlobClientWrapper(account_); blob_client.upload_file_to_blob(tmp_content_filename_, container_, object_); if (errno != 0) { - return errors::Internal("Failed to upload to az://", account_, "/", - container_, "/", object_, " (", errno_to_string(), - ")"); + std::string error_message = + absl::StrCat("Failed to upload to az://", account_, "/", container_, + "/", object_, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } - - return Status::OK(); + sync_needed_ = false; + TF_SetStatus(status, TF_OK, ""); } - Status CheckWritable() const { - if (!outfile_.is_open()) { - return errors::FailedPrecondition( - "The internal temporary file is not writable."); + void Close(TF_Status* status) { + if (outfile_.is_open()) { + Sync(status); + if (TF_GetCode(status) != TF_OK) { + return; + } + outfile_.close(); + std::remove(tmp_content_filename_.c_str()); } - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); } + private: std::string account_; std::string container_; std::string object_; @@ -356,249 +465,235 @@ class AzBlobWritableFile : public WritableFile { bool sync_needed_; // whether there is buffered data that needs to be synced }; -class AzBlobReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { - public: - AzBlobReadOnlyMemoryRegion(std::unique_ptr data, uint64 length) - : data_(std::move(data)), length_(length) {} - const void *data() override { return reinterpret_cast(data_.get()); } - uint64 length() override { return length_; } - - private: - std::unique_ptr data_; - uint64 length_; -}; - -} // namespace +#if 0 +Status GetMatchingPaths(const std::string& pattern, std::vector* results) { + const std::string& fixed_prefix = + pattern.substr(0, pattern.find_first_of("*?[\\")); -Status AzBlobFileSystem::NewRandomAccessFile( - const std::string &filename, std::unique_ptr *result) { - string account, container, object; + std::string account, container, object; TF_RETURN_IF_ERROR( - ParseAzBlobPath(filename, false, &account, &container, &object)); - result->reset(new AzBlobRandomAccessFile(account, container, object)); - return Status::OK(); -} + ParseAzBlobPathClass(fixed_prefix, true, &account, &container, &object)); -Status AzBlobFileSystem::NewWritableFile( - const std::string &fname, std::unique_ptr *result) { - string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - result->reset(new AzBlobWritableFile(account, container, object)); - return Status::OK(); -} + auto blob_client = CreateAzBlobClientWrapper(account); -Status AzBlobFileSystem::NewAppendableFile( - const std::string &fname, std::unique_ptr *result) { - string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - result->reset(new AzBlobWritableFile(account, container, object)); - return Status::OK(); -} + std::vector blobs; + TF_RETURN_IF_ERROR(ListResources(fixed_prefix, "", blob_client, &blobs)); -Status AzBlobFileSystem::NewReadOnlyMemoryRegionFromFile( - const std::string &filename, - std::unique_ptr *result) { - uint64 size; - TF_RETURN_IF_ERROR(GetFileSize(filename, &size)); - std::unique_ptr data(new char[size]); + std::string container_path; + if (pattern.find(kAzBlobEndpoint) != std::string::npos) { + container_path = + io::JoinPath("az://", account + kAzBlobEndpoint, container); + } else { + container_path = io::JoinPath("az://", account, container); + } - std::unique_ptr file; - TF_RETURN_IF_ERROR(NewRandomAccessFile(filename, &file)); + std::transform(std::begin(blobs), std::end(blobs), std::begin(blobs), + [&container_path](const std::string& path) { + return io::JoinPath(container_path, path); + }); - StringPiece piece; - TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get())); + std::copy_if(std::begin(blobs), std::end(blobs), std::back_inserter(*results), + [&pattern](const std::string& full_path) { + return Env::Default()->MatchPath(full_path, pattern); + }); - result->reset(new AzBlobReadOnlyMemoryRegion(std::move(data), size)); return Status::OK(); } +#endif -Status AzBlobFileSystem::FileExists(const std::string &fname) { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - auto blob_client = CreateAzBlobClientWrapper(account); - auto blob_exists = blob_client.blob_exists(container, object); - if (errno != 0) { - return errors::NotFound("Failed to check if ", fname, " exists (", - errno_to_string(), ")"); - } - if (!blob_exists) { - return errors::NotFound("The specified path ", fname, " was not found."); - } - return Status::OK(); +// SECTION 1. Implementation for `TF_RandomAccessFile` +// ---------------------------------------------------------------------------- +namespace tf_random_access_file { + +static void Cleanup(TF_RandomAccessFile* file) { + auto az_file = static_cast(file->plugin_file); + delete az_file; } -Status AzBlobFileSystem::Stat(const std::string &fname, FileStatistics *stat) { - using namespace std::chrono; +static int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n, + char* buffer, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + return az_file->Read(offset, n, buffer, status); +} - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - auto blob_client = CreateAzBlobClientWrapper(account); +} // namespace tf_random_access_file - if (IsDirectory(fname).ok()) { - *stat = FileStatistics(0, 0, true); - return Status::OK(); - } +// SECTION 2. Implementation for `TF_WritableFile` +// ---------------------------------------------------------------------------- +namespace tf_writable_file { - if (!FileExists(fname).ok()) { - return errors::NotFound("The specified object ", fname, " was not found"); - } +static void Cleanup(TF_WritableFile* file) { + auto az_file = static_cast(file->plugin_file); + delete az_file; +} - auto blob_property = blob_client.get_blob_property(container, object); - if (errno != 0) { - return errors::NotFound("Failed to get file stats for ", fname, " (", - errno_to_string(), ")"); - } +static void Append(const TF_WritableFile* file, const char* buffer, size_t n, + TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + az_file->Append(buffer, n, status); +} - FileStatistics fs; - fs.length = blob_property.size; - fs.mtime_nsec = - duration_cast(seconds(blob_property.last_modified)).count(); +static int64_t Tell(const TF_WritableFile* file, TF_Status* status) { + TF_SetStatus(status, TF_UNIMPLEMENTED, "Stat not implemented"); + return -1; +} - *stat = std::move(fs); +static void Flush(const TF_WritableFile* file, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + az_file->Sync(status); +} - return Status::OK(); +static void Sync(const TF_WritableFile* file, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + az_file->Sync(status); } -Status AzBlobFileSystem::GetChildren(const std::string &dir, - std::vector *result) { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(dir, false, &account, &container, &object)); - auto blob_client = CreateAzBlobClientWrapper(account); +static void Close(const TF_WritableFile* file, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + az_file->Close(status); +} - std::string continuation_token; - if (container.empty()) { - // TODO: iterate while continuation_token isn't empty - auto list_containers = - blob_client.list_containers_segmented("", continuation_token, INT_MAX); - std::transform( - begin(list_containers), end(list_containers), - std::back_inserter(*result), - [](azure::storage_lite::list_containers_item item) -> std::string { - return item.name; - }); - return Status::OK(); - } +} // namespace tf_writable_file - if (!object.empty() && object.back() != '/') { - object += "/"; - } +// SECTION 3. Implementation for `TF_ReadOnlyMemoryRegion` +// ---------------------------------------------------------------------------- +namespace tf_read_only_memory_region { +void Cleanup(TF_ReadOnlyMemoryRegion* region) {} - auto list_blobs = blob_client.list_blobs_segmented( - container, "/", continuation_token, object); - if (errno != 0) { - return errors::Internal("Failed to get child of ", dir, " (", - errno_to_string(), ")"); - } +const void* Data(const TF_ReadOnlyMemoryRegion* region) { return nullptr; } - auto blobs = list_blobs.blobs; - result->reserve(blobs.size()); - std::transform( - std::begin(blobs), std::end(blobs), std::back_inserter(*result), - [&object](azure::storage_lite::list_blobs_segmented_item list_blob_item) - -> std::string { - // Remove the prefix from the name - auto blob_name = list_blob_item.name; - blob_name.erase(0, object.size()); - // Remove the trailing slash from folders - if (blob_name.back() == '/') { - blob_name.pop_back(); - } - return blob_name; - }); +uint64_t Length(const TF_ReadOnlyMemoryRegion* region) { return 0; } - return Status::OK(); -} +} // namespace tf_read_only_memory_region -Status AzBlobFileSystem::GetMatchingPaths(const std::string &pattern, - std::vector *results) { - const std::string &fixed_prefix = - pattern.substr(0, pattern.find_first_of("*?[\\")); +// SECTION 4. Implementation for `TF_Filesystem`, the actual filesystem +// ---------------------------------------------------------------------------- +namespace tf_azfs_filesystem { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fixed_prefix, true, &account, &container, &object)); +static void Init(TF_Filesystem* filesystem, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); +} - auto blob_client = CreateAzBlobClientWrapper(account); +static void Cleanup(TF_Filesystem* filesystem) {} - std::vector blobs; - TF_RETURN_IF_ERROR(ListResources(fixed_prefix, "", blob_client, &blobs)); - - std::string container_path; - if (pattern.find(kAzBlobEndpoint) != std::string::npos) { - container_path = - io::JoinPath("az://", account + kAzBlobEndpoint, container); - } else { - container_path = io::JoinPath("az://", account, container); +static void NewRandomAccessFile(const TF_Filesystem* filesystem, + const char* path, TF_RandomAccessFile* file, + TF_Status* status) { + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; } + file->plugin_file = new AzBlobRandomAccessFile(account, container, object); - std::transform(std::begin(blobs), std::end(blobs), std::begin(blobs), - [&container_path](const std::string &path) { - return io::JoinPath(container_path, path); - }); + TF_SetStatus(status, TF_OK, ""); +} - std::copy_if(std::begin(blobs), std::end(blobs), std::back_inserter(*results), - [&pattern](const std::string &full_path) { - return Env::Default()->MatchPath(full_path, pattern); - }); +static void NewWritableFile(const TF_Filesystem* filesystem, const char* path, + TF_WritableFile* file, TF_Status* status) { + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + file->plugin_file = new AzBlobWritableFile(account, container, object); - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); } -Status AzBlobFileSystem::DeleteFile(const std::string &fname) { +static void NewAppendableFile(const TF_Filesystem* filesystem, const char* path, + TF_WritableFile* file, TF_Status* status) { std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - auto blob_client = CreateAzBlobClientWrapper(account); - - blob_client.delete_blob(container, object); - if (errno != 0) { - return errors::Internal("Failed to delete ", fname, " (", errno_to_string(), - ")"); + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; } + file->plugin_file = new AzBlobWritableFile(account, container, object); - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); +} + +static void NewReadOnlyMemoryRegionFromFile(const TF_Filesystem* filesystem, + const char* path, + TF_ReadOnlyMemoryRegion* region, + TF_Status* status) { + TF_SetStatus(status, TF_UNIMPLEMENTED, + "NewReadOnlyMemoryRegionFromFile not implemented"); } -Status AzBlobFileSystem::CreateDir(const std::string &dirname) { +static void CreateDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(dirname, true, &account, &container, &object)); + ParseAzBlobPath(path, true, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } if (container.empty()) { - return errors::FailedPrecondition("Cannot create storage accounts"); + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "Cannot create storage accounts"); + return; } // Blob storage has virtual folders. We can make sure the container exists auto blob_client_wrapper = CreateAzBlobClientWrapper(account); if (blob_client_wrapper.container_exists(container)) { - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return; } blob_client_wrapper.create_container(container); if (errno != 0) { - return errors::Internal("Failed to create directory ", dirname, " (", - errno_to_string(), ")"); + std::string error_message = absl::StrCat( + "Failed to create directory ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); } -Status AzBlobFileSystem::DeleteDir(const std::string &dirname) { +static void RecursivelyCreateDir(const TF_Filesystem* filesystem, + const char* path, TF_Status* status) { + CreateDir(filesystem, path, status); +} + +static void DeleteFile(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + auto blob_client = CreateAzBlobClientWrapper(account); + + blob_client.delete_blob(container, object); + if (errno != 0) { + std::string error_message = absl::StrCat( + "Failed to delete ", path, ": ", errno, "(", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void DeleteDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { // Doesn't support file delete - call GetChildren (without delimiter) and then // loop and delete std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(dirname, false, &account, &container, &object)); + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } if (container.empty()) { // Don't allow deleting entire storage accout as we can't create them from // this file system - return errors::FailedPrecondition( + TF_SetStatus( + status, TF_FAILED_PRECONDITION, "Cannot delete storage account, limited to blobs or containers"); + return; } auto blob_client = CreateAzBlobClientWrapper(account); @@ -608,109 +703,196 @@ Status AzBlobFileSystem::DeleteDir(const std::string &dirname) { if (object.empty()) { blob_client.delete_container(container); if (errno != 0) { - return errors::Internal("Error deleting ", dirname, " (", - errno_to_string(), ")"); + std::string error_message = + absl::StrCat("Error deleting ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } } else { // Delete all blobs under dirname prefix std::vector children; - TF_RETURN_IF_ERROR(ListResources(dirname, "", blob_client, &children)); + ListResources(path, "", blob_client, &children, status); + if (TF_GetCode(status) != TF_OK) { + return; + } - for (const auto &child : children) { + for (const auto& child : children) { blob_client.delete_blob(container, child); if (errno != 0) { - return errors::Internal("Failed to delete ", child, " (", - errno_to_string(), ")"); + std::string error_message = absl::StrCat("Failed to delete ", child, + " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } } } - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); } -Status AzBlobFileSystem::GetFileSize(const std::string &fname, - uint64 *file_size) { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - - auto blob_client = CreateAzBlobClientWrapper(account); - auto blob_property = blob_client.get_blob_property(container, object); - if (errno != 0) { - return errors::Internal("Failed to get properties of ", fname, " (", - errno_to_string(), ")"); - } - *file_size = blob_property.size; - - return Status::OK(); +static void DeleteRecursively(const TF_Filesystem* filesystem, const char* path, + uint64_t* undeleted_files, + uint64_t* undeleted_dirs, TF_Status* status) { + *undeleted_files = 0; + *undeleted_dirs = 0; + DeleteDir(filesystem, path, status); } -Status AzBlobFileSystem::RenameFile(const std::string &src, - const std::string &target) { +static void RenameFile(const TF_Filesystem* filesystem, const char* src, + const char* dst, TF_Status* status) { std::string src_account, src_container, src_object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(src, false, &src_account, &src_container, &src_object)); - std::string target_account, target_container, target_object; - TF_RETURN_IF_ERROR(ParseAzBlobPath(target, false, &target_account, - &target_container, &target_object)); + ParseAzBlobPath(src, false, &src_account, &src_container, &src_object, + status); + if (TF_GetCode(status) != TF_OK) { + return; + } - if (src_account != target_account) { - return errors::Unimplemented( - "Couldn't rename ", src, " to ", target, - ": moving files between accounts is not supported."); + std::string dst_account, dst_container, dst_object; + ParseAzBlobPath(dst, false, &dst_account, &dst_container, &dst_object, + status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + if (src_account != dst_account) { + std::string error_message = + absl::StrCat("Couldn't rename ", src, " to ", dst, + ": moving files between accounts is not supported"); + TF_SetStatus(status, TF_UNIMPLEMENTED, error_message.c_str()); + return; } auto blob_client = CreateAzBlobClientWrapper(src_account); - blob_client.start_copy(src_container, src_object, target_container, - target_object); + blob_client.start_copy(src_container, src_object, dst_container, dst_object); if (errno != 0) { - return errors::Internal("Failed to start rename from ", src, " to ", target, - " (", errno_to_string(), ")"); + std::string error_message = + absl::StrCat("Failed to start rename from ", src, " to ", dst, " (", + errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } // Wait until copy completes // Status can be success, pending, aborted or failed std::string copy_status; do { - const auto target_blob_property = - blob_client.get_blob_property(target_container, target_object); - copy_status = target_blob_property.copy_status; + if (!copy_status.empty()) { +#if defined(_MSC_VER) + Sleep(1000); +#else + sleep(1); +#endif + } + const auto dst_blob_property = + blob_client.get_blob_property(dst_container, dst_object); + copy_status = dst_blob_property.copy_status; } while (copy_status.find("pending") == 0 && !copy_status.empty()); if (copy_status.find("success") == std::string::npos) { - return errors::Internal("Process of renaming resulted in status of ", - copy_status, " when renaming ", src, " to ", - target); + std::string error_message = + absl::StrCat("Process of renaming from ", src, " to ", dst, + " resulted in status of ", copy_status); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } blob_client.delete_blob(src_container, src_object); if (errno != 0) { - return errors::Internal("Failed to get delete after copy of ", src, " (", - errno_to_string(), ")"); + std::string error_message = + absl::StrCat("Failed to get delete after copy of ", src, " (", + errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); +} + +static void CopyFile(const TF_Filesystem* filesystem, const char* src, + const char* dst, TF_Status* status) { + // 128KB copy buffer + constexpr size_t kCopyFileBufferSize = 128 * 1024; + + std::string src_account, src_container, src_object; + ParseAzBlobPath(src, false, &src_account, &src_container, &src_object, + status); + if (TF_GetCode(status) != TF_OK) { + return; + } + std::unique_ptr src_file( + new AzBlobRandomAccessFile(src_account, src_container, src_object)); + + std::string dst_account, dst_container, dst_object; + ParseAzBlobPath(dst, false, &dst_account, &dst_container, &dst_object, + status); + if (TF_GetCode(status) != TF_OK) { + return; + } + std::unique_ptr dst_file( + new AzBlobWritableFile(dst_account, dst_container, dst_object)); + + uint64_t offset = 0; + std::unique_ptr buffer(new char[kCopyFileBufferSize]); + while (TF_GetCode(status) == TF_OK) { + int64_t bytes_to_read = + src_file->Read(offset, kCopyFileBufferSize, buffer.get(), status); + if (!(TF_GetCode(status) == TF_OK || + TF_GetCode(status) == TF_OUT_OF_RANGE)) { + return; + } + dst_file->Append(buffer.get(), bytes_to_read, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + offset += bytes_to_read; + } + dst_file->Close(status); } -Status AzBlobFileSystem::RecursivelyCreateDir(const string &dirname) { - return CreateDir(dirname); +static void PathExists(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + auto blob_client = CreateAzBlobClientWrapper(account); + auto blob_exists = blob_client.blob_exists(container, object); + if (errno != 0) { + std::string error_message = absl::StrCat( + "Failed to check if ", path, " exists (", errno_to_string(), ")"); + TF_SetStatus(status, TF_NOT_FOUND, error_message.c_str()); + return; + } + if (!blob_exists) { + std::string error_message = + absl::StrCat("The specified path ", path, " was not found"); + TF_SetStatus(status, TF_NOT_FOUND, error_message.c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); } -Status AzBlobFileSystem::IsDirectory(const std::string &fname) { +static bool IsDirectory(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { // Should check that account and container exist and that fname isn't a file // Azure storage file system is virtual and is created with path compenents in // blobs name so no need to check further std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, true, &account, &container, &object)); + ParseAzBlobPath(path, true, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return false; + } auto blob_client = CreateAzBlobClientWrapper(account); if (container.empty()) { - return errors::Unimplemented( - "Currently account exists check is not implemented"); + TF_SetStatus(status, TF_UNIMPLEMENTED, + "Currently account exists check is not implemented"); + return false; // bool is_account; // TF_RETURN_IF_ERROR(AccountExists(account, &is_account, blob_client)); @@ -721,95 +903,224 @@ Status AzBlobFileSystem::IsDirectory(const std::string &fname) { auto container_exists = blob_client.container_exists(container); if (!container_exists) { - return errors::NotFound("The specified folder ", fname, " was not found"); + std::string error_message = + absl::StrCat("The specified folder ", path, " was not found"); + TF_SetStatus(status, TF_NOT_FOUND, error_message.c_str()); + return false; } if (!object.empty()) { // Lastly check fname doesn't point to a file auto blob_exists = blob_client.blob_exists(container, object); if (blob_exists) { - return errors::FailedPrecondition("The specified path ", fname, - " is not a directory."); + std::string error_message = + absl::StrCat("The specified folder ", path, " is not a directory"); + TF_SetStatus(status, TF_FAILED_PRECONDITION, error_message.c_str()); + return false; } } - - // If account & container exist & fname isn't a file, with virtual directories - // we say that fname is a directory - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return true; } -Status AzBlobFileSystem::DeleteRecursively(const std::string &dirname, - int64 *undeleted_files, - int64 *undeleted_dirs) { - TF_RETURN_IF_ERROR(DeleteDir(dirname)); - *undeleted_dirs = 0; - *undeleted_files = 0; +static void Stat(const TF_Filesystem* filesystem, const char* path, + TF_FileStatistics* stats, TF_Status* status) { + using namespace std::chrono; - return Status::OK(); -} + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } -void AzBlobFileSystem::FlushCaches() {} + auto blob_client = CreateAzBlobClientWrapper(account); -Status AzBlobFileSystem::ListResources( - const std::string &dir, const std::string &delimiter, - azure::storage_lite::blob_client_wrapper &blob_client, - std::vector *results) const { - if (!results) { - return errors::Internal("results cannot be null"); + if (IsDirectory(filesystem, path, status)) { + stats->length = 0; + stats->mtime_nsec = 0; + stats->is_directory = true; + return; + } + + PathExists(filesystem, path, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + auto blob_property = blob_client.get_blob_property(container, object); + if (errno != 0) { + std::string error_message = absl::StrCat( + "Failed to get file stats for ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_NOT_FOUND, error_message.c_str()); + return; } + stats->length = blob_property.size; + stats->mtime_nsec = + duration_cast(seconds(blob_property.last_modified)).count(); + stats->is_directory = false; + TF_SetStatus(status, TF_OK, ""); +} + +static int GetChildren(const TF_Filesystem* filesystem, const char* path, + char*** entries, TF_Status* status) { std::string account, container, object; - TF_RETURN_IF_ERROR(ParseAzBlobPath(dir, true, &account, &container, &object)); + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return 0; + } - std::string continuation_token; + auto blob_client = CreateAzBlobClientWrapper(account); + std::string continuation_token; if (container.empty()) { - std::vector containers; - do { - auto list_containers_response = - blob_client.list_containers_segmented("", continuation_token); - if (errno != 0) { - return errors::Internal("Failed to get containers of account ", dir, - " (", errno_to_string(), ")"); - } + std::vector result; + // TODO: iterate while continuation_token isn't empty + auto list_containers = + blob_client.list_containers_segmented("", continuation_token, INT_MAX); + std::transform( + begin(list_containers), end(list_containers), + std::back_inserter(result), + [](azure::storage_lite::list_containers_item item) -> std::string { + return item.name; + }); - containers.insert(containers.end(), list_containers_response.begin(), - list_containers_response.end()); - } while (!continuation_token.empty()); + int num_entries = result.size(); + *entries = static_cast( + plugin_memory_allocate(num_entries * sizeof((*entries)[0]))); + for (int i = 0; i < num_entries; i++) { + (*entries)[i] = static_cast( + plugin_memory_allocate(strlen(result[i].c_str()) + 1)); + memcpy((*entries)[i], result[i].c_str(), strlen(result[i].c_str()) + 1); + } + TF_SetStatus(status, TF_OK, ""); + return num_entries; + } - std::transform( - std::begin(containers), std::end(containers), - std::back_inserter(*results), - [](azure::storage_lite::list_containers_item list_container_item) - -> std::string { return list_container_item.name; }); + if (!object.empty() && object.back() != '/') { + object += "/"; + } - } else { - std::vector blobs; - do { - auto list_blobs_response = blob_client.list_blobs_segmented( - container, delimiter, continuation_token, object); - if (errno != 0) { - return errors::Internal("Failed to get blobs of ", dir, " (", - errno_to_string(), ")"); - } + auto list_blobs = blob_client.list_blobs_segmented( + container, "/", continuation_token, object); + if (errno != 0) { + std::string error_message = absl::StrCat("Failed to get child of ", path, + " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return 0; + } - blobs.insert(blobs.end(), list_blobs_response.blobs.begin(), - list_blobs_response.blobs.end()); + std::vector result; + auto blobs = list_blobs.blobs; + result.reserve(blobs.size()); + std::transform( + std::begin(blobs), std::end(blobs), std::back_inserter(result), + [&object](azure::storage_lite::list_blobs_segmented_item list_blob_item) + -> std::string { + // Remove the prefix from the name + auto blob_name = list_blob_item.name; + blob_name.erase(0, object.size()); + // Remove the trailing slash from folders + if (blob_name.back() == '/') { + blob_name.pop_back(); + } + return blob_name; + }); - continuation_token = list_blobs_response.next_marker; - } while (!continuation_token.empty()); + int num_entries = result.size(); + *entries = static_cast( + plugin_memory_allocate(num_entries * sizeof((*entries)[0]))); + for (int i = 0; i < num_entries; i++) { + (*entries)[i] = static_cast( + plugin_memory_allocate(strlen(result[i].c_str()) + 1)); + memcpy((*entries)[i], result[i].c_str(), strlen(result[i].c_str()) + 1); + } + TF_SetStatus(status, TF_OK, ""); + return num_entries; +} - results->reserve(blobs.size()); - std::transform( - blobs.begin(), blobs.end(), std::back_inserter(*results), - [](azure::storage_lite::list_blobs_segmented_item list_blob_item) - -> std::string { return list_blob_item.name; }); +static int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return 0; } - return Status::OK(); + auto blob_client = CreateAzBlobClientWrapper(account); + auto blob_property = blob_client.get_blob_property(container, object); + if (errno != 0) { + std::string error_message = absl::StrCat( + "Failed to get properties of ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return 0; + } + + TF_SetStatus(status, TF_OK, ""); + return blob_property.size; } -namespace { -REGISTER_FILE_SYSTEM("az", io::AzBlobFileSystem); + +static char* TranslateName(const TF_Filesystem* filesystem, const char* uri) { + return strdup(uri); +} + +} // namespace tf_azfs_filesystem + } // namespace + +void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri) { + TF_SetFilesystemVersionMetadata(ops); + ops->scheme = strdup(uri); + + ops->random_access_file_ops = static_cast( + plugin_memory_allocate(TF_RANDOM_ACCESS_FILE_OPS_SIZE)); + ops->random_access_file_ops->cleanup = tf_random_access_file::Cleanup; + ops->random_access_file_ops->read = tf_random_access_file::Read; + + ops->writable_file_ops = static_cast( + plugin_memory_allocate(TF_WRITABLE_FILE_OPS_SIZE)); + ops->writable_file_ops->cleanup = tf_writable_file::Cleanup; + ops->writable_file_ops->append = tf_writable_file::Append; + ops->writable_file_ops->tell = tf_writable_file::Tell; + ops->writable_file_ops->flush = tf_writable_file::Flush; + ops->writable_file_ops->sync = tf_writable_file::Sync; + ops->writable_file_ops->close = tf_writable_file::Close; + + ops->read_only_memory_region_ops = static_cast( + plugin_memory_allocate(TF_READ_ONLY_MEMORY_REGION_OPS_SIZE)); + ops->read_only_memory_region_ops->cleanup = + tf_read_only_memory_region::Cleanup; + ops->read_only_memory_region_ops->data = tf_read_only_memory_region::Data; + ops->read_only_memory_region_ops->length = tf_read_only_memory_region::Length; + + ops->filesystem_ops = static_cast( + plugin_memory_allocate(TF_FILESYSTEM_OPS_SIZE)); + ops->filesystem_ops->init = tf_azfs_filesystem::Init; + ops->filesystem_ops->cleanup = tf_azfs_filesystem::Cleanup; + ops->filesystem_ops->new_random_access_file = + tf_azfs_filesystem::NewRandomAccessFile; + ops->filesystem_ops->new_writable_file = tf_azfs_filesystem::NewWritableFile; + ops->filesystem_ops->new_appendable_file = + tf_azfs_filesystem::NewAppendableFile; + ops->filesystem_ops->new_read_only_memory_region_from_file = + tf_azfs_filesystem::NewReadOnlyMemoryRegionFromFile; + ops->filesystem_ops->create_dir = tf_azfs_filesystem::CreateDir; + ops->filesystem_ops->recursively_create_dir = + tf_azfs_filesystem::RecursivelyCreateDir; + ops->filesystem_ops->delete_file = tf_azfs_filesystem::DeleteFile; + ops->filesystem_ops->delete_recursively = + tf_azfs_filesystem::DeleteRecursively; + ops->filesystem_ops->delete_dir = tf_azfs_filesystem::DeleteDir; + ops->filesystem_ops->copy_file = tf_azfs_filesystem::CopyFile; + ops->filesystem_ops->rename_file = tf_azfs_filesystem::RenameFile; + ops->filesystem_ops->path_exists = tf_azfs_filesystem::PathExists; + ops->filesystem_ops->stat = tf_azfs_filesystem::Stat; + ops->filesystem_ops->is_directory = tf_azfs_filesystem::IsDirectory; + ops->filesystem_ops->get_file_size = tf_azfs_filesystem::GetFileSize; + ops->filesystem_ops->get_children = tf_azfs_filesystem::GetChildren; + ops->filesystem_ops->translate_name = tf_azfs_filesystem::TranslateName; +} + +} // namespace az } // namespace io } // namespace tensorflow diff --git a/tensorflow_io/core/kernels/azfs_kernels.h b/tensorflow_io/core/kernels/azfs_kernels.h deleted file mode 100644 index 611e75a19..000000000 --- a/tensorflow_io/core/kernels/azfs_kernels.h +++ /dev/null @@ -1,82 +0,0 @@ -/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_IO_AZURE_AZFS_OPS_H_ -#define TENSORFLOW_IO_AZURE_AZFS_OPS_H_ - -#include "blob/blob_client.h" -#include "tensorflow/core/platform/file_system.h" - -namespace tensorflow { -namespace io { - -// class AzBlobFileSystem; - -/// Azure Blob Storage implementation of a file system. -class AzBlobFileSystem : public FileSystem { - public: - Status NewRandomAccessFile( - const std::string& filename, - std::unique_ptr* result) override; - - Status NewWritableFile(const std::string& fname, - std::unique_ptr* result) override; - - Status NewAppendableFile(const std::string& fname, - std::unique_ptr* result) override; - - Status NewReadOnlyMemoryRegionFromFile( - const std::string& filename, - std::unique_ptr* result) override; - - Status FileExists(const std::string& fname) override; - - Status Stat(const std::string& fname, FileStatistics* stat) override; - - Status GetChildren(const std::string& dir, - std::vector* result) override; - - Status GetMatchingPaths(const std::string& pattern, - std::vector* results) override; - - Status DeleteFile(const std::string& fname) override; - - Status CreateDir(const std::string& dirname) override; - - Status DeleteDir(const std::string& dirname) override; - - Status GetFileSize(const std::string& fname, uint64* file_size) override; - - Status RenameFile(const std::string& src, const std::string& target) override; - - Status IsDirectory(const std::string& fname) override; - - Status RecursivelyCreateDir(const string& dirname) override; - - Status DeleteRecursively(const std::string& dirname, int64* undeleted_files, - int64* undeleted_dirs) override; - - void FlushCaches() override; - - private: - Status ListResources(const std::string& dir, const std::string& delimiter, - azure::storage_lite::blob_client_wrapper& blob_client, - std::vector* results) const; -}; - -} // namespace io -} // namespace tensorflow - -#endif // TENSORFLOW_IO_AZURE_AZFS_OPS_H_ diff --git a/tensorflow_io/core/kernels/azfs_kernels_test.cc b/tensorflow_io/core/kernels/azfs_kernels_test.cc deleted file mode 100644 index 97bb3cc02..000000000 --- a/tensorflow_io/core/kernels/azfs_kernels_test.cc +++ /dev/null @@ -1,281 +0,0 @@ -/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow_io/core/kernels/azfs_kernels.h" - -#include "gtest/gtest.h" - -#define EXPECT_OK(val) EXPECT_EQ(val, Status::OK()) - -namespace tensorflow { -namespace io { -namespace { - -using namespace tensorflow::io; - -class AzBlobFileSystemTest : public ::testing::Test { - protected: - AzBlobFileSystemTest() {} - - std::string PathTo(const std::string& path) { - return "az://devstoreaccount1/aztest" + path; - } - - Status WriteString(const std::string& fname, const std::string& content) { - std::unique_ptr writer; - TF_RETURN_IF_ERROR(fs.NewWritableFile(fname, &writer)); - TF_RETURN_IF_ERROR(writer->Append(content)); - TF_RETURN_IF_ERROR(writer->Close()); - return Status::OK(); - } - - Status ReadAll(const std::string& fname, std::string* content) { - std::unique_ptr reader; - TF_RETURN_IF_ERROR(fs.NewRandomAccessFile(fname, &reader)); - - uint64 file_size = 0; - TF_RETURN_IF_ERROR(fs.GetFileSize(fname, &file_size)); - - StringPiece result; - char* strdata = &(*content)[0]; - TF_RETURN_IF_ERROR(reader->Read(0, file_size, &result, strdata)); - if (file_size != result.size()) { - return errors::DataLoss("expected ", file_size, " got ", result.size(), - " bytes"); - } - - *content = std::string(result); - - return Status::OK(); - } - - void SetUp() override { - // Create container - (void)fs.CreateDir(PathTo("")); - } - - void TearDown() override { - // Delete container - // fs.DeleteDir(PathTo("")); - } - - AzBlobFileSystem fs; -}; - -TEST_F(AzBlobFileSystemTest, ContainerShouldBeDirectory) { - auto container_path = PathTo(""); - auto is_dir = fs.IsDirectory(container_path); - EXPECT_EQ(is_dir, Status::OK()); -} - -TEST_F(AzBlobFileSystemTest, NewRandomAccessFile) { - const std::string fname = PathTo("/RandomAccessFile"); - const std::string content = "abcdefghijklmn"; - - size_t size = 4, offset = 2; - const auto content_substring = content.substr(offset, size); - - EXPECT_OK(WriteString(fname, content)); - - std::unique_ptr reader; - EXPECT_OK(fs.NewRandomAccessFile(fname, &reader)); - - StringPiece result; - EXPECT_OK(reader->Read(0, content.size(), &result, nullptr)); - EXPECT_EQ(content, result); - - EXPECT_OK(reader->Read(offset, size, &result, nullptr)); - EXPECT_EQ(content_substring, result); -} - -TEST_F(AzBlobFileSystemTest, NewWritableFile) { - std::unique_ptr writer; - const std::string fname = PathTo("/WritableFile"); - EXPECT_OK(fs.NewWritableFile(fname, &writer)); - EXPECT_OK(writer->Append("content1,")); - EXPECT_OK(writer->Append("content2")); - EXPECT_OK(writer->Flush()); - EXPECT_OK(writer->Sync()); - EXPECT_OK(writer->Close()); - - std::string content; - EXPECT_OK(ReadAll(fname, &content)); - EXPECT_EQ("content1,content2", content); -} - -TEST_F(AzBlobFileSystemTest, NewAppendableFile) { - std::unique_ptr writer; - - const std::string fname = PathTo("/AppendableFile"); - EXPECT_OK(WriteString(fname, "test")); - - EXPECT_OK(fs.NewAppendableFile(fname, &writer)); - EXPECT_OK(writer->Append("content")); - EXPECT_OK(writer->Close()); -} - -TEST_F(AzBlobFileSystemTest, NewReadOnlyMemoryRegionFromFile) { - const auto fname = PathTo("/MemoryFile"); - const auto content = "content"; - EXPECT_OK(WriteString(fname, content)); - std::unique_ptr region; - EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile(fname, ®ion)); - - const char* data = static_cast(region->data()); - auto length = region->length(); - std::string read_content(data, length); - EXPECT_EQ(content, read_content); -} - -TEST_F(AzBlobFileSystemTest, FileExists) { - const std::string fname = PathTo("/FileExists"); - EXPECT_EQ(error::Code::NOT_FOUND, fs.FileExists(fname).code()); - EXPECT_OK(WriteString(fname, "test")); - EXPECT_OK(fs.FileExists(fname)); - EXPECT_OK(fs.DeleteFile(fname)); -} - -TEST_F(AzBlobFileSystemTest, GetChildren) { - const auto base = PathTo("/GetChildren"); - EXPECT_OK(fs.CreateDir(base)); - - const auto file = base + "/TestFile.csv"; - EXPECT_OK(WriteString(file, "test")); - - const auto subdir = base + "/SubDir"; - EXPECT_OK(fs.CreateDir(subdir)); - const auto subfile = subdir + "/TestSubFile.csv"; - EXPECT_OK(WriteString(subfile, "test")); - - std::vector children; - EXPECT_OK(fs.GetChildren(base, &children)); - std::sort(children.begin(), children.end()); - EXPECT_EQ(std::vector({"SubDir", "TestFile.csv"}), children); -} - -TEST_F(AzBlobFileSystemTest, DeleteFile) { - const std::string fname = PathTo("/DeleteFile"); - EXPECT_OK(WriteString(fname, "test")); - EXPECT_OK(fs.DeleteFile(fname)); -} - -TEST_F(AzBlobFileSystemTest, DeleteRecursively) { - const std::string fname = PathTo("/recursive"); - - for (const auto& ext : {".txt", ".md"}) { - for (int i = 0; i < 3; ++i) { - const auto this_fname = fname + "/" + std::to_string(i) + ext; - (void)WriteString(this_fname, ""); - } - } - - std::vector txt_files; - (void)fs.GetMatchingPaths(fname + "/*.txt", &txt_files); - EXPECT_EQ(3, txt_files.size()); - - int64 undeleted_files, undeleted_dirs; - EXPECT_OK(fs.DeleteRecursively(fname, &undeleted_files, &undeleted_dirs)); -} - -TEST_F(AzBlobFileSystemTest, GetFileSize) { - const std::string fname = PathTo("/GetFileSize"); - EXPECT_OK(WriteString(fname, "test")); - uint64 file_size = 0; - EXPECT_OK(fs.GetFileSize(fname, &file_size)); - EXPECT_EQ(4, file_size); -} - -TEST_F(AzBlobFileSystemTest, CreateDir) { - const auto dir = PathTo("/CreateDir"); - EXPECT_OK(fs.CreateDir(dir)); - - const auto file = dir + "/CreateDirFile.csv"; - EXPECT_OK(WriteString(file, "test")); - FileStatistics stat; - EXPECT_OK(fs.Stat(dir, &stat)); - EXPECT_TRUE(stat.is_directory); -} - -TEST_F(AzBlobFileSystemTest, DeleteDir) { - const auto dir = PathTo("/DeleteDir"); - const auto file = dir + "/DeleteDirFile.csv"; - EXPECT_OK(WriteString(file, "test")); - EXPECT_OK(fs.DeleteDir(dir)); - - FileStatistics stat; - // Still OK here as virtual directories always exist - EXPECT_OK(fs.Stat(dir, &stat)); -} - -TEST_F(AzBlobFileSystemTest, RenameFile) { - const auto fname1 = PathTo("/RenameFile1"); - const auto fname2 = PathTo("/RenameFile2"); - EXPECT_OK(WriteString(fname1, "test")); - EXPECT_OK(fs.RenameFile(fname1, fname2)); - std::string content; - EXPECT_OK(ReadAll(fname2, &content)); - EXPECT_EQ("test", content); -} - -TEST_F(AzBlobFileSystemTest, RenameFile_Overwrite) { - const auto fname1 = PathTo("/RenameFile1"); - const auto fname2 = PathTo("/RenameFile2"); - - EXPECT_OK(WriteString(fname2, "test")); - EXPECT_OK(fs.FileExists(fname2)); - - EXPECT_OK(WriteString(fname1, "test")); - EXPECT_OK(fs.RenameFile(fname1, fname2)); - std::string content; - EXPECT_OK(ReadAll(fname2, &content)); - EXPECT_EQ("test", content); -} - -TEST_F(AzBlobFileSystemTest, StatFile) { - const auto fname = PathTo("/StatFile"); - EXPECT_OK(WriteString(fname, "test")); - FileStatistics stat; - EXPECT_OK(fs.Stat(fname, &stat)); - EXPECT_EQ(4, stat.length); - EXPECT_FALSE(stat.is_directory); -} - -TEST_F(AzBlobFileSystemTest, GetMatchingPaths_NoWildcard) { - const auto fname = PathTo("/path/subpath/file2.txt"); - - EXPECT_OK(WriteString(fname, "test")); - std::vector results; - EXPECT_OK(fs.GetMatchingPaths(fname, &results)); - EXPECT_EQ(std::vector({fname}), results); -} - -TEST_F(AzBlobFileSystemTest, GetMatchingPaths_FilenameWildcard) { - const auto fname1 = PathTo("/path/subpath/file1.txt"); - const auto fname2 = PathTo("/path/subpath/file2.txt"); - const auto fname3 = PathTo("/path/subpath/another.txt"); - - EXPECT_OK(WriteString(fname1, "test")); - EXPECT_OK(WriteString(fname2, "test")); - EXPECT_OK(WriteString(fname3, "test")); - - const auto pattern = PathTo("/path/subpath/file*.txt"); - std::vector results; - EXPECT_OK(fs.GetMatchingPaths(pattern, &results)); - EXPECT_EQ(std::vector({fname1, fname2}), results); -} - -} // namespace -} // namespace io -} // namespace tensorflow diff --git a/tensorflow_io/core/kernels/file_system_plugins.cc b/tensorflow_io/core/kernels/file_system_plugins.cc new file mode 100644 index 000000000..994760094 --- /dev/null +++ b/tensorflow_io/core/kernels/file_system_plugins.cc @@ -0,0 +1,26 @@ +/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow_io/core/kernels/file_system_plugins.h" + +void TF_InitPlugin(TF_FilesystemPluginInfo* info) { + info->plugin_memory_allocate = tensorflow::io::plugin_memory_allocate; + info->plugin_memory_free = tensorflow::io::plugin_memory_free; + info->num_schemes = 1; + info->ops = static_cast( + tensorflow::io::plugin_memory_allocate(info->num_schemes * + sizeof(info->ops[0]))); + tensorflow::io::az::ProvideFilesystemSupportFor(&info->ops[0], "az"); +} diff --git a/tensorflow_io/core/kernels/file_system_plugins.h b/tensorflow_io/core/kernels/file_system_plugins.h new file mode 100644 index 000000000..a9cb7aa7c --- /dev/null +++ b/tensorflow_io/core/kernels/file_system_plugins.h @@ -0,0 +1,38 @@ +/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_IO_CORE_KERNELS_FILE_SYSTEM_PLUGINS_H +#define TENSORFLOW_IO_CORE_KERNELS_FILE_SYSTEM_PLUGINS_H + +#include + +#include "tensorflow/c/experimental/filesystem/filesystem_interface.h" + +namespace tensorflow { +namespace io { + +static void* plugin_memory_allocate(size_t size) { return calloc(1, size); } +static void plugin_memory_free(void* ptr) { free(ptr); } + +namespace az { + +void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri); + +} // namespace az + +} // namespace io +} // namespace tensorflow + +#endif // TENSORFLOW_IO_CORE_KERNELS_FILE_SYSTEM_PLUGINS_H diff --git a/tensorflow_io/core/python/ops/__init__.py b/tensorflow_io/core/python/ops/__init__.py index 87f52782f..7294b696e 100644 --- a/tensorflow_io/core/python/ops/__init__.py +++ b/tensorflow_io/core/python/ops/__init__.py @@ -50,6 +50,8 @@ def _load_library(filename, lib="op"): load_fn = tf.load_op_library elif lib == "dependency": load_fn = lambda f: ctypes.CDLL(f, mode=ctypes.RTLD_GLOBAL) + elif lib == "fs": + load_fn = lambda f: tf.experimental.register_filesystem_plugin(f) is None else: load_fn = lambda f: tf.compat.v1.load_file_system_library(f) is None @@ -69,3 +71,7 @@ def _load_library(filename, lib="op"): core_ops = _load_library("libtensorflow_io.so") +try: + plugin_ops = _load_library("libtensorflow_io_plugins.so", "fs") +except NotImplementedError as e: + plugin_ops = _load_library("libtensorflow_io.so", "fs") diff --git a/tensorflow_io/core/python/ops/version_ops.py b/tensorflow_io/core/python/ops/version_ops.py index f2fc3720a..40b9fb156 100644 --- a/tensorflow_io/core/python/ops/version_ops.py +++ b/tensorflow_io/core/python/ops/version_ops.py @@ -14,5 +14,5 @@ # ============================================================================== """version_ops""" -package = "tensorflow>=2.3.0,<2.4.0" -version = "0.16.0" +package = "tensorflow>=2.4.0rc0,<2.5.0" +version = "0.17.0" diff --git a/tests/test_azure.py b/tests/test_azure.py index 9c9741322..b9b09b423 100644 --- a/tests/test_azure.py +++ b/tests/test_azure.py @@ -20,6 +20,8 @@ from tensorflow.python.platform import gfile import tensorflow_io as tfio # pylint: disable=unused-import +# Note: export TF_AZURE_USE_DEV_STORAGE=1 to enable emulation + class AZFSTest(test.TestCase): """[summary] diff --git a/third_party/hdf5.BUILD b/third_party/hdf5.BUILD index fdb35152a..cf2283e6d 100644 --- a/third_party/hdf5.BUILD +++ b/third_party/hdf5.BUILD @@ -21,7 +21,12 @@ cc_library( "config/H5Tinit.c", "config/H5lib_settings.c", ], - copts = [], + copts = select({ + "@bazel_tools//src/conditions:windows": [ + "/Zc:preprocessor-", + ], + "//conditions:default": [], + }), includes = [ "c++/src", "config", @@ -45,7 +50,12 @@ cc_library( ]) + [ "config/H5pubconf.h", ], - copts = [], + copts = select({ + "@bazel_tools//src/conditions:windows": [ + "/Zc:preprocessor-", + ], + "//conditions:default": [], + }), includes = [ "c++/src", "config", diff --git a/tools/build/configure.py b/tools/build/configure.py index 0fc5b4a17..d959a67cb 100644 --- a/tools/build/configure.py +++ b/tools/build/configure.py @@ -121,6 +121,8 @@ def write_config(): # Stay with 10.13 for macOS bazel_rc.write('build:macos --copt="-mmacosx-version-min=10.13"\n') bazel_rc.write('build:macos --linkopt="-mmacosx-version-min=10.13"\n') + # MSVC (Windows): Standards-conformant preprocessor mode + bazel_rc.write('build:windows --copt="/Zc:preprocessor"\n') bazel_rc.close() except OSError: print("ERROR: Writing .bazelrc")