Skip to content

Commit

Permalink
feat: DH-18664: Add support to read from uri's with scheme "s3a" (#6640)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Mar 3, 2025
1 parent 0e5a012 commit 88e3c6e
Show file tree
Hide file tree
Showing 15 changed files with 443 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,22 +3,42 @@
//
package io.deephaven.iceberg.util;

import io.deephaven.engine.table.Table;
import io.deephaven.engine.util.TableTools;
import io.deephaven.extensions.s3.S3Instructions;
import io.deephaven.iceberg.base.IcebergUtils;
import io.deephaven.iceberg.junit5.SqliteCatalogBase;
import io.deephaven.iceberg.sqlite.SqliteHelper;
import org.apache.iceberg.CatalogProperties;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DataFiles;
import org.apache.iceberg.aws.s3.S3FileIO;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.Test;
import org.apache.iceberg.catalog.TableIdentifier;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CreateBucketRequest;
import software.amazon.awssdk.services.s3.model.HeadBucketRequest;
import software.amazon.awssdk.services.s3.model.NoSuchBucketException;

import java.nio.file.Path;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;

import static io.deephaven.engine.testutil.TstUtils.assertTableEquals;
import static io.deephaven.engine.util.TableTools.doubleCol;
import static io.deephaven.engine.util.TableTools.intCol;
import static io.deephaven.extensions.s3.testlib.S3Helper.TIMEOUT_SECONDS;
import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri;
import static io.deephaven.iceberg.base.IcebergUtils.locationUri;
import static org.assertj.core.api.Assertions.assertThat;

abstract class S3WarehouseSqliteCatalogBase extends SqliteCatalogBase {

Expand All @@ -32,17 +52,131 @@ public final Object dataInstructions() {
}

@Override
protected IcebergCatalogAdapter catalogAdapter(TestInfo testInfo, Path rootDir, Map<String, String> properties)
protected IcebergCatalogAdapter catalogAdapter(
final TestInfo testInfo,
final Path rootDir,
final Map<String, String> properties)
throws ExecutionException, InterruptedException, TimeoutException {
return catalogAdapterForScheme(testInfo, properties, "s3");
}

private IcebergCatalogAdapter catalogAdapterForScheme(
final TestInfo testInfo,
final Map<String, String> properties,
final String scheme)
throws ExecutionException, InterruptedException, TimeoutException {
final String methodName = testInfo.getTestMethod().orElseThrow().getName();
final String catalogName = methodName + "-catalog";
final String bucket = methodName.toLowerCase(Locale.US) + "-bucket";
try (final S3AsyncClient client = s3AsyncClient()) {
client.createBucket(CreateBucketRequest.builder().bucket(bucket).build())
.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
if (!doesBucketExist(client, bucket)) {
client.createBucket(CreateBucketRequest.builder().bucket(bucket).build())
.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
}
}
properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3://" + bucket + "/warehouse");
properties.put(CatalogProperties.WAREHOUSE_LOCATION, scheme + "://" + bucket + "/warehouse");
properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName());
return IcebergToolsS3.createAdapter(catalogName, properties, Map.of(), s3Instructions());
}

private boolean doesBucketExist(final S3AsyncClient client, final String bucketName)
throws ExecutionException, InterruptedException, TimeoutException {
try {
client.headBucket(HeadBucketRequest.builder().bucket(bucketName).build())
.get(TIMEOUT_SECONDS, TimeUnit.SECONDS);
return true;
} catch (ExecutionException e) {
if (e.getCause() instanceof NoSuchBucketException) {
return false;
}
throw e;
}
}

@Test
void testIcebergTablesWithS3AScheme(TestInfo testInfo, @TempDir Path rootDir)
throws ExecutionException, InterruptedException, TimeoutException {
testIcebergTablesWithCustomScheme("s3a", testInfo, rootDir);
}

@Test
void testIcebergTablesWithS3NScheme(TestInfo testInfo, @TempDir Path rootDir)
throws ExecutionException, InterruptedException, TimeoutException {
testIcebergTablesWithCustomScheme("s3n", testInfo, rootDir);
}

private void testIcebergTablesWithCustomScheme(final String scheme, TestInfo testInfo, @TempDir Path rootDir)
throws ExecutionException, InterruptedException, TimeoutException {
final Map<String, String> properties = new HashMap<>();
SqliteHelper.setJdbcCatalogProperties(properties, rootDir);
final IcebergCatalogAdapter catalogAdapter = catalogAdapterForScheme(testInfo, properties, scheme);

final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable");

final Table data = TableTools.newTable(
intCol("intCol", 2, 4, 6, 8, 10),
doubleCol("doubleCol", 2.5, 5.0, 7.5, 10.0, 12.5));

// Create a new iceberg table
final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, data.getDefinition());
final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable();

