Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
alexzerntev committed Mar 17, 2024
1 parent 41c6c47 commit bd29f6c
Show file tree
Hide file tree
Showing 32 changed files with 860 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,36 @@ import raw.compiler.rql2.tests.CompilerTestContext
trait BenchmarkTests extends CompilerTestContext {

property("raw.training-wheels", "false")
property("raw.runtime.external.disk-block-max-size", "500MB")

// testing if the code is running
// test(
// """let
// | lineitemsType = type collection(record(l_orderkey: int, l_payedammount: double)),
// | customerType = type record(customer: collection(record(c_custkey: string))),
// | ordersType = type collection(record(o_orderkey: int, o_custkey: int)),
// | lineitems = PostgreSQL.Query(
// | "postgres",
// | "select l_orderkey, (l_extendedprice * (1 - l_discount)) as l_payedammount from tpch1.lineitem",
// | lineitemsType,
// | host = "localhost:44444",
// | username = "postgres",
// | password = "1234"
// | ),
// | customers = Json.Read("file:///home/ld/workspace/TPCH/1GB/customer.json", customerType), // an object with an array inside
// | orders = Csv.Read("file:///home/ld/workspace/TPCH/1GB/orders.csv", ordersType, delimiter = "\t"),
// | customersOrders = Collection.EquiJoin(customers.customer, orders, (c) -> Int.From(c.c_custkey), (o) -> o.o_custkey),
// | customerOrdersItems = Collection.EquiJoin(
// | customersOrders,
// | lineitems,
// | (co) -> Int.From(co.o_orderkey),
// | (oi) -> oi.l_orderkey
// | ),
// | grouped = Collection.GroupBy(customerOrdersItems, (c) -> c.c_custkey),
// | result = Collection.Transform(grouped, (g) -> {id: g, total_payed: Collection.Sum(g.group.l_payedammount)}),
// | finalResult = Collection.Filter(result, (r) -> r.total_payed > 6000000)
// |in
// | Collection.Count(finalResult)""".stripMargin
// )(_ should evaluateTo("9L"))
// test(
// """let
// | lineitemsType = type collection(record(l_orderkey: int, l_payedammount: double)),
// | customerType = type record(customer: collection(record(c_custkey: string))),
// | ordersType = type collection(record(o_orderkey: int, o_custkey: int)),
// | lineitems = PostgreSQL.Query(
// | "postgres",
// | "select l_orderkey, (l_extendedprice * (1 - l_discount)) as l_payedammount from tpch1.lineitem",
// | lineitemsType,
// | host = "localhost:44444",
// | username = "postgres",
// | password = "1234"
// | ),
// | customers = Json.Read("file:///Users/alexzerntev/workspace/TPCH/1GB/customer.json", customerType), // an object with an array inside
// | orders = Csv.Read("file:///Users/alexzerntev/workspace/TPCH/1GB/orders.csv", ordersType, delimiter = "\t"),
// | customersOrders = Collection.EquiJoin(customers.customer, orders, (c) -> Int.From(c.c_custkey), (o) -> o.o_custkey),
// | customerOrdersItems = Collection.EquiJoin(
// | customersOrders,
// | lineitems,
// | (co) -> Int.From(co.o_orderkey),
// | (oi) -> oi.l_orderkey
// | ),
// | grouped = Collection.GroupBy(customerOrdersItems, (c) -> c.c_custkey),
// | result = Collection.Transform(grouped, (g) -> {id: g, total_payed: Collection.Sum(g.group.l_payedammount)}),
// | finalResult = Collection.Filter(result, (r) -> r.total_payed > 6000000)
// |in
// | Collection.Count(finalResult)""".stripMargin
// )(_ should evaluateTo("9L"))

test("Debugging") { _ =>
assume(false, "This test is disabled by default")
Expand Down Expand Up @@ -106,8 +105,8 @@ trait BenchmarkTests extends CompilerTestContext {
| username = "postgres",
| password = "1234"
| ),
| customers = Json.Read("file:///home/ld/workspace/TPCH/1GB/customer.json", customerType), // an object with an array inside
| orders = Csv.Read("file:///home/ld/workspace/TPCH/1GB/orders.csv", ordersType, delimiter = "\t"),
| customers = Json.Read("file:///Users/alexzerntev/workspace/TPCH/1GB/customer.json", customerType), // an object with an array inside
| orders = Csv.Read("file:///Users/alexzerntev/workspace/TPCH/1GB/orders.csv", ordersType, delimiter = "\t"),
| customersOrders = Collection.EquiJoin(customers.customer, orders, (c) -> Int.From(c.c_custkey), (o) -> o.o_custkey),
| customerOrdersItems = Collection.EquiJoin(
| customersOrders,
Expand Down Expand Up @@ -152,7 +151,7 @@ trait BenchmarkTests extends CompilerTestContext {
}

test("Range Join File with db test tpch10") { _ =>
assume(false, "This test is disabled by default")
assume(true, "This test is disabled by default")

val prog = """let
| lineitemsType = type collection(record(l_orderkey: int, l_payedammount: double)),
Expand All @@ -166,8 +165,8 @@ trait BenchmarkTests extends CompilerTestContext {
| username = "postgres",
| password = "1234"
| ),
| customers = Json.Read("file:///home/ld/workspace/TPCH/10GB/customer.json", customerType), // an object with an array inside
| orders = Csv.Read("file:///home/ld/workspace/TPCH/10GB/orders.csv", ordersType, delimiter = "\t"),
| customers = Json.Read("file:///Users/alexzerntev/workspace/TPCH/10GB/customer.json", customerType), // an object with an array inside
| orders = Csv.Read("file:///Users/alexzerntev/workspace/TPCH/10GB/orders.csv", ordersType, delimiter = "|"),
| customersOrders = Collection.EquiJoin(customers.customer, orders, (c) -> Int.From(c.c_custkey), (o) -> o.o_custkey),
| customerOrdersItems = Collection.EquiJoin(
| customersOrders,
Expand All @@ -186,7 +185,7 @@ trait BenchmarkTests extends CompilerTestContext {
val elapsedIn = System.currentTimeMillis()
logger.info("++++++++++ First run: " + (elapsedIn - startedIn))

val numberOfRuns = 2
val numberOfRuns = 0

val values = Array.fill(numberOfRuns)(0L)

Expand All @@ -200,7 +199,7 @@ trait BenchmarkTests extends CompilerTestContext {
logger.info("++++++++++ Next run: " + values(i))
}

val mean = (values.sum) / numberOfRuns
val mean = (values.sum) / 1

var standardDeviation = 0.0
for (num <- values) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,21 @@ trait Rql2TruffleCompilerServiceTestContext extends Rql2CompilerServiceTestConte
val options = new java.util.HashMap[String, String]()
options.put("rql.settings", settings.renderAsString)
// //diagnostics
// options.put("engine.CompilationStatistics", "true")
// // options.put("engine.CompilationStatisticDetails", "true")
// // options.put("compiler.InstrumentBoundaries", "true")
// options.put("engine.CompilationFailureAction", "Diagnose")
// options.put("engine.TracePerformanceWarnings", "all")
// options.put("engine.TraceCompilation", "true")
// options.put("engine.TraceSplitting", "true")
// options.put("engine.TraceDeoptimizeFrame", "true")
// options.put("engine.TraceTransferToInterpreter", "true")
// options.put("engine.TraceCompilationPolymorphism", "true")
// options.put("engine.TraceSplittingSummary", "true")
// // options.put("engine.TraceCompilationDetails", "true")
options.put("engine.CompilationStatistics", "true")
// options.put("engine.CompilationStatisticDetails", "true")
// options.put("compiler.InstrumentBoundaries", "true")
options.put("engine.CompilationFailureAction", "Diagnose")
options.put("engine.TracePerformanceWarnings", "all")
options.put("engine.TraceCompilation", "true")
options.put("engine.TraceSplitting", "true")
options.put("engine.TraceDeoptimizeFrame", "true")
options.put("engine.TraceTransferToInterpreter", "true")
options.put("engine.TraceCompilationPolymorphism", "true")
options.put("engine.TraceSplittingSummary", "true")
// options.put("engine.TraceCompilationDetails", "true")
// options.put("engine.CompileImmediately", "true")
// options.put("engine.BackgroundCompilation", "false")
// options.put("engine.SpecializationStatistics", "false")
options.put("engine.BackgroundCompilation", "false")
options.put("engine.SpecializationStatistics", "false")

// // optimizations
// options.put("compiler.InlineAcrossTruffleBoundary", "true")
Expand Down
1 change: 1 addition & 0 deletions snapi-truffle/src/main/java/module-info.java
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,5 @@
exports raw.runtime.truffle.runtime.exceptions.validation;
exports raw.compiler.snapi.truffle.compiler;
exports raw.compiler.rql2output.truffle.builtin;
exports raw.runtime.truffle.runtime.generator.collection.off_heap_generator;
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,19 +37,36 @@ protected int getShouldContinueSlot(VirtualFrame frame) {
return AuxiliarySlots.getShouldContinueSlot(frame.getFrameDescriptor());
}

protected int getOffHeapFlushSlot(VirtualFrame frame) {
return AuxiliarySlots.getOffHeapFlushSlot(frame.getFrameDescriptor());
}

protected int getKryoOutputSlot(VirtualFrame frame) {
return AuxiliarySlots.getKryoOutputSlot(frame.getFrameDescriptor());
}

protected int getIteratorSlot(VirtualFrame frame) {
return AuxiliarySlots.getIteratorSlot(frame.getFrameDescriptor());
}

@Specialization
protected Object doDistinct(
VirtualFrame frame,
Object iterable,
@Cached(value = "getComputeNextSlot(frame)", neverDefault = false) int generatorSlot,
@Cached(value = "getShouldContinueSlot(frame)", neverDefault = true)
int offHeapDistinctSlot) {
@Cached(value = "getShouldContinueSlot(frame)", neverDefault = true) int offHeapDistinctSlot,
@Cached(value = "getOffHeapFlushSlot(frame)", neverDefault = true) int offHeapFlushSlot,
@Cached(value = "getKryoOutputSlot(frame)", neverDefault = true) int kryoOutputSlot,
@Cached(value = "getIteratorSlot(frame)", neverDefault = true) int iteratorSlot) {
return new DistinctCollection(
iterable,
getValueType(),
RawContext.get(this).getSourceContext(),
frame.materialize(),
generatorSlot,
offHeapDistinctSlot);
offHeapDistinctSlot,
kryoOutputSlot,
iteratorSlot,
offHeapFlushSlot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@ public class CollectionEquiJoinNode extends ExpressionNode {
@CompilerDirectives.CompilationFinal private int generatorSlot = -1;
@CompilerDirectives.CompilationFinal private int keyFunctionSlot = -1;
@CompilerDirectives.CompilationFinal private int mapSlot = -1;
@CompilerDirectives.CompilationFinal private int kryoOutputSlot = -1;
@CompilerDirectives.CompilationFinal private int iteratorSlot = -1;
@CompilerDirectives.CompilationFinal private int offHeapFlushSlot = -1;

public CollectionEquiJoinNode(
ExpressionNode left,
Expand Down Expand Up @@ -66,6 +69,9 @@ private void setSlots(VirtualFrame frame) {
generatorSlot = getGeneratorSlot(frame.getFrameDescriptor());
keyFunctionSlot = getFunctionSlot(frame.getFrameDescriptor());
mapSlot = getMapSlot(frame.getFrameDescriptor());
kryoOutputSlot = getKryoOutputSlot(frame.getFrameDescriptor());
iteratorSlot = getIteratorSlot(frame.getFrameDescriptor());
offHeapFlushSlot = getOffHeapFlushSlot(frame.getFrameDescriptor());
}

@Override
Expand Down Expand Up @@ -94,6 +100,9 @@ public Object executeGeneric(VirtualFrame frame) {
shouldContinueSlot,
generatorSlot,
keyFunctionSlot,
mapSlot);
mapSlot,
kryoOutputSlot,
iteratorSlot,
offHeapFlushSlot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,29 @@ protected int getMapSlot(VirtualFrame frame) {
return AuxiliarySlots.getMapSlot(frame.getFrameDescriptor());
}

protected int getOffHeapFlushSlot(VirtualFrame frame) {
return AuxiliarySlots.getOffHeapFlushSlot(frame.getFrameDescriptor());
}

protected int getKryoOutputSlot(VirtualFrame frame) {
return AuxiliarySlots.getKryoOutputSlot(frame.getFrameDescriptor());
}

protected int getIteratorSlot(VirtualFrame frame) {
return AuxiliarySlots.getIteratorSlot(frame.getFrameDescriptor());
}

@Specialization
protected Object doGroup(
VirtualFrame frame,
Object iterable,
Object keyFun,
@Cached(value = "getGeneratorSlot(frame)", neverDefault = false) int generatorSlot,
@Cached(value = "getFunctionSlot(frame)", neverDefault = true) int keyFunctionSlot,
@Cached(value = "getMapSlot(frame)", neverDefault = true) int mapSlot) {
@Cached(value = "getMapSlot(frame)", neverDefault = true) int mapSlot,
@Cached(value = "getOffHeapFlushSlot(frame)", neverDefault = true) int offHeapFlushSlot,
@Cached(value = "getKryoOutputSlot(frame)", neverDefault = true) int kryoOutputSlot,
@Cached(value = "getIteratorSlot(frame)", neverDefault = true) int iteratorSlot) {
return new GroupByCollection(
iterable,
keyFun,
Expand All @@ -63,6 +78,9 @@ protected Object doGroup(
frame.materialize(),
generatorSlot,
keyFunctionSlot,
mapSlot);
mapSlot,
kryoOutputSlot,
iteratorSlot,
offHeapFlushSlot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@ public class CollectionOrderByNode extends ExpressionNode {
@CompilerDirectives.CompilationFinal private int collectionSlot = -1;
@CompilerDirectives.CompilationFinal private int offHeapGroupByKeysSlot = -1;

@CompilerDirectives.CompilationFinal private int kryoOutputSlot = -1;

@CompilerDirectives.CompilationFinal private int iteratorSlot = -1;

@CompilerDirectives.CompilationFinal private int offHeapFlushSlot = -1;

public CollectionOrderByNode(
ExpressionNode input,
ExpressionNode[] keyFuns,
Expand Down Expand Up @@ -74,6 +80,9 @@ public Object executeGeneric(VirtualFrame frame) {
generatorSlot = getGeneratorSlot(frame.getFrameDescriptor());
collectionSlot = getCollectionSlot(frame.getFrameDescriptor());
offHeapGroupByKeysSlot = getOffHeapGroupByKeysSlot(frame.getFrameDescriptor());
kryoOutputSlot = getKryoOutputSlot(frame.getFrameDescriptor());
iteratorSlot = getIteratorSlot(frame.getFrameDescriptor());
offHeapFlushSlot = getOffHeapFlushSlot(frame.getFrameDescriptor());
}

return new OrderByCollection(
Expand All @@ -86,6 +95,9 @@ public Object executeGeneric(VirtualFrame frame) {
frame.materialize(),
generatorSlot,
collectionSlot,
offHeapGroupByKeysSlot);
offHeapGroupByKeysSlot,
kryoOutputSlot,
iteratorSlot,
offHeapFlushSlot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

package raw.runtime.truffle.ast.expressions.iterable.list;

import static raw.runtime.truffle.ast.osr.AuxiliarySlots.*;

import com.oracle.truffle.api.Truffle;
import com.oracle.truffle.api.frame.VirtualFrame;
import com.oracle.truffle.api.nodes.LoopNode;
Expand Down Expand Up @@ -71,6 +73,11 @@ public class ListGroupByNode extends ExpressionNode {
private final int keyFunctionSlot;
private final int mapSlot;
private final int listSlot;
private int kryoOutputSlot = -1;

private int iteratorSlot = -1;

private int offHeapFlushSlot = -1;

public ListGroupByNode(
ExpressionNode inputNode,
Expand Down Expand Up @@ -109,13 +116,29 @@ public ListGroupByNode(

@Override
public Object executeGeneric(VirtualFrame frame) {

if (kryoOutputSlot == -1) {
kryoOutputSlot = getKryoOutputSlot(frame.getFrameDescriptor());
iteratorSlot = getIteratorSlot(frame.getFrameDescriptor());
offHeapFlushSlot = getOffHeapFlushSlot(frame.getFrameDescriptor());
}

Object input = inputNode.executeGeneric(frame);
Object keyFun = keyFunNode.executeGeneric(frame);
Object iterable = toIterableNode.execute(this, input);
SourceContext context = RawContext.get(this).getSourceContext();
OffHeapGroupByKey map =
new OffHeapGroupByKey(this.keyType, this.rowType, context, new RecordShaper(true));
new OffHeapGroupByKey(
this.keyType,
this.rowType,
context,
new RecordShaper(true),
frame.materialize(),
kryoOutputSlot,
iteratorSlot,
offHeapFlushSlot);
Object generator = getGeneratorNode.execute(this, iterable);

try {
generatorInitNode.execute(this, generator);
frame.setObject(generatorSlot, generator);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ public final class AuxiliarySlots {
private static final String OUTPUT_BUFFER_SLOT = "outputBufferSlot";
private static final String OFF_HEAP_GROUP_BY_KEYS_SLOT = "offHeapGroupByKeysSlot";
private static final String COLLECTION_SLOT = "collectionSlot";
private static final String KRYO_OUTPUT_SLOT = "kryoOutputSlot";
private static final String ITERATOR_SLOT = "iteratorSlot";
private static final String OFF_HEAP_FLUSH_SLOT = "offHeapFlushSlot";

@CompilerDirectives.TruffleBoundary
public static int getGeneratorSlot(FrameDescriptor frameDescriptor) {
Expand Down Expand Up @@ -69,4 +72,19 @@ public static int getOffHeapGroupByKeysSlot(FrameDescriptor frameDescriptor) {
public static int getCollectionSlot(FrameDescriptor frameDescriptor) {
return frameDescriptor.findOrAddAuxiliarySlot(COLLECTION_SLOT);
}

@CompilerDirectives.TruffleBoundary
public static int getKryoOutputSlot(FrameDescriptor frameDescriptor) {
return frameDescriptor.findOrAddAuxiliarySlot(KRYO_OUTPUT_SLOT);
}

@CompilerDirectives.TruffleBoundary
public static int getIteratorSlot(FrameDescriptor frameDescriptor) {
return frameDescriptor.findOrAddAuxiliarySlot(ITERATOR_SLOT);
}

@CompilerDirectives.TruffleBoundary
public static int getOffHeapFlushSlot(FrameDescriptor frameDescriptor) {
return frameDescriptor.findOrAddAuxiliarySlot(OFF_HEAP_FLUSH_SLOT);
}
}
Loading

0 comments on commit bd29f6c

Please sign in to comment.