37
37
import org .apache .hugegraph .backend .cache .CacheNotifier .SchemaCacheNotifier ;
38
38
import org .apache .hugegraph .backend .cache .CachedGraphTransaction ;
39
39
import org .apache .hugegraph .backend .cache .CachedSchemaTransaction ;
40
+ import org .apache .hugegraph .backend .cache .CachedSchemaTransactionV2 ;
40
41
import org .apache .hugegraph .backend .id .Id ;
41
42
import org .apache .hugegraph .backend .id .IdGenerator ;
42
43
import org .apache .hugegraph .backend .id .SnowflakeIdGenerator ;
52
53
import org .apache .hugegraph .backend .store .raft .RaftGroupManager ;
53
54
import org .apache .hugegraph .backend .store .ram .RamTable ;
54
55
import org .apache .hugegraph .backend .tx .GraphTransaction ;
55
- import org .apache .hugegraph .backend .tx .SchemaTransaction ;
56
+ import org .apache .hugegraph .backend .tx .ISchemaTransaction ;
56
57
import org .apache .hugegraph .config .CoreOptions ;
57
58
import org .apache .hugegraph .config .HugeConfig ;
58
59
import org .apache .hugegraph .config .TypedOption ;
69
70
import org .apache .hugegraph .masterelection .RoleElectionStateMachine ;
70
71
import org .apache .hugegraph .masterelection .StandardClusterRoleStore ;
71
72
import org .apache .hugegraph .masterelection .StandardRoleElectionStateMachine ;
73
+ import org .apache .hugegraph .meta .MetaManager ;
72
74
import org .apache .hugegraph .perf .PerfUtil .Watched ;
73
75
import org .apache .hugegraph .rpc .RpcServiceConfig4Client ;
74
76
import org .apache .hugegraph .rpc .RpcServiceConfig4Server ;
@@ -176,6 +178,8 @@ public class StandardHugeGraph implements HugeGraph {
176
178
177
179
private final RamTable ramtable ;
178
180
181
+ private final String schedulerType ;
182
+
179
183
public StandardHugeGraph (HugeConfig config ) {
180
184
this .params = new StandardHugeGraphParams ();
181
185
this .configuration = config ;
@@ -209,6 +213,7 @@ public StandardHugeGraph(HugeConfig config) {
209
213
this .closed = false ;
210
214
this .mode = GraphMode .NONE ;
211
215
this .readMode = GraphReadMode .OLTP_ONLY ;
216
+ this .schedulerType = config .get (CoreOptions .SCHEDULER_TYPE );
212
217
213
218
LockUtil .init (this .name );
214
219
@@ -221,6 +226,13 @@ public StandardHugeGraph(HugeConfig config) {
221
226
throw new HugeException (message , e );
222
227
}
223
228
229
+ if (isHstore ()) {
230
+ // TODO: parameterize the remaining configurations
231
+ MetaManager .instance ().connect ("hg" , MetaManager .MetaDriverType .PD ,
232
+ "ca" , "ca" , "ca" ,
233
+ config .get (CoreOptions .PD_PEERS ));
234
+ }
235
+
224
236
try {
225
237
this .tx = new TinkerPopTransaction (this );
226
238
boolean supportsPersistence = this .backendStoreFeatures ().supportsPersistence ();
@@ -457,9 +469,18 @@ private void clearVertexCache() {
457
469
}
458
470
}
459
471
460
- private SchemaTransaction openSchemaTransaction () throws HugeException {
472
+ private boolean isHstore () {
473
+ return this .storeProvider .isHstore ();
474
+ }
475
+
476
+ private ISchemaTransaction openSchemaTransaction () throws HugeException {
461
477
this .checkGraphNotClosed ();
462
478
try {
479
+ if (isHstore ()) {
480
+ return new CachedSchemaTransactionV2 (
481
+ MetaManager .instance ().metaDriver (),
482
+ MetaManager .instance ().cluster (), this .params );
483
+ }
463
484
return new CachedSchemaTransaction (this .params , loadSchemaStore ());
464
485
} catch (BackendException e ) {
465
486
String message = "Failed to open schema transaction" ;
@@ -504,11 +525,14 @@ private BackendStore loadGraphStore() {
504
525
}
505
526
506
527
private BackendStore loadSystemStore () {
528
+ if (isHstore ()) {
529
+ return this .storeProvider .loadGraphStore (this .configuration );
530
+ }
507
531
return this .storeProvider .loadSystemStore (this .configuration );
508
532
}
509
533
510
534
@ Watched
511
- private SchemaTransaction schemaTransaction () {
535
+ private ISchemaTransaction schemaTransaction () {
512
536
this .checkGraphNotClosed ();
513
537
/*
514
538
* NOTE: each schema operation will be auto committed,
@@ -1196,7 +1220,7 @@ public GraphReadMode readMode() {
1196
1220
}
1197
1221
1198
1222
@ Override
1199
- public SchemaTransaction schemaTransaction () {
1223
+ public ISchemaTransaction schemaTransaction () {
1200
1224
return StandardHugeGraph .this .schemaTransaction ();
1201
1225
}
1202
1226
@@ -1316,6 +1340,11 @@ public RamTable ramtable() {
1316
1340
public <T > void submitEphemeralJob (EphemeralJob <T > job ) {
1317
1341
this .ephemeralJobQueue .add (job );
1318
1342
}
1343
+
1344
+ @ Override
1345
+ public String schedulerType () {
1346
+ return StandardHugeGraph .this .schedulerType ;
1347
+ }
1319
1348
}
1320
1349
1321
1350
private class TinkerPopTransaction extends AbstractThreadLocalTransaction {
@@ -1447,7 +1476,7 @@ private void setClosed() {
1447
1476
}
1448
1477
}
1449
1478
1450
- private SchemaTransaction schemaTransaction () {
1479
+ private ISchemaTransaction schemaTransaction () {
1451
1480
return this .getOrNewTransaction ().schemaTx ;
1452
1481
}
1453
1482
@@ -1468,7 +1497,7 @@ private Txs getOrNewTransaction() {
1468
1497
1469
1498
Txs txs = this .transactions .get ();
1470
1499
if (txs == null ) {
1471
- SchemaTransaction schemaTransaction = null ;
1500
+ ISchemaTransaction schemaTransaction = null ;
1472
1501
SysTransaction sysTransaction = null ;
1473
1502
GraphTransaction graphTransaction = null ;
1474
1503
try {
@@ -1511,12 +1540,12 @@ private void destroyTransaction() {
1511
1540
1512
1541
private static final class Txs {
1513
1542
1514
- private final SchemaTransaction schemaTx ;
1543
+ private final ISchemaTransaction schemaTx ;
1515
1544
private final SysTransaction systemTx ;
1516
1545
private final GraphTransaction graphTx ;
1517
1546
private long openedTime ;
1518
1547
1519
- public Txs (SchemaTransaction schemaTx , SysTransaction systemTx ,
1548
+ public Txs (ISchemaTransaction schemaTx , SysTransaction systemTx ,
1520
1549
GraphTransaction graphTx ) {
1521
1550
assert schemaTx != null && systemTx != null && graphTx != null ;
1522
1551
this .schemaTx = schemaTx ;
0 commit comments