Skip to content

Commit

Permalink
Fix issue with not closing parquet files.
Browse files Browse the repository at this point in the history
  • Loading branch information
Imbruced committed Jan 7, 2025
1 parent 5dd127a commit 7439505
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,23 @@ object GeoParquetMetadataPartitionReaderFactory {
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
val reader = ParquetFileReader
.open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))

try {
readFile(configuration, partitionedFile, readDataSchema, reader)
} finally {
reader.close()
}
}

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType,
reader: ParquetFileReader): Iterator[InternalRow] = {
val filePath = partitionedFile.filePath
val metadata = ParquetFileReader
.open(HadoopInputFile.fromPath(new Path(filePath), configuration))
.getFooter
.getFileMetaData
.getKeyValueMetaData
val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,28 @@ case class GeoParquetMetadataPartitionReaderFactory(
}

object GeoParquetMetadataPartitionReaderFactory {

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
val filePath = partitionedFile.toPath.toString
val metadata = ParquetFileReader
val reader = ParquetFileReader
.open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
.getFooter
.getFileMetaData
.getKeyValueMetaData

try {
readFile(configuration, partitionedFile, readDataSchema, reader)
} finally {
reader.close()
}
}

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType,
reader: ParquetFileReader): Iterator[InternalRow] = {
val filePath = partitionedFile.toPath.toString
val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,29 @@ case class GeoParquetMetadataPartitionReaderFactory(
}

object GeoParquetMetadataPartitionReaderFactory {

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
val filePath = partitionedFile.toPath.toString
val metadata = ParquetFileReader
val reader = ParquetFileReader
.open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
.getFooter
.getFileMetaData
.getKeyValueMetaData

try {
readFile(configuration, partitionedFile, readDataSchema, reader)
} finally {
reader.close()
}
}

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType,
reader: ParquetFileReader): Iterator[InternalRow] = {
val filePath = partitionedFile.toPath.toString

val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>
Expand Down

0 comments on commit 7439505

Please sign in to comment.