Skip to content

Commit 755c88f

Browse files
Add vertex delete with edge (#103)
* update version to 3.8.0 * Added vertex with edges deletion --------- Co-authored-by: Anqi <anqi.wang@vesoft.com>
1 parent 13df954 commit 755c88f

File tree

8 files changed

+64
-7
lines changed

8 files changed

+64
-7
lines changed

connector/pom.xml

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
<parent>
66
<artifactId>nebula-flink</artifactId>
77
<groupId>com.vesoft</groupId>
8-
<version>3.0-SNAPSHOT</version>
8+
<version>3.8.0</version>
99
<relativePath>../pom.xml</relativePath>
1010
</parent>
1111
<modelVersion>4.0.0</modelVersion>
1212

1313
<artifactId>nebula-flink-connector</artifactId>
1414

1515
<properties>
16-
<nebula.version>3.0-SNAPSHOT</nebula.version>
16+
<nebula.version>3.8.0</nebula.version>
1717
<flink.version>1.14.4</flink.version>
1818
<scala.binary.version>2.11</scala.binary.version>
1919
<compiler.source.version>1.8</compiler.source.version>

connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaVertexBatchExecutor.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,8 @@ public void executeBatch(Session session) throws IOException {
6262
return;
6363
}
6464
NebulaVertices nebulaVertices = new NebulaVertices(executionOptions.getLabel(),
65-
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy());
65+
executionOptions.getFields(), nebulaVertexList, executionOptions.getPolicy(),
66+
executionOptions.isDeleteExecutedWithEdges());
6667
// generate the write ngql statement
6768
String statement = null;
6869
switch (executionOptions.getWriteMode()) {

connector/src/main/java/org.apache.flink/connector/nebula/statement/VertexExecutionOptions.java

+19-1
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public class VertexExecutionOptions extends ExecutionOptions {
3232
*/
3333
private int idIndex;
3434

35+
private boolean isDeleteExecutedWithEdges = false;
36+
3537
public VertexExecutionOptions(String graphSpace,
3638
String executeStatement,
3739
List<String> fields,
@@ -44,6 +46,7 @@ public VertexExecutionOptions(String graphSpace,
4446
PolicyEnum policy,
4547
WriteModeEnum mode,
4648
String tag,
49+
boolean isDeleteExecutedWithEdges,
4750
int idIndex,
4851
int batchIntervalMs,
4952
FailureHandlerEnum failureHandler,
@@ -54,6 +57,7 @@ public VertexExecutionOptions(String graphSpace,
5457
failureHandler, maxRetries, retryDelayMs);
5558
this.tag = tag;
5659
this.idIndex = idIndex;
60+
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
5761
}
5862

5963
public int getIdIndex() {
@@ -65,6 +69,10 @@ public String getLabel() {
6569
return tag;
6670
}
6771

72+
public boolean isDeleteExecutedWithEdges() {
73+
return isDeleteExecutedWithEdges;
74+
}
75+
6876
@Override
6977
public DataTypeEnum getDataType() {
7078
return DataTypeEnum.VERTEX;
@@ -78,6 +86,7 @@ public ExecutionOptionBuilder toBuilder() {
7886
.setFields(this.getFields())
7987
.setPositions(this.getPositions())
8088
.setNoColumn(this.isNoColumn())
89+
.setDeleteExecutedWithEdges(this.isDeleteExecutedWithEdges())
8190
.setLimit(this.getLimit())
8291
.setStartTime(this.getStartTime())
8392
.setEndTime(this.getEndTime())
@@ -99,6 +108,7 @@ public static class ExecutionOptionBuilder {
99108
private List<String> fields;
100109
private List<Integer> positions;
101110
private boolean noColumn = false;
111+
private boolean isDeleteExecutedWithEdges = false;
102112
private int limit = DEFAULT_SCAN_LIMIT;
103113
private long startTime = 0;
104114
private long endTime = Long.MAX_VALUE;
@@ -144,6 +154,13 @@ public ExecutionOptionBuilder setNoColumn(boolean noColumn) {
144154
return this;
145155
}
146156

157+
public ExecutionOptionBuilder setDeleteExecutedWithEdges(
158+
boolean isDeleteExecutedWithEdges
159+
) {
160+
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
161+
return this;
162+
}
163+
147164
public ExecutionOptionBuilder setLimit(int limit) {
148165
this.limit = limit;
149166
return this;
@@ -220,7 +237,8 @@ public VertexExecutionOptions build() {
220237
}
221238
return new VertexExecutionOptions(graphSpace, executeStatement, fields,
222239
positions, noColumn, limit, startTime, endTime, batchSize, policy, mode, tag,
223-
idIndex, batchIntervalMs, failureHandler, maxRetries, retryDelayMs);
240+
isDeleteExecutedWithEdges, idIndex, batchIntervalMs,
241+
failureHandler, maxRetries, retryDelayMs);
224242
}
225243
}
226244
}

connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaConstant.java

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ public class NebulaConstant {
2121

2222
// template for delete statement
2323
public static String DELETE_VERTEX_TEMPLATE = "DELETE VERTEX %s";
24+
public static String DELETE_VERTEX_TEMPLATE_WITH_EDGE = "DELETE VERTEX %s WITH EDGE";
2425
public static String DELETE_EDGE_TEMPLATE = "DELETE EDGE `%s` %s";
2526
public static String EDGE_ENDPOINT_TEMPLATE = "%s->%s@%d";
2627

connector/src/main/java/org.apache.flink/connector/nebula/utils/NebulaVertices.java

+16-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
import static org.apache.flink.connector.nebula.utils.NebulaConstant.BATCH_INSERT_TEMPLATE;
99
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE;
10+
import static org.apache.flink.connector.nebula.utils.NebulaConstant.DELETE_VERTEX_TEMPLATE_WITH_EDGE;
1011
import static org.apache.flink.connector.nebula.utils.NebulaConstant.ENDPOINT_TEMPLATE;
1112
import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VALUE_TEMPLATE;
1213
import static org.apache.flink.connector.nebula.utils.NebulaConstant.UPDATE_VERTEX_TEMPLATE;
@@ -23,6 +24,8 @@ public class NebulaVertices implements Serializable {
2324
private List<NebulaVertex> vertices;
2425
private PolicyEnum policy = null;
2526

27+
private boolean isDeleteExecutedWithEdges;
28+
2629
public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex> vertices,
2730
PolicyEnum policy) {
2831
this.tagName = tagName;
@@ -31,6 +34,15 @@ public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex>
3134
this.policy = policy;
3235
}
3336

37+
public NebulaVertices(String tagName, List<String> propNames, List<NebulaVertex> vertices,
38+
PolicyEnum policy, boolean isDeleteExecutedWithEdges) {
39+
this.tagName = tagName;
40+
this.propNames = propNames;
41+
this.vertices = vertices;
42+
this.policy = policy;
43+
this.isDeleteExecutedWithEdges = isDeleteExecutedWithEdges;
44+
}
45+
3446
public String getPropNames() {
3547
List<String> escapePropNames = new ArrayList<>();
3648
for (String propName : propNames) {
@@ -110,7 +122,10 @@ public String getDeleteStatement() {
110122
String vertexId = getVertexId(vertex);
111123
vertexIds.add(vertexId);
112124
}
113-
return String.format(DELETE_VERTEX_TEMPLATE, String.join(",", vertexIds));
125+
String template = isDeleteExecutedWithEdges
126+
? DELETE_VERTEX_TEMPLATE_WITH_EDGE
127+
: DELETE_VERTEX_TEMPLATE;
128+
return String.format(template, String.join(",", vertexIds));
114129
}
115130

116131
/**

connector/src/test/java/org/apache/flink/connector/nebula/utils/NebulaVerticesTest.java

+22
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,26 @@ public void testGetDeleteStatementWithPolicy() {
112112
assert (vertexStatement.equals(expectStatement));
113113
}
114114

115+
public void testGetDeleteStatementWithEdges() {
116+
vertices.add(new NebulaVertex("\"vid1\"", props1));
117+
vertices.add(new NebulaVertex("\"vid2\"", props2));
118+
119+
NebulaVertices nebulaVertices = new NebulaVertices(
120+
tagName, propNames, vertices, null, true
121+
);
122+
String vertexStatement = nebulaVertices.getDeleteStatement();
123+
String expectStatement = "DELETE VERTEX \"vid1\",\"vid2\" WITH EDGE";
124+
assert (vertexStatement.equals(expectStatement));
125+
}
126+
127+
public void testGetDeleteStatementWithPolicyAndEdges() {
128+
vertices.add(new NebulaVertex("vid1", props1));
129+
vertices.add(new NebulaVertex("vid2", props2));
130+
131+
NebulaVertices nebulaVertices = new NebulaVertices(tagName, propNames, vertices,
132+
PolicyEnum.HASH, true);
133+
String vertexStatement = nebulaVertices.getDeleteStatement();
134+
String expectStatement = "DELETE VERTEX HASH(\"vid1\"),HASH(\"vid2\") WITH EDGE";
135+
assert (vertexStatement.equals(expectStatement));
136+
}
115137
}

example/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
<parent>
66
<artifactId>nebula-flink</artifactId>
77
<groupId>com.vesoft</groupId>
8-
<version>3.0-SNAPSHOT</version>
8+
<version>3.8.0</version>
99
<relativePath>../pom.xml</relativePath>
1010
</parent>
1111
<modelVersion>4.0.0</modelVersion>

pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
<groupId>com.vesoft</groupId>
88
<artifactId>nebula-flink</artifactId>
99
<packaging>pom</packaging>
10-
<version>3.0-SNAPSHOT</version>
10+
<version>3.8.0</version>
1111

1212
<modules>
1313
<module>connector</module>

0 commit comments

Comments
 (0)