Skip to content

Commit 30b1961

Browse files
committed
test
1 parent 68c2918 commit 30b1961

File tree

2 files changed

+79
-25
lines changed

2 files changed

+79
-25
lines changed

server/src/main/java/org/opensearch/index/engine/Engine.java

+31-4
Original file line numberDiff line numberDiff line change
@@ -1518,15 +1518,17 @@ public String getLowercase() {
15181518
private final VersionType versionType;
15191519
private final Origin origin;
15201520
private final long startTime;
1521+
private final InternalEngine.IndexingStrategy indexingStrategy;
15211522

1522-
public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) {
1523+
public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime, InternalEngine.IndexingStrategy indexingStrategy) {
15231524
this.uid = uid;
15241525
this.seqNo = seqNo;
15251526
this.primaryTerm = primaryTerm;
15261527
this.version = version;
15271528
this.versionType = versionType;
15281529
this.origin = origin;
15291530
this.startTime = startTime;
1531+
this.indexingStrategy = indexingStrategy;
15301532
}
15311533

15321534
/**
@@ -1587,6 +1589,10 @@ public long startTime() {
15871589
abstract String id();
15881590

15891591
public abstract TYPE operationType();
1592+
1593+
public InternalEngine.IndexingStrategy indexingStrategy() {
1594+
return indexingStrategy;
1595+
};
15901596
}
15911597

15921598
/**
@@ -1617,7 +1623,25 @@ public Index(
16171623
long ifSeqNo,
16181624
long ifPrimaryTerm
16191625
) {
1620-
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
1626+
this(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry, ifSeqNo, ifPrimaryTerm, null);
1627+
}
1628+
1629+
public Index(
1630+
Term uid,
1631+
ParsedDocument doc,
1632+
long seqNo,
1633+
long primaryTerm,
1634+
long version,
1635+
VersionType versionType,
1636+
Origin origin,
1637+
long startTime,
1638+
long autoGeneratedIdTimestamp,
1639+
boolean isRetry,
1640+
long ifSeqNo,
1641+
long ifPrimaryTerm,
1642+
InternalEngine.IndexingStrategy indexingStrategy
1643+
) {
1644+
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime, indexingStrategy);
16211645
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
16221646
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
16231647
assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
@@ -1630,6 +1654,7 @@ public Index(
16301654
this.ifPrimaryTerm = ifPrimaryTerm;
16311655
}
16321656

1657+
16331658
public Index(Term uid, long primaryTerm, ParsedDocument doc) {
16341659
this(uid, primaryTerm, doc, Versions.MATCH_ANY);
16351660
} // TEST ONLY
@@ -1706,6 +1731,8 @@ public long getIfSeqNo() {
17061731
public long getIfPrimaryTerm() {
17071732
return ifPrimaryTerm;
17081733
}
1734+
1735+
17091736
}
17101737

17111738
/**
@@ -1732,7 +1759,7 @@ public Delete(
17321759
long ifSeqNo,
17331760
long ifPrimaryTerm
17341761
) {
1735-
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
1762+
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime, null);
17361763
assert (origin == Origin.PRIMARY) == (versionType != null) : "invalid version_type=" + versionType + " for origin=" + origin;
17371764
assert ifPrimaryTerm >= 0 : "ifPrimaryTerm [" + ifPrimaryTerm + "] must be non negative";
17381765
assert ifSeqNo == UNASSIGNED_SEQ_NO || ifSeqNo >= 0 : "ifSeqNo [" + ifSeqNo + "] must be non negative or unset";
@@ -1812,7 +1839,7 @@ public String reason() {
18121839
}
18131840

18141841
public NoOp(final long seqNo, final long primaryTerm, final Origin origin, final long startTime, final String reason) {
1815-
super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime);
1842+
super(null, seqNo, primaryTerm, Versions.NOT_FOUND, null, origin, startTime, null);
18161843
this.reason = reason;
18171844
}
18181845

server/src/main/java/org/opensearch/index/engine/InternalEngine.java

+48-21
Original file line numberDiff line numberDiff line change
@@ -684,16 +684,16 @@ enum OpVsLuceneDocStatus {
684684
/** the op is more recent than the one that last modified the doc found in lucene*/
685685
OP_NEWER,
686686
/** the op is older or the same as the one that last modified the doc found in lucene*/
687-
OP_STALE_OR_EQUAL,
687+
OP_STALE_OR_EQUAL,// 一样,或者更旧的
688688
/** no doc was found in lucene */
689689
LUCENE_DOC_NOT_FOUND
690690
}
691691

