Skip to content

Commit 7b816a6

Browse files
committed
initial spillover shard impl
1 parent 7f92eee commit 7b816a6

17 files changed

+250
-98
lines changed

scripts/indexTaxis.py

+7-4
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import pprint
12
import random
23
import gzip
34
import socket
@@ -259,8 +260,7 @@ def main():
259260
if len(replicaPorts) > 0:
260261
refreshSec = 1.0
261262
else:
262-
# Turn off refreshes to maximize indexing throughput:
263-
refreshSec = 100000.0
263+
refreshSec = 5.0
264264
send(LOCALHOST, primaryPorts[0], "liveSettings", {'indexName': 'index', 'index.ramBufferSizeMB': 256., 'maxRefreshSec': refreshSec})
265265

266266
fields = {'indexName': 'index',
@@ -361,7 +361,7 @@ def main():
361361
dps = float(dps)
362362

363363
if USE_JSON:
364-
c = ChunkedHTTPSend(LOCALHOST, primaryPorts[0], 'bulkAddDocument2', {'indexName': 'index'})
364+
c = ChunkedHTTPSend(LOCALHOST, primaryPorts[0], 'bulkAddDocument2', {'indexName': 'index'})
365365
elif BINARY_CSV:
366366
b1 = BinarySend(LOCALHOST, primaryPorts[1], 'bulkCSVAddDocument')
367367
b1.add(b',index\n')
@@ -421,7 +421,10 @@ def main():
421421
print('%6.1f sec: %.2fM hits on replica, %.2f M docs... %.1f docs/sec, %.1f MB/sec' % (delay, x['totalHits']/1000000., id/1000000., dps, (totBytes/1024./1024.)/delay))
422422
else:
423423
print('%6.1f sec: %.2f M docs... %.1f docs/sec, %.1f MB/sec' % (delay, id/1000000., dps, (totBytes/1024./1024.)/delay))
424-
424+
result = json.loads(send(LOCALHOST, primaryPorts[0], 'stats', {'indexName': 'index'}))
425+
print(json.dumps(result, sort_keys=True, indent=4))
426+
x = json.loads(send(LOCALHOST, primaryPorts[0], 'search2', {'indexNames': ['index'], 'queryText': '*:*'}));
427+
print('%d hits...' % x['totalHits'])
425428
while nextPrint <= id:
426429
nextPrint += 250000
427430

src/java/org/apache/lucene/server/GlobalState.java

+6-4
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353

5454
import org.apache.lucene.facet.taxonomy.SearcherTaxonomyManager.SearcherAndTaxonomy;
5555
import org.apache.lucene.index.Term;
56+
import org.apache.lucene.search.MatchAllDocsQuery;
5657
import org.apache.lucene.search.ScoreDoc;
5758
import org.apache.lucene.search.TermQuery;
5859
import org.apache.lucene.search.TimeLimitingCollector;
@@ -168,12 +169,14 @@ private void _run() throws Exception {
168169
}
169170
//System.out.println("N" + StringHelper.idToString(nodeID) + ": run query Q" + query.id);
170171

171-
IndexState indexState = get(query.indexName);
172+
IndexState indexState = getIndex(query.indexName);
172173
ShardState shardState = indexState.getShard(query.shardOrd);
173174
SearcherAndTaxonomy searcher = shardState.acquire();
175+
shardState.slm.record(searcher.searcher);
176+
System.out.println("searcher for " + shardState);
174177
TopDocs hits;
175178
try {
176-
hits = searcher.searcher.search(new TermQuery(new Term("title", query.text)), 10);
179+
hits = searcher.searcher.search(new MatchAllDocsQuery(), 10);
177180
} finally {
178181
shardState.release(searcher);
179182
}
@@ -282,8 +285,7 @@ public Map<String,Handler> getHandlers() {
282285
}
283286

284287
/** Get the {@link IndexState} by index name. */
285-
// nocommit rename to getIndex
286-
public IndexState get(String name) throws IOException {
288+
public IndexState getIndex(String name) throws IOException {
287289
synchronized(indices) {
288290
IndexState state = indices.get(name);
289291
if (state == null) {

src/java/org/apache/lucene/server/IndexState.java

+45-3
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
import org.apache.lucene.index.IndexWriterConfig;
7575
import org.apache.lucene.index.IndexableField;
7676
import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
77+
import org.apache.lucene.index.MergePolicy;
7778
import org.apache.lucene.index.PersistentSnapshotDeletionPolicy;
7879
import org.apache.lucene.index.SegmentInfos;
7980
import org.apache.lucene.index.SimpleMergedSegmentWarmer;
@@ -183,7 +184,9 @@ public class IndexState implements Closeable {
183184
/** {@link Bindings} to pass when evaluating expressions. */
184185
public final Bindings exprBindings = new FieldDefBindings(fields);
185186

186-
private final Map<Integer,ShardState> shards = new ConcurrentHashMap<>();
187+
public final Map<Integer,ShardState> shards = new ConcurrentHashMap<>();
188+
189+
private ShardState writableShard;
187190

188191
/** Built suggest implementations */
189192
public final Map<String,Lookup> suggesters = new ConcurrentHashMap<String,Lookup>();
@@ -362,6 +365,7 @@ public IndexState(GlobalState globalState, String name, Path rootDir, boolean do
362365
this.globalState = globalState;
363366
this.name = name;
364367
this.rootDir = rootDir;
368+
// nocommit require rootDir != null! no RAMDirectory!
365369
if (rootDir != null) {
366370
if (Files.exists(rootDir) == false) {
367371
Files.createDirectories(rootDir);
@@ -435,7 +439,7 @@ public boolean hasCommit() throws IOException {
435439
return saveLoadState.getNextWriteGen() != 0;
436440
}
437441

438-
IndexWriterConfig getIndexWriterConfig(OpenMode openMode, Directory origIndexDir) throws IOException {
442+
IndexWriterConfig getIndexWriterConfig(OpenMode openMode, Directory origIndexDir, int shardOrd) throws IOException {
439443
IndexWriterConfig iwc = new IndexWriterConfig(indexAnalyzer);
440444
iwc.setOpenMode(openMode);
441445
if (getBooleanSetting("index.verbose")) {
@@ -452,7 +456,15 @@ IndexWriterConfig getIndexWriterConfig(OpenMode openMode, Directory origIndexDir
452456
// nocommit in primary case we can't do this?
453457
iwc.setMergedSegmentWarmer(new SimpleMergedSegmentWarmer(iwc.getInfoStream()));
454458

455-
ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) iwc.getMergeScheduler();
459+
ConcurrentMergeScheduler cms = new ConcurrentMergeScheduler() {
460+
@Override
461+
public synchronized MergeThread getMergeThread(IndexWriter writer, MergePolicy.OneMerge merge) throws IOException {
462+
MergeThread thread = super.getMergeThread(writer, merge);
463+
thread.setName(thread.getName() + " [" + name + ":" + shardOrd + "]");
464+
return thread;
465+
}
466+
};
467+
iwc.setMergeScheduler(cms);
456468

457469
if (getBooleanSetting("index.merge.scheduler.auto_throttle")) {
458470
cms.enableAutoIOThrottle();
@@ -710,8 +722,12 @@ public void setIndexSort(Sort sort, Object saveState) {
710722

711723
public void start() throws Exception {
712724
// start all local shards
725+
713726
for(ShardState shard : shards.values()) {
714727
shard.start();
728+
if (writableShard == null || shard.shardOrd > writableShard.shardOrd) {
729+
writableShard = shard;
730+
}
715731
}
716732

717733
if (suggesterSettings != null) {
@@ -1009,10 +1025,36 @@ public ShardState addShard(int shardOrd, boolean doCreate) {
10091025
throw new IllegalArgumentException("shardOrd=" + shardOrd + " already exists in index + \"" + name + "\"");
10101026
}
10111027
ShardState shard = new ShardState(this, shardOrd, doCreate);
1028+
// nocommit fail if there is already a shard here?
10121029
shards.put(shardOrd, shard);
10131030
return shard;
10141031
}
10151032

1033+
/** Returns the shard that should be used for writing new documents, possibly creating and starting a new shard if the current one is full. */
1034+
public ShardState getWritableShard() throws Exception {
1035+
verifyStarted(null);
1036+
1037+
synchronized (this) {
1038+
1039+
// nocommit make this tunable:
1040+
if (writableShard.writer.maxDoc() > 50000000) {
1041+
System.out.println("NEW SHARD ORD " + (writableShard.shardOrd+1));
1042+
ShardState nextShard = addShard(writableShard.shardOrd+1, true);
1043+
1044+
// nocommit hmm we can't do this now ... we need to get all in-flight ops to finish first
1045+
writableShard.finishWriting();
1046+
// nocommit we should close IW after forceMerge finishes
1047+
1048+
nextShard.start();
1049+
writableShard = nextShard;
1050+
}
1051+
1052+
writableShard.startIndexingChunk();
1053+
}
1054+
1055+
return writableShard;
1056+
}
1057+
10161058
public ShardState getShard(int shardOrd) {
10171059
ShardState shardState = shards.get(shardOrd);
10181060
if (shardState == null) {

src/java/org/apache/lucene/server/Server.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -443,7 +443,7 @@ private void _handle(HttpExchange x) throws Exception {
443443
IndexState state;
444444
if (handler.requiresIndexName) {
445445
String indexName = request.getString("indexName");
446-
state = globalState.get(indexName);
446+
state = globalState.getIndex(indexName);
447447
} else {
448448
state = null;
449449
}
@@ -903,6 +903,7 @@ private class BinaryServer extends Thread {
903903
private volatile boolean stop;
904904

905905
public BinaryServer(String host, int port) throws IOException {
906+
setName("BinaryServer " + host + ":" + port);
906907
serverSocket = new ServerSocket();
907908
serverSocket.bind(new InetSocketAddress(host, port));
908909
actualPort = serverSocket.getLocalPort();

src/java/org/apache/lucene/server/ShardState.java

+92-3
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public class ShardState implements Closeable {
9696
public final Path rootDir;
9797

9898
/** Which shard we are in this index */
99-
private final int shardOrd;
99+
public final int shardOrd;
100100

101101
/** Base directory */
102102
public Directory origIndexDir;
@@ -434,7 +434,7 @@ protected IndexWriter openIndexWriter(Directory dir, IndexWriterConfig iwc) thro
434434
}
435435
};
436436

437-
writer = new IndexWriter(indexDir, indexState.getIndexWriterConfig(openMode, origIndexDir));
437+
writer = new IndexWriter(indexDir, indexState.getIndexWriterConfig(openMode, origIndexDir, shardOrd));
438438
snapshots = (PersistentSnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
439439

440440
// NOTE: must do this after writer, because SDP only
@@ -586,7 +586,7 @@ public synchronized void startPrimary(long primaryGen) throws Exception {
586586

587587
boolean verbose = indexState.getBooleanSetting("index.verbose");
588588

589-
writer = new IndexWriter(indexDir, indexState.getIndexWriterConfig(openMode, origIndexDir));
589+
writer = new IndexWriter(indexDir, indexState.getIndexWriterConfig(openMode, origIndexDir, shardOrd));
590590
snapshots = (PersistentSnapshotDeletionPolicy) writer.getConfig().getIndexDeletionPolicy();
591591

592592
// NOTE: must do this after writer, because SDP only
@@ -995,4 +995,93 @@ public OrdRange getOrdRange(String dim) {
995995
return ssdvState;
996996
}
997997
}
998+
999+
@Override
1000+
public String toString() {
1001+
return indexState.name + ":" + shardOrd;
1002+
}
1003+
1004+
private final AtomicInteger runningChunks = new AtomicInteger();
1005+
private volatile int finishWriting;
1006+
1007+
public void startIndexingChunk() {
1008+
if (finishWriting != 0) {
1009+
throw new IllegalStateException("cannot start indexing when finish is called");
1010+
}
1011+
runningChunks.incrementAndGet();
1012+
}
1013+
1014+
public void finishIndexingChunk() throws InterruptedException {
1015+
if (runningChunks.decrementAndGet() == 0) {
1016+
if (finishWriting == 1) {
1017+
finishIndexing();
1018+
}
1019+
}
1020+
}
1021+
1022+
public String getState() {
1023+
if (finishWriting == 2) {
1024+
return "readonly";
1025+
} else if (finishWriting == 1) {
1026+
return "finishing";
1027+
} else {
1028+
return "active";
1029+
}
1030+
}
1031+
1032+
public void finishIndexing() throws InterruptedException {
1033+
System.out.println(this + ": now finish indexing");
1034+
indexState.globalState.submitIndexingTask(new Runnable() {
1035+
@Override
1036+
public void run() {
1037+
// nocommit make this tunable
1038+
try {
1039+
System.out.println(ShardState.this + ": now force merge");
1040+
writer.forceMerge(1);
1041+
System.out.println(ShardState.this + ": done force merge; now refresh");
1042+
reopenThread.close();
1043+
maybeRefreshBlocking();
1044+
System.out.println(ShardState.this + ": done force merge; now close writers");
1045+
finishWriting = 2;
1046+
1047+
// nocommit register this, somehow, in a long running task
1048+
// nocommit close searcher pruning thread too
1049+
1050+
writer.close();
1051+
taxoWriter.close();
1052+
System.out.println(ShardState.this + ": done force merge; done close writers");
1053+
// nocommit what about refreshing readers after this? can i "reopen to last reader" somehow?
1054+
indexState.globalState.indexingJobsRunning.release();
1055+
} catch (Throwable t) {
1056+
System.out.println("ShardState.finishIndexing " + this + " FAILED:");
1057+
t.printStackTrace(System.out);
1058+
throw new RuntimeException(t);
1059+
}
1060+
}
1061+
});
1062+
}
1063+
1064+
public int maxDoc() throws IOException {
1065+
synchronized (this) {
1066+
if (finishWriting == 0) {
1067+
return writer.maxDoc();
1068+
}
1069+
}
1070+
1071+
SearcherAndTaxonomy searcher = acquire();
1072+
try {
1073+
return searcher.searcher.getIndexReader().maxDoc();
1074+
} finally {
1075+
release(searcher);
1076+
}
1077+
}
1078+
1079+
public void finishWriting() throws InterruptedException {
1080+
synchronized(this) {
1081+
finishWriting = 1;
1082+
}
1083+
if (runningChunks.get() == 0) {
1084+
finishIndexing();
1085+
}
1086+
}
9981087
}

src/java/org/apache/lucene/server/handlers/AddReplicaHandler.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public boolean binaryRequest() {
5050
@Override
5151
public void handleBinary(InputStream streamIn, DataInput in, DataOutput out, OutputStream streamOut) throws Exception {
5252
String indexName = in.readString();
53-
IndexState indexState = globalState.get(indexName);
53+
IndexState indexState = globalState.getIndex(indexName);
5454
ShardState shardState = indexState.getShard(0);
5555
if (shardState.isPrimary() == false) {
5656
throw new IllegalArgumentException("index \"" + indexName + "\" was not started or is not a primary");

0 commit comments

Comments
 (0)