Skip to content

Commit

Permalink
Removing withErrorHanding from inferrers
Browse files Browse the repository at this point in the history
  • Loading branch information
torcato committed Jan 15, 2024
1 parent be90dd6 commit bed6485
Show file tree
Hide file tree
Showing 5 changed files with 127 additions and 97 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,60 +41,57 @@ class HjsonInferrer(implicit protected val sourceContext: SourceContext)
maybeEncoding: Option[Encoding],
maybeSampleSize: Option[Int]
): TextInputStreamFormatDescriptor = {
withErrorHandling {
val r = getTextBuffer(is, maybeEncoding)
try {
TextInputStreamFormatDescriptor(r.encoding, r.confidence, infer(r.reader, maybeSampleSize))
} finally {
r.reader.close()
}
val r = getTextBuffer(is, maybeEncoding)
try {
TextInputStreamFormatDescriptor(r.encoding, r.confidence, infer(r.reader, maybeSampleSize))
} finally {
r.reader.close()
}

}

def infer(reader: Reader, maybeSampleSize: Option[Int]): TextInputFormatDescriptor = {
withErrorHandling {
try {
val sampleSize = maybeSampleSize.getOrElse(defaultSampleSize)
val nObjs = if (sampleSize <= 0) Int.MaxValue else sampleSize
val it = new TextLineIterator(reader)
try {
val sampleSize = maybeSampleSize.getOrElse(defaultSampleSize)
val nObjs = if (sampleSize <= 0) Int.MaxValue else sampleSize
val it = new TextLineIterator(reader)

val jsonMapper = new ObjectMapper()
parserFeatures.foreach(jsonMapper.enable(_))
var n = 0;
var innerType: SourceType = SourceNothingType()
while (it.hasNext && n < nObjs) {
val line = it.next()
val obj = jsonMapper.readValue(line, classOf[Any])
obj match {
case _: String =>
val split = line.split('"')
if (split.length > 3 || (split.length == 3 && split(2) != "")) {
throw new LocalInferrerException("extra value found after string definition in HJSON")
}
case _ =>
}
innerType = inferType(obj, innerType)
n += 1
val jsonMapper = new ObjectMapper()
parserFeatures.foreach(jsonMapper.enable(_))
var n = 0;
var innerType: SourceType = SourceNothingType()
while (it.hasNext && n < nObjs) {
val line = it.next()
val obj = jsonMapper.readValue(line, classOf[Any])
obj match {
case _: String =>
val split = line.split('"')
if (split.length > 3 || (split.length == 3 && split(2) != "")) {
throw new LocalInferrerException("extra value found after string definition in HJSON")
}
case _ =>
}
innerType = inferType(obj, innerType)
n += 1
}

if (innerType == SourceNothingType()) {
throw new LocalInferrerException("could not get items from HJSON file")
}
if (innerType == SourceNothingType()) {
throw new LocalInferrerException("could not get items from HJSON file")
}

val result = uniquifyTemporalFormats(innerType)
HjsonInputFormatDescriptor(
SourceCollectionType(result.cleanedType, false),
it.hasNext,
result.timeFormat,
result.dateFormat,
result.timestampFormat
)
val result = uniquifyTemporalFormats(innerType)
HjsonInputFormatDescriptor(
SourceCollectionType(result.cleanedType, false),
it.hasNext,
result.timeFormat,
result.dateFormat,
result.timestampFormat
)

} catch {
case ex: JsonProcessingException =>
logger.warn(s"Invalid HJSON.", ex)
throw new LocalInferrerException(s"invalid HJSON: ${ex.getMessage}")
}
} catch {
case ex: JsonProcessingException =>
logger.warn(s"Invalid HJSON.", ex)
throw new LocalInferrerException(s"invalid HJSON: ${ex.getMessage}")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,16 @@ class JsonInferrer(implicit protected val sourceContext: SourceContext)
maybeEncoding: Option[Encoding],
maybeSampleSize: Option[Int]
): TextInputStreamFormatDescriptor = {
withErrorHandling {
val buffer = getTextBuffer(is, maybeEncoding)
try {
val result = infer(buffer.reader, maybeSampleSize)
TextInputStreamFormatDescriptor(buffer.encoding, buffer.confidence, result)
} finally {
buffer.reader.close()
}
val buffer = getTextBuffer(is, maybeEncoding)
try {
val result = infer(buffer.reader, maybeSampleSize)
TextInputStreamFormatDescriptor(buffer.encoding, buffer.confidence, result)
} finally {
buffer.reader.close()
}
}

def infer(reader: Reader, maybeSampleSize: Option[Int]): TextInputFormatDescriptor = {
withErrorHandling {
try {
var nobjs = maybeSampleSize.getOrElse(defaultSampleSize)
// if you define a sample-size < 0 then it will read the full file
Expand Down Expand Up @@ -94,7 +91,7 @@ class JsonInferrer(implicit protected val sourceContext: SourceContext)
logger.warn("Invalid JSON.", ex)
throw new LocalInferrerException(s"invalid JSON: ${ex.getMessage}")
}
}

}

private def nextType(parser: JsonParser, currentType: SourceType): SourceType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,53 +44,49 @@ class TextInferrer(implicit protected val sourceContext: SourceContext)
maybeEncoding: Option[Encoding],
maybeSampleSize: Option[Int]
): TextInputStreamFormatDescriptor = {
withErrorHandling {
val r = getTextBuffer(is, maybeEncoding)
try {
TextInputStreamFormatDescriptor(r.encoding, r.confidence, infer(r.reader, maybeSampleSize))
} finally {
r.reader.close()
}
}
}

def infer(reader: Reader, maybeSampleSize: Option[Int]): TextInputFormatDescriptor = {
withErrorHandling {
var count = 0
val sampleSize = maybeSampleSize.getOrElse(defaultSampleSize)
val nObjs = if (sampleSize <= 0) Int.MaxValue else sampleSize
val it = new TextLineIterator(reader)

val matchers = regexList.map(_.regex.pattern.matcher(""))
val stats = new Array[Double](regexList.length)
java.util.Arrays.fill(stats, 0.0d)

while (it.hasNext && count < nObjs) {
val line = it.next()
count += 1
// loops over each regex in the list and increases the stat if there is a match
matchers.zipWithIndex.foreach {
case (regex, i) =>
regex.reset(line)
if (regex.matches()) stats(i) += 1.0
}
var count = 0
val sampleSize = maybeSampleSize.getOrElse(defaultSampleSize)
val nObjs = if (sampleSize <= 0) Int.MaxValue else sampleSize
val it = new TextLineIterator(reader)

val matchers = regexList.map(_.regex.pattern.matcher(""))
val stats = new Array[Double](regexList.length)
java.util.Arrays.fill(stats, 0.0d)

while (it.hasNext && count < nObjs) {
val line = it.next()
count += 1
// loops over each regex in the list and increases the stat if there is a match
matchers.zipWithIndex.foreach {
case (regex, i) =>
regex.reset(line)
if (regex.matches()) stats(i) += 1.0
}
}

val matches = regexList.zip(stats)
val matches = regexList.zip(stats)

if (count == 0) throw new LocalInferrerException("could not read any line from file")
if (count == 0) throw new LocalInferrerException("could not read any line from file")

val (choice, v) = matches.maxBy(_._2)
val value = v / count
if (value < minMatch) {
val innerType = SourceStringType(false)
val tipe = SourceCollectionType(innerType, false)
LinesInputFormatDescriptor(tipe, None, false)
} else {
val innerType = SourceRecordType(choice.atts.toVector, true)
val tipe = SourceCollectionType(innerType, false)
LinesInputFormatDescriptor(tipe, Some(choice.regex.regex), it.hasNext)
}
val (choice, v) = matches.maxBy(_._2)
val value = v / count
if (value < minMatch) {
val innerType = SourceStringType(false)
val tipe = SourceCollectionType(innerType, false)
LinesInputFormatDescriptor(tipe, None, false)
} else {
val innerType = SourceRecordType(choice.atts.toVector, true)
val tipe = SourceCollectionType(innerType, false)
LinesInputFormatDescriptor(tipe, Some(choice.regex.regex), it.hasNext)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import raw.sources.api._
import raw.sources.bytestream.api.SeekableInputStream

import java.io.Reader
import javax.xml.stream.XMLStreamException

object XmlInferrer {
private val XML_SAMPLE_SIZE = "raw.inferrer.local.xml.sample-size"
Expand All @@ -39,18 +40,17 @@ class XmlInferrer(implicit protected val sourceContext: SourceContext)
maybeEncoding: Option[Encoding],
maybeSampleSize: Option[Int]
): TextInputStreamFormatDescriptor = {
withErrorHandling {
val r = getTextBuffer(is, maybeEncoding)
try {
TextInputStreamFormatDescriptor(r.encoding, r.confidence, infer(r.reader, maybeSampleSize))
} finally {
r.reader.close()
}
val r = getTextBuffer(is, maybeEncoding)
try {
TextInputStreamFormatDescriptor(r.encoding, r.confidence, infer(r.reader, maybeSampleSize))
} finally {
r.reader.close()
}

}

def infer(reader: Reader, maybeSampleSize: Option[Int]): TextInputFormatDescriptor = {
withErrorHandling {
try {
var nobjs = maybeSampleSize.getOrElse(defaultSampleSize)
if (nobjs < 0) {
nobjs = Int.MaxValue
Expand All @@ -68,6 +68,8 @@ class XmlInferrer(implicit protected val sourceContext: SourceContext)
// If multiple top-level XML documents, make it a collection.
val tipe = if (count > 1) SourceCollectionType(result.cleanedType, false) else result.cleanedType
XmlInputFormatDescriptor(tipe, xmlReader.hasNext(), result.timeFormat, result.dateFormat, result.timestampFormat)
} catch {
case ex: XMLStreamException => throw new LocalInferrerException("inference failed unexpectedly", ex)
}
}
}
38 changes: 38 additions & 0 deletions snapi-frontend/src/test/scala/raw/inferrer/local/RD3852.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package raw.inferrer.local

import com.typesafe.scalalogging.StrictLogging
import raw.client.api.LocationDescription
import raw.inferrer.api.{
AutoInferrerProperties,
CsvInputFormatDescriptor,
LinesInputFormatDescriptor,
TextInputStreamFormatDescriptor
}
import raw.sources.api.SourceContext
import raw.sources.filesystem.local.LocalLocationsTestContext
import raw.utils.{RawTestSuite, RawUtils, SettingsTestContext}
import raw.sources.filesystem.local.LocalPath

class RD3852 extends RawTestSuite with SettingsTestContext with StrictLogging with LocalLocationsTestContext {


// CSV is the last in the list in the auto inferrer.
// So inferring this a csv file means that the other inferrers threw correctly
// a LocalInferrerException while trying to parse the file
test("Auto inferring CSV") { _ =>
implicit val sourceContext = new SourceContext(null, null, settings, None)
val inferrer = new LocalInferrerService
val p = RawUtils.getResource("data/students/students.csv")
val l1 = new LocalPath(p)
try {

val TextInputStreamFormatDescriptor(_, _, format) =
inferrer.infer(AutoInferrerProperties(LocationDescription(l1.rawUri), None))
assert(format.isInstanceOf[CsvInputFormatDescriptor])

} finally {
RawUtils.withSuppressNonFatalException(inferrer.stop())
}
}

}

0 comments on commit bed6485

Please sign in to comment.