692692
private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long seqNo, long primaryTerm, VersionValue versionValue) {
693693
Objects.requireNonNull(versionValue);
694694
if (seqNo > versionValue.seqNo) {
695-
return OpVsLuceneDocStatus.OP_NEWER;
696-
} else if (seqNo == versionValue.seqNo) {
695+
return OpVsLuceneDocStatus.OP_NEWER;// 新的
696+
} else if (seqNo == versionValue.seqNo) {// 一样
697697
assert versionValue.term == primaryTerm : "primary term not matched; id="
698698
+ id
699699
+ " seq_no="
@@ -711,12 +711,13 @@ private static OpVsLuceneDocStatus compareOpToVersionMapOnSeqNo(String id, long
711711
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
712712
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
713713
final OpVsLuceneDocStatus status;
714-
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
714+
715+
VersionValue versionValue = getVersionFromMap(op.uid().bytes());// 从maps中找下,看可以找到吗
715716
assert incrementVersionLookup();
716717
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
717-
if (versionValue != null) {
718+
if (versionValue != null) {// maps中找到了
718719
status = compareOpToVersionMapOnSeqNo(op.id(), op.seqNo(), op.primaryTerm(), versionValue);
719-
} else {
720+
} else {// 没找到
720721
// load from index
721722
assert incrementIndexVersionLookup();
722723
try (Searcher searcher = acquireSearcher("load_seq_no", SearcherScope.INTERNAL)) {
@@ -906,10 +907,11 @@ public IndexResult index(Index index) throws IOException {
906907
index.getAutoGeneratedIdTimestamp(),
907908
index.isRetry(),
908909
index.getIfSeqNo(),
909-
index.getIfPrimaryTerm()
910+
index.getIfPrimaryTerm(),
911+
plan
910912
);
911913

912-
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
914+
final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;// 不带主键的写入
913915
if (toAppend == false) {
914916
advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
915917
}
@@ -922,6 +924,9 @@ public IndexResult index(Index index) throws IOException {
922924
if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
923925
indexResult = indexIntoLucene(index, plan);
924926
} else {
927+
if (plan.versionForIndexing != index.indexingStrategy().versionForIndexing || plan.currentNotFoundOrDeleted != index.indexingStrategy().currentNotFoundOrDeleted) {
928+
throw new RuntimeException("plan.versionForIndexing != index.indexingStrategy().versionForIndexing || plan.currentNotFoundOrDeleted != index.indexingStrategy().currentNotFoundOrDeleted");
929+
}
925930
indexResult = new IndexResult(
926931
plan.versionForIndexing,
927932
index.primaryTerm(),
@@ -956,6 +961,9 @@ public IndexResult index(Index index) throws IOException {
956961
index.uid().bytes(),
957962
new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm())
958963
);
964+
if (plan.versionForIndexing != index.indexingStrategy().versionForIndexing) {
965+
System.out.println("wrong");
966+
}
959967
}
960968
localCheckpointTracker.markSeqNoAsProcessed(indexResult.getSeqNo());
961969
if (indexResult.getTranslogLocation() == null) {
@@ -987,7 +995,7 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
987995
assert assertNonPrimaryOrigin(index);
988996
// needs to maintain the auto_id timestamp in case this replica becomes primary
989997
if (canOptimizeAddDocument(index)) {// 如果不带主键的写入,肯定可以优化
990-
mayHaveBeenIndexedBefore(index);
998+
mayHaveBeenIndexedBefore(index);// 可能更新maxUnsafeAutoIdTimestamp和必须更新maxSeenAutoIdTimestamp
991999
}
9921000
final IndexingStrategy plan;
9931001
// unlike the primary, replicas don't really care to about creation status of documents
@@ -1010,19 +1018,24 @@ protected final IndexingStrategy planIndexingAsNonPrimary(Index index) throws IO
10101018
} else {
10111019
boolean segRepEnabled = engineConfig.getIndexSettings().isSegRepEnabledOrRemoteNode();
10121020
versionMap.enforceSafeAccess();
1013-
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
1014-
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {
1015-
if (segRepEnabled) {
1016-
// For segrep based indices, we can't completely rely on localCheckpointTracker
1017-
// as the preserved checkpoint may not have all the operations present in lucene
1018-
// we don't need to index it again as stale op as it would create multiple documents for same seq no
1019-
plan = IndexingStrategy.processButSkipLucene(false, index.version());
1020-
} else {
1021-
plan = IndexingStrategy.processAsStaleOp(index.version());
1021+
1022+
// if (index.IndexingStrategy() != null && segRepEnabled == false) {
1023+
// return index.IndexingStrategy();
1024+
// } else {
1025+
final OpVsLuceneDocStatus opVsLucene = compareOpToLuceneDocBasedOnSeqNo(index);
1026+
if (opVsLucene == OpVsLuceneDocStatus.OP_STALE_OR_EQUAL) {// seqNo小于等于当前已经写入的
1027+
if (segRepEnabled) {
1028+
// For segrep based indices, we can't completely rely on localCheckpointTracker
1029+
// as the preserved checkpoint may not have all the operations present in lucene
1030+
// we don't need to index it again as stale op as it would create multiple documents for same seq no
1031+
plan = IndexingStrategy.processButSkipLucene(false, index.version());
1032+
} else {
1033+
plan = IndexingStrategy.processAsStaleOp(index.version());
1034+
}
1035+
} else {// 更新的,或者没找到
1036+
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0);
10221037
}
1023-
} else {
1024-
plan = IndexingStrategy.processNormally(opVsLucene == OpVsLuceneDocStatus.LUCENE_DOC_NOT_FOUND, index.version(), 0);
1025-
}
1038+
// }
10261039
}
10271040
return plan;
10281041
}
@@ -1111,6 +1124,11 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I
11111124
assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();
11121125
assert plan.versionForIndexing >= 0 : "version must be set. got " + plan.versionForIndexing;
11131126
assert plan.indexIntoLucene || plan.addStaleOpToLucene;
1127+
InternalEngine.IndexingStrategy indexingStrategy = index.indexingStrategy();
1128+
if (indexingStrategy.versionForIndexing != plan.versionForIndexing || (indexingStrategy.indexIntoLucene || indexingStrategy.addStaleOpToLucene) == false) {
1129+
throw new RuntimeException("dddddd 0");
1130+
}
1131+
11141132
/* Update the document's sequence number and primary term; the sequence number here is derived here from either the sequence
11151133
* number service if this is on the primary, or the existing document's sequence number if this is on the replica. The
11161134
* primary term here has already been set, see IndexShard#prepareIndex where the Engine$Index operation is created.
@@ -1119,15 +1137,24 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan) throws I
11191137
index.parsedDoc().version().setLongValue(plan.versionForIndexing);
11201138
try {
11211139
if (plan.addStaleOpToLucene) {
1140+
if (indexingStrategy.addStaleOpToLucene == false) {
1141+
throw new RuntimeException("indexingStrategy.addStaleOpToLucene == false");
1142+
}
11221143
addStaleDocs(index.docs(), indexWriter);
11231144
} else if (plan.useLuceneUpdateDocument) {
1145+
if (indexingStrategy.useLuceneUpdateDocument == false) {
1146+
throw new RuntimeException("indexingStrategy.useLuceneUpdateDocument == false");
1147+
}
11241148
assert assertMaxSeqNoOfUpdatesIsAdvanced(index.uid(), index.seqNo(), true, true);
11251149
updateDocs(index.uid(), index.docs(), indexWriter);
11261150
} else {
11271151
// document does not exists, we can optimize for create, but double check if assertions are running
11281152
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
11291153
addDocs(index.docs(), indexWriter);
11301154
}
1155+
if (plan.versionForIndexing != indexingStrategy.versionForIndexing || plan.currentNotFoundOrDeleted != indexingStrategy.currentNotFoundOrDeleted) {
1156+
throw new RuntimeException("plan.versionForIndexing != indexingStrategy.versionForIndexing || plan.currentNotFoundOrDeleted != indexingStrategy.currentNotFoundOrDeleted");
1157+
}
11311158
return new IndexResult(plan.versionForIndexing, index.primaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
11321159
} catch (Exception ex) {
11331160
if (ex instanceof AlreadyClosedException == false

0 commit comments

Comments
 (0)