-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Add Ability to List Child Containers to BlobContainer #42653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f484e0f
8f64fae
4fa020a
3dcbf28
19354d9
4512cea
2eb7cd5
2bc2371
7736bf7
963c787
574896d
3666288
ab56c73
29ac16b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,15 +29,18 @@ | |
| import com.microsoft.azure.storage.blob.BlobInputStream; | ||
| import com.microsoft.azure.storage.blob.BlobListingDetails; | ||
| import com.microsoft.azure.storage.blob.BlobProperties; | ||
| import com.microsoft.azure.storage.blob.CloudBlob; | ||
| import com.microsoft.azure.storage.blob.CloudBlobClient; | ||
| import com.microsoft.azure.storage.blob.CloudBlobContainer; | ||
| import com.microsoft.azure.storage.blob.CloudBlobDirectory; | ||
| import com.microsoft.azure.storage.blob.CloudBlockBlob; | ||
| import com.microsoft.azure.storage.blob.DeleteSnapshotsOption; | ||
| import com.microsoft.azure.storage.blob.ListBlobItem; | ||
| import org.apache.logging.log4j.LogManager; | ||
| import org.apache.logging.log4j.Logger; | ||
| import org.apache.logging.log4j.message.ParameterizedMessage; | ||
| import org.elasticsearch.common.blobstore.BlobMetaData; | ||
| import org.elasticsearch.common.blobstore.BlobPath; | ||
| import org.elasticsearch.common.blobstore.support.PlainBlobMetaData; | ||
| import org.elasticsearch.common.collect.Tuple; | ||
| import org.elasticsearch.common.settings.Settings; | ||
|
|
@@ -54,7 +57,9 @@ | |
| import java.security.InvalidKeyException; | ||
| import java.util.EnumSet; | ||
| import java.util.HashMap; | ||
| import java.util.HashSet; | ||
| import java.util.Map; | ||
| import java.util.Set; | ||
| import java.util.function.Supplier; | ||
|
|
||
| import static java.util.Collections.emptyMap; | ||
|
|
@@ -214,15 +219,40 @@ public Map<String, BlobMetaData> listBlobsByPrefix(String account, String contai | |
| // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ | ||
| // this requires 1 + container.length() + 1, with each 1 corresponding to one of the / | ||
| final String blobPath = uri.getPath().substring(1 + container.length() + 1); | ||
| final BlobProperties properties = ((CloudBlockBlob) blobItem).getProperties(); | ||
| final String name = blobPath.substring(keyPath.length()); | ||
| logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength())); | ||
| blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength())); | ||
| if (blobItem instanceof CloudBlob) { | ||
| final BlobProperties properties = ((CloudBlob) blobItem).getProperties(); | ||
| final String name = blobPath.substring(keyPath.length()); | ||
| logger.trace(() -> new ParameterizedMessage("blob url [{}], name [{}], size [{}]", uri, name, properties.getLength())); | ||
| blobsBuilder.put(name, new PlainBlobMetaData(name, properties.getLength())); | ||
| } | ||
| } | ||
| }); | ||
| return Map.copyOf(blobsBuilder); | ||
| } | ||
|
|
||
| public Set<String> children(String account, String container, BlobPath path) throws URISyntaxException, StorageException { | ||
| final var blobsBuilder = new HashSet<String>(); | ||
| final Tuple<CloudBlobClient, Supplier<OperationContext>> client = client(account); | ||
| final CloudBlobContainer blobContainer = client.v1().getContainerReference(container); | ||
| final String keyPath = path.buildAsString(); | ||
| final EnumSet<BlobListingDetails> enumBlobListingDetails = EnumSet.of(BlobListingDetails.METADATA); | ||
|
|
||
| SocketAccess.doPrivilegedVoidException(() -> { | ||
| for (ListBlobItem blobItem : blobContainer.listBlobs(keyPath, false, enumBlobListingDetails, null, client.v2().get())) { | ||
| if (blobItem instanceof CloudBlobDirectory) { | ||
| final URI uri = blobItem.getUri(); | ||
| logger.trace(() -> new ParameterizedMessage("blob url [{}]", uri)); | ||
| // uri.getPath is of the form /container/keyPath.* and we want to strip off the /container/ | ||
| // this requires 1 + container.length() + 1, with each 1 corresponding to one of the /. | ||
| // Lastly, we add the length of keyPath to the offset to strip this container's path. | ||
| final String uriPath = uri.getPath(); | ||
| blobsBuilder.add(uriPath.substring(1 + container.length() + 1 + keyPath.length(), uriPath.length() - 1)); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The comment mentions
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have to strip our own path (that of the blob container as well. I'll amend the comment. |
||
| } | ||
| } | ||
| }); | ||
| return Set.copyOf(blobsBuilder); | ||
| } | ||
|
|
||
| public void writeBlob(String account, String container, String blobName, InputStream inputStream, long blobSize, | ||
| boolean failIfAlreadyExists) | ||
| throws URISyntaxException, StorageException, FileAlreadyExistsException { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| /* | ||
| * Licensed to Elasticsearch under one or more contributor | ||
| * license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright | ||
| * ownership. Elasticsearch licenses this file to you under | ||
| * the Apache License, Version 2.0 (the "License"); you may | ||
| * not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, | ||
| * software distributed under the License is distributed on an | ||
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
| * KIND, either express or implied. See the License for the | ||
| * specific language governing permissions and limitations | ||
| * under the License. | ||
| */ | ||
| package org.elasticsearch.repositories.hdfs; | ||
|
|
||
| import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; | ||
| import org.elasticsearch.action.support.master.AcknowledgedResponse; | ||
| import org.elasticsearch.bootstrap.JavaVersion; | ||
| import org.elasticsearch.common.settings.MockSecureSettings; | ||
| import org.elasticsearch.common.settings.SecureSettings; | ||
| import org.elasticsearch.common.settings.Settings; | ||
| import org.elasticsearch.plugins.Plugin; | ||
| import org.elasticsearch.repositories.AbstractThirdPartyRepositoryTestCase; | ||
|
|
||
| import java.util.Collection; | ||
|
|
||
| import static org.hamcrest.Matchers.equalTo; | ||
|
|
||
| @ThreadLeakFilters(filters = HdfsClientThreadLeakFilter.class) | ||
| public class HdfsRepositoryTests extends AbstractThirdPartyRepositoryTestCase { | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Even though it's not technically third party, I figured it would be good to have this running for HDFS as well right? |
||
|
|
||
| @Override | ||
| protected Collection<Class<? extends Plugin>> getPlugins() { | ||
| return pluginList(HdfsPlugin.class); | ||
| } | ||
|
|
||
| @Override | ||
| protected SecureSettings credentials() { | ||
| return new MockSecureSettings(); | ||
| } | ||
|
|
||
| @Override | ||
| protected void createRepository(String repoName) { | ||
| assumeFalse("https://github.com/elastic/elasticsearch/issues/31498", JavaVersion.current().equals(JavaVersion.parse("11"))); | ||
| AcknowledgedResponse putRepositoryResponse = client().admin().cluster().preparePutRepository(repoName) | ||
| .setType("hdfs") | ||
| .setSettings(Settings.builder() | ||
| .put("uri", "hdfs:///") | ||
| .put("conf.fs.AbstractFileSystem.hdfs.impl", TestingFs.class.getName()) | ||
| .put("path", "foo") | ||
| .put("chunk_size", randomIntBetween(100, 1000) + "k") | ||
| .put("compress", randomBoolean()) | ||
| ).get(); | ||
| assertThat(putRepositoryResponse.isAcknowledged(), equalTo(true)); | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -164,7 +164,7 @@ if (useFixture) { | |
| def minioAddress = { | ||
| int minioPort = postProcessFixture.ext."test.fixtures.minio-fixture.tcp.9000" | ||
| assert minioPort > 0 | ||
| return 'http://127.0.0.1:' + minioPort | ||
| 'http://127.0.0.1:' + minioPort | ||
| } | ||
|
|
||
| File minioAddressFile = new File(project.buildDir, 'generated-resources/s3Fixture.address') | ||
|
|
@@ -173,15 +173,13 @@ if (useFixture) { | |
| // and pass its name instead. | ||
| task writeMinioAddress { | ||
| dependsOn tasks.bundlePlugin, tasks.postProcessFixture | ||
| outputs.file(minioAddressFile) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding these outputs and inputs was a mistake, repeated runs against minio fail because of them since we need to update the file when repeatedly starting Minio. |
||
| doLast { | ||
| file(minioAddressFile).text = "${ -> minioAddress.call() }" | ||
| } | ||
| } | ||
|
|
||
| thirdPartyTest { | ||
| dependsOn writeMinioAddress | ||
| inputs.file(minioAddressFile) | ||
| systemProperty 'test.s3.endpoint', minioAddressFile.name | ||
| } | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.