Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[VL] The POC of supporting Flink in Gluten #8839

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions gluten-flink/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Gluten Flink Project
Gluten for Flink is under developing now, you can refer to [user guide](docs/Flink.md) for a quick usage.
117 changes: 117 additions & 0 deletions gluten-flink/docs/Flink.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
---
layout: page
title: Gluten For Flink with Velox Backend
nav_order: 1
---

# Supported Version

| Type | Version |
|-------|------------------------------|
| Flink | 1.19.2 |
| OS | Ubuntu20.04/22.04, Centos7/8 |
| jdk | openjdk11/jdk17 |
| scala | 2.12 |

# Prerequisite

Currently, with static build Gluten+Flink+Velox backend supports all the Linux OSes, but is only tested on **Ubuntu20.04**. With dynamic build, Gluten+Velox backend support **Ubuntu20.04/Ubuntu22.04/Centos7/Centos8** and their variants.

Currently, the officially supported Flink version is 1.19.2.

We need to set up the `JAVA_HOME` env. Currently, Gluten supports **java 11** and **java 17**.

**For x86_64**

```bash
## make sure jdk11 is used
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
export PATH=$JAVA_HOME/bin:$PATH
```

**For aarch64**

```bash
## make sure jdk11 is used
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-arm64
export PATH=$JAVA_HOME/bin:$PATH
```

**Get Velox4j**

Gluten for Flink depends on [Velox4j](https://github.com/velox4j/velox4j) to call velox. This is an experimental feature.
You need to get the Velox4j code, and compile it first.

```bash
## fetch velox4j code
git clone https://github.com/shuai-xu/velox4j.git
cd velox4j
git checkout gluten
mvn clean install
```
**Get gluten**

```bash
## config maven, like proxy in ~/.m2/settings.xml

## fetch gluten code
git clone https://github.com/apache/incubator-gluten.git
```

# Build Gluten Flink with Velox Backend

```
cd /path/to/gluten/gluten-flink
mvn clean package
```

## Dependency library deployment

You need to get the Velox4j packages and used them with gluten.
Velox4j jar available now is velox4j-0.1.0-SNAPSHOT.jar.

## Submit the Flink SQL job

Submit test script from `flink run`. You can use the `StreamSQLExample` as an example.

### Flink local cluster

After deploying flink binaries, please add gluten-flink jar to flink library path,
including gluten-flink-runtime-1.4.0.jar, gluten-flink-loader-1.4.0.jar and Velox4j jars above.
And make them loaded before flink libraries.
Then you can go to flink binary path and use the below scripts to
submit the example job.

```bash
bin/start-cluster.sh
bin/flink run -d -m 0.0.0.0:8080 \
-c org.apache.flink.table.examples.java.basics.StreamSQLExample \
lib/flink-examples-table_2.12-1.20.1.jar
```

Then you can get the result in `log/flink-*-taskexecutor-*.out`.
And you can see an operator named `gluten-cal` from the web frontend of your flink job.

#### All operators executed by native
Another example supports all operators executed by native.
You can use the data-generator.sql in dev directory.

```bash
bin/sql-client.sh -f data-generator.sql
```

### Flink Yarn per job mode

TODO

## Performance
Using the data-generator example, it shows that for native execution, it can generate 10,0000
records in about 60ms, while Flink generator 10,000 records in about 600ms. It runs 10 times faster.
More perf cases to be added.

## Notes:
Now both Gluten for Flink and Velox4j have not a bundled jar including all jars depends on.
So you may have to add these jars by yourself, which may including guava-33.4.0-jre.jar, jackson-core-2.18.0.jar,
jackson-databind-2.18.0.jar, jackson-datatype-jdk8-2.18.0.jar, jackson-annotations-2.18.0.jar, arrow-memory-core-18.1.0.jar,
arrow-memory-unsafe-18.1.0.jar, arrow-vector-18.1.0.jar, flatbuffers-java-24.3.25.jar, arrow-format-18.1.0.jar, arrow-c-data-18.1.0.jar.
We will supply bundled jars soon.
3 changes: 3 additions & 0 deletions gluten-flink/docs/data-generator.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
CREATE TABLE srcTbl (id INT, price INT, name STRING) WITH ('connector'='datagen');
CREATE TABLE snkTbl (id INT, price INT) WITH ('connector'='blackhole');
INSERT INTO snkTbl SELECT id, price FROM srcTbl WHERE price > 10;
82 changes: 82 additions & 0 deletions gluten-flink/loader/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-flink</artifactId>
<version>1.4.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

<artifactId>gluten-flink-loader</artifactId>
<name>Gluten Flink Loader</name>
<packaging>jar</packaging>

<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-flink-planner</artifactId>
<version>${project.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-table-planner-jars</id>
<phase>prepare-package</phase>
<goals>
<goal>copy</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.apache.gluten</groupId>
<artifactId>gluten-flink-planner</artifactId>
<version>${project.version}</version>
<type>jar</type>
<overWrite>true</overWrite>
<destFileName>gluten-flink-planner.jar</destFileName>
</artifactItem>
</artifactItems>
<outputDirectory>${project.build.directory}/classes</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Loading