Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] The POC of supporting Flink in Gluten #8839

Open
wants to merge 8 commits into
base: main
Choose a base branch
from

Conversation

shuai-xu
Copy link
Contributor

@shuai-xu shuai-xu commented Feb 27, 2025

What changes were proposed in this pull request?

This pr is the Java side of POC to support Flink. It generates a GlutenCalOperator to run filter using native.
The draft design is here.

Copy link

Thanks for opening a pull request!

Could you open an issue for this pull request on Github Issues?

https://github.com/apache/incubator-gluten/issues

Then could you also rename commit message and pull request title in the following format?

[GLUTEN-${ISSUES_ID}][COMPONENT]feat/fix: ${detailed message}

See also:

@weiting-chen weiting-chen added the enhancement New feature or request label Feb 27, 2025
@weiting-chen weiting-chen changed the title The POC of supporting Flink in Gluten [VL] The POC of supporting Flink in Gluten Feb 27, 2025
@PHILO-HE
Copy link
Contributor

@shuai-xu, thanks for your great work! Could you draft a design doc? Google doc is preferred.

*/
package org.apache.gluten.backendsapi;

public class FlinkBackend {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth emphasizing that Flink will not be a backend in Gluten. It's more considered a frontend or a framework or so.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, We can discuss it.

@jinchengchenghh
Copy link
Contributor

Can we use gluten-substrait module? Looks like there exists too much duplicated code.

@FelixYBW
Copy link
Contributor

Thank you for the PR! It eventually starts

@majetideepak
Copy link
Collaborator

majetideepak commented Feb 27, 2025

Is there a design for this support for people who are not very familiar with Flink?

@weiting-chen
Copy link
Contributor

Is there a design for this support for people who are not very familiar with Flink?

This is an initial PR and require more people to join with us for reviewing the design.
For the background and current potential issues, please check the link: https://docs.google.com/document/d/1VNMs1oR0c02kuQLFLQXZeTyk3CoPjrtAfiKGoLGxGzE/edit?usp=sharing

@shuai-xu
Copy link
Contributor Author

Can we use gluten-substrait module? Looks like there exists too much duplicated code.

Yes, we need to discuss whether it need to translate to substrait plan or just call velox jni interface, if do need to translate to substrait plan, It need to reconstruct the gluten-substraint module a little to share it between gluten spark and flink.

@shuai-xu
Copy link
Contributor Author

@PHILO-HE @majetideepak OK, I will write a design soon.

@zhztheplayer
Copy link
Member

@PHILO-HE @majetideepak OK, I will write a design soon.

Hi @weiting-chen, would you share who is the author of https://docs.google.com/document/d/1VNMs1oR0c02kuQLFLQXZeTyk3CoPjrtAfiKGoLGxGzE/edit?usp=sharing?

I am opening a GitHub side discussion here: #8849

@weiting-chen
Copy link
Contributor

@PHILO-HE @majetideepak OK, I will write a design soon.

Hi @weiting-chen, would you share who is the author of https://docs.google.com/document/d/1VNMs1oR0c02kuQLFLQXZeTyk3CoPjrtAfiKGoLGxGzE/edit?usp=sharing?

I am opening a GitHub side discussion here: #8849

Yes, done to add the author.

}
}

private native long nativeProcessElement(int executor, long data);

This comment was marked as resolved.

@zjuwangg
Copy link
Contributor

zjuwangg commented Mar 3, 2025

Very glad to see such exciting progress on Flink support.
Looking forward on how to run the poc code in test...

@ParyshevSergey
Copy link

System.load("/home/xushuai/gluten/cpp-flink/build/libgflink.so");
How to get this library? Or which steps need to pass for compile it?

new BoundSplit(
"5",
-1,
new ExternalStreamConnectorSplit("escs1", es.id())));
Copy link
Member

@zhztheplayer zhztheplayer Mar 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would use connector-external-stream as connector ID.

