Skip to content

Commit 1e00ba8

Browse files
committed
[query] unify backend rpc
1 parent 174743e commit 1e00ba8

File tree

78 files changed

+1830
-1973
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

78 files changed

+1830
-1973
lines changed

hail/hail/src/is/hail/HailContext.scala

+4-5
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import org.apache.spark.executor.InputMetrics
1919
import org.apache.spark.rdd.RDD
2020
import org.json4s.Extraction
2121
import org.json4s.jackson.JsonMethods
22+
import sourcecode.Enclosing
2223

2324
case class FilePartition(index: Int, file: String) extends Partition
2425

@@ -41,7 +42,7 @@ object HailContext {
4142

4243
def backend: Backend = get.backend
4344

44-
def sparkBackend(op: String): SparkBackend = get.sparkBackend(op)
45+
def sparkBackend(implicit E: Enclosing): SparkBackend = get.backend.asSpark
4546

4647
def configureLogging(logFile: String, quiet: Boolean, append: Boolean): Unit = {
4748
org.apache.log4j.helpers.LogLog.setInternalDebugging(true)
@@ -152,7 +153,7 @@ object HailContext {
152153

153154
val fsBc = fs.broadcast
154155

155-
new RDD[T](SparkBackend.sparkContext("readPartition"), Nil) {
156+
new RDD[T](SparkBackend.sparkContext, Nil) {
156157
def getPartitions: Array[Partition] =
157158
Array.tabulate(nPartitions)(i => FilePartition(i, partFiles(i)))
158159

@@ -175,8 +176,6 @@ class HailContext private (
175176
) {
176177
def stop(): Unit = HailContext.stop()
177178

178-
def sparkBackend(op: String): SparkBackend = backend.asSpark(op)
179-
180179
var checkRVDKeys: Boolean = false
181180

182181
def version: String = is.hail.HAIL_PRETTY_VERSION
@@ -188,7 +187,7 @@ class HailContext private (
188187
maxLines: Int,
189188
): Map[String, Array[WithContext[String]]] = {
190189
val regexp = regex.r
191-
SparkBackend.sparkContext("fileAndLineCounts").textFilesLines(fs.globAll(files).map(_.getPath))
190+
SparkBackend.sparkContext.textFilesLines(fs.globAll(files).map(_.getPath))
192191
.filter(line => regexp.findFirstIn(line.value).isDefined)
193192
.take(maxLines)
194193
.groupBy(_.source.file)
+12-87
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,20 @@
11
package is.hail.backend
22

3-
import is.hail.asm4s._
4-
import is.hail.backend.Backend.jsonToBytes
3+
import is.hail.asm4s.HailClassLoader
54
import is.hail.backend.spark.SparkBackend
6-
import is.hail.expr.ir.{IR, IRParser, LoweringAnalyses, SortField, TableIR, TableReader}
5+
import is.hail.expr.ir.{IR, LoweringAnalyses, SortField, TableIR, TableReader}
76
import is.hail.expr.ir.lowering.{TableStage, TableStageDependency}
87
import is.hail.io.{BufferSpec, TypedCodecSpec}
9-
import is.hail.io.fs._
10-
import is.hail.io.plink.LoadPlink
11-
import is.hail.io.vcf.LoadVCF
12-
import is.hail.types._
8+
import is.hail.io.fs.FS
9+
import is.hail.types.RTable
1310
import is.hail.types.encoded.EType
1411
import is.hail.types.physical.PTuple
15-
import is.hail.types.virtual.TFloat64
16-
import is.hail.utils._
17-
import is.hail.variant.ReferenceGenome
12+
import is.hail.utils.fatal
1813

1914
import scala.reflect.ClassTag
2015

21-
import java.io._
22-
import java.nio.charset.StandardCharsets
16+
import java.io.{Closeable, OutputStream}
2317

24-
import org.json4s._
25-
import org.json4s.jackson.JsonMethods
2618
import sourcecode.Enclosing
2719

2820
object Backend {
@@ -38,23 +30,19 @@ object Backend {
3830
ctx: ExecuteContext,
3931
t: PTuple,
4032
off: Long,
41-
bufferSpecString: String,
33+
bufferSpec: BufferSpec,
4234
os: OutputStream,
4335
): Unit = {
44-
val bs = BufferSpec.parseOrDefault(bufferSpecString)
4536
assert(t.size == 1)
4637
val elementType = t.fields(0).typ
4738
val codec = TypedCodecSpec(
4839
EType.fromPythonTypeEncoding(elementType.virtualType),
4940
elementType.virtualType,
50-
bs,
41+
bufferSpec,
5142
)
5243
assert(t.isFieldDefined(off, 0))
5344
codec.encode(ctx, elementType, t.loadField(off, 0), os)
5445
}
55-
56-
def jsonToBytes(f: => JValue): Array[Byte] =
57-
JsonMethods.compact(f).getBytes(StandardCharsets.UTF_8)
5846
}
5947

6048
abstract class BroadcastValue[T] { def value: T }
@@ -82,8 +70,8 @@ abstract class Backend extends Closeable {
8270
f: (Array[Byte], HailTaskContext, HailClassLoader, FS) => Array[Byte]
8371
): (Option[Throwable], IndexedSeq[(Array[Byte], Int)])
8472

85-
def asSpark(op: String): SparkBackend =
86-
fatal(s"${getClass.getSimpleName}: $op requires SparkBackend")
73+
def asSpark(implicit E: Enclosing): SparkBackend =
74+
fatal(s"${getClass.getSimpleName}: ${E.value} requires SparkBackend")
8775

8876
def lowerDistributedSort(
8977
ctx: ExecuteContext,
@@ -116,70 +104,7 @@ abstract class Backend extends Closeable {
116104
def tableToTableStage(ctx: ExecuteContext, inputIR: TableIR, analyses: LoweringAnalyses)
117105
: TableStage
118106

119-
def withExecuteContext[T](f: ExecuteContext => T)(implicit E: Enclosing): T
120-
121-
final def valueType(s: String): Array[Byte] =
122-
withExecuteContext { ctx =>
123-
jsonToBytes {
124-
IRParser.parse_value_ir(ctx, s).typ.toJSON
125-
}
126-
}
127-
128-
final def tableType(s: String): Array[Byte] =
129-
withExecuteContext { ctx =>
130-
jsonToBytes {
131-
IRParser.parse_table_ir(ctx, s).typ.toJSON
132-
}
133-
}
134-
135-
final def matrixTableType(s: String): Array[Byte] =
136-
withExecuteContext { ctx =>
137-
jsonToBytes {
138-
IRParser.parse_matrix_ir(ctx, s).typ.toJSON
139-
}
140-
}
141-
142-
final def blockMatrixType(s: String): Array[Byte] =
143-
withExecuteContext { ctx =>
144-
jsonToBytes {
145-
IRParser.parse_blockmatrix_ir(ctx, s).typ.toJSON
146-
}
147-
}
148-
149-
def loadReferencesFromDataset(path: String): Array[Byte]
150-
151-
def fromFASTAFile(
152-
name: String,
153-
fastaFile: String,
154-
indexFile: String,
155-
xContigs: Array[String],
156-
yContigs: Array[String],
157-
mtContigs: Array[String],
158-
parInput: Array[String],
159-
): Array[Byte] =
160-
withExecuteContext { ctx =>
161-
jsonToBytes {
162-
ReferenceGenome.fromFASTAFile(ctx, name, fastaFile, indexFile,
163-
xContigs, yContigs, mtContigs, parInput).toJSON
164-
}
165-
}
166-
167-
def parseVCFMetadata(path: String): Array[Byte] =
168-
withExecuteContext { ctx =>
169-
jsonToBytes {
170-
Extraction.decompose {
171-
LoadVCF.parseHeaderMetadata(ctx.fs, Set.empty, TFloat64, path)
172-
}(defaultJSONFormats)
173-
}
174-
}
175-
176-
def importFam(path: String, isQuantPheno: Boolean, delimiter: String, missingValue: String)
177-
: Array[Byte] =
178-
withExecuteContext { ctx =>
179-
LoadPlink.importFamJSON(ctx.fs, path, isQuantPheno, delimiter, missingValue).getBytes(
180-
StandardCharsets.UTF_8
181-
)
182-
}
183-
184107
def execute(ctx: ExecuteContext, ir: IR): Either[Unit, (PTuple, Long)]
108+
109+
def backendContext(ctx: ExecuteContext): BackendContext
185110
}

0 commit comments

Comments
 (0)