-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[VL] The POC of supporting Flink in Gluten #8839
base: main
Are you sure you want to change the base?
Conversation
Thanks for opening a pull request! Could you open an issue for this pull request on Github Issues? https://github.com/apache/incubator-gluten/issues Then could you also rename commit message and pull request title in the following format?
See also: |
@shuai-xu, thanks for your great work! Could you draft a design doc? Google doc is preferred. |
*/ | ||
package org.apache.gluten.backendsapi; | ||
|
||
public class FlinkBackend { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth emphasizing that Flink will not be a backend in Gluten. It's more considered a frontend or a framework or so.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, We can discuss it.
Can we use gluten-substrait module? Looks like there exists too much duplicated code. |
Thank you for the PR! It eventually starts |
Is there a design for this support for people who are not very familiar with Flink? |
This is an initial PR and require more people to join with us for reviewing the design. |
Yes, we need to discuss whether it need to translate to substrait plan or just call velox jni interface, if do need to translate to substrait plan, It need to reconstruct the gluten-substraint module a little to share it between gluten spark and flink. |
@PHILO-HE @majetideepak OK, I will write a design soon. |
Hi @weiting-chen, would you share who is the author of https://docs.google.com/document/d/1VNMs1oR0c02kuQLFLQXZeTyk3CoPjrtAfiKGoLGxGzE/edit?usp=sharing? I am opening a GitHub side discussion here: #8849 |
Yes, done to add the author. |
} | ||
} | ||
|
||
private native long nativeProcessElement(int executor, long data); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
Very glad to see such exciting progress on Flink support. |
|
new BoundSplit( | ||
"5", | ||
-1, | ||
new ExternalStreamConnectorSplit("escs1", es.id()))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would use connector-external-stream
as connector ID.
It is currently a fixed value in Velox4j; https://github.com/velox4j/velox4j/blob/434ae37dfc3d5fb79788fe5bce41e41dd17901b5/src/main/cpp/main/velox4j/init/Init.cc#L94-L97
ExternalStream es = session.externalStreamOps().bind(new DownIterator(inputIterator)); | ||
List<BoundSplit> splits = List.of( | ||
new BoundSplit( | ||
"5", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The planNodeId
is the same with leaf scan node's node ID so Velox
knows we bind this split to the scan. Perhaps we can pass the scan node ID into GlutenCalOperator
somehow?
return new ConstantTypedExpr( | ||
toType(literal.getType()), | ||
toVariant(literal), | ||
null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could use ConstantTypedExpr.create(toVariant(literal))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The create will call native method, I think we'd better not call native in client side, so we need not load native libraries in Flink client side.
// add a mock input as velox not allow the source is empty. | ||
PlanNode mockInput = new ValuesNode( | ||
String.valueOf(ExecNodeContext.newNodeId()), | ||
"", | ||
false, | ||
1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a scan node so we can bind the split to it. E.g.,
final TableScanNode scanNode = new TableScanNode(
"id-1",
...(type),
new ExternalStreamTableHandle("connector-external-stream"),
List.of()
);
case VARCHAR: | ||
return new VarCharValue(literal.getValue().toString()); | ||
case BINARY: | ||
return new VarBinaryValue(literal.getValue().toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we use VarBinaryValue.create()
to pass a byte array in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
att
After try to run this POC, got error |
You can try the latest code. |
Thanks for reply. I rerun with last commit and gotcha Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "sources" (class io.github.zhztheplayer.velox4j.plan.TableScanNode), not marked as ignorable (4 known properties: "id", "tableHandle", "outputType", "assignments"])
at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: io.github.zhztheplayer.velox4j.plan.TableScanNode["sources"]) I looked at TableScanNode and notice that json getter exist on "sources" but this field is absence. I try rebuild velox4j with Serde change (add disable for jackson DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) and got Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: couldn't find key name in dynamic object
at io.github.zhztheplayer.velox4j.jni.JniWrapper.executeQuery(Native Method)
at io.github.zhztheplayer.velox4j.jni.JniApi.executeQuery(JniApi.java:52)
at io.github.zhztheplayer.velox4j.query.Queries.execute(Queries.java:14)
at org.apache.gluten.table.runtime.operators.GlutenCalOperator.processElement(GlutenCalOperator.java:97) SQL query CREATE TABLE srcTbl (id INT, price INT, name STRING) WITH ('connector'='datagen');
CREATE TABLE snkTbl (id INT, price INT) WITH ('connector'='blackhole');
INSERT INTO snkTbl SELECT id, price FROM srcTbl WHERE price > 10; |
@ParyshevSergey , your query runs well in my side, could you give more logs? |
The error is likely because this velox4j change velox4j/velox4j@b1e66a2 is not included in the release yet. I will release a newer version of velox4j then let you know. |
Or you can try build velox4j by your self: https://github.com/velox4j/velox4j?tab=readme-ov-file#prerequisites |
I attache the log. Also add some information, in OpenSource Flink 1.20 i create module flink-examples-gluten with all needed dependencies, in this module i run the job. package org.apache.flink.examples.gluten;
import io.github.zhztheplayer.velox4j.Velox4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import java.util.concurrent.ExecutionException;
public class TestJob {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Velox4j.initialize();
Configuration conf = new Configuration();
StreamExecutionEnvironment see =
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
see.setParallelism(1);
StreamTableEnvironment ste = StreamTableEnvironment.create(see);
ste.executeSql(
"create table srcTbl (id int, price int, name string) WITH ('connector'='datagen')");
ste.executeSql("create table snkTbl (id int, price int) WITH ('connector'='blackhole')");
ste.executeSql("insert into snkTbl select id, price from srcTbl where price > 10").await();
}
} |
Thanks for reply, I build locally velox4j and rerun test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some superficial comments. Thank you!
docs/get-started/Flink.md
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer firstly putting this doc under gluten-flink/docs folder as it is an experimental and separate feature.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
docs/get-started/Flink.md
Outdated
**For x86_64** | ||
|
||
```bash | ||
## make sure jdk8 is used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment is not aligned with the following.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
docs/get-started/Flink.md
Outdated
**For aarch64** | ||
|
||
```bash | ||
## make sure jdk8 is used |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
} | ||
|
||
private void startTaskManagerRunnerServices() throws Exception { | ||
synchronized (lock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Redundant synchronized, since it has been added on calling side. Assume it is only called there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is flink code, I just copy it and add some code to load libvelox.so. I think we can ignore it first.
} | ||
|
||
private CompletableFuture<Void> shutDownServices() { | ||
synchronized (lock) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as above
import java.util.List; | ||
|
||
/** Converter between velox RowVector and Flink RowData. */ | ||
public class FlinkRowToVLRowConvertor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
->FlinkRowToVeloxColumnarConvertor
VLRow is misleading.
<modules> | ||
<module>planner</module> | ||
<module>loader</module> | ||
<module>runtime</module> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems loader is just for loading classes? Could you explain why we need a dedicated module for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I say in the design doc. Flink use a loader to load the modules only used in client side. This is to avoid class confilicting when submit multi jobs by one client. We just follow flink in gluten.
When will stateful operators be supported? |
I've published a new SNAPSHOT jar that may relate to this. Would you like to try again (with See the published SNAPSHOT jar list: |
Non-blocking style execution is now working since the PR was finished. With the feature, Velox will not stop processing data until we explicitly notified the task. This could remove a major blocker against implementing filters with Velox. Would you also help verify it from this PR's end? Thank you. |
I build velox4j and remove line with drivers check in QueryExecutor.cc(drivers_empty() check) and then POC started up normally. |
Glad to hear that. Does the filter-project routine work from your end? |
Filter and projection working well. How important check of "drivers_empty()" in QueryExecutor.cc ? |
This is a long term work, may be in latter half of this year. |
I think it's not really important and we can remove it anyway. |
I find some class is not in that jar,please update the new jar ,Thanks |
The latest snapshot https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/io/github/zhztheplayer/velox4j/0.1.0-SNAPSHOT/0.1.0-20250316.095536-16/ should already contain the newest code. Thought this PR might still rely on old versions of Velox4J so need to be updated. Could you share the error message you saw? |
Very glad to see this great work has been pushing forward. |
Cool, keep an eye on this PR |
What changes were proposed in this pull request?
This pr is the Java side of POC to support Flink. It generates a GlutenCalOperator to run filter using native.
The draft design is here.