Skip to content

Commit 2757776

Browse files
committed
fix loop in update rep
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com>
1 parent 6b45972 commit 2757776

File tree

8 files changed

+183
-13
lines changed

8 files changed

+183
-13
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
4242
- Fix exists queries on nested flat_object fields throws exception ([#16803](https://github.com/opensearch-project/OpenSearch/pull/16803))
4343
- Add highlighting for wildcard search on `match_only_text` field ([#17101](https://github.com/opensearch-project/OpenSearch/pull/17101))
4444
- Fix illegal argument exception when creating a PIT ([#16781](https://github.com/opensearch-project/OpenSearch/pull/16781))
45+
- Fix simultaneously creating a snapshot and updating the repository can potentially trigger an infinite loop ([#17532](https://github.com/opensearch-project/OpenSearch/pull/17532))
4546

4647
### Security
4748

server/src/internalClusterTest/java/org/opensearch/repositories/RepositoriesServiceIT.java

+66
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,9 @@
3232

3333
package org.opensearch.repositories;
3434

35+
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesRequest;
3536
import org.opensearch.action.admin.cluster.repositories.get.GetRepositoriesResponse;
37+
import org.opensearch.action.admin.cluster.snapshots.create.CreateSnapshotResponse;
3638
import org.opensearch.cluster.metadata.RepositoryMetadata;
3739
import org.opensearch.common.settings.Settings;
3840
import org.opensearch.plugins.Plugin;
@@ -45,6 +47,8 @@
4547
import java.util.Collection;
4648
import java.util.Collections;
4749

50+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
51+
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
4852
import static org.hamcrest.Matchers.equalTo;
4953
import static org.hamcrest.Matchers.hasSize;
5054
import static org.hamcrest.Matchers.instanceOf;
@@ -122,4 +126,66 @@ public void testSystemRepositoryCantBeCreated() {
122126

123127
assertThrows(RepositoryException.class, () -> createRepository(repositoryName, FsRepository.TYPE, repoSettings));
124128
}
129+
130+
public void testCreatSnapAndUpdateReposityCauseInfiniteLoop() throws InterruptedException {
131+
// create index
132+
internalCluster();
133+
String indexName = "test-index";
134+
createIndex(indexName, Settings.builder().put(SETTING_NUMBER_OF_REPLICAS, 0).put(SETTING_NUMBER_OF_SHARDS, 1).build());
135+
index(indexName, "_doc", "1", Collections.singletonMap("user", generateRandomStringArray(1, 10, false, false)));
136+
flush(indexName);
137+
138+
// create repository
139+
final String repositoryName = "test-repo";
140+
Settings.Builder repoSettings = Settings.builder()
141+
.put("location", randomRepoPath())
142+
.put("max_snapshot_bytes_per_sec", "10mb")
143+
.put("max_restore_bytes_per_sec", "10mb");
144+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
145+
client().admin().cluster(),
146+
repositoryName,
147+
FsRepository.TYPE,
148+
true,
149+
repoSettings
150+
);
151+
152+
Thread thread = new Thread(() -> {
153+
String snapshotName = "test-snapshot";
154+
logger.info("--> starting snapshot");
155+
CreateSnapshotResponse createSnapshotResponse = client().admin()
156+
.cluster()
157+
.prepareCreateSnapshot(repositoryName, snapshotName)
158+
.setWaitForCompletion(true)
159+
.setIndices(indexName)
160+
.get();
161+
logger.info("--> finishing snapshot");
162+
});
163+
thread.start();
164+
165+
logger.info("--> begin to reset repository");
166+
repoSettings = Settings.builder().put("location", randomRepoPath()).put("max_snapshot_bytes_per_sec", "300mb");
167+
OpenSearchIntegTestCase.putRepositoryWithNoSettingOverrides(
168+
client().admin().cluster(),
169+
repositoryName,
170+
FsRepository.TYPE,
171+
true,
172+
repoSettings
173+
);
174+
logger.info("--> finish to reset repository");
175+
176+
GetRepositoriesRequest getRepositoriesRequest = new GetRepositoriesRequest(new String[] { repositoryName });
177+
try {
178+
GetRepositoriesResponse getRepositoriesResponse = client().admin().cluster().getRepositories(getRepositoriesRequest).get();
179+
assertThat(getRepositoriesResponse.repositories(), hasSize(1));
180+
RepositoryMetadata repositoryMetadata = getRepositoriesResponse.repositories().get(0);
181+
assertThat(repositoryMetadata.type(), equalTo(FsRepository.TYPE));
182+
assertThat(repositoryMetadata.settings().get("max_snapshot_bytes_per_sec"), equalTo("300mb"));
183+
assertThat(repositoryMetadata.settings().hasValue("max_restore_bytes_per_sec"), equalTo(false));
184+
} catch (Exception e) {
185+
throw new RuntimeException(e);
186+
}
187+
logger.info("--> finish to get response about repository");
188+
thread.join();
189+
}
190+
125191
}

server/src/main/java/org/opensearch/repositories/FilterRepository.java

+18-1
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import java.util.Map;
5959
import java.util.function.Consumer;
6060
import java.util.function.Function;
61+
import java.util.function.Supplier;
6162

6263
/**
6364
* Repository that is filtered
@@ -284,13 +285,24 @@ public void updateState(ClusterState state) {
284285
in.updateState(state);
285286
}
286287

288+
@Deprecated
287289
@Override
288290
public void executeConsistentStateUpdate(
289291
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
290292
String source,
291293
Consumer<Exception> onFailure
292294
) {
293-
in.executeConsistentStateUpdate(createUpdateTask, source, onFailure);
295+
executeConsistentStateUpdate(createUpdateTask, source, () -> this, onFailure);
296+
}
297+
298+
@Override
299+
public void executeConsistentStateUpdate(
300+
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
301+
String source,
302+
Supplier<Repository> currentRepositeSupplier,
303+
Consumer<Exception> onFailure
304+
) {
305+
in.executeConsistentStateUpdate(createUpdateTask, source, currentRepositeSupplier, onFailure);
294306
}
295307

296308
@Override
@@ -345,4 +357,9 @@ public void stop() {
345357
public void close() {
346358
in.close();
347359
}
360+
361+
@Override
362+
public boolean isOpen() {
363+
return in.isOpen();
364+
}
348365
}

server/src/main/java/org/opensearch/repositories/Repository.java

+12
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import java.util.Map;
6666
import java.util.function.Consumer;
6767
import java.util.function.Function;
68+
import java.util.function.Supplier;
6869

6970
/**
7071
* An interface for interacting with a repository in snapshot and restore.
@@ -542,6 +543,14 @@ default IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotInfo snapshotInf
542543
* @param source the source of the cluster state update task
543544
* @param onFailure error handler invoked on failure to get a consistent view of the current {@link RepositoryData}
544545
*/
546+
void executeConsistentStateUpdate(
547+
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
548+
String source,
549+
Supplier<Repository> currentRepositeSupplier,
550+
Consumer<Exception> onFailure
551+
);
552+
553+
@Deprecated
545554
void executeConsistentStateUpdate(
546555
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
547556
String source,
@@ -611,4 +620,7 @@ default void reload(RepositoryMetadata repositoryMetadata) {}
611620
* Validate the repository metadata
612621
*/
613622
default void validateMetadata(RepositoryMetadata repositoryMetadata) {}
623+
624+
boolean isOpen();
625+
614626
}

server/src/main/java/org/opensearch/repositories/blobstore/BlobStoreRepository.java

+33-3
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,8 @@ protected static long calculateMaxWithinIntLimit(long defaultThresholdOfHeap, lo
551551
*/
552552
protected volatile int bufferSize;
553553

554+
private volatile boolean closed;
555+
554556
/**
555557
* Constructs new BlobStoreRepository
556558
* @param repositoryMetadata The metadata for this repository including name and settings
@@ -630,21 +632,44 @@ protected void doClose() {
630632
}
631633
if (store != null) {
632634
try {
635+
closed = true;
633636
store.close();
634637
} catch (Exception t) {
635638
logger.warn("cannot close blob store", t);
636639
}
637640
}
638641
}
639642

643+
@Deprecated
644+
@Override
645+
public void executeConsistentStateUpdate(
646+
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
647+
String source,
648+
Consumer<Exception> onFailure
649+
) {
650+
executeConsistentStateUpdate(createUpdateTask, source, () -> this, onFailure);
651+
}
652+
640653
@Override
641654
public void executeConsistentStateUpdate(
642655
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
643656
String source,
657+
Supplier<Repository> currentRepositeSupplier,
644658
Consumer<Exception> onFailure
645659
) {
646-
final RepositoryMetadata repositoryMetadataStart = metadata;
647-
getRepositoryData(ActionListener.wrap(repositoryData -> {
660+
Repository currentRepository = this;
661+
final RepositoryMetadata repositoryMetadataStart;
662+
if (currentRepository != currentRepositeSupplier.get()) {
663+
if (this.isOpen()) {
664+
throw new IllegalStateException("the repository should be closed");
665+
}
666+
currentRepository = currentRepositeSupplier.get();
667+
repositoryMetadataStart = currentRepository.getMetadata();
668+
} else {
669+
repositoryMetadataStart = metadata;
670+
}
671+
672+
currentRepository.getRepositoryData(ActionListener.wrap(repositoryData -> {
648673
final ClusterStateUpdateTask updateTask = createUpdateTask.apply(repositoryData);
649674
clusterService.submitStateUpdateTask(source, new ClusterStateUpdateTask(updateTask.priority()) {
650675

@@ -679,7 +704,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
679704
if (executedTask) {
680705
updateTask.clusterStateProcessed(source, oldState, newState);
681706
} else {
682-
executeConsistentStateUpdate(createUpdateTask, source, onFailure);
707+
executeConsistentStateUpdate(createUpdateTask, source, currentRepositeSupplier, onFailure);
683708
}
684709
}
685710

@@ -4690,6 +4715,11 @@ private void checkAborted() {
46904715
}
46914716
}
46924717

4718+
@Override
4719+
public boolean isOpen() {
4720+
return closed == false;
4721+
}
4722+
46934723
private static void failStoreIfCorrupted(Store store, Exception e) {
46944724
if (Lucene.isCorruptionException(e)) {
46954725
try {

server/src/main/java/org/opensearch/snapshots/SnapshotsService.java

+16-8
Original file line numberDiff line numberDiff line change
@@ -451,7 +451,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, final Cl
451451
public TimeValue timeout() {
452452
return request.clusterManagerNodeTimeout();
453453
}
454-
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
454+
}, "create_snapshot [" + snapshotName + ']', () -> repositoriesService.repository(request.repository()), listener::onFailure);
455455
}
456456

457457
/**
@@ -640,7 +640,7 @@ public TimeValue timeout() {
640640
return request.clusterManagerNodeTimeout();
641641
}
642642

643-
}, "create_snapshot [" + snapshotName + ']', listener::onFailure);
643+
}, "create_snapshot [" + snapshotName + ']', () -> repositoriesService.repository(repositoryName), listener::onFailure);
644644
}
645645

646646
private void cleanOrphanTimestamp(String repoName, RepositoryData repositoryData) {
@@ -1062,7 +1062,11 @@ public void onFailure(Exception e) {
10621062
public TimeValue timeout() {
10631063
return request.clusterManagerNodeTimeout();
10641064
}
1065-
}, "clone_snapshot_v2 [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
1065+
},
1066+
"clone_snapshot_v2 [" + request.source() + "][" + snapshotName + ']',
1067+
() -> repositoriesService.repository(repositoryName),
1068+
listener::onFailure
1069+
);
10661070
}
10671071

10681072
// TODO: It is worth revisiting the design choice of creating a placeholder entry in snapshots-in-progress here once we have a cache
@@ -1148,14 +1152,18 @@ public void onFailure(String source, Exception e) {
11481152
public void clusterStateProcessed(String source, ClusterState oldState, final ClusterState newState) {
11491153
logger.info("snapshot clone [{}] started", snapshot);
11501154
addListener(snapshot, ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure));
1151-
startCloning(repository, newEntry);
1155+
startCloning(repository, repositoryName, newEntry);
11521156
}
11531157

11541158
@Override
11551159
public TimeValue timeout() {
11561160
return request.clusterManagerNodeTimeout();
11571161
}
1158-
}, "clone_snapshot [" + request.source() + "][" + snapshotName + ']', listener::onFailure);
1162+
},
1163+
"clone_snapshot [" + request.source() + "][" + snapshotName + ']',
1164+
() -> repositoriesService.repository(repositoryName),
1165+
listener::onFailure
1166+
);
11591167
}
11601168

11611169
private static void ensureNoCleanupInProgress(ClusterState currentState, String repositoryName, String snapshotName) {
@@ -1189,7 +1197,7 @@ private static void ensureSnapshotNameAvailableInRepo(RepositoryData repositoryD
11891197
* @param repository repository to run operation on
11901198
* @param cloneEntry clone operation in the cluster state
11911199
*/
1192-
private void startCloning(Repository repository, SnapshotsInProgress.Entry cloneEntry) {
1200+
private void startCloning(Repository repository, String repositoryName, SnapshotsInProgress.Entry cloneEntry) {
11931201
final List<IndexId> indices = cloneEntry.indices();
11941202
final SnapshotId sourceSnapshot = cloneEntry.source();
11951203
final Snapshot targetSnapshot = cloneEntry.snapshot();
@@ -1310,7 +1318,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
13101318
logger.warn("Did not find expected entry [{}] in the cluster state", cloneEntry);
13111319
}
13121320
}
1313-
}, "start snapshot clone", onFailure), onFailure);
1321+
}, "start snapshot clone", () -> repositoriesService.repository(repositoryName), onFailure), onFailure);
13141322
}
13151323

