From 81f853acf6ebd920e7d37c7829556441710f5da1 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Thu, 24 Sep 2020 15:04:29 -0700 Subject: [PATCH 1/9] Convert AZFS to use C modular file system API This PR converts AZFS to use modular C file system API. This PR is part of the 1111. Signed-off-by: Yong Tang bazel build -s --verbose_failures //tensorflow_io/core:python/ops/libtensorflow_io.so //tensorflow_io/core:python/ops/libtensorflow_io_azfs.so Signed-off-by: Yong Tang Update absl Signed-off-by: Yong Tang Use "/Zc:preprocessor-" on Windows Signed-off-by: Yong Tang Try Signed-off-by: Yong Tang --- .github/workflows/build.yml | 2 +- tensorflow_io/core/BUILD | 10 +- tensorflow_io/core/kernels/azfs_kernels.cc | 463 ++++++++++++++++++--- tensorflow_io/core/python/ops/__init__.py | 3 + third_party/hdf5.BUILD | 14 +- tools/build/configure.py | 2 + 6 files changed, 440 insertions(+), 54 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 86ff60fb8..287774fc1 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -330,7 +330,7 @@ jobs: python3 setup.py --package-version | xargs python3 -m pip install python3 tools/build/configure.py cat .bazelrc - bazel build -s --verbose_failures //tensorflow_io/core:python/ops/libtensorflow_io.so + bazel build -s --verbose_failures //tensorflow_io/core:python/ops/libtensorflow_io.so //tensorflow_io/core:python/ops/libtensorflow_io_azfs.so - uses: actions/upload-artifact@v1 with: name: ${{ runner.os }}-bazel-bin diff --git a/tensorflow_io/core/BUILD b/tensorflow_io/core/BUILD index 690c16962..7114975a7 100644 --- a/tensorflow_io/core/BUILD +++ b/tensorflow_io/core/BUILD @@ -562,7 +562,7 @@ cc_library( ) cc_library( - name = "azfs_ops", + name = "file_system_plugins", srcs = [ "kernels/azfs_kernels.cc", "kernels/azfs_kernels.h", @@ -571,6 +571,9 @@ cc_library( linkstatic = True, deps = [ "@com_github_azure_azure_storage_cpplite//:azure", + "@com_google_absl//absl/algorithm", + "@com_google_absl//absl/container:fixed_array", + "@com_google_absl//absl/container:flat_hash_map", "@local_config_tf//:libtensorflow_framework", "@local_config_tf//:tf_header_lib", ], @@ -674,7 +677,6 @@ cc_binary( "//tensorflow_io/bigquery:bigquery_ops", "//tensorflow_io/core:audio_video_ops", "//tensorflow_io/core:avro_ops", - "//tensorflow_io/core:azfs_ops", "//tensorflow_io/core:cpuinfo", "//tensorflow_io/core:file_ops", "//tensorflow_io/core:grpc_ops", @@ -705,7 +707,9 @@ cc_binary( "//tensorflow_io/core:oss_ops", "//tensorflow_io/core/kernels/gsmemcachedfs:gs_memcached_file_system", ], - }), + }) + [ + "//tensorflow_io/core:file_system_plugins", + ], ) cc_binary( diff --git a/tensorflow_io/core/kernels/azfs_kernels.cc b/tensorflow_io/core/kernels/azfs_kernels.cc index a2c6e0575..f3f98022c 100644 --- a/tensorflow_io/core/kernels/azfs_kernels.cc +++ b/tensorflow_io/core/kernels/azfs_kernels.cc @@ -25,6 +25,7 @@ limitations under the License. #include "storage_account.h" #include "storage_credential.h" #include "storage_errno.h" +#include "tensorflow/c/experimental/filesystem/filesystem_interface.h" #include "tensorflow/core/lib/io/path.h" #include "tensorflow/core/lib/strings/str_util.h" #include "tensorflow/core/platform/env.h" @@ -33,7 +34,7 @@ namespace tensorflow { namespace io { namespace { // TODO: DO NOT use a hardcoded path -Status GetTmpFilename(std::string *filename) { +Status GetTmpFilename(std::string* filename) { if (!filename) { return errors::Internal("'filename' cannot be nullptr."); } @@ -45,7 +46,7 @@ Status GetTmpFilename(std::string *filename) { } #else char buffer[] = "/tmp/az_blob_filesystem_XXXXXX"; - char *ret = _mktemp(buffer); + char* ret = _mktemp(buffer); if (ret == nullptr) { return errors::Internal("Failed to create a temporary file."); } @@ -62,8 +63,8 @@ constexpr char kAzBlobEndpoint[] = ".blob.core.windows.net"; /// "az://account-name.blob.core.windows.net/container/path/to/file.txt" gets /// split into "account-name", "container" and "path/to/file.txt". Status ParseAzBlobPath(StringPiece fname, bool empty_object_ok, - std::string *account, std::string *container, - std::string *object) { + std::string* account, std::string* container, + std::string* object) { if (!account || !object) { return errors::Internal("account and object cannot be null."); } @@ -144,7 +145,7 @@ std::string errno_to_string() { } std::shared_ptr get_credential( - const std::string &account) { + const std::string& account) { const auto key = std::getenv("TF_AZURE_STORAGE_KEY"); if (key != nullptr) { return std::make_shared(account, @@ -155,9 +156,9 @@ std::shared_ptr get_credential( } azure::storage_lite::blob_client_wrapper CreateAzBlobClientWrapper( - const std::string &account) { + const std::string& account) { azure::storage_lite::logger::set_logger( - [](azure::storage_lite::log_level level, const std::string &log_msg) { + [](azure::storage_lite::log_level level, const std::string& log_msg) { switch (level) { case azure::storage_lite::log_level::info: _TF_LOG_INFO << log_msg; @@ -204,13 +205,13 @@ azure::storage_lite::blob_client_wrapper CreateAzBlobClientWrapper( class AzBlobRandomAccessFile : public RandomAccessFile { public: - AzBlobRandomAccessFile(const std::string &account, - const std::string &container, - const std::string &object) + AzBlobRandomAccessFile(const std::string& account, + const std::string& container, + const std::string& object) : account_(account), container_(container), object_(object) {} ~AzBlobRandomAccessFile() {} - Status Read(uint64 offset, size_t n, StringPiece *result, - char *scratch) const override { + Status Read(uint64 offset, size_t n, StringPiece* result, + char* scratch) const override { // If n == 0, then return Status::OK() // otherwise, if bytes_read < n then return OutofRange if (n == 0) { @@ -274,8 +275,8 @@ class AzBlobRandomAccessFile : public RandomAccessFile { class AzBlobWritableFile : public WritableFile { public: - AzBlobWritableFile(const std::string &account, const std::string &container, - const std::string &object) + AzBlobWritableFile(const std::string& account, const std::string& container, + const std::string& object) : account_(account), container_(container), object_(object), @@ -360,7 +361,7 @@ class AzBlobReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { public: AzBlobReadOnlyMemoryRegion(std::unique_ptr data, uint64 length) : data_(std::move(data)), length_(length) {} - const void *data() override { return reinterpret_cast(data_.get()); } + const void* data() override { return reinterpret_cast(data_.get()); } uint64 length() override { return length_; } private: @@ -371,7 +372,7 @@ class AzBlobReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { } // namespace Status AzBlobFileSystem::NewRandomAccessFile( - const std::string &filename, std::unique_ptr *result) { + const std::string& filename, std::unique_ptr* result) { string account, container, object; TF_RETURN_IF_ERROR( ParseAzBlobPath(filename, false, &account, &container, &object)); @@ -380,7 +381,7 @@ Status AzBlobFileSystem::NewRandomAccessFile( } Status AzBlobFileSystem::NewWritableFile( - const std::string &fname, std::unique_ptr *result) { + const std::string& fname, std::unique_ptr* result) { string account, container, object; TF_RETURN_IF_ERROR( ParseAzBlobPath(fname, false, &account, &container, &object)); @@ -389,7 +390,7 @@ Status AzBlobFileSystem::NewWritableFile( } Status AzBlobFileSystem::NewAppendableFile( - const std::string &fname, std::unique_ptr *result) { + const std::string& fname, std::unique_ptr* result) { string account, container, object; TF_RETURN_IF_ERROR( ParseAzBlobPath(fname, false, &account, &container, &object)); @@ -398,8 +399,8 @@ Status AzBlobFileSystem::NewAppendableFile( } Status AzBlobFileSystem::NewReadOnlyMemoryRegionFromFile( - const std::string &filename, - std::unique_ptr *result) { + const std::string& filename, + std::unique_ptr* result) { uint64 size; TF_RETURN_IF_ERROR(GetFileSize(filename, &size)); std::unique_ptr data(new char[size]); @@ -414,7 +415,7 @@ Status AzBlobFileSystem::NewReadOnlyMemoryRegionFromFile( return Status::OK(); } -Status AzBlobFileSystem::FileExists(const std::string &fname) { +Status AzBlobFileSystem::FileExists(const std::string& fname) { std::string account, container, object; TF_RETURN_IF_ERROR( ParseAzBlobPath(fname, false, &account, &container, &object)); @@ -430,7 +431,7 @@ Status AzBlobFileSystem::FileExists(const std::string &fname) { return Status::OK(); } -Status AzBlobFileSystem::Stat(const std::string &fname, FileStatistics *stat) { +Status AzBlobFileSystem::Stat(const std::string& fname, FileStatistics* stat) { using namespace std::chrono; std::string account, container, object; @@ -463,8 +464,8 @@ Status AzBlobFileSystem::Stat(const std::string &fname, FileStatistics *stat) { return Status::OK(); } -Status AzBlobFileSystem::GetChildren(const std::string &dir, - std::vector *result) { +Status AzBlobFileSystem::GetChildren(const std::string& dir, + std::vector* result) { std::string account, container, object; TF_RETURN_IF_ERROR( ParseAzBlobPath(dir, false, &account, &container, &object)); @@ -514,9 +515,9 @@ Status AzBlobFileSystem::GetChildren(const std::string &dir, return Status::OK(); } -Status AzBlobFileSystem::GetMatchingPaths(const std::string &pattern, - std::vector *results) { - const std::string &fixed_prefix = +Status AzBlobFileSystem::GetMatchingPaths(const std::string& pattern, + std::vector* results) { + const std::string& fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\")); std::string account, container, object; @@ -537,19 +538,19 @@ Status AzBlobFileSystem::GetMatchingPaths(const std::string &pattern, } std::transform(std::begin(blobs), std::end(blobs), std::begin(blobs), - [&container_path](const std::string &path) { + [&container_path](const std::string& path) { return io::JoinPath(container_path, path); }); std::copy_if(std::begin(blobs), std::end(blobs), std::back_inserter(*results), - [&pattern](const std::string &full_path) { + [&pattern](const std::string& full_path) { return Env::Default()->MatchPath(full_path, pattern); }); return Status::OK(); } -Status AzBlobFileSystem::DeleteFile(const std::string &fname) { +Status AzBlobFileSystem::DeleteFile(const std::string& fname) { std::string account, container, object; TF_RETURN_IF_ERROR( ParseAzBlobPath(fname, false, &account, &container, &object)); @@ -564,7 +565,7 @@ Status AzBlobFileSystem::DeleteFile(const std::string &fname) { return Status::OK(); } -Status AzBlobFileSystem::CreateDir(const std::string &dirname) { +Status AzBlobFileSystem::CreateDir(const std::string& dirname) { std::string account, container, object; TF_RETURN_IF_ERROR( ParseAzBlobPath(dirname, true, &account, &container, &object)); @@ -587,7 +588,7 @@ Status AzBlobFileSystem::CreateDir(const std::string &dirname) { return Status::OK(); } -Status AzBlobFileSystem::DeleteDir(const std::string &dirname) { +Status AzBlobFileSystem::DeleteDir(const std::string& dirname) { // Doesn't support file delete - call GetChildren (without delimiter) and then // loop and delete @@ -616,7 +617,7 @@ Status AzBlobFileSystem::DeleteDir(const std::string &dirname) { std::vector children; TF_RETURN_IF_ERROR(ListResources(dirname, "", blob_client, &children)); - for (const auto &child : children) { + for (const auto& child : children) { blob_client.delete_blob(container, child); if (errno != 0) { return errors::Internal("Failed to delete ", child, " (", @@ -628,8 +629,8 @@ Status AzBlobFileSystem::DeleteDir(const std::string &dirname) { return Status::OK(); } -Status AzBlobFileSystem::GetFileSize(const std::string &fname, - uint64 *file_size) { +Status AzBlobFileSystem::GetFileSize(const std::string& fname, + uint64* file_size) { std::string account, container, object; TF_RETURN_IF_ERROR( ParseAzBlobPath(fname, false, &account, &container, &object)); @@ -645,8 +646,8 @@ Status AzBlobFileSystem::GetFileSize(const std::string &fname, return Status::OK(); } -Status AzBlobFileSystem::RenameFile(const std::string &src, - const std::string &target) { +Status AzBlobFileSystem::RenameFile(const std::string& src, + const std::string& target) { std::string src_account, src_container, src_object; TF_RETURN_IF_ERROR( ParseAzBlobPath(src, false, &src_account, &src_container, &src_object)); @@ -693,11 +694,11 @@ Status AzBlobFileSystem::RenameFile(const std::string &src, return Status::OK(); } -Status AzBlobFileSystem::RecursivelyCreateDir(const string &dirname) { +Status AzBlobFileSystem::RecursivelyCreateDir(const string& dirname) { return CreateDir(dirname); } -Status AzBlobFileSystem::IsDirectory(const std::string &fname) { +Status AzBlobFileSystem::IsDirectory(const std::string& fname) { // Should check that account and container exist and that fname isn't a file // Azure storage file system is virtual and is created with path compenents in // blobs name so no need to check further @@ -738,9 +739,9 @@ Status AzBlobFileSystem::IsDirectory(const std::string &fname) { return Status::OK(); } -Status AzBlobFileSystem::DeleteRecursively(const std::string &dirname, - int64 *undeleted_files, - int64 *undeleted_dirs) { +Status AzBlobFileSystem::DeleteRecursively(const std::string& dirname, + int64* undeleted_files, + int64* undeleted_dirs) { TF_RETURN_IF_ERROR(DeleteDir(dirname)); *undeleted_dirs = 0; *undeleted_files = 0; @@ -751,9 +752,9 @@ Status AzBlobFileSystem::DeleteRecursively(const std::string &dirname, void AzBlobFileSystem::FlushCaches() {} Status AzBlobFileSystem::ListResources( - const std::string &dir, const std::string &delimiter, - azure::storage_lite::blob_client_wrapper &blob_client, - std::vector *results) const { + const std::string& dir, const std::string& delimiter, + azure::storage_lite::blob_client_wrapper& blob_client, + std::vector* results) const { if (!results) { return errors::Internal("results cannot be null"); } @@ -808,8 +809,374 @@ Status AzBlobFileSystem::ListResources( return Status::OK(); } -namespace { -REGISTER_FILE_SYSTEM("az", io::AzBlobFileSystem); -} // namespace } // namespace io } // namespace tensorflow + +namespace tensorflow { +namespace io { +namespace azfs { + +static void* plugin_memory_allocate(size_t size) { return calloc(1, size); } +static void plugin_memory_free(void* ptr) { free(ptr); } + +// SECTION 1. Implementation for `TF_RandomAccessFile` +// ---------------------------------------------------------------------------- +namespace tf_random_access_file { + +static void Cleanup(TF_RandomAccessFile* file) { + auto az_file = static_cast(file->plugin_file); + delete az_file; +} + +static int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n, + char* buffer, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + StringPiece result; + Status s = az_file->Read(offset, n, &result, buffer); + if (!(s.ok() || errors::IsOutOfRange(s))) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + } else if (errors::IsOutOfRange(s)) { + TF_SetStatus(status, TF_OUT_OF_RANGE, "Read fewer bytes than requested"); + } else { + TF_SetStatus(status, TF_OK, ""); + } + return result.size(); +} + +} // namespace tf_random_access_file + +// SECTION 2. Implementation for `TF_WritableFile` +// ---------------------------------------------------------------------------- +namespace tf_writable_file { + +static void Cleanup(TF_WritableFile* file) { + auto az_file = static_cast(file->plugin_file); + delete az_file; +} + +static void Append(const TF_WritableFile* file, const char* buffer, size_t n, + TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + StringPiece data(buffer, n); + Status s = az_file->Append(data); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static int64_t Tell(const TF_WritableFile* file, TF_Status* status) { + TF_SetStatus(status, TF_UNIMPLEMENTED, "Stat not implemented"); + return -1; +} + +static void Flush(const TF_WritableFile* file, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + Status s = az_file->Flush(); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void Sync(const TF_WritableFile* file, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + Status s = az_file->Flush(); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void Close(const TF_WritableFile* file, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + Status s = az_file->Flush(); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +} // namespace tf_writable_file + +// SECTION 3. Implementation for `TF_ReadOnlyMemoryRegion` +// ---------------------------------------------------------------------------- +namespace tf_read_only_memory_region { +void Cleanup(TF_ReadOnlyMemoryRegion* region) {} + +const void* Data(const TF_ReadOnlyMemoryRegion* region) { return nullptr; } + +uint64_t Length(const TF_ReadOnlyMemoryRegion* region) { return 0; } + +} // namespace tf_read_only_memory_region + +// SECTION 4. Implementation for `TF_Filesystem`, the actual filesystem +// ---------------------------------------------------------------------------- +namespace tf_azfs_filesystem { + +static AzBlobFileSystem azfs; + +static void Init(TF_Filesystem* filesystem, TF_Status* status) { + TF_SetStatus(status, TF_OK, ""); +} + +static void Cleanup(TF_Filesystem* filesystem) {} + +static void NewRandomAccessFile(const TF_Filesystem* filesystem, + const char* path, TF_RandomAccessFile* file, + TF_Status* status) { + std::unique_ptr result; + Status s = azfs.NewRandomAccessFile(path, &result); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + file->plugin_file = result.release(); + TF_SetStatus(status, TF_OK, ""); +} + +static void NewWritableFile(const TF_Filesystem* filesystem, const char* path, + TF_WritableFile* file, TF_Status* status) { + std::unique_ptr result; + Status s = azfs.NewWritableFile(path, &result); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + file->plugin_file = result.release(); + TF_SetStatus(status, TF_OK, ""); +} + +static void NewAppendableFile(const TF_Filesystem* filesystem, const char* path, + TF_WritableFile* file, TF_Status* status) { + std::unique_ptr result; + Status s = azfs.NewAppendableFile(path, &result); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + file->plugin_file = result.release(); + TF_SetStatus(status, TF_OK, ""); +} + +static void NewReadOnlyMemoryRegionFromFile(const TF_Filesystem* filesystem, + const char* path, + TF_ReadOnlyMemoryRegion* region, + TF_Status* status) { + TF_SetStatus(status, TF_UNIMPLEMENTED, + "NewReadOnlyMemoryRegionFromFile not implemented"); +} + +static void CreateDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + Status s = azfs.CreateDir(path); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void RecursivelyCreateDir(const TF_Filesystem* filesystem, + const char* path, TF_Status* status) { + Status s = azfs.RecursivelyCreateDir(path); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void DeleteFile(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + Status s = azfs.DeleteFile(path); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void DeleteRecursively(const TF_Filesystem* filesystem, const char* path, + uint64_t* undeleted_files, + uint64_t* undeleted_dirs, TF_Status* status) { + int64 undeleted_files_value, undeleted_dirs_value; + Status s = azfs.DeleteRecursively(path, &undeleted_files_value, + &undeleted_dirs_value); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + *undeleted_files = undeleted_files_value; + *undeleted_dirs = undeleted_dirs_value; + TF_SetStatus(status, TF_OK, ""); +} + +static void DeleteDir(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + Status s = azfs.DeleteDir(path); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void RenameFile(const TF_Filesystem* filesystem, const char* src, + const char* dst, TF_Status* status) { + Status s = azfs.RenameFile(src, dst); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void CopyFile(const TF_Filesystem* filesystem, const char* src, + const char* dst, TF_Status* status) { + Status s = azfs.CopyFile(src, dst); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void PathExists(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + Status s = azfs.FileExists(path); + if (!s.ok()) { + TF_SetStatus(status, TF_NOT_FOUND, s.error_message().c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); +} + +static void Stat(const TF_Filesystem* filesystem, const char* path, + TF_FileStatistics* stats, TF_Status* status) { + FileStatistics stats_value; + Status s = azfs.Stat(path, &stats_value); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return; + } + stats->length = stats_value.length; + stats->mtime_nsec = stats_value.mtime_nsec; + stats->is_directory = stats_value.is_directory; + TF_SetStatus(status, TF_OK, ""); +} + +static int GetChildren(const TF_Filesystem* filesystem, const char* path, + char*** entries, TF_Status* status) { + std::vector result; + Status s = azfs.GetChildren(path, &result); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return false; + } + int num_entries = result.size(); + *entries = static_cast( + plugin_memory_allocate(num_entries * sizeof((*entries)[0]))); + for (int i = 0; i < num_entries; i++) { + (*entries)[i] = static_cast( + plugin_memory_allocate(strlen(result[i].c_str()) + 1)); + memcpy((*entries)[i], result[i].c_str(), strlen(result[i].c_str()) + 1); + } + TF_SetStatus(status, TF_OK, ""); + return num_entries; +} + +static int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + TF_SetStatus(status, TF_UNIMPLEMENTED, "GetFileSize not implemented"); + return -1; +} + +static bool IsDirectory(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + Status s = azfs.IsDirectory(path); + if (!s.ok()) { + TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + return false; + } + TF_SetStatus(status, TF_OK, ""); + return true; +} + +static char* TranslateName(const TF_Filesystem* filesystem, const char* uri) { + return strdup(uri); +} + +} // namespace tf_azfs_filesystem + +static void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, + const char* uri) { + TF_SetFilesystemVersionMetadata(ops); + ops->scheme = strdup(uri); + + ops->random_access_file_ops = static_cast( + plugin_memory_allocate(TF_RANDOM_ACCESS_FILE_OPS_SIZE)); + ops->random_access_file_ops->cleanup = tf_random_access_file::Cleanup; + ops->random_access_file_ops->read = tf_random_access_file::Read; + + ops->writable_file_ops = static_cast( + plugin_memory_allocate(TF_WRITABLE_FILE_OPS_SIZE)); + ops->writable_file_ops->cleanup = tf_writable_file::Cleanup; + ops->writable_file_ops->append = tf_writable_file::Append; + ops->writable_file_ops->tell = tf_writable_file::Tell; + ops->writable_file_ops->flush = tf_writable_file::Flush; + ops->writable_file_ops->sync = tf_writable_file::Sync; + ops->writable_file_ops->close = tf_writable_file::Close; + + ops->read_only_memory_region_ops = static_cast( + plugin_memory_allocate(TF_READ_ONLY_MEMORY_REGION_OPS_SIZE)); + ops->read_only_memory_region_ops->cleanup = + tf_read_only_memory_region::Cleanup; + ops->read_only_memory_region_ops->data = tf_read_only_memory_region::Data; + ops->read_only_memory_region_ops->length = tf_read_only_memory_region::Length; + + ops->filesystem_ops = static_cast( + plugin_memory_allocate(TF_FILESYSTEM_OPS_SIZE)); + ops->filesystem_ops->init = tf_azfs_filesystem::Init; + ops->filesystem_ops->cleanup = tf_azfs_filesystem::Cleanup; + ops->filesystem_ops->new_random_access_file = + tf_azfs_filesystem::NewRandomAccessFile; + ops->filesystem_ops->new_writable_file = tf_azfs_filesystem::NewWritableFile; + ops->filesystem_ops->new_appendable_file = + tf_azfs_filesystem::NewAppendableFile; + ops->filesystem_ops->new_read_only_memory_region_from_file = + tf_azfs_filesystem::NewReadOnlyMemoryRegionFromFile; + ops->filesystem_ops->create_dir = tf_azfs_filesystem::CreateDir; + ops->filesystem_ops->recursively_create_dir = + tf_azfs_filesystem::RecursivelyCreateDir; + ops->filesystem_ops->delete_file = tf_azfs_filesystem::DeleteFile; + ops->filesystem_ops->delete_recursively = + tf_azfs_filesystem::DeleteRecursively; + ops->filesystem_ops->delete_dir = tf_azfs_filesystem::DeleteDir; + ops->filesystem_ops->copy_file = tf_azfs_filesystem::CopyFile; + ops->filesystem_ops->rename_file = tf_azfs_filesystem::RenameFile; + ops->filesystem_ops->path_exists = tf_azfs_filesystem::PathExists; + ops->filesystem_ops->stat = tf_azfs_filesystem::Stat; + ops->filesystem_ops->is_directory = tf_azfs_filesystem::IsDirectory; + ops->filesystem_ops->get_file_size = tf_azfs_filesystem::GetFileSize; + ops->filesystem_ops->get_children = tf_azfs_filesystem::GetChildren; + ops->filesystem_ops->translate_name = tf_azfs_filesystem::TranslateName; +} + +} // namespace azfs +} // namespace io +} // namespace tensorflow + +void TF_InitPlugin(TF_FilesystemPluginInfo* info) { + info->plugin_memory_allocate = tensorflow::io::azfs::plugin_memory_allocate; + info->plugin_memory_free = tensorflow::io::azfs::plugin_memory_free; + info->num_schemes = 1; + info->ops = static_cast( + tensorflow::io::azfs::plugin_memory_allocate(info->num_schemes * + sizeof(info->ops[0]))); + tensorflow::io::azfs::ProvideFilesystemSupportFor(&info->ops[0], "az"); +} diff --git a/tensorflow_io/core/python/ops/__init__.py b/tensorflow_io/core/python/ops/__init__.py index 87f52782f..8ee607e9a 100644 --- a/tensorflow_io/core/python/ops/__init__.py +++ b/tensorflow_io/core/python/ops/__init__.py @@ -50,6 +50,8 @@ def _load_library(filename, lib="op"): load_fn = tf.load_op_library elif lib == "dependency": load_fn = lambda f: ctypes.CDLL(f, mode=ctypes.RTLD_GLOBAL) + elif lib == "fs": + load_fn = lambda f: tf.experimental.register_filesystem_plugin(f) is None else: load_fn = lambda f: tf.compat.v1.load_file_system_library(f) is None @@ -69,3 +71,4 @@ def _load_library(filename, lib="op"): core_ops = _load_library("libtensorflow_io.so") +azfs_ops = _load_library("libtensorflow_io.so", "fs") diff --git a/third_party/hdf5.BUILD b/third_party/hdf5.BUILD index fdb35152a..cf2283e6d 100644 --- a/third_party/hdf5.BUILD +++ b/third_party/hdf5.BUILD @@ -21,7 +21,12 @@ cc_library( "config/H5Tinit.c", "config/H5lib_settings.c", ], - copts = [], + copts = select({ + "@bazel_tools//src/conditions:windows": [ + "/Zc:preprocessor-", + ], + "//conditions:default": [], + }), includes = [ "c++/src", "config", @@ -45,7 +50,12 @@ cc_library( ]) + [ "config/H5pubconf.h", ], - copts = [], + copts = select({ + "@bazel_tools//src/conditions:windows": [ + "/Zc:preprocessor-", + ], + "//conditions:default": [], + }), includes = [ "c++/src", "config", diff --git a/tools/build/configure.py b/tools/build/configure.py index 0fc5b4a17..d959a67cb 100644 --- a/tools/build/configure.py +++ b/tools/build/configure.py @@ -121,6 +121,8 @@ def write_config(): # Stay with 10.13 for macOS bazel_rc.write('build:macos --copt="-mmacosx-version-min=10.13"\n') bazel_rc.write('build:macos --linkopt="-mmacosx-version-min=10.13"\n') + # MSVC (Windows): Standards-conformant preprocessor mode + bazel_rc.write('build:windows --copt="/Zc:preprocessor"\n') bazel_rc.close() except OSError: print("ERROR: Writing .bazelrc") From c7463173d26655bc89ca8e11b6f9e0b0eb9d6758 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Sun, 25 Oct 2020 13:23:23 -0700 Subject: [PATCH 2/9] Avoid C++ dependency to libtensorflow_framework.so Signed-off-by: Yong Tang --- tensorflow_io/core/BUILD | 20 +- tensorflow_io/core/kernels/azfs_kernels.cc | 1243 ++++++++--------- tensorflow_io/core/kernels/azfs_kernels.h | 82 -- .../core/kernels/azfs_kernels_test.cc | 281 ---- tests/test_azure.py | 2 + 5 files changed, 594 insertions(+), 1034 deletions(-) delete mode 100644 tensorflow_io/core/kernels/azfs_kernels.h delete mode 100644 tensorflow_io/core/kernels/azfs_kernels_test.cc diff --git a/tensorflow_io/core/BUILD b/tensorflow_io/core/BUILD index 7114975a7..c96c7483f 100644 --- a/tensorflow_io/core/BUILD +++ b/tensorflow_io/core/BUILD @@ -565,36 +565,18 @@ cc_library( name = "file_system_plugins", srcs = [ "kernels/azfs_kernels.cc", - "kernels/azfs_kernels.h", ], copts = tf_io_copts(), linkstatic = True, deps = [ "@com_github_azure_azure_storage_cpplite//:azure", - "@com_google_absl//absl/algorithm", - "@com_google_absl//absl/container:fixed_array", - "@com_google_absl//absl/container:flat_hash_map", + "@com_google_absl//absl/strings", "@local_config_tf//:libtensorflow_framework", "@local_config_tf//:tf_header_lib", ], alwayslink = 1, ) -# bazel test \ -# --action_env=TF_AZURE_USE_DEV_STORAGE=1 \ -# //tensorflow_io/azure:azfs_ops_test -cc_test( - name = "azfs_ops_test", - srcs = [ - "kernels/azfs_kernels_test.cc", - ], - deps = [ - ":azfs_ops", - "@com_google_googletest//:gtest", - "@com_google_googletest//:gtest_main", - ], -) - cc_library( name = "sql_ops", srcs = [ diff --git a/tensorflow_io/core/kernels/azfs_kernels.cc b/tensorflow_io/core/kernels/azfs_kernels.cc index f3f98022c..be9e14dde 100644 --- a/tensorflow_io/core/kernels/azfs_kernels.cc +++ b/tensorflow_io/core/kernels/azfs_kernels.cc @@ -12,7 +12,6 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. ==============================================================================*/ -#include "tensorflow_io/core/kernels/azfs_kernels.h" #include #include @@ -20,39 +19,59 @@ limitations under the License. #include #include +#include "absl/strings/str_cat.h" +#include "absl/strings/string_view.h" +#include "absl/strings/strip.h" #include "blob/blob_client.h" #include "logging.h" #include "storage_account.h" #include "storage_credential.h" #include "storage_errno.h" #include "tensorflow/c/experimental/filesystem/filesystem_interface.h" -#include "tensorflow/core/lib/io/path.h" -#include "tensorflow/core/lib/strings/str_util.h" -#include "tensorflow/core/platform/env.h" namespace tensorflow { namespace io { +namespace azfs { namespace { // TODO: DO NOT use a hardcoded path -Status GetTmpFilename(std::string* filename) { +bool GetTmpFilename(std::string* filename) { if (!filename) { - return errors::Internal("'filename' cannot be nullptr."); + // return errors::Internal("'filename' cannot be nullptr."); + return false; } #ifndef _WIN32 char buffer[] = "/tmp/az_blob_filesystem_XXXXXX"; int fd = mkstemp(buffer); if (fd < 0) { - return errors::Internal("Failed to create a temporary file."); + // return errors::Internal("Failed to create a temporary file."); + return false; } #else char buffer[] = "/tmp/az_blob_filesystem_XXXXXX"; char* ret = _mktemp(buffer); if (ret == nullptr) { - return errors::Internal("Failed to create a temporary file."); + // return errors::Internal("Failed to create a temporary file."); + return false; } #endif *filename = buffer; - return Status::OK(); + // return Status::OK(); + return true; +} + +void ParseURI(const absl::string_view& fname, absl::string_view* scheme, + absl::string_view* host, absl::string_view* path) { + size_t scheme_chunk = fname.find("://"); + if (scheme_chunk == absl::string_view::npos) { + return; + } + size_t host_chunk = fname.find("/", scheme_chunk + 3); + if (host_chunk == absl::string_view::npos) { + return; + } + *scheme = absl::string_view(fname).substr(0, scheme_chunk); + *host = fname.substr(scheme_chunk + 3, host_chunk); + *path = fname.substr(host_chunk, -1); } constexpr char kAzBlobEndpoint[] = ".blob.core.windows.net"; @@ -62,25 +81,30 @@ constexpr char kAzBlobEndpoint[] = ".blob.core.windows.net"; /// For example, /// "az://account-name.blob.core.windows.net/container/path/to/file.txt" gets /// split into "account-name", "container" and "path/to/file.txt". -Status ParseAzBlobPath(StringPiece fname, bool empty_object_ok, - std::string* account, std::string* container, - std::string* object) { +void ParseAzBlobPath(const std::string& fname, bool empty_object_ok, + std::string* account, std::string* container, + std::string* object, TF_Status* status) { if (!account || !object) { - return errors::Internal("account and object cannot be null."); + TF_SetStatus(status, TF_INTERNAL, "account and object cannot be null"); + return; } - StringPiece scheme, accountp, objectp; - io::ParseURI(fname, &scheme, &accountp, &objectp); + absl::string_view scheme, accountp, objectp; + ParseURI(fname, &scheme, &accountp, &objectp); if (scheme != "az") { - return errors::InvalidArgument( + std::string error_message = absl::StrCat( "Azure Blob Storage path doesn't start with 'az://': ", fname); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } // Consume blob.core.windows.net if it exists absl::ConsumeSuffix(&accountp, kAzBlobEndpoint); if (accountp.empty() || accountp.compare(".") == 0) { - return errors::InvalidArgument( + std::string error_message = absl::StrCat( "Azure Blob Storage path doesn't contain a account name: ", fname); + TF_SetStatus(status, TF_INVALID_ARGUMENT, error_message.c_str()); + return; } *account = std::string(accountp); @@ -96,7 +120,8 @@ Status ParseAzBlobPath(StringPiece fname, bool empty_object_ok, *object = std::string(objectp.substr(pos + 1)); } - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return; } std::string errno_to_string() { @@ -155,20 +180,21 @@ std::shared_ptr get_credential( } } +// TODO: Enable logging azure::storage_lite::blob_client_wrapper CreateAzBlobClientWrapper( const std::string& account) { azure::storage_lite::logger::set_logger( [](azure::storage_lite::log_level level, const std::string& log_msg) { switch (level) { case azure::storage_lite::log_level::info: - _TF_LOG_INFO << log_msg; + // _TF_LOG_INFO << log_msg; break; case azure::storage_lite::log_level::error: case azure::storage_lite::log_level::critical: - _TF_LOG_ERROR << log_msg; + // _TF_LOG_ERROR << log_msg; break; case azure::storage_lite::log_level::warn: - _TF_LOG_WARNING << log_msg; + // _TF_LOG_WARNING << log_msg; break; case azure::storage_lite::log_level::trace: case azure::storage_lite::log_level::debug: @@ -203,27 +229,95 @@ azure::storage_lite::blob_client_wrapper CreateAzBlobClientWrapper( return blob_client_wrapper; } -class AzBlobRandomAccessFile : public RandomAccessFile { +void ListResources(const std::string& dir, const std::string& delimiter, + azure::storage_lite::blob_client_wrapper& blob_client, + std::vector* results, TF_Status* status) { + if (!results) { + TF_SetStatus(status, TF_INTERNAL, "results cannot be null"); + return; + } + + std::string account, container, object; + ParseAzBlobPath(dir, true, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + std::string continuation_token; + + if (container.empty()) { + std::vector containers; + do { + auto list_containers_response = + blob_client.list_containers_segmented("", continuation_token); + if (errno != 0) { + std::string error_message = + absl::StrCat("Failed to get containers of account ", dir, " (", + errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + + containers.insert(containers.end(), list_containers_response.begin(), + list_containers_response.end()); + } while (!continuation_token.empty()); + + std::transform( + std::begin(containers), std::end(containers), + std::back_inserter(*results), + [](azure::storage_lite::list_containers_item list_container_item) + -> std::string { return list_container_item.name; }); + + } else { + std::vector blobs; + do { + auto list_blobs_response = blob_client.list_blobs_segmented( + container, delimiter, continuation_token, object); + if (errno != 0) { + std::string error_message = absl::StrCat("Failed to get blobs of ", dir, + " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + + blobs.insert(blobs.end(), list_blobs_response.blobs.begin(), + list_blobs_response.blobs.end()); + + continuation_token = list_blobs_response.next_marker; + } while (!continuation_token.empty()); + + results->reserve(blobs.size()); + std::transform( + blobs.begin(), blobs.end(), std::back_inserter(*results), + [](azure::storage_lite::list_blobs_segmented_item list_blob_item) + -> std::string { return list_blob_item.name; }); + } + + TF_SetStatus(status, TF_OK, ""); +} + +class AzBlobRandomAccessFile { public: AzBlobRandomAccessFile(const std::string& account, const std::string& container, const std::string& object) : account_(account), container_(container), object_(object) {} ~AzBlobRandomAccessFile() {} - Status Read(uint64 offset, size_t n, StringPiece* result, - char* scratch) const override { + int64_t Read(uint64_t offset, size_t n, char* buffer, + TF_Status* status) const { // If n == 0, then return Status::OK() // otherwise, if bytes_read < n then return OutofRange if (n == 0) { - *result = StringPiece("", 0); - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return 0; } auto blob_client = CreateAzBlobClientWrapper(account_); auto blob_property = blob_client.get_blob_property(container_, object_); if (errno != 0) { - return errors::Internal("Failed to get properties"); + TF_SetStatus(status, TF_INTERNAL, "Failed to get properties"); + return 0; } - int64 file_size = blob_property.size; + int64_t file_size = blob_property.size; size_t bytes_to_read = n; if (offset >= file_size) { @@ -232,39 +326,37 @@ class AzBlobRandomAccessFile : public RandomAccessFile { bytes_to_read = file_size - offset; } - if (bytes_to_read == 0) { - *result = StringPiece("", 0); - } else { + if (bytes_to_read > 0) { std::ostringstream oss; // https://stackoverflow.com/a/12481580 #ifndef __APPLE__ - oss.rdbuf()->pubsetbuf(scratch, bytes_to_read); + oss.rdbuf()->pubsetbuf(buffer, bytes_to_read); #endif blob_client.download_blob_to_stream(container_, object_, offset, bytes_to_read, oss); if (errno != 0) { - *result = StringPiece("", 0); - return errors::Internal("Failed to get contents of az://", account_, - kAzBlobEndpoint, "/", container_, "/", object_, - " (", errno_to_string(), ")"); + std::string error_message = absl::StrCat( + "Failed to get contents of az://", account_, kAzBlobEndpoint, "/", + container_, "/", object_, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return 0; } -#ifndef __APPLE__ - *result = StringPiece(scratch, bytes_to_read); -#else +#ifdef __APPLE__ auto blob_string = oss.str(); - if (scratch != nullptr) { - std::copy(blob_string.begin(), blob_string.end(), scratch); + if (buffer != nullptr) { + std::copy(blob_string.begin(), blob_string.end(), buffer); } - *result = StringPiece(blob_string); + bytes_to_read = blob_string.size(); #endif } + if (bytes_to_read < n) { - return errors::OutOfRange("EOF reached"); + TF_SetStatus(status, TF_OUT_OF_RANGE, "EOF reached"); } - - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return bytes_to_read; } private: @@ -273,7 +365,7 @@ class AzBlobRandomAccessFile : public RandomAccessFile { std::string object_; }; -class AzBlobWritableFile : public WritableFile { +class AzBlobWritableFile { public: AzBlobWritableFile(const std::string& account, const std::string& container, const std::string& object) @@ -281,74 +373,78 @@ class AzBlobWritableFile : public WritableFile { container_(container), object_(object), sync_needed_(true) { - if (GetTmpFilename(&tmp_content_filename_).ok()) { + if (GetTmpFilename(&tmp_content_filename_)) { outfile_.open(tmp_content_filename_, std::ofstream::binary | std::ofstream::app); } } - ~AzBlobWritableFile() { Close().IgnoreError(); } + ~AzBlobWritableFile() { + TF_Status* status = TF_NewStatus(); + Close(status); + TF_DeleteStatus(status); + } + + void Append(const char* buffer, size_t n, TF_Status* status) { + if (!outfile_.is_open()) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "The internal temporary file is not writable"); + return; + } - Status Append(StringPiece data) override { - TF_RETURN_IF_ERROR(CheckWritable()); + std::string data(buffer, n); sync_needed_ = true; outfile_ << data; if (!outfile_.good()) { - return errors::Internal( - "Could not append to the internal temporary file."); + TF_SetStatus(status, TF_INTERNAL, + "Could not append to the internal temporary file"); + return; } - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); } - - Status Close() override { - if (outfile_.is_open()) { - TF_RETURN_IF_ERROR(Sync()); - outfile_.close(); - std::remove(tmp_content_filename_.c_str()); + void Sync(TF_Status* status) { + if (!outfile_.is_open()) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "The internal temporary file is not writable"); + return; } - return Status::OK(); - } - - Status Flush() override { return Sync(); } - Status Sync() override { - TF_RETURN_IF_ERROR(CheckWritable()); if (!sync_needed_) { - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); + return; } - const auto status = SyncImpl(); - if (status.ok()) { - sync_needed_ = false; - } - return status; - } - - private: - Status SyncImpl() { outfile_.flush(); if (!outfile_.good()) { - return errors::Internal( - "Could not write to the internal temporary file."); + TF_SetStatus(status, TF_INTERNAL, + "Could not write to the internal temporary file"); + return; } auto blob_client = CreateAzBlobClientWrapper(account_); blob_client.upload_file_to_blob(tmp_content_filename_, container_, object_); if (errno != 0) { - return errors::Internal("Failed to upload to az://", account_, "/", - container_, "/", object_, " (", errno_to_string(), - ")"); + std::string error_message = + absl::StrCat("Failed to upload to az://", account_, "/", container_, + "/", object_, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; } - - return Status::OK(); + sync_needed_ = false; + TF_SetStatus(status, TF_OK, ""); } - Status CheckWritable() const { - if (!outfile_.is_open()) { - return errors::FailedPrecondition( - "The internal temporary file is not writable."); + void Close(TF_Status* status) { + if (outfile_.is_open()) { + Sync(status); + if (TF_GetCode(status) != TF_OK) { + return; + } + outfile_.close(); + std::remove(tmp_content_filename_.c_str()); } - return Status::OK(); + TF_SetStatus(status, TF_OK, ""); } + private: std::string account_; std::string container_; std::string object_; @@ -357,172 +453,14 @@ class AzBlobWritableFile : public WritableFile { bool sync_needed_; // whether there is buffered data that needs to be synced }; -class AzBlobReadOnlyMemoryRegion : public ReadOnlyMemoryRegion { - public: - AzBlobReadOnlyMemoryRegion(std::unique_ptr data, uint64 length) - : data_(std::move(data)), length_(length) {} - const void* data() override { return reinterpret_cast(data_.get()); } - uint64 length() override { return length_; } - - private: - std::unique_ptr data_; - uint64 length_; -}; - -} // namespace - -Status AzBlobFileSystem::NewRandomAccessFile( - const std::string& filename, std::unique_ptr* result) { - string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(filename, false, &account, &container, &object)); - result->reset(new AzBlobRandomAccessFile(account, container, object)); - return Status::OK(); -} - -Status AzBlobFileSystem::NewWritableFile( - const std::string& fname, std::unique_ptr* result) { - string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - result->reset(new AzBlobWritableFile(account, container, object)); - return Status::OK(); -} - -Status AzBlobFileSystem::NewAppendableFile( - const std::string& fname, std::unique_ptr* result) { - string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - result->reset(new AzBlobWritableFile(account, container, object)); - return Status::OK(); -} - -Status AzBlobFileSystem::NewReadOnlyMemoryRegionFromFile( - const std::string& filename, - std::unique_ptr* result) { - uint64 size; - TF_RETURN_IF_ERROR(GetFileSize(filename, &size)); - std::unique_ptr data(new char[size]); - - std::unique_ptr file; - TF_RETURN_IF_ERROR(NewRandomAccessFile(filename, &file)); - - StringPiece piece; - TF_RETURN_IF_ERROR(file->Read(0, size, &piece, data.get())); - - result->reset(new AzBlobReadOnlyMemoryRegion(std::move(data), size)); - return Status::OK(); -} - -Status AzBlobFileSystem::FileExists(const std::string& fname) { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - auto blob_client = CreateAzBlobClientWrapper(account); - auto blob_exists = blob_client.blob_exists(container, object); - if (errno != 0) { - return errors::NotFound("Failed to check if ", fname, " exists (", - errno_to_string(), ")"); - } - if (!blob_exists) { - return errors::NotFound("The specified path ", fname, " was not found."); - } - return Status::OK(); -} - -Status AzBlobFileSystem::Stat(const std::string& fname, FileStatistics* stat) { - using namespace std::chrono; - - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - auto blob_client = CreateAzBlobClientWrapper(account); - - if (IsDirectory(fname).ok()) { - *stat = FileStatistics(0, 0, true); - return Status::OK(); - } - - if (!FileExists(fname).ok()) { - return errors::NotFound("The specified object ", fname, " was not found"); - } - - auto blob_property = blob_client.get_blob_property(container, object); - if (errno != 0) { - return errors::NotFound("Failed to get file stats for ", fname, " (", - errno_to_string(), ")"); - } - - FileStatistics fs; - fs.length = blob_property.size; - fs.mtime_nsec = - duration_cast(seconds(blob_property.last_modified)).count(); - - *stat = std::move(fs); - - return Status::OK(); -} - -Status AzBlobFileSystem::GetChildren(const std::string& dir, - std::vector* result) { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(dir, false, &account, &container, &object)); - auto blob_client = CreateAzBlobClientWrapper(account); - - std::string continuation_token; - if (container.empty()) { - // TODO: iterate while continuation_token isn't empty - auto list_containers = - blob_client.list_containers_segmented("", continuation_token, INT_MAX); - std::transform( - begin(list_containers), end(list_containers), - std::back_inserter(*result), - [](azure::storage_lite::list_containers_item item) -> std::string { - return item.name; - }); - return Status::OK(); - } - - if (!object.empty() && object.back() != '/') { - object += "/"; - } - - auto list_blobs = blob_client.list_blobs_segmented( - container, "/", continuation_token, object); - if (errno != 0) { - return errors::Internal("Failed to get child of ", dir, " (", - errno_to_string(), ")"); - } - - auto blobs = list_blobs.blobs; - result->reserve(blobs.size()); - std::transform( - std::begin(blobs), std::end(blobs), std::back_inserter(*result), - [&object](azure::storage_lite::list_blobs_segmented_item list_blob_item) - -> std::string { - // Remove the prefix from the name - auto blob_name = list_blob_item.name; - blob_name.erase(0, object.size()); - // Remove the trailing slash from folders - if (blob_name.back() == '/') { - blob_name.pop_back(); - } - return blob_name; - }); - - return Status::OK(); -} - -Status AzBlobFileSystem::GetMatchingPaths(const std::string& pattern, - std::vector* results) { +#if 0 +Status GetMatchingPaths(const std::string& pattern, std::vector* results) { const std::string& fixed_prefix = pattern.substr(0, pattern.find_first_of("*?[\\")); std::string account, container, object; TF_RETURN_IF_ERROR( - ParseAzBlobPath(fixed_prefix, true, &account, &container, &object)); + ParseAzBlobPathClass(fixed_prefix, true, &account, &container, &object)); auto blob_client = CreateAzBlobClientWrapper(account); @@ -549,356 +487,61 @@ Status AzBlobFileSystem::GetMatchingPaths(const std::string& pattern, return Status::OK(); } +#endif -Status AzBlobFileSystem::DeleteFile(const std::string& fname) { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - auto blob_client = CreateAzBlobClientWrapper(account); +static void* plugin_memory_allocate(size_t size) { return calloc(1, size); } +static void plugin_memory_free(void* ptr) { free(ptr); } - blob_client.delete_blob(container, object); - if (errno != 0) { - return errors::Internal("Failed to delete ", fname, " (", errno_to_string(), - ")"); - } +// SECTION 1. Implementation for `TF_RandomAccessFile` +// ---------------------------------------------------------------------------- +namespace tf_random_access_file { - return Status::OK(); +static void Cleanup(TF_RandomAccessFile* file) { + auto az_file = static_cast(file->plugin_file); + delete az_file; } -Status AzBlobFileSystem::CreateDir(const std::string& dirname) { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(dirname, true, &account, &container, &object)); - if (container.empty()) { - return errors::FailedPrecondition("Cannot create storage accounts"); - } +static int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n, + char* buffer, TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + return az_file->Read(offset, n, buffer, status); +} - // Blob storage has virtual folders. We can make sure the container exists - auto blob_client_wrapper = CreateAzBlobClientWrapper(account); +} // namespace tf_random_access_file - if (blob_client_wrapper.container_exists(container)) { - return Status::OK(); - } +// SECTION 2. Implementation for `TF_WritableFile` +// ---------------------------------------------------------------------------- +namespace tf_writable_file { - blob_client_wrapper.create_container(container); - if (errno != 0) { - return errors::Internal("Failed to create directory ", dirname, " (", - errno_to_string(), ")"); - } - return Status::OK(); +static void Cleanup(TF_WritableFile* file) { + auto az_file = static_cast(file->plugin_file); + delete az_file; } -Status AzBlobFileSystem::DeleteDir(const std::string& dirname) { - // Doesn't support file delete - call GetChildren (without delimiter) and then - // loop and delete - - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(dirname, false, &account, &container, &object)); - if (container.empty()) { - // Don't allow deleting entire storage accout as we can't create them from - // this file system - return errors::FailedPrecondition( - "Cannot delete storage account, limited to blobs or containers"); - } +static void Append(const TF_WritableFile* file, const char* buffer, size_t n, + TF_Status* status) { + auto az_file = static_cast(file->plugin_file); + az_file->Append(buffer, n, status); +} - auto blob_client = CreateAzBlobClientWrapper(account); - - // Check container exists - // Just pull out the first path component representing the container - if (object.empty()) { - blob_client.delete_container(container); - if (errno != 0) { - return errors::Internal("Error deleting ", dirname, " (", - errno_to_string(), ")"); - } - } else { - // Delete all blobs under dirname prefix - std::vector children; - TF_RETURN_IF_ERROR(ListResources(dirname, "", blob_client, &children)); - - for (const auto& child : children) { - blob_client.delete_blob(container, child); - if (errno != 0) { - return errors::Internal("Failed to delete ", child, " (", - errno_to_string(), ")"); - } - } - } - - return Status::OK(); -} - -Status AzBlobFileSystem::GetFileSize(const std::string& fname, - uint64* file_size) { - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, false, &account, &container, &object)); - - auto blob_client = CreateAzBlobClientWrapper(account); - auto blob_property = blob_client.get_blob_property(container, object); - if (errno != 0) { - return errors::Internal("Failed to get properties of ", fname, " (", - errno_to_string(), ")"); - } - *file_size = blob_property.size; - - return Status::OK(); -} - -Status AzBlobFileSystem::RenameFile(const std::string& src, - const std::string& target) { - std::string src_account, src_container, src_object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(src, false, &src_account, &src_container, &src_object)); - std::string target_account, target_container, target_object; - TF_RETURN_IF_ERROR(ParseAzBlobPath(target, false, &target_account, - &target_container, &target_object)); - - if (src_account != target_account) { - return errors::Unimplemented( - "Couldn't rename ", src, " to ", target, - ": moving files between accounts is not supported."); - } - - auto blob_client = CreateAzBlobClientWrapper(src_account); - - blob_client.start_copy(src_container, src_object, target_container, - target_object); - if (errno != 0) { - return errors::Internal("Failed to start rename from ", src, " to ", target, - " (", errno_to_string(), ")"); - } - - // Wait until copy completes - // Status can be success, pending, aborted or failed - std::string copy_status; - do { - const auto target_blob_property = - blob_client.get_blob_property(target_container, target_object); - copy_status = target_blob_property.copy_status; - } while (copy_status.find("pending") == 0 && !copy_status.empty()); - - if (copy_status.find("success") == std::string::npos) { - return errors::Internal("Process of renaming resulted in status of ", - copy_status, " when renaming ", src, " to ", - target); - } - - blob_client.delete_blob(src_container, src_object); - if (errno != 0) { - return errors::Internal("Failed to get delete after copy of ", src, " (", - errno_to_string(), ")"); - } - - return Status::OK(); -} - -Status AzBlobFileSystem::RecursivelyCreateDir(const string& dirname) { - return CreateDir(dirname); -} - -Status AzBlobFileSystem::IsDirectory(const std::string& fname) { - // Should check that account and container exist and that fname isn't a file - // Azure storage file system is virtual and is created with path compenents in - // blobs name so no need to check further - - std::string account, container, object; - TF_RETURN_IF_ERROR( - ParseAzBlobPath(fname, true, &account, &container, &object)); - - auto blob_client = CreateAzBlobClientWrapper(account); - - if (container.empty()) { - return errors::Unimplemented( - "Currently account exists check is not implemented"); - // bool is_account; - - // TF_RETURN_IF_ERROR(AccountExists(account, &is_account, blob_client)); - // return is_account ? Status::OK() - // : errors::NotFound("The specified account az://", - // account, " was not found."); - } - - auto container_exists = blob_client.container_exists(container); - if (!container_exists) { - return errors::NotFound("The specified folder ", fname, " was not found"); - } - - if (!object.empty()) { - // Lastly check fname doesn't point to a file - auto blob_exists = blob_client.blob_exists(container, object); - if (blob_exists) { - return errors::FailedPrecondition("The specified path ", fname, - " is not a directory."); - } - } - - // If account & container exist & fname isn't a file, with virtual directories - // we say that fname is a directory - return Status::OK(); -} - -Status AzBlobFileSystem::DeleteRecursively(const std::string& dirname, - int64* undeleted_files, - int64* undeleted_dirs) { - TF_RETURN_IF_ERROR(DeleteDir(dirname)); - *undeleted_dirs = 0; - *undeleted_files = 0; - - return Status::OK(); -} - -void AzBlobFileSystem::FlushCaches() {} - -Status AzBlobFileSystem::ListResources( - const std::string& dir, const std::string& delimiter, - azure::storage_lite::blob_client_wrapper& blob_client, - std::vector* results) const { - if (!results) { - return errors::Internal("results cannot be null"); - } - - std::string account, container, object; - TF_RETURN_IF_ERROR(ParseAzBlobPath(dir, true, &account, &container, &object)); - - std::string continuation_token; - - if (container.empty()) { - std::vector containers; - do { - auto list_containers_response = - blob_client.list_containers_segmented("", continuation_token); - if (errno != 0) { - return errors::Internal("Failed to get containers of account ", dir, - " (", errno_to_string(), ")"); - } - - containers.insert(containers.end(), list_containers_response.begin(), - list_containers_response.end()); - } while (!continuation_token.empty()); - - std::transform( - std::begin(containers), std::end(containers), - std::back_inserter(*results), - [](azure::storage_lite::list_containers_item list_container_item) - -> std::string { return list_container_item.name; }); - - } else { - std::vector blobs; - do { - auto list_blobs_response = blob_client.list_blobs_segmented( - container, delimiter, continuation_token, object); - if (errno != 0) { - return errors::Internal("Failed to get blobs of ", dir, " (", - errno_to_string(), ")"); - } - - blobs.insert(blobs.end(), list_blobs_response.blobs.begin(), - list_blobs_response.blobs.end()); - - continuation_token = list_blobs_response.next_marker; - } while (!continuation_token.empty()); - - results->reserve(blobs.size()); - std::transform( - blobs.begin(), blobs.end(), std::back_inserter(*results), - [](azure::storage_lite::list_blobs_segmented_item list_blob_item) - -> std::string { return list_blob_item.name; }); - } - - return Status::OK(); -} -} // namespace io -} // namespace tensorflow - -namespace tensorflow { -namespace io { -namespace azfs { - -static void* plugin_memory_allocate(size_t size) { return calloc(1, size); } -static void plugin_memory_free(void* ptr) { free(ptr); } - -// SECTION 1. Implementation for `TF_RandomAccessFile` -// ---------------------------------------------------------------------------- -namespace tf_random_access_file { - -static void Cleanup(TF_RandomAccessFile* file) { - auto az_file = static_cast(file->plugin_file); - delete az_file; -} - -static int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n, - char* buffer, TF_Status* status) { - auto az_file = static_cast(file->plugin_file); - StringPiece result; - Status s = az_file->Read(offset, n, &result, buffer); - if (!(s.ok() || errors::IsOutOfRange(s))) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); - } else if (errors::IsOutOfRange(s)) { - TF_SetStatus(status, TF_OUT_OF_RANGE, "Read fewer bytes than requested"); - } else { - TF_SetStatus(status, TF_OK, ""); - } - return result.size(); -} - -} // namespace tf_random_access_file - -// SECTION 2. Implementation for `TF_WritableFile` -// ---------------------------------------------------------------------------- -namespace tf_writable_file { - -static void Cleanup(TF_WritableFile* file) { - auto az_file = static_cast(file->plugin_file); - delete az_file; -} - -static void Append(const TF_WritableFile* file, const char* buffer, size_t n, - TF_Status* status) { - auto az_file = static_cast(file->plugin_file); - StringPiece data(buffer, n); - Status s = az_file->Append(data); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); - return; - } - TF_SetStatus(status, TF_OK, ""); -} - -static int64_t Tell(const TF_WritableFile* file, TF_Status* status) { - TF_SetStatus(status, TF_UNIMPLEMENTED, "Stat not implemented"); - return -1; -} +static int64_t Tell(const TF_WritableFile* file, TF_Status* status) { + TF_SetStatus(status, TF_UNIMPLEMENTED, "Stat not implemented"); + return -1; +} static void Flush(const TF_WritableFile* file, TF_Status* status) { auto az_file = static_cast(file->plugin_file); - Status s = az_file->Flush(); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); - return; - } - TF_SetStatus(status, TF_OK, ""); + az_file->Sync(status); } static void Sync(const TF_WritableFile* file, TF_Status* status) { auto az_file = static_cast(file->plugin_file); - Status s = az_file->Flush(); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); - return; - } - TF_SetStatus(status, TF_OK, ""); + az_file->Sync(status); } static void Close(const TF_WritableFile* file, TF_Status* status) { auto az_file = static_cast(file->plugin_file); - Status s = az_file->Flush(); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); - return; - } - TF_SetStatus(status, TF_OK, ""); + az_file->Close(status); } } // namespace tf_writable_file @@ -918,8 +561,6 @@ uint64_t Length(const TF_ReadOnlyMemoryRegion* region) { return 0; } // ---------------------------------------------------------------------------- namespace tf_azfs_filesystem { -static AzBlobFileSystem azfs; - static void Init(TF_Filesystem* filesystem, TF_Status* status) { TF_SetStatus(status, TF_OK, ""); } @@ -929,37 +570,37 @@ static void Cleanup(TF_Filesystem* filesystem) {} static void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path, TF_RandomAccessFile* file, TF_Status* status) { - std::unique_ptr result; - Status s = azfs.NewRandomAccessFile(path, &result); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { return; } - file->plugin_file = result.release(); + file->plugin_file = new AzBlobRandomAccessFile(account, container, object); + TF_SetStatus(status, TF_OK, ""); } static void NewWritableFile(const TF_Filesystem* filesystem, const char* path, TF_WritableFile* file, TF_Status* status) { - std::unique_ptr result; - Status s = azfs.NewWritableFile(path, &result); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { return; } - file->plugin_file = result.release(); + file->plugin_file = new AzBlobWritableFile(account, container, object); + TF_SetStatus(status, TF_OK, ""); } static void NewAppendableFile(const TF_Filesystem* filesystem, const char* path, TF_WritableFile* file, TF_Status* status) { - std::unique_ptr result; - Status s = azfs.NewAppendableFile(path, &result); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { return; } - file->plugin_file = result.release(); + file->plugin_file = new AzBlobWritableFile(account, container, object); + TF_SetStatus(status, TF_OK, ""); } @@ -973,9 +614,30 @@ static void NewReadOnlyMemoryRegionFromFile(const TF_Filesystem* filesystem, static void CreateDir(const TF_Filesystem* filesystem, const char* path, TF_Status* status) { - Status s = azfs.CreateDir(path); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + std::string account, container, object; + ParseAzBlobPath(path, true, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + if (container.empty()) { + TF_SetStatus(status, TF_FAILED_PRECONDITION, + "Cannot create storage accounts"); + return; + } + + // Blob storage has virtual folders. We can make sure the container exists + auto blob_client_wrapper = CreateAzBlobClientWrapper(account); + + if (blob_client_wrapper.container_exists(container)) { + TF_SetStatus(status, TF_OK, ""); + return; + } + + blob_client_wrapper.create_container(container); + if (errno != 0) { + std::string error_message = absl::StrCat( + "Failed to create directory ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); return; } TF_SetStatus(status, TF_OK, ""); @@ -983,101 +645,372 @@ static void CreateDir(const TF_Filesystem* filesystem, const char* path, static void RecursivelyCreateDir(const TF_Filesystem* filesystem, const char* path, TF_Status* status) { - Status s = azfs.RecursivelyCreateDir(path); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); - return; - } - TF_SetStatus(status, TF_OK, ""); + CreateDir(filesystem, path, status); } static void DeleteFile(const TF_Filesystem* filesystem, const char* path, TF_Status* status) { - Status s = azfs.DeleteFile(path); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { return; } - TF_SetStatus(status, TF_OK, ""); -} -static void DeleteRecursively(const TF_Filesystem* filesystem, const char* path, - uint64_t* undeleted_files, - uint64_t* undeleted_dirs, TF_Status* status) { - int64 undeleted_files_value, undeleted_dirs_value; - Status s = azfs.DeleteRecursively(path, &undeleted_files_value, - &undeleted_dirs_value); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + auto blob_client = CreateAzBlobClientWrapper(account); + + blob_client.delete_blob(container, object); + if (errno != 0) { + std::string error_message = + absl::StrCat("Failed to delete ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); return; } - *undeleted_files = undeleted_files_value; - *undeleted_dirs = undeleted_dirs_value; TF_SetStatus(status, TF_OK, ""); } static void DeleteDir(const TF_Filesystem* filesystem, const char* path, TF_Status* status) { - Status s = azfs.DeleteDir(path); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + // Doesn't support file delete - call GetChildren (without delimiter) and then + // loop and delete + + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { return; } + if (container.empty()) { + // Don't allow deleting entire storage accout as we can't create them from + // this file system + TF_SetStatus( + status, TF_FAILED_PRECONDITION, + "Cannot delete storage account, limited to blobs or containers"); + return; + } + + auto blob_client = CreateAzBlobClientWrapper(account); + + // Check container exists + // Just pull out the first path component representing the container + if (object.empty()) { + blob_client.delete_container(container); + if (errno != 0) { + std::string error_message = + absl::StrCat("Error deleting ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + } else { + // Delete all blobs under dirname prefix + std::vector children; + ListResources(path, "", blob_client, &children, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + for (const auto& child : children) { + blob_client.delete_blob(container, child); + if (errno != 0) { + std::string error_message = absl::StrCat("Failed to delete ", child, + " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + } + } + TF_SetStatus(status, TF_OK, ""); } +static void DeleteRecursively(const TF_Filesystem* filesystem, const char* path, + uint64_t* undeleted_files, + uint64_t* undeleted_dirs, TF_Status* status) { + *undeleted_files = 0; + *undeleted_dirs = 0; + DeleteDir(filesystem, path, status); +} + static void RenameFile(const TF_Filesystem* filesystem, const char* src, const char* dst, TF_Status* status) { - Status s = azfs.RenameFile(src, dst); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + std::string src_account, src_container, src_object; + ParseAzBlobPath(src, false, &src_account, &src_container, &src_object, + status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + std::string dst_account, dst_container, dst_object; + ParseAzBlobPath(dst, false, &dst_account, &dst_container, &dst_object, + status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + if (src_account != dst_account) { + std::string error_message = + absl::StrCat("Couldn't rename ", src, " to ", dst, + ": moving files between accounts is not supported"); + TF_SetStatus(status, TF_UNIMPLEMENTED, error_message.c_str()); + return; + } + + auto blob_client = CreateAzBlobClientWrapper(src_account); + + blob_client.start_copy(src_container, src_object, dst_container, dst_object); + if (errno != 0) { + std::string error_message = + absl::StrCat("Failed to start rename from ", src, " to ", dst, " (", + errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + + // Wait until copy completes + // Status can be success, pending, aborted or failed + std::string copy_status; + do { + const auto dst_blob_property = + blob_client.get_blob_property(dst_container, dst_object); + copy_status = dst_blob_property.copy_status; + } while (copy_status.find("pending") == 0 && !copy_status.empty()); + + if (copy_status.find("success") == std::string::npos) { + std::string error_message = + absl::StrCat("Process of renaming resulted in status of ", copy_status, + " when renaming ", src, " to ", dst); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); return; } + + blob_client.delete_blob(src_container, src_object); + if (errno != 0) { + std::string error_message = + absl::StrCat("Failed to get delete after copy of ", src, " (", + errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); } static void CopyFile(const TF_Filesystem* filesystem, const char* src, const char* dst, TF_Status* status) { - Status s = azfs.CopyFile(src, dst); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + // 128KB copy buffer + constexpr size_t kCopyFileBufferSize = 128 * 1024; + + std::string src_account, src_container, src_object; + ParseAzBlobPath(src, false, &src_account, &src_container, &src_object, + status); + if (TF_GetCode(status) != TF_OK) { return; } - TF_SetStatus(status, TF_OK, ""); + std::unique_ptr src_file( + new AzBlobRandomAccessFile(src_account, src_container, src_object)); + + std::string dst_account, dst_container, dst_object; + ParseAzBlobPath(dst, false, &dst_account, &dst_container, &dst_object, + status); + if (TF_GetCode(status) != TF_OK) { + return; + } + std::unique_ptr dst_file( + new AzBlobWritableFile(dst_account, dst_container, dst_object)); + + uint64_t offset = 0; + std::unique_ptr buffer(new char[kCopyFileBufferSize]); + while (TF_GetCode(status) == TF_OK) { + int64_t bytes_to_read = + src_file->Read(offset, kCopyFileBufferSize, buffer.get(), status); + if (!(TF_GetCode(status) == TF_OK || + TF_GetCode(status) == TF_OUT_OF_RANGE)) { + return; + } + dst_file->Append(buffer.get(), bytes_to_read, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + offset += bytes_to_read; + } + dst_file->Close(status); } static void PathExists(const TF_Filesystem* filesystem, const char* path, TF_Status* status) { - Status s = azfs.FileExists(path); - if (!s.ok()) { - TF_SetStatus(status, TF_NOT_FOUND, s.error_message().c_str()); + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + auto blob_client = CreateAzBlobClientWrapper(account); + auto blob_exists = blob_client.blob_exists(container, object); + if (errno != 0) { + std::string error_message = absl::StrCat( + "Failed to check if ", path, " exists (", errno_to_string(), ")"); + TF_SetStatus(status, TF_NOT_FOUND, error_message.c_str()); + return; + } + if (!blob_exists) { + std::string error_message = + absl::StrCat("The specified path ", path, " was not found"); + TF_SetStatus(status, TF_NOT_FOUND, error_message.c_str()); return; } TF_SetStatus(status, TF_OK, ""); } +static bool IsDirectory(const TF_Filesystem* filesystem, const char* path, + TF_Status* status) { + // Should check that account and container exist and that fname isn't a file + // Azure storage file system is virtual and is created with path compenents in + // blobs name so no need to check further + + std::string account, container, object; + ParseAzBlobPath(path, true, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return false; + } + + auto blob_client = CreateAzBlobClientWrapper(account); + + if (container.empty()) { + TF_SetStatus(status, TF_UNIMPLEMENTED, + "Currently account exists check is not implemented"); + return false; + // bool is_account; + + // TF_RETURN_IF_ERROR(AccountExists(account, &is_account, blob_client)); + // return is_account ? Status::OK() + // : errors::NotFound("The specified account az://", + // account, " was not found."); + } + + auto container_exists = blob_client.container_exists(container); + if (!container_exists) { + std::string error_message = + absl::StrCat("The specified folder ", path, " was not found"); + TF_SetStatus(status, TF_NOT_FOUND, error_message.c_str()); + return false; + } + + if (!object.empty()) { + // Lastly check fname doesn't point to a file + auto blob_exists = blob_client.blob_exists(container, object); + if (blob_exists) { + std::string error_message = + absl::StrCat("The specified folder ", path, " is not a directory"); + TF_SetStatus(status, TF_FAILED_PRECONDITION, error_message.c_str()); + return false; + } + } + TF_SetStatus(status, TF_OK, ""); + return true; +} + static void Stat(const TF_Filesystem* filesystem, const char* path, TF_FileStatistics* stats, TF_Status* status) { - FileStatistics stats_value; - Status s = azfs.Stat(path, &stats_value); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); + using namespace std::chrono; + + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + auto blob_client = CreateAzBlobClientWrapper(account); + + if (IsDirectory(filesystem, path, status)) { + stats->length = 0; + stats->mtime_nsec = 0; + stats->is_directory = true; return; } - stats->length = stats_value.length; - stats->mtime_nsec = stats_value.mtime_nsec; - stats->is_directory = stats_value.is_directory; + + PathExists(filesystem, path, status); + if (TF_GetCode(status) != TF_OK) { + return; + } + + auto blob_property = blob_client.get_blob_property(container, object); + if (errno != 0) { + std::string error_message = absl::StrCat( + "Failed to get file stats for ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_NOT_FOUND, error_message.c_str()); + return; + } + + stats->length = blob_property.size; + stats->mtime_nsec = + duration_cast(seconds(blob_property.last_modified)).count(); + stats->is_directory = false; TF_SetStatus(status, TF_OK, ""); } static int GetChildren(const TF_Filesystem* filesystem, const char* path, char*** entries, TF_Status* status) { - std::vector result; - Status s = azfs.GetChildren(path, &result); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); - return false; + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return 0; } + + auto blob_client = CreateAzBlobClientWrapper(account); + + std::string continuation_token; + if (container.empty()) { + std::vector result; + // TODO: iterate while continuation_token isn't empty + auto list_containers = + blob_client.list_containers_segmented("", continuation_token, INT_MAX); + std::transform( + begin(list_containers), end(list_containers), + std::back_inserter(result), + [](azure::storage_lite::list_containers_item item) -> std::string { + return item.name; + }); + + int num_entries = result.size(); + *entries = static_cast( + plugin_memory_allocate(num_entries * sizeof((*entries)[0]))); + for (int i = 0; i < num_entries; i++) { + (*entries)[i] = static_cast( + plugin_memory_allocate(strlen(result[i].c_str()) + 1)); + memcpy((*entries)[i], result[i].c_str(), strlen(result[i].c_str()) + 1); + } + TF_SetStatus(status, TF_OK, ""); + return num_entries; + } + + if (!object.empty() && object.back() != '/') { + object += "/"; + } + + auto list_blobs = blob_client.list_blobs_segmented( + container, "/", continuation_token, object); + if (errno != 0) { + std::string error_message = absl::StrCat("Failed to get child of ", path, + " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return 0; + } + + std::vector result; + auto blobs = list_blobs.blobs; + result.reserve(blobs.size()); + std::transform( + std::begin(blobs), std::end(blobs), std::back_inserter(result), + [&object](azure::storage_lite::list_blobs_segmented_item list_blob_item) + -> std::string { + // Remove the prefix from the name + auto blob_name = list_blob_item.name; + blob_name.erase(0, object.size()); + // Remove the trailing slash from folders + if (blob_name.back() == '/') { + blob_name.pop_back(); + } + return blob_name; + }); + int num_entries = result.size(); *entries = static_cast( plugin_memory_allocate(num_entries * sizeof((*entries)[0]))); @@ -1092,19 +1025,23 @@ static int GetChildren(const TF_Filesystem* filesystem, const char* path, static int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path, TF_Status* status) { - TF_SetStatus(status, TF_UNIMPLEMENTED, "GetFileSize not implemented"); - return -1; -} + std::string account, container, object; + ParseAzBlobPath(path, false, &account, &container, &object, status); + if (TF_GetCode(status) != TF_OK) { + return 0; + } -static bool IsDirectory(const TF_Filesystem* filesystem, const char* path, - TF_Status* status) { - Status s = azfs.IsDirectory(path); - if (!s.ok()) { - TF_SetStatus(status, TF_INTERNAL, s.error_message().c_str()); - return false; + auto blob_client = CreateAzBlobClientWrapper(account); + auto blob_property = blob_client.get_blob_property(container, object); + if (errno != 0) { + std::string error_message = absl::StrCat( + "Failed to get properties of ", path, " (", errno_to_string(), ")"); + TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); + return 0; } + TF_SetStatus(status, TF_OK, ""); - return true; + return blob_property.size; } static char* TranslateName(const TF_Filesystem* filesystem, const char* uri) { @@ -1113,6 +1050,8 @@ static char* TranslateName(const TF_Filesystem* filesystem, const char* uri) { } // namespace tf_azfs_filesystem +} // namespace + static void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri) { TF_SetFilesystemVersionMetadata(ops); diff --git a/tensorflow_io/core/kernels/azfs_kernels.h b/tensorflow_io/core/kernels/azfs_kernels.h deleted file mode 100644 index 611e75a19..000000000 --- a/tensorflow_io/core/kernels/azfs_kernels.h +++ /dev/null @@ -1,82 +0,0 @@ -/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#ifndef TENSORFLOW_IO_AZURE_AZFS_OPS_H_ -#define TENSORFLOW_IO_AZURE_AZFS_OPS_H_ - -#include "blob/blob_client.h" -#include "tensorflow/core/platform/file_system.h" - -namespace tensorflow { -namespace io { - -// class AzBlobFileSystem; - -/// Azure Blob Storage implementation of a file system. -class AzBlobFileSystem : public FileSystem { - public: - Status NewRandomAccessFile( - const std::string& filename, - std::unique_ptr* result) override; - - Status NewWritableFile(const std::string& fname, - std::unique_ptr* result) override; - - Status NewAppendableFile(const std::string& fname, - std::unique_ptr* result) override; - - Status NewReadOnlyMemoryRegionFromFile( - const std::string& filename, - std::unique_ptr* result) override; - - Status FileExists(const std::string& fname) override; - - Status Stat(const std::string& fname, FileStatistics* stat) override; - - Status GetChildren(const std::string& dir, - std::vector* result) override; - - Status GetMatchingPaths(const std::string& pattern, - std::vector* results) override; - - Status DeleteFile(const std::string& fname) override; - - Status CreateDir(const std::string& dirname) override; - - Status DeleteDir(const std::string& dirname) override; - - Status GetFileSize(const std::string& fname, uint64* file_size) override; - - Status RenameFile(const std::string& src, const std::string& target) override; - - Status IsDirectory(const std::string& fname) override; - - Status RecursivelyCreateDir(const string& dirname) override; - - Status DeleteRecursively(const std::string& dirname, int64* undeleted_files, - int64* undeleted_dirs) override; - - void FlushCaches() override; - - private: - Status ListResources(const std::string& dir, const std::string& delimiter, - azure::storage_lite::blob_client_wrapper& blob_client, - std::vector* results) const; -}; - -} // namespace io -} // namespace tensorflow - -#endif // TENSORFLOW_IO_AZURE_AZFS_OPS_H_ diff --git a/tensorflow_io/core/kernels/azfs_kernels_test.cc b/tensorflow_io/core/kernels/azfs_kernels_test.cc deleted file mode 100644 index 97bb3cc02..000000000 --- a/tensorflow_io/core/kernels/azfs_kernels_test.cc +++ /dev/null @@ -1,281 +0,0 @@ -/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -#include "tensorflow_io/core/kernels/azfs_kernels.h" - -#include "gtest/gtest.h" - -#define EXPECT_OK(val) EXPECT_EQ(val, Status::OK()) - -namespace tensorflow { -namespace io { -namespace { - -using namespace tensorflow::io; - -class AzBlobFileSystemTest : public ::testing::Test { - protected: - AzBlobFileSystemTest() {} - - std::string PathTo(const std::string& path) { - return "az://devstoreaccount1/aztest" + path; - } - - Status WriteString(const std::string& fname, const std::string& content) { - std::unique_ptr writer; - TF_RETURN_IF_ERROR(fs.NewWritableFile(fname, &writer)); - TF_RETURN_IF_ERROR(writer->Append(content)); - TF_RETURN_IF_ERROR(writer->Close()); - return Status::OK(); - } - - Status ReadAll(const std::string& fname, std::string* content) { - std::unique_ptr reader; - TF_RETURN_IF_ERROR(fs.NewRandomAccessFile(fname, &reader)); - - uint64 file_size = 0; - TF_RETURN_IF_ERROR(fs.GetFileSize(fname, &file_size)); - - StringPiece result; - char* strdata = &(*content)[0]; - TF_RETURN_IF_ERROR(reader->Read(0, file_size, &result, strdata)); - if (file_size != result.size()) { - return errors::DataLoss("expected ", file_size, " got ", result.size(), - " bytes"); - } - - *content = std::string(result); - - return Status::OK(); - } - - void SetUp() override { - // Create container - (void)fs.CreateDir(PathTo("")); - } - - void TearDown() override { - // Delete container - // fs.DeleteDir(PathTo("")); - } - - AzBlobFileSystem fs; -}; - -TEST_F(AzBlobFileSystemTest, ContainerShouldBeDirectory) { - auto container_path = PathTo(""); - auto is_dir = fs.IsDirectory(container_path); - EXPECT_EQ(is_dir, Status::OK()); -} - -TEST_F(AzBlobFileSystemTest, NewRandomAccessFile) { - const std::string fname = PathTo("/RandomAccessFile"); - const std::string content = "abcdefghijklmn"; - - size_t size = 4, offset = 2; - const auto content_substring = content.substr(offset, size); - - EXPECT_OK(WriteString(fname, content)); - - std::unique_ptr reader; - EXPECT_OK(fs.NewRandomAccessFile(fname, &reader)); - - StringPiece result; - EXPECT_OK(reader->Read(0, content.size(), &result, nullptr)); - EXPECT_EQ(content, result); - - EXPECT_OK(reader->Read(offset, size, &result, nullptr)); - EXPECT_EQ(content_substring, result); -} - -TEST_F(AzBlobFileSystemTest, NewWritableFile) { - std::unique_ptr writer; - const std::string fname = PathTo("/WritableFile"); - EXPECT_OK(fs.NewWritableFile(fname, &writer)); - EXPECT_OK(writer->Append("content1,")); - EXPECT_OK(writer->Append("content2")); - EXPECT_OK(writer->Flush()); - EXPECT_OK(writer->Sync()); - EXPECT_OK(writer->Close()); - - std::string content; - EXPECT_OK(ReadAll(fname, &content)); - EXPECT_EQ("content1,content2", content); -} - -TEST_F(AzBlobFileSystemTest, NewAppendableFile) { - std::unique_ptr writer; - - const std::string fname = PathTo("/AppendableFile"); - EXPECT_OK(WriteString(fname, "test")); - - EXPECT_OK(fs.NewAppendableFile(fname, &writer)); - EXPECT_OK(writer->Append("content")); - EXPECT_OK(writer->Close()); -} - -TEST_F(AzBlobFileSystemTest, NewReadOnlyMemoryRegionFromFile) { - const auto fname = PathTo("/MemoryFile"); - const auto content = "content"; - EXPECT_OK(WriteString(fname, content)); - std::unique_ptr region; - EXPECT_OK(fs.NewReadOnlyMemoryRegionFromFile(fname, ®ion)); - - const char* data = static_cast(region->data()); - auto length = region->length(); - std::string read_content(data, length); - EXPECT_EQ(content, read_content); -} - -TEST_F(AzBlobFileSystemTest, FileExists) { - const std::string fname = PathTo("/FileExists"); - EXPECT_EQ(error::Code::NOT_FOUND, fs.FileExists(fname).code()); - EXPECT_OK(WriteString(fname, "test")); - EXPECT_OK(fs.FileExists(fname)); - EXPECT_OK(fs.DeleteFile(fname)); -} - -TEST_F(AzBlobFileSystemTest, GetChildren) { - const auto base = PathTo("/GetChildren"); - EXPECT_OK(fs.CreateDir(base)); - - const auto file = base + "/TestFile.csv"; - EXPECT_OK(WriteString(file, "test")); - - const auto subdir = base + "/SubDir"; - EXPECT_OK(fs.CreateDir(subdir)); - const auto subfile = subdir + "/TestSubFile.csv"; - EXPECT_OK(WriteString(subfile, "test")); - - std::vector children; - EXPECT_OK(fs.GetChildren(base, &children)); - std::sort(children.begin(), children.end()); - EXPECT_EQ(std::vector({"SubDir", "TestFile.csv"}), children); -} - -TEST_F(AzBlobFileSystemTest, DeleteFile) { - const std::string fname = PathTo("/DeleteFile"); - EXPECT_OK(WriteString(fname, "test")); - EXPECT_OK(fs.DeleteFile(fname)); -} - -TEST_F(AzBlobFileSystemTest, DeleteRecursively) { - const std::string fname = PathTo("/recursive"); - - for (const auto& ext : {".txt", ".md"}) { - for (int i = 0; i < 3; ++i) { - const auto this_fname = fname + "/" + std::to_string(i) + ext; - (void)WriteString(this_fname, ""); - } - } - - std::vector txt_files; - (void)fs.GetMatchingPaths(fname + "/*.txt", &txt_files); - EXPECT_EQ(3, txt_files.size()); - - int64 undeleted_files, undeleted_dirs; - EXPECT_OK(fs.DeleteRecursively(fname, &undeleted_files, &undeleted_dirs)); -} - -TEST_F(AzBlobFileSystemTest, GetFileSize) { - const std::string fname = PathTo("/GetFileSize"); - EXPECT_OK(WriteString(fname, "test")); - uint64 file_size = 0; - EXPECT_OK(fs.GetFileSize(fname, &file_size)); - EXPECT_EQ(4, file_size); -} - -TEST_F(AzBlobFileSystemTest, CreateDir) { - const auto dir = PathTo("/CreateDir"); - EXPECT_OK(fs.CreateDir(dir)); - - const auto file = dir + "/CreateDirFile.csv"; - EXPECT_OK(WriteString(file, "test")); - FileStatistics stat; - EXPECT_OK(fs.Stat(dir, &stat)); - EXPECT_TRUE(stat.is_directory); -} - -TEST_F(AzBlobFileSystemTest, DeleteDir) { - const auto dir = PathTo("/DeleteDir"); - const auto file = dir + "/DeleteDirFile.csv"; - EXPECT_OK(WriteString(file, "test")); - EXPECT_OK(fs.DeleteDir(dir)); - - FileStatistics stat; - // Still OK here as virtual directories always exist - EXPECT_OK(fs.Stat(dir, &stat)); -} - -TEST_F(AzBlobFileSystemTest, RenameFile) { - const auto fname1 = PathTo("/RenameFile1"); - const auto fname2 = PathTo("/RenameFile2"); - EXPECT_OK(WriteString(fname1, "test")); - EXPECT_OK(fs.RenameFile(fname1, fname2)); - std::string content; - EXPECT_OK(ReadAll(fname2, &content)); - EXPECT_EQ("test", content); -} - -TEST_F(AzBlobFileSystemTest, RenameFile_Overwrite) { - const auto fname1 = PathTo("/RenameFile1"); - const auto fname2 = PathTo("/RenameFile2"); - - EXPECT_OK(WriteString(fname2, "test")); - EXPECT_OK(fs.FileExists(fname2)); - - EXPECT_OK(WriteString(fname1, "test")); - EXPECT_OK(fs.RenameFile(fname1, fname2)); - std::string content; - EXPECT_OK(ReadAll(fname2, &content)); - EXPECT_EQ("test", content); -} - -TEST_F(AzBlobFileSystemTest, StatFile) { - const auto fname = PathTo("/StatFile"); - EXPECT_OK(WriteString(fname, "test")); - FileStatistics stat; - EXPECT_OK(fs.Stat(fname, &stat)); - EXPECT_EQ(4, stat.length); - EXPECT_FALSE(stat.is_directory); -} - -TEST_F(AzBlobFileSystemTest, GetMatchingPaths_NoWildcard) { - const auto fname = PathTo("/path/subpath/file2.txt"); - - EXPECT_OK(WriteString(fname, "test")); - std::vector results; - EXPECT_OK(fs.GetMatchingPaths(fname, &results)); - EXPECT_EQ(std::vector({fname}), results); -} - -TEST_F(AzBlobFileSystemTest, GetMatchingPaths_FilenameWildcard) { - const auto fname1 = PathTo("/path/subpath/file1.txt"); - const auto fname2 = PathTo("/path/subpath/file2.txt"); - const auto fname3 = PathTo("/path/subpath/another.txt"); - - EXPECT_OK(WriteString(fname1, "test")); - EXPECT_OK(WriteString(fname2, "test")); - EXPECT_OK(WriteString(fname3, "test")); - - const auto pattern = PathTo("/path/subpath/file*.txt"); - std::vector results; - EXPECT_OK(fs.GetMatchingPaths(pattern, &results)); - EXPECT_EQ(std::vector({fname1, fname2}), results); -} - -} // namespace -} // namespace io -} // namespace tensorflow diff --git a/tests/test_azure.py b/tests/test_azure.py index 9c9741322..b9b09b423 100644 --- a/tests/test_azure.py +++ b/tests/test_azure.py @@ -20,6 +20,8 @@ from tensorflow.python.platform import gfile import tensorflow_io as tfio # pylint: disable=unused-import +# Note: export TF_AZURE_USE_DEV_STORAGE=1 to enable emulation + class AZFSTest(test.TestCase): """[summary] From a1e651e4f1390c178718c7a778b776aba90f03c3 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Mon, 2 Nov 2020 13:33:00 -0800 Subject: [PATCH 3/9] Update to address CentOS 7 issues Signed-off-by: Yong Tang --- .github/workflows/build.yml | 2 +- README.md | 8 +++- tensorflow_io/core/BUILD | 47 ++++++++++++++++++- tensorflow_io/core/kernels/azfs_kernels.cc | 22 ++------- .../core/kernels/file_system_plugins.cc | 26 ++++++++++ .../core/kernels/file_system_plugins.h | 38 +++++++++++++++ tensorflow_io/core/kernels/void_kernels.cc | 18 +++++++ tensorflow_io/core/python/ops/__init__.py | 2 +- 8 files changed, 139 insertions(+), 24 deletions(-) create mode 100644 tensorflow_io/core/kernels/file_system_plugins.cc create mode 100644 tensorflow_io/core/kernels/file_system_plugins.h create mode 100644 tensorflow_io/core/kernels/void_kernels.cc diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 287774fc1..2f9dac6b6 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -330,7 +330,7 @@ jobs: python3 setup.py --package-version | xargs python3 -m pip install python3 tools/build/configure.py cat .bazelrc - bazel build -s --verbose_failures //tensorflow_io/core:python/ops/libtensorflow_io.so //tensorflow_io/core:python/ops/libtensorflow_io_azfs.so + bazel build -s --verbose_failures //tensorflow_io/core:python/ops/libtensorflow_io.so //tensorflow_io/core:python/ops/libtensorflow_io_plugins.so - uses: actions/upload-artifact@v1 with: name: ${{ runner.os }}-bazel-bin diff --git a/README.md b/README.md index d55fe903c..305116f7e 100644 --- a/README.md +++ b/README.md @@ -311,6 +311,10 @@ libraries (.so). The gcc provided by Developer Toolset and rh-python36 should be Also, the libstdc++ has to be linked statically to avoid discrepancy of libstdc++ installed on CentOS vs. newer gcc version by devtoolset. +Furthermore, a special flag `--//tensorflow_io/core:static_build` has to be passed to Bazel +in order to avoid duplication of symbols in statically linked libraries for file system +plugins. + The following will install bazel, devtoolset-9, rh-python36, and build the shared libraries: ```sh #!/usr/bin/env bash @@ -331,10 +335,10 @@ scl enable rh-python36 devtoolset-9 \ scl enable rh-python36 devtoolset-9 \ './configure.sh' -# Build shared libraries +# Build shared libraries, notice the passing of --//tensorflow_io/core:static_build BAZEL_LINKOPTS="-static-libstdc++ -static-libgcc" BAZEL_LINKLIBS="-lm -l%:libstdc++.a" \ scl enable rh-python36 devtoolset-9 \ - 'bazel build -s --verbose_failures //tensorflow_io/...' + 'bazel build -s --verbose_failures --//tensorflow_io/core:static_build //tensorflow_io/...' # Once build is complete, shared libraries will be available in # `bazel-bin/tensorflow_io/core/python/ops/` and it is possible diff --git a/tensorflow_io/core/BUILD b/tensorflow_io/core/BUILD index c96c7483f..ebe32b033 100644 --- a/tensorflow_io/core/BUILD +++ b/tensorflow_io/core/BUILD @@ -7,6 +7,19 @@ load( "tf_io_copts", ) load("@io_bazel_rules_go//go:def.bzl", "go_binary") +load("@bazel_skylib//rules:common_settings.bzl", "bool_flag") + +bool_flag( + name = "static_build", + build_setting_default = False, +) + +config_setting( + name = "static_build_on", + flag_values = { + ":static_build": "True", + }, +) cc_library( name = "cpuinfo", @@ -565,6 +578,8 @@ cc_library( name = "file_system_plugins", srcs = [ "kernels/azfs_kernels.cc", + "kernels/file_system_plugins.cc", + "kernels/file_system_plugins.h", ], copts = tf_io_copts(), linkstatic = True, @@ -689,9 +704,37 @@ cc_binary( "//tensorflow_io/core:oss_ops", "//tensorflow_io/core/kernels/gsmemcachedfs:gs_memcached_file_system", ], - }) + [ - "//tensorflow_io/core:file_system_plugins", + }) + select({ + "//tensorflow_io/core:static_build_on": [ + "//tensorflow_io/core:file_system_plugins", + ], + "//conditions:default": [], + }), +) + +cc_library( + name = "void_plugins", + srcs = [ + "kernels/void_kernels.cc", ], + copts = tf_io_copts(), + linkstatic = True, + deps = [], + alwayslink = 1, +) + +cc_binary( + name = "python/ops/libtensorflow_io_plugins.so", + copts = tf_io_copts(), + linkshared = 1, + deps = select({ + "//tensorflow_io/core:static_build_on": [ + "//tensorflow_io/core:void_plugins", + ], + "//conditions:default": [ + "//tensorflow_io/core:file_system_plugins", + ], + }), ) cc_binary( diff --git a/tensorflow_io/core/kernels/azfs_kernels.cc b/tensorflow_io/core/kernels/azfs_kernels.cc index be9e14dde..9f7c78575 100644 --- a/tensorflow_io/core/kernels/azfs_kernels.cc +++ b/tensorflow_io/core/kernels/azfs_kernels.cc @@ -27,11 +27,11 @@ limitations under the License. #include "storage_account.h" #include "storage_credential.h" #include "storage_errno.h" -#include "tensorflow/c/experimental/filesystem/filesystem_interface.h" +#include "tensorflow_io/core/kernels/file_system_plugins.h" namespace tensorflow { namespace io { -namespace azfs { +namespace az { namespace { // TODO: DO NOT use a hardcoded path bool GetTmpFilename(std::string* filename) { @@ -489,9 +489,6 @@ Status GetMatchingPaths(const std::string& pattern, std::vector* re } #endif -static void* plugin_memory_allocate(size_t size) { return calloc(1, size); } -static void plugin_memory_free(void* ptr) { free(ptr); } - // SECTION 1. Implementation for `TF_RandomAccessFile` // ---------------------------------------------------------------------------- namespace tf_random_access_file { @@ -1052,8 +1049,7 @@ static char* TranslateName(const TF_Filesystem* filesystem, const char* uri) { } // namespace -static void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, - const char* uri) { +void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri) { TF_SetFilesystemVersionMetadata(ops); ops->scheme = strdup(uri); @@ -1106,16 +1102,6 @@ static void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, ops->filesystem_ops->translate_name = tf_azfs_filesystem::TranslateName; } -} // namespace azfs +} // namespace az } // namespace io } // namespace tensorflow - -void TF_InitPlugin(TF_FilesystemPluginInfo* info) { - info->plugin_memory_allocate = tensorflow::io::azfs::plugin_memory_allocate; - info->plugin_memory_free = tensorflow::io::azfs::plugin_memory_free; - info->num_schemes = 1; - info->ops = static_cast( - tensorflow::io::azfs::plugin_memory_allocate(info->num_schemes * - sizeof(info->ops[0]))); - tensorflow::io::azfs::ProvideFilesystemSupportFor(&info->ops[0], "az"); -} diff --git a/tensorflow_io/core/kernels/file_system_plugins.cc b/tensorflow_io/core/kernels/file_system_plugins.cc new file mode 100644 index 000000000..994760094 --- /dev/null +++ b/tensorflow_io/core/kernels/file_system_plugins.cc @@ -0,0 +1,26 @@ +/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#include "tensorflow_io/core/kernels/file_system_plugins.h" + +void TF_InitPlugin(TF_FilesystemPluginInfo* info) { + info->plugin_memory_allocate = tensorflow::io::plugin_memory_allocate; + info->plugin_memory_free = tensorflow::io::plugin_memory_free; + info->num_schemes = 1; + info->ops = static_cast( + tensorflow::io::plugin_memory_allocate(info->num_schemes * + sizeof(info->ops[0]))); + tensorflow::io::az::ProvideFilesystemSupportFor(&info->ops[0], "az"); +} diff --git a/tensorflow_io/core/kernels/file_system_plugins.h b/tensorflow_io/core/kernels/file_system_plugins.h new file mode 100644 index 000000000..a9cb7aa7c --- /dev/null +++ b/tensorflow_io/core/kernels/file_system_plugins.h @@ -0,0 +1,38 @@ +/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +#ifndef TENSORFLOW_IO_CORE_KERNELS_FILE_SYSTEM_PLUGINS_H +#define TENSORFLOW_IO_CORE_KERNELS_FILE_SYSTEM_PLUGINS_H + +#include + +#include "tensorflow/c/experimental/filesystem/filesystem_interface.h" + +namespace tensorflow { +namespace io { + +static void* plugin_memory_allocate(size_t size) { return calloc(1, size); } +static void plugin_memory_free(void* ptr) { free(ptr); } + +namespace az { + +void ProvideFilesystemSupportFor(TF_FilesystemPluginOps* ops, const char* uri); + +} // namespace az + +} // namespace io +} // namespace tensorflow + +#endif // TENSORFLOW_IO_CORE_KERNELS_FILE_SYSTEM_PLUGINS_H diff --git a/tensorflow_io/core/kernels/void_kernels.cc b/tensorflow_io/core/kernels/void_kernels.cc new file mode 100644 index 000000000..eca05f9d5 --- /dev/null +++ b/tensorflow_io/core/kernels/void_kernels.cc @@ -0,0 +1,18 @@ +/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +==============================================================================*/ + +extern "C" { +void TF_InitPlugin(void* info) {} +} diff --git a/tensorflow_io/core/python/ops/__init__.py b/tensorflow_io/core/python/ops/__init__.py index 8ee607e9a..d4604279d 100644 --- a/tensorflow_io/core/python/ops/__init__.py +++ b/tensorflow_io/core/python/ops/__init__.py @@ -71,4 +71,4 @@ def _load_library(filename, lib="op"): core_ops = _load_library("libtensorflow_io.so") -azfs_ops = _load_library("libtensorflow_io.so", "fs") +azfs_ops = _load_library("libtensorflow_io_plugins.so", "fs") From 07c8e9d6fb82005474c6e8646086c1a5af93eef5 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Thu, 24 Sep 2020 14:03:26 -0700 Subject: [PATCH 4/9] Bump version to TF 2.4.0rc0, and 0.17.0 Signed-off-by: Yong Tang --- tensorflow_io/core/python/ops/version_ops.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tensorflow_io/core/python/ops/version_ops.py b/tensorflow_io/core/python/ops/version_ops.py index f2fc3720a..40b9fb156 100644 --- a/tensorflow_io/core/python/ops/version_ops.py +++ b/tensorflow_io/core/python/ops/version_ops.py @@ -14,5 +14,5 @@ # ============================================================================== """version_ops""" -package = "tensorflow>=2.3.0,<2.4.0" -version = "0.16.0" +package = "tensorflow>=2.4.0rc0,<2.5.0" +version = "0.17.0" From e919dc8dc0cf55cf057ca6907a2f8a468bd7c73e Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Tue, 3 Nov 2020 00:00:41 +0000 Subject: [PATCH 5/9] Additional fix for CentOS 7 Signed-off-by: Yong Tang --- tensorflow_io/core/BUILD | 15 +-------------- tensorflow_io/core/kernels/void_kernels.cc | 18 ------------------ tensorflow_io/core/python/ops/__init__.py | 5 ++++- 3 files changed, 5 insertions(+), 33 deletions(-) delete mode 100644 tensorflow_io/core/kernels/void_kernels.cc diff --git a/tensorflow_io/core/BUILD b/tensorflow_io/core/BUILD index ebe32b033..0e5563153 100644 --- a/tensorflow_io/core/BUILD +++ b/tensorflow_io/core/BUILD @@ -712,25 +712,12 @@ cc_binary( }), ) -cc_library( - name = "void_plugins", - srcs = [ - "kernels/void_kernels.cc", - ], - copts = tf_io_copts(), - linkstatic = True, - deps = [], - alwayslink = 1, -) - cc_binary( name = "python/ops/libtensorflow_io_plugins.so", copts = tf_io_copts(), linkshared = 1, deps = select({ - "//tensorflow_io/core:static_build_on": [ - "//tensorflow_io/core:void_plugins", - ], + "//tensorflow_io/core:static_build_on": [], "//conditions:default": [ "//tensorflow_io/core:file_system_plugins", ], diff --git a/tensorflow_io/core/kernels/void_kernels.cc b/tensorflow_io/core/kernels/void_kernels.cc deleted file mode 100644 index eca05f9d5..000000000 --- a/tensorflow_io/core/kernels/void_kernels.cc +++ /dev/null @@ -1,18 +0,0 @@ -/* Copyright 2019 The TensorFlow Authors. All Rights Reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -==============================================================================*/ - -extern "C" { -void TF_InitPlugin(void* info) {} -} diff --git a/tensorflow_io/core/python/ops/__init__.py b/tensorflow_io/core/python/ops/__init__.py index d4604279d..7294b696e 100644 --- a/tensorflow_io/core/python/ops/__init__.py +++ b/tensorflow_io/core/python/ops/__init__.py @@ -71,4 +71,7 @@ def _load_library(filename, lib="op"): core_ops = _load_library("libtensorflow_io.so") -azfs_ops = _load_library("libtensorflow_io_plugins.so", "fs") +try: + plugin_ops = _load_library("libtensorflow_io_plugins.so", "fs") +except NotImplementedError as e: + plugin_ops = _load_library("libtensorflow_io.so", "fs") From 1d324149e5984f543a407c16c3548ee20025a929 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Tue, 3 Nov 2020 14:14:43 -0800 Subject: [PATCH 6/9] Fix a small bug in return with EOF Signed-off-by: Yong Tang --- tensorflow_io/core/kernels/azfs_kernels.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/tensorflow_io/core/kernels/azfs_kernels.cc b/tensorflow_io/core/kernels/azfs_kernels.cc index 9f7c78575..9be07439a 100644 --- a/tensorflow_io/core/kernels/azfs_kernels.cc +++ b/tensorflow_io/core/kernels/azfs_kernels.cc @@ -354,6 +354,7 @@ class AzBlobRandomAccessFile { if (bytes_to_read < n) { TF_SetStatus(status, TF_OUT_OF_RANGE, "EOF reached"); + return bytes_to_read; } TF_SetStatus(status, TF_OK, ""); return bytes_to_read; From a5bf11508851cb250fb5121dee7ba0cdd5b30549 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Thu, 5 Nov 2020 09:36:55 -0800 Subject: [PATCH 7/9] Address review comments Signed-off-by: Yong Tang --- tensorflow_io/core/kernels/azfs_kernels.cc | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/tensorflow_io/core/kernels/azfs_kernels.cc b/tensorflow_io/core/kernels/azfs_kernels.cc index 9be07439a..5c7a6e943 100644 --- a/tensorflow_io/core/kernels/azfs_kernels.cc +++ b/tensorflow_io/core/kernels/azfs_kernels.cc @@ -120,6 +120,13 @@ void ParseAzBlobPath(const std::string& fname, bool empty_object_ok, *object = std::string(objectp.substr(pos + 1)); } + if (!empty_object_ok && object.empty()) { + std::string error_message = absl::StrCat( + "Azure Blob Storage path doesn't contain a object name: ", fname); + TF_SetStatus(status, TF_INVALID_ARGUMENT, error_message.c_str()); + return; + } + TF_SetStatus(status, TF_OK, ""); return; } @@ -658,8 +665,8 @@ static void DeleteFile(const TF_Filesystem* filesystem, const char* path, blob_client.delete_blob(container, object); if (errno != 0) { - std::string error_message = - absl::StrCat("Failed to delete ", path, " (", errno_to_string(), ")"); + std::string error_message = absl::StrCat( + "Failed to delete ", path, ": ", errno, "(", errno_to_string(), ")"); TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); return; } @@ -766,6 +773,9 @@ static void RenameFile(const TF_Filesystem* filesystem, const char* src, // Status can be success, pending, aborted or failed std::string copy_status; do { + if (!copy_status.empty()) { + sleep(1); + } const auto dst_blob_property = blob_client.get_blob_property(dst_container, dst_object); copy_status = dst_blob_property.copy_status; @@ -773,8 +783,7 @@ static void RenameFile(const TF_Filesystem* filesystem, const char* src, if (copy_status.find("success") == std::string::npos) { std::string error_message = - absl::StrCat("Process of renaming resulted in status of ", copy_status, - " when renaming ", src, " to ", dst); + absl::StrCat("Process of renaming from , src, " to ", dst, " resulted in status of ", copy_status); TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); return; } From 6c203fc6bd60df6ee6cb4d5fe5b5705f6dbcc2a2 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Thu, 5 Nov 2020 11:17:20 -0800 Subject: [PATCH 8/9] Add missing `"` typo Signed-off-by: Yong Tang --- tensorflow_io/core/kernels/azfs_kernels.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tensorflow_io/core/kernels/azfs_kernels.cc b/tensorflow_io/core/kernels/azfs_kernels.cc index 5c7a6e943..8c44b2f55 100644 --- a/tensorflow_io/core/kernels/azfs_kernels.cc +++ b/tensorflow_io/core/kernels/azfs_kernels.cc @@ -120,7 +120,7 @@ void ParseAzBlobPath(const std::string& fname, bool empty_object_ok, *object = std::string(objectp.substr(pos + 1)); } - if (!empty_object_ok && object.empty()) { + if (!empty_object_ok && object->empty()) { std::string error_message = absl::StrCat( "Azure Blob Storage path doesn't contain a object name: ", fname); TF_SetStatus(status, TF_INVALID_ARGUMENT, error_message.c_str()); @@ -783,7 +783,8 @@ static void RenameFile(const TF_Filesystem* filesystem, const char* src, if (copy_status.find("success") == std::string::npos) { std::string error_message = - absl::StrCat("Process of renaming from , src, " to ", dst, " resulted in status of ", copy_status); + absl::StrCat("Process of renaming from ", src, " to ", dst, + " resulted in status of ", copy_status); TF_SetStatus(status, TF_INTERNAL, error_message.c_str()); return; } From 37dfa10cc2c020864dacc3b1fdcdf33877534e7b Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Thu, 5 Nov 2020 12:25:07 -0800 Subject: [PATCH 9/9] Make Sleep portable on Windows C++ (Visual Studio) Signed-off-by: Yong Tang --- tensorflow_io/core/kernels/azfs_kernels.cc | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/tensorflow_io/core/kernels/azfs_kernels.cc b/tensorflow_io/core/kernels/azfs_kernels.cc index 8c44b2f55..d5de173d8 100644 --- a/tensorflow_io/core/kernels/azfs_kernels.cc +++ b/tensorflow_io/core/kernels/azfs_kernels.cc @@ -19,6 +19,10 @@ limitations under the License. #include #include +#if defined(_MSC_VER) +#include +#endif + #include "absl/strings/str_cat.h" #include "absl/strings/string_view.h" #include "absl/strings/strip.h" @@ -774,7 +778,11 @@ static void RenameFile(const TF_Filesystem* filesystem, const char* src, std::string copy_status; do { if (!copy_status.empty()) { +#if defined(_MSC_VER) + Sleep(1000); +#else sleep(1); +#endif } const auto dst_blob_property = blob_client.get_blob_property(dst_container, dst_object);