// Verify that the table location has the right scheme
assertThat(locationUri(icebergTable).getScheme()).isEqualTo(scheme);

// Add data to the table
final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder()
.tableDefinition(data.getDefinition())
.build());
tableWriter.append(IcebergWriteInstructions.builder()
.addTables(data, data)
.build());

// Verify all data files have the right scheme
final List<DataFile> dataFiles = IcebergUtils.allDataFiles(icebergTable, icebergTable.currentSnapshot())
.collect(Collectors.toList());
assertThat(dataFiles).hasSize(2);
assertThat(dataFiles).allMatch(dataFile -> dataFileUri(icebergTable, dataFile).getScheme().equals(scheme));

// Verify the data is correct
Table fromIceberg = tableAdapter.table();
Table expected = TableTools.merge(data, data);
assertTableEquals(expected, fromIceberg);

// Create a new data file but with s3 scheme
final DataFile existingDataFile = dataFiles.get(0);
final String existingDataFileLocation = existingDataFile.location();
assertThat(existingDataFileLocation).startsWith(scheme);
final String newLocation = existingDataFileLocation.replace(scheme + "://", "s3://");
final DataFile newDataFile = DataFiles.builder(icebergTable.spec())
.withPath(newLocation)
.withFormat(existingDataFile.format())
.withRecordCount(existingDataFile.recordCount())
.withFileSizeInBytes(existingDataFile.fileSizeInBytes())
.build();

// Append the new data files to the table
icebergTable.newAppend().appendFile(newDataFile).commit();

// Verify the new data file has the right scheme
final List<DataFile> newDataFiles = IcebergUtils.allDataFiles(icebergTable, icebergTable.currentSnapshot())
.collect(Collectors.toList());
int s3DataFiles = 0;
int nonS3DataFiles = 0;
for (final DataFile dataFile : newDataFiles) {
if (dataFileUri(icebergTable, dataFile).getScheme().equals(scheme)) {
nonS3DataFiles++;
} else {
assertThat(dataFileUri(icebergTable, dataFile).getScheme()).isEqualTo("s3");
s3DataFiles++;
}
}
assertThat(s3DataFiles).isEqualTo(1);
assertThat(nonS3DataFiles).isEqualTo(2);

// Verify the data is correct
fromIceberg = tableAdapter.table();
expected = TableTools.merge(expected, data);
assertTableEquals(expected, fromIceberg);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@
//
package io.deephaven.iceberg.base;

import io.deephaven.base.FileUtils;
import io.deephaven.engine.table.ColumnDefinition;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.iceberg.relative.RelativeFileIO;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ManifestContent;
Expand All @@ -21,6 +23,7 @@
import org.apache.iceberg.catalog.SupportsNamespaces;
import org.apache.iceberg.exceptions.AlreadyExistsException;
import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.jetbrains.annotations.NotNull;
Expand All @@ -29,6 +32,7 @@
import java.io.IOException;
import java.io.UncheckedIOException;
import java.math.BigDecimal;
import java.net.URI;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
Expand Down Expand Up @@ -127,6 +131,18 @@ public static <T> Stream<T> toStream(final org.apache.iceberg.io.CloseableIterab
});
}

private static String path(String path, FileIO io) {
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
}

public static URI locationUri(Table table) {
return FileUtils.convertToURI(path(table.location(), table.io()), true);
}

public static URI dataFileUri(Table table, DataFile dataFile) {
return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false);
}

/**
* Convert an Iceberg data type to a Deephaven type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,12 @@
//
package io.deephaven.iceberg.layout;

import io.deephaven.base.FileUtils;
import io.deephaven.engine.table.TableDefinition;
import io.deephaven.engine.table.impl.locations.TableDataException;
import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder;
import io.deephaven.iceberg.base.IcebergUtils;
import io.deephaven.iceberg.location.IcebergTableLocationKey;
import io.deephaven.iceberg.location.IcebergTableParquetLocationKey;
import io.deephaven.iceberg.relative.RelativeFileIO;
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.parquet.table.ParquetInstructions;
Expand All @@ -20,26 +18,37 @@
import org.apache.iceberg.*;
import org.apache.iceberg.catalog.Catalog;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.io.FileIO;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.net.URI;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.function.Consumer;
import java.util.stream.Stream;

import static io.deephaven.iceberg.base.IcebergUtils.allManifestFiles;
import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri;
import static io.deephaven.iceberg.base.IcebergUtils.locationUri;

public abstract class IcebergBaseLayout implements TableLocationKeyFinder<IcebergTableLocationKey> {
/**
* The {@link IcebergTableAdapter} that will be used to access the table.
*/
final IcebergTableAdapter tableAdapter;