13161324
private final Set<RepositoryShardId> currentlyCloning = Collections.synchronizedSet(new HashSet<>());
@@ -2639,7 +2647,7 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
26392647
public TimeValue timeout() {
26402648
return request.clusterManagerNodeTimeout();
26412649
}
2642-
}, "delete snapshot", listener::onFailure);
2650+
}, "delete snapshot", () -> repositoriesService.repository(repoName), listener::onFailure);
26432651
}
26442652

26452653
private static List<SnapshotId> matchingSnapshotIds(

server/src/test/java/org/opensearch/repositories/RepositoriesServiceTests.java

+15
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@
9595
import java.util.concurrent.atomic.AtomicBoolean;
9696
import java.util.function.Consumer;
9797
import java.util.function.Function;
98+
import java.util.function.Supplier;
9899

99100
import static org.opensearch.repositories.blobstore.BlobStoreRepository.SYSTEM_REPOSITORY_SETTING;
100101
import static org.hamcrest.Matchers.equalTo;
@@ -811,13 +812,22 @@ public IndexShardSnapshotStatus getShardSnapshotStatus(SnapshotId snapshotId, In
811812
@Override
812813
public void updateState(final ClusterState state) {}
813814

815+
@Deprecated
814816
@Override
815817
public void executeConsistentStateUpdate(
816818
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
817819
String source,
818820
Consumer<Exception> onFailure
819821
) {}
820822

823+
@Override
824+
public void executeConsistentStateUpdate(
825+
Function<RepositoryData, ClusterStateUpdateTask> createUpdateTask,
826+
String source,
827+
Supplier<Repository> currentRepositeSupplier,
828+
Consumer<Exception> onFailure
829+
) {}
830+
821831
@Override
822832
public void cloneShardSnapshot(
823833
SnapshotId source,
@@ -841,6 +851,11 @@ public void cloneRemoteStoreIndexShardSnapshot(
841851

842852
}
843853

854+
@Override
855+
public boolean isOpen() {
856+
return isClosed == false;
857+
}
858+
844859
@Override
845860
public Lifecycle.State lifecycleState() {
846861
return null;

0 commit comments

Comments
 (0)