Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import jakarta.inject.Inject;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.core.SecurityContext;
import org.apache.commons.codec.binary.Base64;
import java.util.Base64;
import org.apache.iceberg.rest.responses.OAuthTokenResponse;
import org.apache.polaris.core.context.CallContext;
import org.apache.polaris.core.context.RealmContext;
Expand Down Expand Up @@ -83,7 +83,8 @@ public Response getToken(
// has previously attempted to refresh an access token, but refreshing was not supported by the
// token broker. Accept the client id and secret and treat it as a new token request
if (authHeader != null && clientSecret == null && authHeader.startsWith("Basic ")) {
String credentials = new String(Base64.decodeBase64(authHeader.substring(6)), UTF_8);
String credentials =
new String(Base64.getDecoder().decode(authHeader.substring(6).getBytes(UTF_8)), UTF_8);
if (!credentials.contains(":")) {
return OAuthUtils.getResponseFromError(OAuthTokenErrorResponse.Error.invalid_request);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@
*/
package org.apache.polaris.service.task;

import java.io.IOException;
import java.util.List;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.stream.StreamSupport;
import org.apache.commons.codec.binary.Base64;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
Expand Down Expand Up @@ -64,7 +62,7 @@ public boolean handleTask(TaskEntity task, CallContext callContext) {
ManifestCleanupTask cleanupTask = task.readData(ManifestCleanupTask.class);
TableIdentifier tableId = cleanupTask.tableId();
try (FileIO authorizedFileIO = fileIOSupplier.apply(task, tableId, callContext)) {
ManifestFile manifestFile = decodeManifestData(cleanupTask.manifestFileData());
ManifestFile manifestFile = TaskUtils.decodeManifestFileData(cleanupTask.manifestFileData());
return cleanUpManifestFile(manifestFile, authorizedFileIO, tableId);
}
}
Expand Down Expand Up @@ -117,15 +115,12 @@ private boolean cleanUpManifestFile(
}
}

private static ManifestFile decodeManifestData(String manifestFileData) {
try {
return ManifestFiles.decode(Base64.decodeBase64(manifestFileData));
} catch (IOException e) {
throw new RuntimeException("Unable to decode base64 encoded manifest", e);
/** Serialized Task data sent from the {@link TableCleanupTaskHandler} */
public record ManifestCleanupTask(TableIdentifier tableId, String manifestFileData) {

static ManifestCleanupTask buildFrom(TableIdentifier tableId, ManifestFile manifestFile) {
String encodedManifestFileData = TaskUtils.encodeManifestFile(manifestFile);
return new ManifestCleanupTask(tableId, encodedManifestFileData);
}
}

/** Serialized Task data sent from the {@link TableCleanupTaskHandler} */
public record ManifestCleanupTask(TableIdentifier tableId, String manifestFileData) {}
;
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,8 @@ private Stream<TaskEntity> getManifestTaskStream(
.setCreateTimestamp(clock.millis())
.withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP)
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableEntity.getTableIdentifier(), TaskUtils.encodeManifestFile(mf)))
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableEntity.getTableIdentifier(), mf))
.setId(metaStoreManager.generateNewEntityId(polarisCallContext).getId())
// copy the internal properties, which will have storage info
.setInternalProperties(cleanupTask.getInternalPropertiesAsMap())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
package org.apache.polaris.service.task;

