Skip to content

Commit 94c76e2

Browse files
authored
add desc for flink sql (#100)
1 parent 4afb041 commit 94c76e2

File tree

1 file changed

+137
-1
lines changed

1 file changed

+137
-1
lines changed

README.md

+137-1
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ Add the dependency to your pom.xml.
3131

3232
## Example
3333

34-
To write data into Nebula Graph using Flink.
34+
To write data into NebulaGraph using Flink.
3535
```
3636
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
3737
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
@@ -63,6 +63,142 @@ DataStream<Row> dataStream = playerSource.map(row -> {
6363
dataStream.addSink(nebulaSinkFunction);
6464
env.execute("write nebula")
6565
```
66+
67+
To read data from NebulaGraph using Flink.
68+
```
69+
NebulaClientOptions nebulaClientOptions = new NebulaClientOptions.NebulaClientOptionsBuilder()
70+
.setMetaAddress("127.0.0.1:9559")
71+
.build();
72+
storageConnectionProvider = new NebulaStorageConnectionProvider(nebulaClientOptions);
73+
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
74+
env.setParallelism(1);
75+
76+
NebulaSourceFunction sourceFunction = new NebulaSourceFunction(storageConnectionProvider)
77+
.setExecutionOptions(vertexExecutionOptions);
78+
DataStreamSource<BaseTableRow> dataStreamSource = env.addSource(sourceFunction);
79+
dataStreamSource.map(row -> {
80+
List<ValueWrapper> values = row.getValues();
81+
Row record = new Row(15);
82+
record.setField(0, values.get(0).asLong());
83+
record.setField(1, values.get(1).asString());
84+
record.setField(2, values.get(2).asString());
85+
record.setField(3, values.get(3).asLong());
86+
record.setField(4, values.get(4).asLong());
87+
record.setField(5, values.get(5).asLong());
88+
record.setField(6, values.get(6).asLong());
89+
record.setField(7, values.get(7).asDate());
90+
record.setField(8, values.get(8).asDateTime().getUTCDateTimeStr());
91+
record.setField(9, values.get(9).asLong());
92+
record.setField(10, values.get(10).asBoolean());
93+
record.setField(11, values.get(11).asDouble());
94+
record.setField(12, values.get(12).asDouble());
95+
record.setField(13, values.get(13).asTime().getUTCTimeStr());
96+
record.setField(14, values.get(14).asGeography());
97+
return record;
98+
}).print();
99+
env.execute("NebulaStreamSource");
100+
```
101+
102+
To operate Schema and data using Flink SQL.
103+
104+
1. create graph space
105+
```
106+
NebulaCatalog nebulaCatalog = NebulaCatalogUtils.createNebulaCatalog(
107+
"NebulaCatalog",
108+
"default",
109+
"root",
110+
"nebula",
111+
"127.0.0.1:9559",
112+
"127.0.0.1:9669");
113+
114+
EnvironmentSettings settings = EnvironmentSettings.newInstance()
115+
.inStreamingMode()
116+
.build();
117+
TableEnvironment tableEnv = TableEnvironment.create(settings);
118+
119+
tableEnv.registerCatalog(CATALOG_NAME, nebulaCatalog);
120+
tableEnv.useCatalog(CATALOG_NAME);
121+
122+
String createDataBase = "CREATE DATABASE IF NOT EXISTS `db1`"
123+
+ " COMMENT 'space 1'"
124+
+ " WITH ("
125+
+ " 'partition_num' = '100',"
126+
+ " 'replica_factor' = '3',"
127+
+ " 'vid_type' = 'FIXED_STRING(10)'"
128+
+ ")";
129+
tableEnv.executeSql(createDataBase);
130+
```
131+
2. create tag
132+
```
133+
tableEnvironment.executeSql("CREATE TABLE `person` ("
134+
+ " vid BIGINT,"
135+
+ " col1 STRING,"
136+
+ " col2 STRING,"
137+
+ " col3 BIGINT,"
138+
+ " col4 BIGINT,"
139+
+ " col5 BIGINT,"
140+
+ " col6 BIGINT,"
141+
+ " col7 DATE,"
142+
+ " col8 TIMESTAMP,"
143+
+ " col9 BIGINT,"
144+
+ " col10 BOOLEAN,"
145+
+ " col11 DOUBLE,"
146+
+ " col12 DOUBLE,"
147+
+ " col13 TIME,"
148+
+ " col14 STRING"
149+
+ ") WITH ("
150+
+ " 'connector' = 'nebula',"
151+
+ " 'meta-address' = '127.0.0.1:9559',"
152+
+ " 'graph-address' = '127.0.0.1:9669',"
153+
+ " 'username' = 'root',"
154+
+ " 'password' = 'nebula',"
155+
+ " 'data-type' = 'vertex',"
156+
+ " 'graph-space' = 'flink_test',"
157+
+ " 'label-name' = 'person'"
158+
+ ")"
159+
);
160+
```
161+
3. create edge
162+
```
163+
tableEnvironment.executeSql("CREATE TABLE `friend` ("
164+
+ " sid BIGINT,"
165+
+ " did BIGINT,"
166+
+ " rid BIGINT,"
167+
+ " col1 STRING,"
168+
+ " col2 STRING,"
169+
+ " col3 BIGINT,"
170+
+ " col4 BIGINT,"
171+
+ " col5 BIGINT,"
172+
+ " col6 BIGINT,"
173+
+ " col7 DATE,"
174+
+ " col8 TIMESTAMP,"
175+
+ " col9 BIGINT,"
176+
+ " col10 BOOLEAN,"
177+
+ " col11 DOUBLE,"
178+
+ " col12 DOUBLE,"
179+
+ " col13 TIME,"
180+
+ " col14 STRING"
181+
+ ") WITH ("
182+
+ " 'connector' = 'nebula',"
183+
+ " 'meta-address' = '127.0.0.1:9559',"
184+
+ " 'graph-address' = '127.0.0.1:9669',"
185+
+ " 'username' = 'root',"
186+
+ " 'password' = 'nebula',"
187+
+ " 'graph-space' = 'flink_test',"
188+
+ " 'label-name' = 'friend',"
189+
+ " 'data-type'='edge',"
190+
+ " 'src-id-index'='0',"
191+
+ " 'dst-id-index'='1',"
192+
+ " 'rank-id-index'='2'"
193+
+ ")"
194+
);
195+
```
196+
4. query edge data and insert into another edge type
197+
```
198+
Table table = tableEnvironment.sqlQuery("SELECT * FROM `friend`");
199+
table.executeInsert("`friend_sink`").await();
200+
```
201+
66202
## Version match
67203

68204
There are the version correspondence between Nebula Flink Connector and Nebula:

0 commit comments

Comments
 (0)