Skip to content

Commit

Permalink
Adding max rows.
Browse files Browse the repository at this point in the history
  • Loading branch information
miguelbranco80 committed Jul 3, 2024
1 parent 5f22150 commit 996fc43
Show file tree
Hide file tree
Showing 5 changed files with 40 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ final case class ProgramEnvironment(
scopes: Set[String],
options: Map[String, String],
maybeTraceId: Option[String] = None,
jdbcUrl: Option[String] = None
jdbcUrl: Option[String] = None,
maxRows: Option[Long] = None
)
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ import java.util.Base64
import scala.annotation.tailrec
import scala.util.control.NonFatal

final class Rql2CsvWriter(os: OutputStream, lineSeparator: String) extends Closeable {
final class Rql2CsvWriter(os: OutputStream, lineSeparator: String, maxRows: Option[Long]) extends Closeable {

final private val gen =
try {
Expand Down Expand Up @@ -88,6 +88,7 @@ final class Rql2CsvWriter(os: OutputStream, lineSeparator: String) extends Close
write(v, t.cloneAndRemoveProp(nullable).asInstanceOf[Rql2TypeWithProperties])
}
} else {
var rowsWritten = 0L
t match {
case Rql2IterableType(recordType: Rql2RecordType, _) =>
val columnNames = recordType.atts.map(_.idn)
Expand All @@ -100,6 +101,11 @@ final class Rql2CsvWriter(os: OutputStream, lineSeparator: String) extends Close
while (iterator.hasIteratorNextElement) {
val next = iterator.getIteratorNextElement
writeColumns(next, recordType)
rowsWritten += 1
// If maxRows is defined and we have written enough rows, stop writing.
if (maxRows.exists(rowsWritten >= _)) {
return
}
}
case Rql2ListType(recordType: Rql2RecordType, _) =>
val columnNames = recordType.atts.map(_.idn)
Expand All @@ -112,6 +118,11 @@ final class Rql2CsvWriter(os: OutputStream, lineSeparator: String) extends Close
for (i <- 0L until size) {
val next = v.getArrayElement(i)
writeColumns(next, recordType)
rowsWritten += 1
// If maxRows is defined and we have written enough rows, stop writing.
if (maxRows.exists(rowsWritten >= _)) {
return
}
}
case _ => throw new IOException("unsupported type")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.time.format.DateTimeFormatter
import java.util.Base64
import scala.util.control.NonFatal

final class Rql2JsonWriter(os: OutputStream) extends Closeable {
final class Rql2JsonWriter(os: OutputStream, maxRows: Option[Long]) extends Closeable {

final private val gen =
try {
Expand All @@ -45,26 +45,26 @@ final class Rql2JsonWriter(os: OutputStream) extends Closeable {
if (v.isException) {
v.throwException()
} else {
writeValue(v, t.cloneAndRemoveProp(tryable).asInstanceOf[Rql2TypeWithProperties])
writeValue(v, t.cloneAndRemoveProp(tryable).asInstanceOf[Rql2TypeWithProperties], maxRows)
}
} else {
writeValue(v, t.cloneAndRemoveProp(tryable).asInstanceOf[Rql2TypeWithProperties])
writeValue(v, t.cloneAndRemoveProp(tryable).asInstanceOf[Rql2TypeWithProperties], maxRows)
}
}

@throws[IOException]
private def writeValue(v: Value, t: Rql2TypeWithProperties): Unit = {
private def writeValue(v: Value, t: Rql2TypeWithProperties, maxRows: Option[Long]): Unit = {
if (t.props.contains(tryable)) {
if (v.isException) {
try {
v.throwException()
} catch {
case NonFatal(ex) => gen.writeString(ex.getMessage)
}
} else writeValue(v, t.cloneAndRemoveProp(tryable).asInstanceOf[Rql2TypeWithProperties])
} else writeValue(v, t.cloneAndRemoveProp(tryable).asInstanceOf[Rql2TypeWithProperties], maxRows = None)
} else if (t.props.contains(nullable)) {
if (v.isNull) gen.writeNull()
else writeValue(v, t.cloneAndRemoveProp(nullable).asInstanceOf[Rql2TypeWithProperties])
else writeValue(v, t.cloneAndRemoveProp(nullable).asInstanceOf[Rql2TypeWithProperties], maxRows = None)
} else t match {
case _: Rql2BinaryType =>
val bytes = (0L until v.getBufferSize).map(v.readBufferByte)
Expand Down Expand Up @@ -112,34 +112,46 @@ final class Rql2JsonWriter(os: OutputStream) extends Closeable {
val field = distincted.get(i)
gen.writeFieldName(field)
val a = v.getMember(field)
writeValue(a, atts(i).tipe.asInstanceOf[Rql2TypeWithProperties])
writeValue(a, atts(i).tipe.asInstanceOf[Rql2TypeWithProperties], maxRows = None)
}
gen.writeEndObject()
case Rql2IterableType(innerType, _) =>
var rowsWritten = 0L
val iterator = v.getIterator
gen.writeStartArray()
while (iterator.hasIteratorNextElement) {
val next = iterator.getIteratorNextElement
writeValue(next, innerType.asInstanceOf[Rql2TypeWithProperties])
writeValue(next, innerType.asInstanceOf[Rql2TypeWithProperties], maxRows = None)
rowsWritten += 1
// If maxRows is defined and we have written enough rows, stop writing.
if (maxRows.exists(rowsWritten >= _)) {
return
}
}
gen.writeEndArray()
case Rql2ListType(innerType, _) =>
var rowsWritten = 0L
val size = v.getArraySize
gen.writeStartArray()
for (i <- 0L until size) {
val next = v.getArrayElement(i)
writeValue(next, innerType.asInstanceOf[Rql2TypeWithProperties])
writeValue(next, innerType.asInstanceOf[Rql2TypeWithProperties], maxRows = None)
rowsWritten += 1
// If maxRows is defined and we have written enough rows, stop writing.
if (maxRows.exists(rowsWritten >= _)) {
return
}
}
gen.writeEndArray()
case Rql2OrType(tipes, _) if tipes.exists(Rql2TypeUtils.getProps(_).nonEmpty) =>
// A trick to make sur inner types do not have properties
val inners = tipes.map { case inner: Rql2TypeWithProperties => Rql2TypeUtils.resetProps(inner, Set.empty) }
val orProps = tipes.flatMap { case inner: Rql2TypeWithProperties => inner.props }.toSet
writeValue(v, Rql2OrType(inners, orProps))
writeValue(v, Rql2OrType(inners, orProps), maxRows = None)
case Rql2OrType(tipes, _) =>
val index = v.invokeMember("getIndex").asInt()
val actualValue = v.invokeMember("getValue")
writeValue(actualValue, tipes(index).asInstanceOf[Rql2TypeWithProperties])
writeValue(actualValue, tipes(index).asInstanceOf[Rql2TypeWithProperties], maxRows = None)

case _ => throw new RuntimeException("unsupported type")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ class Rql2TruffleCompilerService(engineDefinition: (Engine, Boolean))(implicit p
case _ => programContext.settings.config.getBoolean("raw.compiler.windows-line-ending")
}
val lineSeparator = if (windowsLineEnding) "\r\n" else "\n"
val w = new Rql2CsvWriter(outputStream, lineSeparator)
val w = new Rql2CsvWriter(outputStream, lineSeparator, environment.maxRows)
try {
w.write(v, tipe.asInstanceOf[Rql2TypeWithProperties])
w.flush()
Expand All @@ -293,7 +293,7 @@ class Rql2TruffleCompilerService(engineDefinition: (Engine, Boolean))(implicit p
if (!JsonPackage.outputWriteSupport(tipe)) {
return ExecutionRuntimeFailure("unsupported type")
}
val w = new Rql2JsonWriter(outputStream)
val w = new Rql2JsonWriter(outputStream, environment.maxRows)
try {
w.write(v, tipe.asInstanceOf[Rql2TypeWithProperties])
w.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public RawContext(RawLanguage language, Env env) {
Option<Tuple2<String, RawValue>[]> maybeArguments = Option.empty();
this.programEnvironment =
new ProgramEnvironment(
this.user, maybeArguments, scalaScopes, scalaOptions, maybeTraceId, Option.empty());
this.user, maybeArguments, scalaScopes, scalaOptions, maybeTraceId, Option.empty(), Option.empty());

// The function registry holds snapi methods (top level functions). It is the data
// structure that is used to extract a ref to a function from a piece of execute snapi.
Expand Down

0 comments on commit 996fc43

Please sign in to comment.