import java.io.IOException;
import org.apache.commons.codec.binary.Base64;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.exceptions.NotFoundException;
Expand All @@ -46,9 +47,20 @@ public static boolean exists(String path, FileIO fileIO) {
*/
public static String encodeManifestFile(ManifestFile mf) {
try {
return Base64.encodeBase64String(ManifestFiles.encode(mf));
byte[] encodedBytes = ManifestFiles.encode(mf);
return new String(Base64.getEncoder().encode(encodedBytes), StandardCharsets.UTF_8);
} catch (IOException e) {
throw new RuntimeException("Unable to encode binary data in memory", e);
}
}

public static ManifestFile decodeManifestFileData(String manifestFileData) {
try {
byte[] decodedBytes =
Base64.getDecoder().decode(manifestFileData.getBytes(StandardCharsets.UTF_8));
return ManifestFiles.decode(decodedBytes);
} catch (IOException e) {
throw new RuntimeException("Unable to decode base64 encoded manifest", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,7 @@
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Base64;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.catalog.Namespace;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.inmemory.InMemoryFileIO;
Expand Down Expand Up @@ -83,8 +81,8 @@ public void testCleanupFileNotExists() throws IOException {
new TaskEntity.Builder()
.withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP)
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile))))
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile))
.setName(UUID.randomUUID().toString())
.build();
task = addTaskLocation(task);
Expand All @@ -107,8 +105,8 @@ public void testCleanupFileManifestExistsDataFilesDontExist() throws IOException
new TaskEntity.Builder()
.withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP)
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile))))
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile))
.setName(UUID.randomUUID().toString())
.build();
task = addTaskLocation(task);
Expand Down Expand Up @@ -146,8 +144,8 @@ public void close() {
new TaskEntity.Builder()
.withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP)
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile))))
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile))
.setName(UUID.randomUUID().toString())
.build();
task = addTaskLocation(task);
Expand Down Expand Up @@ -201,8 +199,8 @@ public void deleteFile(String location) {
new TaskEntity.Builder()
.withTaskType(AsyncTaskType.MANIFEST_FILE_CLEANUP)
.withData(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier, Base64.encodeBase64String(ManifestFiles.encode(manifestFile))))
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile))
.setName(UUID.randomUUID().toString())
.build();
task = addTaskLocation(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,7 @@
import java.time.Clock;
import java.util.List;
import java.util.UUID;
import org.apache.commons.codec.binary.Base64;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.PartitionStatisticsFile;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.StatisticsFile;
Expand Down Expand Up @@ -136,9 +134,8 @@ public void testTableCleanup() throws IOException {
.extracting(TaskEntity::of)
.returns(AsyncTaskType.MANIFEST_FILE_CLEANUP, TaskEntity::getTaskType)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
Expand Down Expand Up @@ -290,9 +287,8 @@ public void close() {
.extracting(TaskEntity::of)
.returns(AsyncTaskType.MANIFEST_FILE_CLEANUP, TaskEntity::getTaskType)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
Expand All @@ -302,9 +298,8 @@ public void close() {
.extracting(TaskEntity::of)
.returns(AsyncTaskType.MANIFEST_FILE_CLEANUP, TaskEntity::getTaskType)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
Expand Down Expand Up @@ -421,9 +416,8 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
.extracting(TaskEntity::of)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile1))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile1),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
Expand All @@ -432,9 +426,8 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
.extracting(TaskEntity::of)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile2))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile2),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
Expand All @@ -443,9 +436,8 @@ public void testTableCleanupMultipleSnapshots() throws IOException {
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
.extracting(TaskEntity::of)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile3))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile3),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
Expand Down Expand Up @@ -592,9 +584,8 @@ public void testTableCleanupMultipleMetadata() throws IOException {
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
.extracting(TaskEntity::of)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile1))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile1),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
Expand All @@ -603,9 +594,8 @@ public void testTableCleanupMultipleMetadata() throws IOException {
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
.extracting(TaskEntity::of)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile2))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile2),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)),
Expand All @@ -614,9 +604,8 @@ public void testTableCleanupMultipleMetadata() throws IOException {
.returns(PolarisEntityType.TASK.getCode(), PolarisBaseEntity::getTypeCode)
.extracting(TaskEntity::of)
.returns(
new ManifestFileCleanupTaskHandler.ManifestCleanupTask(
tableIdentifier,
Base64.encodeBase64String(ManifestFiles.encode(manifestFile3))),
ManifestFileCleanupTaskHandler.ManifestCleanupTask.buildFrom(
tableIdentifier, manifestFile3),
entity ->
entity.readData(
ManifestFileCleanupTaskHandler.ManifestCleanupTask.class)));
Expand Down