diff --git a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index 16227c90f6..1e15aac091 100644 --- a/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.3/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path +import org.apache.parquet.ParquetReadOptions import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.broadcast.Broadcast @@ -66,23 +66,14 @@ object GeoParquetMetadataPartitionReaderFactory { configuration: Configuration, partitionedFile: PartitionedFile, readDataSchema: StructType): Iterator[InternalRow] = { - val reader = ParquetFileReader - .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration)) + val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath, configuration) + val inputStream = inputFile.newStream() - try { - readFile(configuration, partitionedFile, readDataSchema, reader) - } finally { - reader.close() - } - } + val footer = ParquetFileReader + .readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream) - private def readFile( - configuration: Configuration, - partitionedFile: PartitionedFile, - readDataSchema: StructType, - reader: ParquetFileReader): Iterator[InternalRow] = { - val filePath = partitionedFile.filePath - val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData + val filePath = partitionedFile.toPath.toString + val metadata = footer.getFileMetaData.getKeyValueMetaData val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { case Some(geo) => val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) => diff --git a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index 6d548deb43..e4ca35992b 100644 --- a/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.4/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata import org.apache.hadoop.conf.Configuration +import org.apache.parquet.ParquetReadOptions import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.broadcast.Broadcast @@ -62,28 +63,18 @@ case class GeoParquetMetadataPartitionReaderFactory( } object GeoParquetMetadataPartitionReaderFactory { - private def readFile( configuration: Configuration, partitionedFile: PartitionedFile, readDataSchema: StructType): Iterator[InternalRow] = { - val reader = ParquetFileReader - .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration)) + val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath, configuration) + val inputStream = inputFile.newStream() - try { - readFile(configuration, partitionedFile, readDataSchema, reader) - } finally { - reader.close() - } - } + val footer = ParquetFileReader + .readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream) - private def readFile( - configuration: Configuration, - partitionedFile: PartitionedFile, - readDataSchema: StructType, - reader: ParquetFileReader): Iterator[InternalRow] = { val filePath = partitionedFile.toPath.toString - val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData + val metadata = footer.getFileMetaData.getKeyValueMetaData val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { case Some(geo) => val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) => diff --git a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala index f7e2f56f7e..e1234e79d8 100644 --- a/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala +++ b/spark/spark-3.5/src/main/scala/org/apache/spark/sql/execution/datasources/v2/geoparquet/metadata/GeoParquetMetadataPartitionReaderFactory.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata import org.apache.hadoop.conf.Configuration +import org.apache.parquet.ParquetReadOptions import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.hadoop.util.HadoopInputFile import org.apache.spark.broadcast.Broadcast @@ -62,29 +63,19 @@ case class GeoParquetMetadataPartitionReaderFactory( } object GeoParquetMetadataPartitionReaderFactory { - private def readFile( configuration: Configuration, partitionedFile: PartitionedFile, readDataSchema: StructType): Iterator[InternalRow] = { - val reader = ParquetFileReader - .open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration)) - try { - readFile(configuration, partitionedFile, readDataSchema, reader) - } finally { - reader.close() - } - } + val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath, configuration) + val inputStream = inputFile.newStream() - private def readFile( - configuration: Configuration, - partitionedFile: PartitionedFile, - readDataSchema: StructType, - reader: ParquetFileReader): Iterator[InternalRow] = { - val filePath = partitionedFile.toPath.toString + val footer = ParquetFileReader + .readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream) - val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData + val filePath = partitionedFile.toPath.toString + val metadata = footer.getFileMetaData.getKeyValueMetaData val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match { case Some(geo) => val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>