diff --git a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/S3WarehouseSqliteCatalogBase.java b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/S3WarehouseSqliteCatalogBase.java index 821a52a0bce..fd9ad8bd848 100644 --- a/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/S3WarehouseSqliteCatalogBase.java +++ b/extensions/iceberg/s3/src/test/java/io/deephaven/iceberg/util/S3WarehouseSqliteCatalogBase.java @@ -3,22 +3,42 @@ // package io.deephaven.iceberg.util; +import io.deephaven.engine.table.Table; +import io.deephaven.engine.util.TableTools; import io.deephaven.extensions.s3.S3Instructions; +import io.deephaven.iceberg.base.IcebergUtils; import io.deephaven.iceberg.junit5.SqliteCatalogBase; +import io.deephaven.iceberg.sqlite.SqliteHelper; import org.apache.iceberg.CatalogProperties; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; import org.apache.iceberg.aws.s3.S3FileIO; import org.junit.jupiter.api.TestInfo; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.Test; +import org.apache.iceberg.catalog.TableIdentifier; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; +import software.amazon.awssdk.services.s3.model.HeadBucketRequest; +import software.amazon.awssdk.services.s3.model.NoSuchBucketException; import java.nio.file.Path; +import java.util.HashMap; +import java.util.List; import java.util.Locale; import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import static io.deephaven.engine.testutil.TstUtils.assertTableEquals; +import static io.deephaven.engine.util.TableTools.doubleCol; +import static io.deephaven.engine.util.TableTools.intCol; import static io.deephaven.extensions.s3.testlib.S3Helper.TIMEOUT_SECONDS; +import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri; +import static io.deephaven.iceberg.base.IcebergUtils.locationUri; +import static org.assertj.core.api.Assertions.assertThat; abstract class S3WarehouseSqliteCatalogBase extends SqliteCatalogBase { @@ -32,17 +52,131 @@ public final Object dataInstructions() { } @Override - protected IcebergCatalogAdapter catalogAdapter(TestInfo testInfo, Path rootDir, Map properties) + protected IcebergCatalogAdapter catalogAdapter( + final TestInfo testInfo, + final Path rootDir, + final Map properties) + throws ExecutionException, InterruptedException, TimeoutException { + return catalogAdapterForScheme(testInfo, properties, "s3"); + } + + private IcebergCatalogAdapter catalogAdapterForScheme( + final TestInfo testInfo, + final Map properties, + final String scheme) throws ExecutionException, InterruptedException, TimeoutException { final String methodName = testInfo.getTestMethod().orElseThrow().getName(); final String catalogName = methodName + "-catalog"; final String bucket = methodName.toLowerCase(Locale.US) + "-bucket"; try (final S3AsyncClient client = s3AsyncClient()) { - client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()) - .get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + if (!doesBucketExist(client, bucket)) { + client.createBucket(CreateBucketRequest.builder().bucket(bucket).build()) + .get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + } } - properties.put(CatalogProperties.WAREHOUSE_LOCATION, "s3://" + bucket + "/warehouse"); + properties.put(CatalogProperties.WAREHOUSE_LOCATION, scheme + "://" + bucket + "/warehouse"); properties.put(CatalogProperties.FILE_IO_IMPL, S3FileIO.class.getName()); return IcebergToolsS3.createAdapter(catalogName, properties, Map.of(), s3Instructions()); } + + private boolean doesBucketExist(final S3AsyncClient client, final String bucketName) + throws ExecutionException, InterruptedException, TimeoutException { + try { + client.headBucket(HeadBucketRequest.builder().bucket(bucketName).build()) + .get(TIMEOUT_SECONDS, TimeUnit.SECONDS); + return true; + } catch (ExecutionException e) { + if (e.getCause() instanceof NoSuchBucketException) { + return false; + } + throw e; + } + } + + @Test + void testIcebergTablesWithS3AScheme(TestInfo testInfo, @TempDir Path rootDir) + throws ExecutionException, InterruptedException, TimeoutException { + testIcebergTablesWithCustomScheme("s3a", testInfo, rootDir); + } + + @Test + void testIcebergTablesWithS3NScheme(TestInfo testInfo, @TempDir Path rootDir) + throws ExecutionException, InterruptedException, TimeoutException { + testIcebergTablesWithCustomScheme("s3n", testInfo, rootDir); + } + + private void testIcebergTablesWithCustomScheme(final String scheme, TestInfo testInfo, @TempDir Path rootDir) + throws ExecutionException, InterruptedException, TimeoutException { + final Map properties = new HashMap<>(); + SqliteHelper.setJdbcCatalogProperties(properties, rootDir); + final IcebergCatalogAdapter catalogAdapter = catalogAdapterForScheme(testInfo, properties, scheme); + + final TableIdentifier tableIdentifier = TableIdentifier.parse("MyNamespace.MyTable"); + + final Table data = TableTools.newTable( + intCol("intCol", 2, 4, 6, 8, 10), + doubleCol("doubleCol", 2.5, 5.0, 7.5, 10.0, 12.5)); + + // Create a new iceberg table + final IcebergTableAdapter tableAdapter = catalogAdapter.createTable(tableIdentifier, data.getDefinition()); + final org.apache.iceberg.Table icebergTable = tableAdapter.icebergTable(); + + // Verify that the table location has the right scheme + assertThat(locationUri(icebergTable).getScheme()).isEqualTo(scheme); + + // Add data to the table + final IcebergTableWriter tableWriter = tableAdapter.tableWriter(writerOptionsBuilder() + .tableDefinition(data.getDefinition()) + .build()); + tableWriter.append(IcebergWriteInstructions.builder() + .addTables(data, data) + .build()); + + // Verify all data files have the right scheme + final List dataFiles = IcebergUtils.allDataFiles(icebergTable, icebergTable.currentSnapshot()) + .collect(Collectors.toList()); + assertThat(dataFiles).hasSize(2); + assertThat(dataFiles).allMatch(dataFile -> dataFileUri(icebergTable, dataFile).getScheme().equals(scheme)); + + // Verify the data is correct + Table fromIceberg = tableAdapter.table(); + Table expected = TableTools.merge(data, data); + assertTableEquals(expected, fromIceberg); + + // Create a new data file but with s3 scheme + final DataFile existingDataFile = dataFiles.get(0); + final String existingDataFileLocation = existingDataFile.location(); + assertThat(existingDataFileLocation).startsWith(scheme); + final String newLocation = existingDataFileLocation.replace(scheme + "://", "s3://"); + final DataFile newDataFile = DataFiles.builder(icebergTable.spec()) + .withPath(newLocation) + .withFormat(existingDataFile.format()) + .withRecordCount(existingDataFile.recordCount()) + .withFileSizeInBytes(existingDataFile.fileSizeInBytes()) + .build(); + + // Append the new data files to the table + icebergTable.newAppend().appendFile(newDataFile).commit(); + + // Verify the new data file has the right scheme + final List newDataFiles = IcebergUtils.allDataFiles(icebergTable, icebergTable.currentSnapshot()) + .collect(Collectors.toList()); + int s3DataFiles = 0; + int nonS3DataFiles = 0; + for (final DataFile dataFile : newDataFiles) { + if (dataFileUri(icebergTable, dataFile).getScheme().equals(scheme)) { + nonS3DataFiles++; + } else { + assertThat(dataFileUri(icebergTable, dataFile).getScheme()).isEqualTo("s3"); + s3DataFiles++; + } + } + assertThat(s3DataFiles).isEqualTo(1); + assertThat(nonS3DataFiles).isEqualTo(2); + + // Verify the data is correct + fromIceberg = tableAdapter.table(); + expected = TableTools.merge(expected, data); + assertTableEquals(expected, fromIceberg); + } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java index 06ad3bbbfe2..fadd1a7062d 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/base/IcebergUtils.java @@ -3,9 +3,11 @@ // package io.deephaven.iceberg.base; +import io.deephaven.base.FileUtils; import io.deephaven.engine.table.ColumnDefinition; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; +import io.deephaven.iceberg.relative.RelativeFileIO; import io.deephaven.iceberg.util.IcebergReadInstructions; import org.apache.iceberg.DataFile; import org.apache.iceberg.ManifestContent; @@ -21,6 +23,7 @@ import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NamespaceNotEmptyException; +import org.apache.iceberg.io.FileIO; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.jetbrains.annotations.NotNull; @@ -29,6 +32,7 @@ import java.io.IOException; import java.io.UncheckedIOException; import java.math.BigDecimal; +import java.net.URI; import java.time.Instant; import java.time.LocalDate; import java.time.LocalDateTime; @@ -127,6 +131,18 @@ public static Stream toStream(final org.apache.iceberg.io.CloseableIterab }); } + private static String path(String path, FileIO io) { + return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; + } + + public static URI locationUri(Table table) { + return FileUtils.convertToURI(path(table.location(), table.io()), true); + } + + public static URI dataFileUri(Table table, DataFile dataFile) { + return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false); + } + /** * Convert an Iceberg data type to a Deephaven type. * diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java index b5feea6f80a..b1e6ce12c1f 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergBaseLayout.java @@ -3,14 +3,12 @@ // package io.deephaven.iceberg.layout; -import io.deephaven.base.FileUtils; import io.deephaven.engine.table.TableDefinition; import io.deephaven.engine.table.impl.locations.TableDataException; import io.deephaven.engine.table.impl.locations.impl.TableLocationKeyFinder; import io.deephaven.iceberg.base.IcebergUtils; import io.deephaven.iceberg.location.IcebergTableLocationKey; import io.deephaven.iceberg.location.IcebergTableParquetLocationKey; -import io.deephaven.iceberg.relative.RelativeFileIO; import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.parquet.table.ParquetInstructions; @@ -20,19 +18,20 @@ import org.apache.iceberg.*; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; -import org.apache.iceberg.io.FileIO; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; import java.net.URI; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.UUID; import java.util.function.Consumer; import java.util.stream.Stream; import static io.deephaven.iceberg.base.IcebergUtils.allManifestFiles; +import static io.deephaven.iceberg.base.IcebergUtils.dataFileUri; +import static io.deephaven.iceberg.base.IcebergUtils.locationUri; public abstract class IcebergBaseLayout implements TableLocationKeyFinder { /** @@ -40,6 +39,16 @@ public abstract class IcebergBaseLayout implements TableLocationKeyFinder uriSchemeTochannelsProviders; /** @@ -100,7 +109,8 @@ protected IcebergTableLocationKey locationKey( @NotNull final ManifestFile manifestFile, @NotNull final DataFile dataFile, @NotNull final URI fileUri, - @Nullable final Map> partitions) { + @Nullable final Map> partitions, + @NotNull final SeekableChannelsProvider channelsProvider) { final org.apache.iceberg.FileFormat format = dataFile.format(); if (format == org.apache.iceberg.FileFormat.PARQUET) { return new IcebergTableParquetLocationKey(catalogName, tableUuid, tableIdentifier, manifestFile, dataFile, @@ -119,6 +129,8 @@ public IcebergBaseLayout( @NotNull final IcebergReadInstructions instructions, @NotNull final DataInstructionsProviderLoader dataInstructionsProvider) { this.tableAdapter = tableAdapter; + this.instructions = instructions; + this.dataInstructionsProvider = dataInstructionsProvider; { UUID uuid; try { @@ -158,22 +170,26 @@ public IcebergBaseLayout( } this.parquetInstructions = builder.build(); } - this.channelsProvider = SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions); - } - abstract IcebergTableLocationKey keyFromDataFile(ManifestFile manifestFile, DataFile dataFile, URI fileUri); - - private static String path(String path, FileIO io) { - return io instanceof RelativeFileIO ? ((RelativeFileIO) io).absoluteLocation(path) : path; + uriSchemeTochannelsProviders = new HashMap<>(); + uriSchemeTochannelsProviders.put(uriScheme, + SeekableChannelsProviderLoader.getInstance().load(uriScheme, specialInstructions)); } - private static URI locationUri(Table table) { - return FileUtils.convertToURI(path(table.location(), table.io()), true); + private SeekableChannelsProvider getChannelsProvider(final String scheme) { + return uriSchemeTochannelsProviders.computeIfAbsent(scheme, + scheme2 -> { + final Object specialInstructions = instructions.dataInstructions() + .orElseGet(() -> dataInstructionsProvider.load(scheme2)); + return SeekableChannelsProviderLoader.getInstance().load(scheme2, specialInstructions); + }); } - private static URI dataFileUri(Table table, DataFile dataFile) { - return FileUtils.convertToURI(path(dataFile.path().toString(), table.io()), false); - } + abstract IcebergTableLocationKey keyFromDataFile( + ManifestFile manifestFile, + DataFile dataFile, + URI fileUri, + SeekableChannelsProvider channelsProvider); @Override public synchronized void findKeys(@NotNull final Consumer locationKeyObserver) { @@ -187,13 +203,8 @@ public synchronized void findKeys(@NotNull final Consumer { final URI fileUri = dataFileUri(table, dataFile); - if (!uriScheme.equals(fileUri.getScheme())) { - throw new TableDataException(String.format( - "%s:%d - multiple URI schemes are not currently supported. uriScheme=%s, " + - "fileUri=%s", - table, snapshot.snapshotId(), uriScheme, fileUri)); - } - return keyFromDataFile(manifestFile, dataFile, fileUri); + return keyFromDataFile(manifestFile, dataFile, fileUri, + getChannelsProvider(fileUri.getScheme())); }) .forEach(locationKeyObserver); }); diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java index 6957de21b87..f4ebc8cc4db 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergFlatLayout.java @@ -8,6 +8,7 @@ import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; import io.deephaven.iceberg.util.IcebergTableAdapter; +import io.deephaven.util.channel.SeekableChannelsProvider; import org.apache.iceberg.*; import org.jetbrains.annotations.NotNull; @@ -38,7 +39,8 @@ public String toString() { IcebergTableLocationKey keyFromDataFile( @NotNull final ManifestFile manifestFile, @NotNull final DataFile dataFile, - @NotNull final URI fileUri) { - return locationKey(manifestFile, dataFile, fileUri, null); + @NotNull final URI fileUri, + @NotNull final SeekableChannelsProvider channelsProvider) { + return locationKey(manifestFile, dataFile, fileUri, null, channelsProvider); } } diff --git a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java index 9c60799a654..33a331ba65d 100644 --- a/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java +++ b/extensions/iceberg/src/main/java/io/deephaven/iceberg/layout/IcebergKeyValuePartitionedLayout.java @@ -10,6 +10,7 @@ import io.deephaven.iceberg.util.IcebergReadInstructions; import io.deephaven.iceberg.util.IcebergTableAdapter; import io.deephaven.iceberg.internal.DataInstructionsProviderLoader; +import io.deephaven.util.channel.SeekableChannelsProvider; import io.deephaven.util.type.TypeUtils; import org.apache.iceberg.*; import org.apache.iceberg.data.IdentityPartitionConverters; @@ -82,7 +83,8 @@ public String toString() { IcebergTableLocationKey keyFromDataFile( @NotNull final ManifestFile manifestFile, @NotNull final DataFile dataFile, - @NotNull final URI fileUri) { + @NotNull final URI fileUri, + @NotNull final SeekableChannelsProvider channelsProvider) { final Map> partitions = new LinkedHashMap<>(); final PartitionData partitionData = (PartitionData) dataFile.partition(); @@ -103,6 +105,6 @@ IcebergTableLocationKey keyFromDataFile( } partitions.put(colName, (Comparable) colValue); } - return locationKey(manifestFile, dataFile, fileUri, partitions); + return locationKey(manifestFile, dataFile, fileUri, partitions, channelsProvider); } } diff --git a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java index 62c61d7aa5b..b1b1062ae05 100644 --- a/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java +++ b/extensions/iceberg/src/test/java/io/deephaven/iceberg/junit5/SqliteCatalogBase.java @@ -35,7 +35,6 @@ import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.types.Types; import org.apache.parquet.hadoop.metadata.ParquetMetadata; -import org.apache.parquet.schema.LogicalTypeAnnotation; import org.apache.parquet.schema.MessageType; import org.jetbrains.annotations.Nullable; import org.junit.jupiter.api.AfterEach; @@ -88,7 +87,7 @@ void tearDown() throws Exception { engineCleanup.tearDown(); } - private TableParquetWriterOptions.Builder writerOptionsBuilder() { + protected TableParquetWriterOptions.Builder writerOptionsBuilder() { final TableParquetWriterOptions.Builder builder = TableParquetWriterOptions.builder(); final Object dataInstructions; if ((dataInstructions = dataInstructions()) != null) { @@ -633,7 +632,7 @@ void writeDataFilesBasicTest() { verifyDataFiles(tableIdentifier, List.of(source, anotherSource, moreData)); { - // Verify thaty we read the data files in the correct order + // Verify that we read the data files in the correct order final Table fromIceberg = tableAdapter.table(); assertTableEquals(TableTools.merge(moreData, source, anotherSource), fromIceberg); } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProvider.java index ea8e75844c3..9c338d37eb3 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/GCSSeekableChannelProvider.java @@ -1,72 +1,19 @@ // // Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending // +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit S3ASeekableChannelProvider and run "./gradlew replicateChannelProviders" to regenerate +// +// @formatter:off package io.deephaven.extensions.s3; -import io.deephaven.internal.log.LoggerFactory; -import io.deephaven.io.logger.Logger; -import io.deephaven.util.channel.CompletableOutputStream; -import io.deephaven.util.channel.SeekableChannelContext; import org.jetbrains.annotations.NotNull; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.channels.SeekableByteChannel; -import java.util.stream.Stream; - import static io.deephaven.extensions.s3.GCSSeekableChannelProviderPlugin.GCS_URI_SCHEME; -import static io.deephaven.extensions.s3.S3SeekableChannelProviderPlugin.S3_URI_SCHEME; -final class GCSSeekableChannelProvider extends S3SeekableChannelProvider { - - private static final Logger log = LoggerFactory.getLogger(GCSSeekableChannelProvider.class); +final class GCSSeekableChannelProvider extends UriToS3SeekableChannelProvider { GCSSeekableChannelProvider(@NotNull final S3Instructions s3Instructions) { - super(s3Instructions); - } - - @Override - public boolean exists(@NotNull final URI uri) { - return super.exists(gcsToS3Uri(uri)); - } - - @Override - public SeekableByteChannel getReadChannel( - @NotNull final SeekableChannelContext channelContext, - @NotNull final URI uri) { - return super.getReadChannel(channelContext, gcsToS3Uri(uri)); - } - - @Override - public CompletableOutputStream getOutputStream(@NotNull final URI uri, final int bufferSizeHint) { - return super.getOutputStream(gcsToS3Uri(uri), bufferSizeHint); - } - - @Override - public Stream list(@NotNull final URI directory) { - if (log.isDebugEnabled()) { - log.debug().append("Fetching child URIs for directory: ").append(directory.toString()).endl(); - } - return createStream(gcsToS3Uri(directory), false, GCS_URI_SCHEME); - } - - @Override - public Stream walk(@NotNull final URI directory) { - if (log.isDebugEnabled()) { - log.debug().append("Performing recursive traversal from directory: ").append(directory.toString()).endl(); - } - return createStream(gcsToS3Uri(directory), true, GCS_URI_SCHEME); - } - - private static URI gcsToS3Uri(@NotNull final URI uri) { - try { - if (S3_URI_SCHEME.equals(uri.getScheme())) { - return uri; - } - return new URI(S3_URI_SCHEME, uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(), - uri.getQuery(), uri.getFragment()); - } catch (final URISyntaxException e) { - throw new IllegalArgumentException("Failed to convert GCS URI " + uri + " to s3 URI", e); - } + super(s3Instructions, GCS_URI_SCHEME); } } diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ASeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ASeekableChannelProvider.java new file mode 100644 index 00000000000..4b0de5306a2 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ASeekableChannelProvider.java @@ -0,0 +1,15 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import org.jetbrains.annotations.NotNull; + +import static io.deephaven.extensions.s3.S3ASeekableChannelProviderPlugin.S3A_URI_SCHEME; + +final class S3ASeekableChannelProvider extends UriToS3SeekableChannelProvider { + + S3ASeekableChannelProvider(@NotNull final S3Instructions s3Instructions) { + super(s3Instructions, S3A_URI_SCHEME); + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ASeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ASeekableChannelProviderPlugin.java new file mode 100644 index 00000000000..b8a03fe3454 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3ASeekableChannelProviderPlugin.java @@ -0,0 +1,36 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import com.google.auto.service.AutoService; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderPlugin; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * {@link SeekableChannelsProviderPlugin} implementation used for reading from and writing to URIs with schema "s3a". + */ +@AutoService(SeekableChannelsProviderPlugin.class) +public final class S3ASeekableChannelProviderPlugin implements SeekableChannelsProviderPlugin { + + static final String S3A_URI_SCHEME = "s3a"; + + @Override + public boolean isCompatible(@NotNull final String uriScheme, @Nullable final Object config) { + return S3A_URI_SCHEME.equals(uriScheme); + } + + @Override + public SeekableChannelsProvider createProvider(@NotNull final String uriScheme, @Nullable final Object config) { + if (!isCompatible(uriScheme, config)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri scheme " + uriScheme); + } + if (config != null && !(config instanceof S3Instructions)) { + throw new IllegalArgumentException("Only S3Instructions are valid when reading files from S3, provided " + + "config instance of class " + config.getClass().getName()); + } + return new S3ASeekableChannelProvider(config == null ? S3Instructions.DEFAULT : (S3Instructions) config); + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3NSeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3NSeekableChannelProvider.java new file mode 100644 index 00000000000..0b1a5e4c011 --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3NSeekableChannelProvider.java @@ -0,0 +1,19 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit S3ASeekableChannelProvider and run "./gradlew replicateChannelProviders" to regenerate +// +// @formatter:off +package io.deephaven.extensions.s3; + +import org.jetbrains.annotations.NotNull; + +import static io.deephaven.extensions.s3.S3NSeekableChannelProviderPlugin.S3N_URI_SCHEME; + +final class S3NSeekableChannelProvider extends UriToS3SeekableChannelProvider { + + S3NSeekableChannelProvider(@NotNull final S3Instructions s3Instructions) { + super(s3Instructions, S3N_URI_SCHEME); + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3NSeekableChannelProviderPlugin.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3NSeekableChannelProviderPlugin.java new file mode 100644 index 00000000000..a31a8fd6a7b --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3NSeekableChannelProviderPlugin.java @@ -0,0 +1,40 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +// ****** AUTO-GENERATED CLASS - DO NOT EDIT MANUALLY +// ****** Edit S3ASeekableChannelProviderPlugin and run "./gradlew replicateChannelProviders" to regenerate +// +// @formatter:off +package io.deephaven.extensions.s3; + +import com.google.auto.service.AutoService; +import io.deephaven.util.channel.SeekableChannelsProvider; +import io.deephaven.util.channel.SeekableChannelsProviderPlugin; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +/** + * {@link SeekableChannelsProviderPlugin} implementation used for reading from and writing to URIs with schema "s3n". + */ +@AutoService(SeekableChannelsProviderPlugin.class) +public final class S3NSeekableChannelProviderPlugin implements SeekableChannelsProviderPlugin { + + static final String S3N_URI_SCHEME = "s3n"; + + @Override + public boolean isCompatible(@NotNull final String uriScheme, @Nullable final Object config) { + return S3N_URI_SCHEME.equals(uriScheme); + } + + @Override + public SeekableChannelsProvider createProvider(@NotNull final String uriScheme, @Nullable final Object config) { + if (!isCompatible(uriScheme, config)) { + throw new IllegalArgumentException("Arguments not compatible, provided uri scheme " + uriScheme); + } + if (config != null && !(config instanceof S3Instructions)) { + throw new IllegalArgumentException("Only S3Instructions are valid when reading files from S3, provided " + + "config instance of class " + config.getClass().getName()); + } + return new S3NSeekableChannelProvider(config == null ? S3Instructions.DEFAULT : (S3Instructions) config); + } +} diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java index da25ba8f60a..f0a117cc8e4 100644 --- a/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/S3SeekableChannelProvider.java @@ -73,11 +73,22 @@ class S3SeekableChannelProvider implements SeekableChannelsProvider { private volatile SoftReference> fileSizeCacheRef; + /** + * The scheme to apply to the children URIs in the returned stream. + */ + private final String childScheme; + + S3SeekableChannelProvider(@NotNull final S3Instructions s3Instructions) { + this(s3Instructions, S3_URI_SCHEME); + } + + S3SeekableChannelProvider(@NotNull final S3Instructions s3Instructions, @NotNull final String childScheme) { this.s3AsyncClient = S3ClientFactory.getAsyncClient(s3Instructions); this.s3Instructions = s3Instructions; this.sharedCache = new S3RequestCache(s3Instructions.fragmentSize()); this.fileSizeCacheRef = new SoftReference<>(new KeyedObjectHashMap<>(FileSizeInfo.URI_MATCH_KEY)); + this.childScheme = childScheme; } @Override @@ -141,7 +152,7 @@ public Stream list(@NotNull final URI directory) { if (log.isDebugEnabled()) { log.debug().append("Fetching child URIs for directory: ").append(directory.toString()).endl(); } - return createStream(directory, false, S3_URI_SCHEME); + return createStream(directory, false); } @Override @@ -149,7 +160,7 @@ public Stream walk(@NotNull final URI directory) { if (log.isDebugEnabled()) { log.debug().append("Performing recursive traversal from directory: ").append(directory.toString()).endl(); } - return createStream(directory, true, S3_URI_SCHEME); + return createStream(directory, true); } /** @@ -157,12 +168,10 @@ public Stream walk(@NotNull final URI directory) { * * @param directory The parent directory to list. * @param isRecursive Whether to list the entries recursively. - * @param childScheme The scheme to apply to the children URIs in the returned stream. */ Stream createStream( @NotNull final URI directory, - final boolean isRecursive, - @NotNull final String childScheme) { + final boolean isRecursive) { // The following iterator fetches URIs from S3 in batches and creates a stream final Iterator iterator = new Iterator<>() { private final String bucketName; diff --git a/extensions/s3/src/main/java/io/deephaven/extensions/s3/UriToS3SeekableChannelProvider.java b/extensions/s3/src/main/java/io/deephaven/extensions/s3/UriToS3SeekableChannelProvider.java new file mode 100644 index 00000000000..204df27338b --- /dev/null +++ b/extensions/s3/src/main/java/io/deephaven/extensions/s3/UriToS3SeekableChannelProvider.java @@ -0,0 +1,75 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.extensions.s3; + +import io.deephaven.internal.log.LoggerFactory; +import io.deephaven.io.logger.Logger; +import io.deephaven.util.channel.CompletableOutputStream; +import io.deephaven.util.channel.SeekableChannelContext; +import org.jetbrains.annotations.NotNull; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.channels.SeekableByteChannel; +import java.util.stream.Stream; + +import static io.deephaven.extensions.s3.S3SeekableChannelProviderPlugin.S3_URI_SCHEME; + +/** + * Base class for all seekable channel providers with URI schemes that can be derived from S3, like S3A, S3N, etc. + */ +class UriToS3SeekableChannelProvider extends S3SeekableChannelProvider { + + private static final Logger log = LoggerFactory.getLogger(UriToS3SeekableChannelProvider.class); + + UriToS3SeekableChannelProvider(@NotNull final S3Instructions s3Instructions, + @NotNull final String childScheme) { + super(s3Instructions, childScheme); + } + + @Override + public boolean exists(@NotNull final URI uri) { + return super.exists(toS3Uri(uri)); + } + + @Override + public SeekableByteChannel getReadChannel( + @NotNull final SeekableChannelContext channelContext, + @NotNull final URI uri) { + return super.getReadChannel(channelContext, toS3Uri(uri)); + } + + @Override + public CompletableOutputStream getOutputStream(@NotNull final URI uri, final int bufferSizeHint) { + return super.getOutputStream(toS3Uri(uri), bufferSizeHint); + } + + @Override + public Stream list(@NotNull final URI directory) { + if (log.isDebugEnabled()) { + log.debug().append("Fetching child URIs for directory: ").append(directory.toString()).endl(); + } + return createStream(toS3Uri(directory), false); + } + + @Override + public Stream walk(@NotNull final URI directory) { + if (log.isDebugEnabled()) { + log.debug().append("Performing recursive traversal from directory: ").append(directory.toString()).endl(); + } + return createStream(toS3Uri(directory), true); + } + + private static URI toS3Uri(@NotNull final URI uri) { + try { + if (S3_URI_SCHEME.equals(uri.getScheme())) { + return uri; + } + return new URI(S3_URI_SCHEME, uri.getUserInfo(), uri.getHost(), uri.getPort(), uri.getPath(), + uri.getQuery(), uri.getFragment()); + } catch (final URISyntaxException e) { + throw new IllegalArgumentException("Failed to convert URI " + uri + " to s3 URI", e); + } + } +} diff --git a/replication/static/build.gradle b/replication/static/build.gradle index 57be19681e5..bf237bb9f83 100644 --- a/replication/static/build.gradle +++ b/replication/static/build.gradle @@ -90,6 +90,8 @@ task replicateAllSafe { dependsOn Tasks.registerMainExecTask(project, 'replicateColumnStats', 'io.deephaven.replicators.ReplicateColumnStats') dependsOn Tasks.registerMainExecTask(project, 'replicateCachingSupplier', 'io.deephaven.replicators.ReplicateCachingSupplier') + + dependsOn Tasks.registerMainExecTask(project, 'replicateChannelsProviders', 'io.deephaven.replicators.ReplicateChannelProviders') } // These replicators need manual fix-up post replication and should not be run without supervision diff --git a/replication/static/src/main/java/io/deephaven/replicators/ReplicateChannelProviders.java b/replication/static/src/main/java/io/deephaven/replicators/ReplicateChannelProviders.java new file mode 100644 index 00000000000..ebc6adf3a34 --- /dev/null +++ b/replication/static/src/main/java/io/deephaven/replicators/ReplicateChannelProviders.java @@ -0,0 +1,37 @@ +// +// Copyright (c) 2016-2025 Deephaven Data Labs and Patent Pending +// +package io.deephaven.replicators; + +import java.io.IOException; + +import static io.deephaven.replication.ReplicatePrimitiveCode.replaceAll; + +public class ReplicateChannelProviders { + private static final String TASK = "replicateChannelProviders"; + private static final String[] NO_EXCEPTIONS = new String[0]; + + private static final String BASE_CHANNEL_PROVIDER_DIR = + "extensions/s3/src/main/java/io/deephaven/extensions/s3/"; + private static final String S3A_CHANNEL_PROVIDER_PATH = + BASE_CHANNEL_PROVIDER_DIR + "S3ASeekableChannelProvider.java"; + private static final String S3A_CHANNEL_PROVIDER_PLUGIN_PATH = + BASE_CHANNEL_PROVIDER_DIR + "S3ASeekableChannelProviderPlugin.java"; + + public static void main(String... args) throws IOException { + // S3A -> GCS + String[][] pairs = new String[][] { + {"S3A", "GCS"}, + {"s3a", "gcs"}, + }; + replaceAll(TASK, S3A_CHANNEL_PROVIDER_PATH, null, NO_EXCEPTIONS, pairs); + + // S3A -> S3N + pairs = new String[][] { + {"S3A", "S3N"}, + {"s3a", "s3n"}, + }; + replaceAll(TASK, S3A_CHANNEL_PROVIDER_PATH, null, NO_EXCEPTIONS, pairs); + replaceAll(TASK, S3A_CHANNEL_PROVIDER_PLUGIN_PATH, null, NO_EXCEPTIONS, pairs); + } +}