It is currently a fixed value in Velox4j; https://github.com/velox4j/velox4j/blob/434ae37dfc3d5fb79788fe5bce41e41dd17901b5/src/main/cpp/main/velox4j/init/Init.cc#L94-L97

ExternalStream es = session.externalStreamOps().bind(new DownIterator(inputIterator));
List<BoundSplit> splits = List.of(
new BoundSplit(
"5",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The planNodeId is the same with leaf scan node's node ID so Velox knows we bind this split to the scan. Perhaps we can pass the scan node ID into GlutenCalOperator somehow?

Comment on lines +51 to +54
return new ConstantTypedExpr(
toType(literal.getType()),
toVariant(literal),
null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could use ConstantTypedExpr.create(toVariant(literal))

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The create will call native method, I think we'd better not call native in client side, so we need not load native libraries in Flink client side.

Comment on lines 109 to 114
// add a mock input as velox not allow the source is empty.
PlanNode mockInput = new ValuesNode(
String.valueOf(ExecNodeContext.newNodeId()),
"",
false,
1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a scan node so we can bind the split to it. E.g.,

final TableScanNode scanNode = new TableScanNode(
        "id-1",
        ...(type),
        new ExternalStreamTableHandle("connector-external-stream"),
        List.of()
    );

case VARCHAR:
return new VarCharValue(literal.getValue().toString());
case BINARY:
return new VarBinaryValue(literal.getValue().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we use VarBinaryValue.create() to pass a byte array in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

att

@ParyshevSergey
Copy link

After try to run this POC, got error
Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: Exception: VeloxUserError Error Source: USER Error Code: INVALID_ARGUMENT Reason: Splits can be associated only with leaf plan nodes which require splits. Plan node ID 5 doesn't refer to such plan node.

@github-actions github-actions bot added the DOCS label Mar 6, 2025
@shuai-xu
Copy link
Contributor Author

shuai-xu commented Mar 6, 2025

After try to run this POC, got error Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: Exception: VeloxUserError Error Source: USER Error Code: INVALID_ARGUMENT Reason: Splits can be associated only with leaf plan nodes which require splits. Plan node ID 5 doesn't refer to such plan node.

You can try the latest code.

@ParyshevSergey
Copy link

ParyshevSergey commented Mar 6, 2025

After try to run this POC, got error Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: Exception: VeloxUserError Error Source: USER Error Code: INVALID_ARGUMENT Reason: Splits can be associated only with leaf plan nodes which require splits. Plan node ID 5 doesn't refer to such plan node.

You can try the latest code.

Thanks for reply. I rerun with last commit and gotcha

Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "sources" (class io.github.zhztheplayer.velox4j.plan.TableScanNode), not marked as ignorable (4 known properties: "id", "tableHandle", "outputType", "assignments"])
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: io.github.zhztheplayer.velox4j.plan.TableScanNode["sources"])

I looked at TableScanNode and notice that json getter exist on "sources" but this field is absence. I try rebuild velox4j with Serde change (add disable for jackson DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) and got

Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: couldn't find key name in dynamic object
	at io.github.zhztheplayer.velox4j.jni.JniWrapper.executeQuery(Native Method)
	at io.github.zhztheplayer.velox4j.jni.JniApi.executeQuery(JniApi.java:52)
	at io.github.zhztheplayer.velox4j.query.Queries.execute(Queries.java:14)
	at org.apache.gluten.table.runtime.operators.GlutenCalOperator.processElement(GlutenCalOperator.java:97)

SQL query

CREATE TABLE srcTbl (id INT, price INT, name STRING) WITH ('connector'='datagen');
CREATE TABLE snkTbl (id INT, price INT) WITH ('connector'='blackhole');
INSERT INTO snkTbl SELECT id, price FROM srcTbl WHERE price > 10;

@shuai-xu
Copy link
Contributor Author

shuai-xu commented Mar 7, 2025

@ParyshevSergey , your query runs well in my side, could you give more logs?

@zhztheplayer
Copy link
Member

After try to run this POC, got error Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: Exception: VeloxUserError Error Source: USER Error Code: INVALID_ARGUMENT Reason: Splits can be associated only with leaf plan nodes which require splits. Plan node ID 5 doesn't refer to such plan node.

You can try the latest code.

Thanks for reply. I rerun with last commit and gotcha

Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException: Unrecognized field "sources" (class io.github.zhztheplayer.velox4j.plan.TableScanNode), not marked as ignorable (4 known properties: "id", "tableHandle", "outputType", "assignments"])
 at [Source: UNKNOWN; byte offset: #UNKNOWN] (through reference chain: io.github.zhztheplayer.velox4j.plan.TableScanNode["sources"])

I looked at TableScanNode and notice that json getter exist on "sources" but this field is absence. I try rebuild velox4j with Serde change (add disable for jackson DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES) and got

Caused by: io.github.zhztheplayer.velox4j.exception.VeloxException: couldn't find key name in dynamic object
	at io.github.zhztheplayer.velox4j.jni.JniWrapper.executeQuery(Native Method)
	at io.github.zhztheplayer.velox4j.jni.JniApi.executeQuery(JniApi.java:52)
	at io.github.zhztheplayer.velox4j.query.Queries.execute(Queries.java:14)
	at org.apache.gluten.table.runtime.operators.GlutenCalOperator.processElement(GlutenCalOperator.java:97)

SQL query

CREATE TABLE srcTbl (id INT, price INT, name STRING) WITH ('connector'='datagen');
CREATE TABLE snkTbl (id INT, price INT) WITH ('connector'='blackhole');
INSERT INTO snkTbl SELECT id, price FROM srcTbl WHERE price > 10;

The error is likely because this velox4j change velox4j/velox4j@b1e66a2 is not included in the release yet. I will release a newer version of velox4j then let you know.

@zhztheplayer
Copy link
Member

zhztheplayer commented Mar 7, 2025

@ParyshevSergey
Copy link

@ParyshevSergey , your query runs well in my side, could you give more logs?

log.txt

I attache the log. Also add some information, in OpenSource Flink 1.20 i create module flink-examples-gluten with all needed dependencies, in this module i run the job.

package org.apache.flink.examples.gluten;

import io.github.zhztheplayer.velox4j.Velox4j;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.util.concurrent.ExecutionException;

public class TestJob {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Velox4j.initialize();
        Configuration conf = new Configuration();
        StreamExecutionEnvironment see =
                StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
        see.setParallelism(1);
        StreamTableEnvironment ste = StreamTableEnvironment.create(see);

        ste.executeSql(
                "create table srcTbl (id int, price int, name string) WITH ('connector'='datagen')");
        ste.executeSql("create table snkTbl (id int, price int) WITH ('connector'='blackhole')");

        ste.executeSql("insert into snkTbl select id, price from srcTbl where price > 10").await();
    }
}

@ParyshevSergey
Copy link

Or you can try build velox4j by your self:

https://github.com/velox4j/velox4j?tab=readme-ov-file#prerequisites https://github.com/velox4j/velox4j?tab=readme-ov-file#build-from-source

Thanks for reply, I build locally velox4j and rerun test

log.txt

Copy link
Contributor

@PHILO-HE PHILO-HE left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some superficial comments. Thank you!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer firstly putting this doc under gluten-flink/docs folder as it is an experimental and separate feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

**For x86_64**

```bash
## make sure jdk8 is used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment is not aligned with the following.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

**For aarch64**

```bash
## make sure jdk8 is used
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}

private void startTaskManagerRunnerServices() throws Exception {
synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redundant synchronized, since it has been added on calling side. Assume it is only called there.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is flink code, I just copy it and add some code to load libvelox.so. I think we can ignore it first.

}

private CompletableFuture<Void> shutDownServices() {
synchronized (lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

import java.util.List;

/** Converter between velox RowVector and Flink RowData. */
public class FlinkRowToVLRowConvertor {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

->FlinkRowToVeloxColumnarConvertor

VLRow is misleading.

<modules>
<module>planner</module>
<module>loader</module>
<module>runtime</module>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems loader is just for loading classes? Could you explain why we need a dedicated module for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I say in the design doc. Flink use a loader to load the modules only used in client side. This is to avoid class confilicting when submit multi jobs by one client. We just follow flink in gluten.

@yangxiao0320
Copy link

When will stateful operators be supported?

@zhztheplayer
Copy link
Member

@ParyshevSergey

I've published a new SNAPSHOT jar that may relate to this. Would you like to try again (with mvn -U)? Thanks.

See the published SNAPSHOT jar list:

https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/io/github/zhztheplayer/velox4j/0.1.0-SNAPSHOT/

@zhztheplayer
Copy link
Member

@shuai-xu

Non-blocking style execution is now working since the PR was finished. With the feature, Velox will not stop processing data until we explicitly notified the task. This could remove a major blocker against implementing filters with Velox.

Would you also help verify it from this PR's end? Thank you.

@ParyshevSergey
Copy link

@ParyshevSergey

I've published a new SNAPSHOT jar that may relate to this. Would you like to try again (with mvn -U)? Thanks.

See the published SNAPSHOT jar list:

https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/io/github/zhztheplayer/velox4j/0.1.0-SNAPSHOT/

I build velox4j and remove line with drivers check in QueryExecutor.cc(drivers_empty() check) and then POC started up normally.

@zhztheplayer
Copy link
Member

I build velox4j and remove line with drivers check in QueryExecutor.cc(drivers_empty() check) and then POC started up normally.

Glad to hear that. Does the filter-project routine work from your end?

@ParyshevSergey
Copy link

ParyshevSergey commented Mar 17, 2025

I build velox4j and remove line with drivers check in QueryExecutor.cc(drivers_empty() check) and then POC started up normally.

Glad to hear that. Does the filter-project routine work from your end?

Filter and projection working well. How important check of "drivers_empty()" in QueryExecutor.cc ?

@shuai-xu
Copy link
Contributor Author

When will stateful operators be supported?

This is a long term work, may be in latter half of this year.

@zhztheplayer
Copy link
Member

I build velox4j and remove line with drivers check in QueryExecutor.cc(drivers_empty() check) and then POC started up normally.

Glad to hear that. Does the filter-project routine work from your end?

Filter and projection working well. How important check of "drivers_empty()" in QueryExecutor.cc ?

I think it's not really important and we can remove it anyway.

@oxwangfeng125
Copy link

@ParyshevSergey

I've published a new SNAPSHOT jar that may relate to this. Would you like to try again (with mvn -U)? Thanks.

See the published SNAPSHOT jar list:

https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/io/github/zhztheplayer/velox4j/0.1.0-SNAPSHOT/

I find some class is not in that jar,please update the new jar ,Thanks

@zhztheplayer
Copy link
Member

zhztheplayer commented Mar 19, 2025

@ParyshevSergey
I've published a new SNAPSHOT jar that may relate to this. Would you like to try again (with mvn -U)? Thanks.
See the published SNAPSHOT jar list:
https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/io/github/zhztheplayer/velox4j/0.1.0-SNAPSHOT/

I find some class is not in that jar,please update the new jar ,Thanks

The latest snapshot https://central.sonatype.com/service/rest/repository/browse/maven-snapshots/io/github/zhztheplayer/velox4j/0.1.0-SNAPSHOT/0.1.0-20250316.095536-16/ should already contain the newest code.

Thought this PR might still rely on old versions of Velox4J so need to be updated. Could you share the error message you saw?

@lsyldliu
Copy link

Very glad to see this great work has been pushing forward.

@leonardBang
Copy link

Cool, keep an eye on this PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
DOCS enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.