/**
* The instructions for customizations while reading.
*/
final IcebergReadInstructions instructions;

/**
* The instructions for customizations while reading.
*/
final DataInstructionsProviderLoader dataInstructionsProvider;

/**
* The UUID of the table, if available.
*/
Expand Down Expand Up @@ -81,7 +90,7 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder<Iceber
* The {@link SeekableChannelsProvider} object that will be used for {@link IcebergTableParquetLocationKey}
* creation.
*/
private final SeekableChannelsProvider channelsProvider;
private final Map<String, SeekableChannelsProvider> uriSchemeTochannelsProviders;


/**
Expand All @@ -100,7 +109,8 @@ protected IcebergTableLocationKey locationKey(
@NotNull final ManifestFile manifestFile,
@NotNull final DataFile dataFile,
@NotNull final URI fileUri,
@Nullable final Map<String, Comparable<?>> partitions) {
@Nullable final Map<String, Comparable<?>> partitions,
@NotNull final SeekableChannelsProvider channelsProvider) {
final org.apache.iceberg.FileFormat format = dataFile.format();
if (format == org.apache.iceberg.FileFormat.PARQUET) {
return new IcebergTableParquetLocationKey(catalogName, tableUuid, tableIdentifier, manifestFile, dataFile,
Expand All @@ -119,6 +129,8 @@ public IcebergBaseLayout(
@NotNull final IcebergReadInstructions instructions,
@NotNull final DataInstructionsProviderLoader dataInstructionsProvider) {
this.tableAdapter = tableAdapter;
this.instructions = instructions;
this.dataInstructionsProvider = dataInstructionsProvider;
{
UUID uuid;
try {
Expand Down Expand Up @@ -158,22 +170,26 @@ public IcebergBaseLayout(
}
this.parquetInstructions = builder.build();
}
this.channelsProvider = SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions);
}

abstract IcebergTableLocationKey keyFromDataFile(ManifestFile manifestFile, DataFile dataFile, URI fileUri);

private static String path(String path, FileIO io) {
return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path;
uriSchemeTochannelsProviders = new HashMap<>();
uriSchemeTochannelsProviders.put(uriScheme,
SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions));
}

private static URI locationUri(Table table) {
return FileUtils.convertToURI(path(table.location(), table.io()), true);
private SeekableChannelsProvider getChannelsProvider(final String scheme) {
return uriSchemeTochannelsProviders.computeIfAbsent(scheme,
scheme2 -> {
final Object specialInstructions = instructions.dataInstructions()
.orElseGet(() -> dataInstructionsProvider.load(scheme2));
return SeekableChannelsProviderLoader.getInstance().load(scheme2, specialInstructions);
});
}

private static URI dataFileUri(Table table, DataFile dataFile) {
return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false);
}
abstract IcebergTableLocationKey keyFromDataFile(
ManifestFile manifestFile,
DataFile dataFile,
URI fileUri,
SeekableChannelsProvider channelsProvider);

@Override
public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKey> locationKeyObserver) {
Expand All @@ -187,13 +203,8 @@ public synchronized void findKeys(@NotNull final Consumer<IcebergTableLocationKe
IcebergUtils.toStream(reader)
.map(dataFile -> {
final URI fileUri = dataFileUri(table, dataFile);
if (!uriScheme.equals(fileUri.getScheme())) {
throw new TableDataException(String.format(
"%s:%d - multiple URI schemes are not currently supported. uriScheme=%s, " +
"fileUri=%s",
table, snapshot.snapshotId(), uriScheme, fileUri));
}
return keyFromDataFile(manifestFile, dataFile, fileUri);
return keyFromDataFile(manifestFile, dataFile, fileUri,
getChannelsProvider(fileUri.getScheme()));
})
.forEach(locationKeyObserver);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.deephaven.iceberg.util.IcebergReadInstructions;
import io.deephaven.iceberg.internal.DataInstructionsProviderLoader;
import io.deephaven.iceberg.util.IcebergTableAdapter;
import io.deephaven.util.channel.SeekableChannelsProvider;
import org.apache.iceberg.*;
import org.jetbrains.annotations.NotNull;

Expand Down Expand Up @@ -38,7 +39,8 @@ public String toString() {
IcebergTableLocationKey keyFromDataFile(
@NotNull final ManifestFile manifestFile,
@NotNull final DataFile dataFile,
@NotNull final URI fileUri) {
return locationKey(manifestFile, dataFile, fileUri, null);
@NotNull final URI fileUri,
@NotNull final SeekableChannelsProvider channelsProvider) {
return locationKey(manifestFile, dataFile, fileUri, null, channelsProvider);
}
}
Loading

0 comments on commit 88e3c6e

Please sign in to comment.