Skip to content

Commit 3125b94

Browse files
[Searchable Snapshot] Fix bug of Searchable Snapshot Dependency on repository chunk_size (opensearch-project#12277)
* implement logic of fetching blocks from multiple chunks of snapshot file. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Refactor and address comments. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * apply spotless check Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Address comments of using a different data structure to fetch blob parts. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * remove unnecessary code. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Refactor outputstream usage. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * refactor blobpart logic into a separate method and add unit tests. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> * Add new unit tests. Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com> --------- Signed-off-by: Rishikesh1159 <rishireddy1159@gmail.com>
1 parent 7921175 commit 3125b94

File tree

6 files changed

+196
-95
lines changed

6 files changed

+196
-95
lines changed

server/src/internalClusterTest/java/org/opensearch/snapshots/SearchableSnapshotIT.java

+31-5
Original file line numberDiff line numberDiff line change
@@ -85,10 +85,10 @@ protected Settings.Builder randomRepositorySettings() {
8585
return settings;
8686
}
8787

88-
private Settings.Builder chunkedRepositorySettings() {
88+
private Settings.Builder chunkedRepositorySettings(long chunkSize) {
8989
final Settings.Builder settings = Settings.builder();
9090
settings.put("location", randomRepoPath()).put("compress", randomBoolean());
91-
settings.put("chunk_size", 2 << 23, ByteSizeUnit.BYTES);
91+
settings.put("chunk_size", chunkSize, ByteSizeUnit.BYTES);
9292
return settings;
9393
}
9494

@@ -194,18 +194,44 @@ public void testSnapshottingSearchableSnapshots() throws Exception {
194194
}
195195

196196
/**
197-
* Tests a chunked repository scenario for searchable snapshots by creating an index,
197+
* Tests a default 8mib chunked repository scenario for searchable snapshots by creating an index,
198198
* taking a snapshot, restoring it as a searchable snapshot index.
199199
*/
200-
public void testCreateSearchableSnapshotWithChunks() throws Exception {
200+
public void testCreateSearchableSnapshotWithDefaultChunks() throws Exception {
201201
final int numReplicasIndex = randomIntBetween(1, 4);
202202
final String indexName = "test-idx";
203203
final String restoredIndexName = indexName + "-copy";
204204
final String repoName = "test-repo";
205205
final String snapshotName = "test-snap";
206206
final Client client = client();
207207

208-
Settings.Builder repositorySettings = chunkedRepositorySettings();
208+
Settings.Builder repositorySettings = chunkedRepositorySettings(2 << 23);
209+
210+
internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
211+
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);
212+
createRepositoryWithSettings(repositorySettings, repoName);
213+
takeSnapshot(client, snapshotName, repoName, indexName);
214+
215+
deleteIndicesAndEnsureGreen(client, indexName);
216+
restoreSnapshotAndEnsureGreen(client, snapshotName, repoName);
217+
assertRemoteSnapshotIndexSettings(client, restoredIndexName);
218+
219+
assertDocCount(restoredIndexName, 1000L);
220+
}
221+
222+
/**
223+
* Tests a small 1000 bytes chunked repository scenario for searchable snapshots by creating an index,
224+
* taking a snapshot, restoring it as a searchable snapshot index.
225+
*/
226+
public void testCreateSearchableSnapshotWithSmallChunks() throws Exception {
227+
final int numReplicasIndex = randomIntBetween(1, 4);
228+
final String indexName = "test-idx";
229+
final String restoredIndexName = indexName + "-copy";
230+
final String repoName = "test-repo";
231+
final String snapshotName = "test-snap";
232+
final Client client = client();
233+
234+
Settings.Builder repositorySettings = chunkedRepositorySettings(1000);
209235

210236
internalCluster().ensureAtLeastNumSearchAndDataNodes(numReplicasIndex + 1);
211237
createIndexWithDocsAndEnsureGreen(numReplicasIndex, 1000, indexName);

server/src/main/java/org/opensearch/index/store/remote/file/OnDemandBlockSnapshotIndexInput.java

+34-12
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import org.opensearch.index.store.remote.utils.TransferManager;
1616

1717
import java.io.IOException;
18+
import java.util.ArrayList;
19+
import java.util.List;
1820

1921
/**
2022
* This is an implementation of {@link OnDemandBlockIndexInput} where this class provides the main IndexInput using shard snapshot files.
@@ -136,25 +138,45 @@ protected IndexInput fetchBlock(int blockId) throws IOException {
136138
final long blockStart = getBlockStart(blockId);
137139
final long blockEnd = blockStart + getActualBlockSize(blockId);
138140

139-
// If the snapshot file is chunked, we must account for this by
140-
// choosing the appropriate file part and updating the position
141-
// accordingly.
142-
final int part = (int) (blockStart / partSize);
143-
final long partStart = part * partSize;
144-
145-
final long position = blockStart - partStart;
146-
final long length = blockEnd - blockStart;
147-
141+
// Block may be present on multiple chunks of a file, so we need
142+
// to fetch each chunk/blob part separately to fetch an entire block.
148143
BlobFetchRequest blobFetchRequest = BlobFetchRequest.builder()
149-
.position(position)
150-
.length(length)
151-
.blobName(fileInfo.partName(part))
144+
.blobParts(getBlobParts(blockStart, blockEnd))
152145
.directory(directory)
153146
.fileName(blockFileName)
154147
.build();
155148
return transferManager.fetchBlob(blobFetchRequest);
156149
}
157150

151+
/**
152+
* Returns list of blob parts/chunks in a file for a given block.
153+
*/
154+
protected List<BlobFetchRequest.BlobPart> getBlobParts(long blockStart, long blockEnd) {
155+
// If the snapshot file is chunked, we must account for this by
156+
// choosing the appropriate file part and updating the position
157+
// accordingly.
158+
int partNum = (int) (blockStart / partSize);
159+
long pos = blockStart;
160+
long diff = (blockEnd - blockStart);
161+
162+
List<BlobFetchRequest.BlobPart> blobParts = new ArrayList<>();
163+
while (diff > 0) {
164+
long partStart = pos % partSize;
165+
long partEnd;
166+
if ((partStart + diff) > partSize) {
167+
partEnd = partSize;
168+
} else {
169+
partEnd = (partStart + diff);
170+
}
171+
long fetchBytes = partEnd - partStart;
172+
blobParts.add(new BlobFetchRequest.BlobPart(fileInfo.partName(partNum), partStart, fetchBytes));
173+
partNum++;
174+
pos = pos + fetchBytes;
175+
diff = (blockEnd - pos);
176+
}
177+
return blobParts;
178+
}
179+
158180
@Override
159181
public OnDemandBlockSnapshotIndexInput clone() {
160182
OnDemandBlockSnapshotIndexInput clone = buildSlice("clone", 0L, this.length);

server/src/main/java/org/opensearch/index/store/remote/utils/BlobFetchRequest.java

+50-45
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.apache.lucene.store.FSDirectory;
1313

1414
import java.nio.file.Path;
15+
import java.util.List;
1516

1617
/**
1718
* The specification to fetch specific block from blob store
@@ -20,37 +21,22 @@
2021
*/
2122
public class BlobFetchRequest {
2223

23-
private final long position;
24-
25-
private final long length;
26-
27-
private final String blobName;
28-
2924
private final Path filePath;
3025

3126
private final Directory directory;
3227

3328
private final String fileName;
3429

30+
private final List<BlobPart> blobParts;
31+
32+
private final long blobLength;
33+
3534
private BlobFetchRequest(Builder builder) {
36-
this.position = builder.position;
37-
this.length = builder.length;
38-
this.blobName = builder.blobName;
3935
this.fileName = builder.fileName;
4036
this.filePath = builder.directory.getDirectory().resolve(fileName);
4137
this.directory = builder.directory;
42-
}
43-
44-
public long getPosition() {
45-
return position;
46-
}
47-
48-
public long getLength() {
49-
return length;
50-
}
51-
52-
public String getBlobName() {
53-
return blobName;
38+
this.blobParts = builder.blobParts;
39+
this.blobLength = builder.blobParts.stream().mapToLong(o -> o.getLength()).sum();
5440
}
5541

5642
public Path getFilePath() {
@@ -65,19 +51,23 @@ public String getFileName() {
6551
return fileName;
6652
}
6753

54+
public List<BlobPart> blobParts() {
55+
return blobParts;
56+
}
57+
58+
public long getBlobLength() {
59+
return blobLength;
60+
}
61+
6862
public static Builder builder() {
6963
return new Builder();
7064
}
7165

7266
@Override
7367
public String toString() {
7468
return "BlobFetchRequest{"
75-
+ "position="
76-
+ position
77-
+ ", length="
78-
+ length
79-
+ ", blobName='"
80-
+ blobName
69+
+ "blobParts="
70+
+ blobParts
8171
+ '\''
8272
+ ", filePath="
8373
+ filePath
@@ -90,35 +80,45 @@ public String toString() {
9080
}
9181

9282
/**
93-
* Builder for BlobFetchRequest
83+
* BlobPart represents a single chunk of a file
9484
*/
95-
public static final class Builder {
85+
public static class BlobPart {
86+
private String blobName;
9687
private long position;
9788
private long length;
98-
private String blobName;
99-
private FSDirectory directory;
100-
private String fileName;
101-
102-
private Builder() {}
10389

104-
public Builder position(long position) {
105-
this.position = position;
106-
return this;
107-
}
108-
109-
public Builder length(long length) {
90+
public BlobPart(String blobName, long position, long length) {
91+
this.blobName = blobName;
11092
if (length <= 0) {
111-
throw new IllegalArgumentException("Length for blob fetch request needs to be non-negative");
93+
throw new IllegalArgumentException("Length for blob part fetch request needs to be non-negative");
11294
}
11395
this.length = length;
114-
return this;
96+
this.position = position;
11597
}
11698

117-
public Builder blobName(String blobName) {
118-
this.blobName = blobName;
119-
return this;
99+
public String getBlobName() {
100+
return blobName;
101+
}
102+
103+
public long getPosition() {
104+
return position;
120105
}
121106

107+
public long getLength() {
108+
return length;
109+
}
110+
}
111+
112+
/**
113+
* Builder for BlobFetchRequest
114+
*/
115+
public static final class Builder {
116+
private List<BlobPart> blobParts;
117+
private FSDirectory directory;
118+
private String fileName;
119+
120+
private Builder() {}
121+
122122
public Builder directory(FSDirectory directory) {
123123
this.directory = directory;
124124
return this;
@@ -129,6 +129,11 @@ public Builder fileName(String fileName) {
129129
return this;
130130
}
131131

132+
public Builder blobParts(List<BlobPart> blobParts) {
133+
this.blobParts = blobParts;
134+
return this;
135+
}
136+
132137
public BlobFetchRequest build() {
133138
return new BlobFetchRequest(this);
134139
}

server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java

+14-8
Original file line numberDiff line numberDiff line change
@@ -48,11 +48,12 @@ public TransferManager(final BlobContainer blobContainer, final FileCache fileCa
4848
}
4949

5050
/**
51-
* Given a blobFetchRequest, return it's corresponding IndexInput.
51+
* Given a blobFetchRequestList, return it's corresponding IndexInput.
5252
* @param blobFetchRequest to fetch
5353
* @return future of IndexInput augmented with internal caching maintenance tasks
5454
*/
5555
public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOException {
56+
5657
final Path key = blobFetchRequest.getFilePath();
5758

5859
final CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
@@ -85,15 +86,20 @@ private static FileCachedIndexInput createIndexInput(FileCache fileCache, BlobCo
8586
try {
8687
if (Files.exists(request.getFilePath()) == false) {
8788
try (
88-
InputStream snapshotFileInputStream = blobContainer.readBlob(
89-
request.getBlobName(),
90-
request.getPosition(),
91-
request.getLength()
92-
);
9389
OutputStream fileOutputStream = Files.newOutputStream(request.getFilePath());
9490
OutputStream localFileOutputStream = new BufferedOutputStream(fileOutputStream)
9591
) {
96-
snapshotFileInputStream.transferTo(localFileOutputStream);
92+
for (BlobFetchRequest.BlobPart blobPart : request.blobParts()) {
93+
try (
94+
InputStream snapshotFileInputStream = blobContainer.readBlob(
95+
blobPart.getBlobName(),
96+
blobPart.getPosition(),
97+
blobPart.getLength()
98+
);
99+
) {
100+
snapshotFileInputStream.transferTo(localFileOutputStream);
101+
}
102+
}
97103
}
98104
}
99105
final IndexInput luceneIndexInput = request.getDirectory().openInput(request.getFileName(), IOContext.READ);
@@ -153,7 +159,7 @@ public IndexInput getIndexInput() throws IOException {
153159

154160
@Override
155161
public long length() {
156-
return request.getLength();
162+
return request.getBlobLength();
157163
}
158164

159165
@Override

0 commit comments

Comments
 (0)