Skip to content

Commit 1c208d5

Browse files
authored
[Remote Store] Cleanup local-only translog files if no metadata in remote (#12691)
Signed-off-by: Sachin Kale <kalsac@amazon.com>
1 parent 02f9d74 commit 1c208d5

File tree

4 files changed

+205
-6
lines changed

4 files changed

+205
-6
lines changed

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreIT.java

+71-1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
import org.opensearch.indices.recovery.RecoverySettings;
3838
import org.opensearch.indices.recovery.RecoveryState;
3939
import org.opensearch.plugins.Plugin;
40+
import org.opensearch.remotestore.multipart.mocks.MockFsRepositoryPlugin;
4041
import org.opensearch.test.InternalTestCluster;
4142
import org.opensearch.test.OpenSearchIntegTestCase;
4243
import org.opensearch.test.transport.MockTransportService;
@@ -59,6 +60,7 @@
5960
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_REPLICAS;
6061
import static org.opensearch.cluster.metadata.IndexMetadata.SETTING_NUMBER_OF_SHARDS;
6162
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.SEGMENTS;
63+
import static org.opensearch.index.remote.RemoteStoreEnums.DataCategory.TRANSLOG;
6264
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.DATA;
6365
import static org.opensearch.index.remote.RemoteStoreEnums.DataType.METADATA;
6466
import static org.opensearch.indices.RemoteStoreSettings.CLUSTER_REMOTE_TRANSLOG_BUFFER_INTERVAL_SETTING;
@@ -77,7 +79,7 @@ public class RemoteStoreIT extends RemoteStoreBaseIntegTestCase {
7779

7880
@Override
7981
protected Collection<Class<? extends Plugin>> nodePlugins() {
80-
return Arrays.asList(MockTransportService.TestPlugin.class);
82+
return Arrays.asList(MockTransportService.TestPlugin.class, MockFsRepositoryPlugin.class);
8183
}
8284

8385
@Override
@@ -789,4 +791,72 @@ public void testResumeUploadAfterFailedPrimaryRelocation() throws ExecutionExcep
789791
docs + moreDocs + uncommittedOps
790792
);
791793
}
794+
795+
// Test local only translog files which are not uploaded to remote store (no metadata present in remote)
796+
// Without the cleanup change in RemoteFsTranslog.createEmptyTranslog, this test fails with NPE.
797+
public void testLocalOnlyTranslogCleanupOnNodeRestart() throws Exception {
798+
clusterSettingsSuppliedByTest = true;
799+
800+
// Overriding settings to use AsyncMultiStreamBlobContainer
801+
Settings settings = Settings.builder()
802+
.put(super.nodeSettings(1))
803+
.put(
804+
remoteStoreClusterSettings(
805+
REPOSITORY_NAME,
806+
segmentRepoPath,
807+
MockFsRepositoryPlugin.TYPE,
808+
REPOSITORY_2_NAME,
809+
translogRepoPath,
810+
MockFsRepositoryPlugin.TYPE
811+
)
812+
)
813+
.build();
814+
815+
internalCluster().startClusterManagerOnlyNode(settings);
816+
String dataNode = internalCluster().startDataOnlyNode(settings);
817+
818+
// 1. Create index with 0 replica
819+
createIndex(INDEX_NAME, remoteStoreIndexSettings(0, 10000L, -1));
820+
ensureGreen(INDEX_NAME);
821+
822+
// 2. Index docs
823+
int searchableDocs = 0;
824+
for (int i = 0; i < randomIntBetween(1, 5); i++) {
825+
indexBulk(INDEX_NAME, 15);
826+
refresh(INDEX_NAME);
827+
searchableDocs += 15;
828+
}
829+
indexBulk(INDEX_NAME, 15);
830+
831+
assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs);
832+
833+
// 3. Delete metadata from remote translog
834+
String indexUUID = client().admin()
835+
.indices()
836+
.prepareGetSettings(INDEX_NAME)
837+
.get()
838+
.getSetting(INDEX_NAME, IndexMetadata.SETTING_INDEX_UUID);
839+
840+
String shardPath = getShardLevelBlobPath(client(), INDEX_NAME, BlobPath.cleanPath(), "0", TRANSLOG, METADATA).buildAsString();
841+
Path translogMetaDataPath = Path.of(translogRepoPath + "/" + shardPath);
842+
843+
try (Stream<Path> files = Files.list(translogMetaDataPath)) {
844+
files.forEach(p -> {
845+
try {
846+
Files.delete(p);
847+
} catch (IOException e) {
848+
// Ignore
849+
}
850+
});
851+
}
852+
853+
internalCluster().restartNode(dataNode);
854+
855+
ensureGreen(INDEX_NAME);
856+
857+
assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs);
858+
indexBulk(INDEX_NAME, 15);
859+
refresh(INDEX_NAME);
860+
assertHitCount(client(dataNode).prepareSearch(INDEX_NAME).setSize(0).get(), searchableDocs + 15);
861+
}
792862
}

server/src/main/java/org/opensearch/index/translog/RemoteFsTranslog.java

+24-1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.opensearch.core.util.FileSystemUtils;
2222
import org.opensearch.index.remote.RemoteStorePathStrategy;
2323
import org.opensearch.index.remote.RemoteTranslogTransferTracker;
24+
import org.opensearch.index.seqno.SequenceNumbers;
2425
import org.opensearch.index.translog.transfer.BlobStoreTransferService;
2526
import org.opensearch.index.translog.transfer.FileTransferTracker;
2627
import org.opensearch.index.translog.transfer.TransferSnapshot;
@@ -219,7 +220,7 @@ static void download(TranslogTransferManager translogTransferManager, Path locat
219220
throw ex;
220221
}
221222

222-
static private void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
223+
private static void downloadOnce(TranslogTransferManager translogTransferManager, Path location, Logger logger) throws IOException {
223224
logger.debug("Downloading translog files from remote");
224225
RemoteTranslogTransferTracker statsTracker = translogTransferManager.getRemoteTranslogTransferTracker();
225226
long prevDownloadBytesSucceeded = statsTracker.getDownloadBytesSucceeded();
@@ -254,10 +255,32 @@ static private void downloadOnce(TranslogTransferManager translogTransferManager
254255
location.resolve(Translog.getCommitCheckpointFileName(translogMetadata.getGeneration())),
255256
location.resolve(Translog.CHECKPOINT_FILE_NAME)
256257
);
258+
} else {
259+
// When code flow reaches this block, it means we don't have any translog files uploaded to remote store.
260+
// If local filesystem contains empty translog or no translog, we don't do anything.
261+
// If local filesystem contains non-empty translog, we clean up these files and create empty translog.
262+
logger.debug("No translog files found on remote, checking local filesystem for cleanup");
263+
if (FileSystemUtils.exists(location.resolve(CHECKPOINT_FILE_NAME))) {
264+
final Checkpoint checkpoint = readCheckpoint(location);
265+
if (isEmptyTranslog(checkpoint) == false) {
266+
logger.debug("Translog files exist on local without any metadata in remote, cleaning up these files");
267+
// Creating empty translog will cleanup the older un-referenced tranlog files, we don't have to explicitly delete
268+
Translog.createEmptyTranslog(location, translogTransferManager.getShardId(), checkpoint);
269+
} else {
270+
logger.debug("Empty translog on local, skipping clean-up");
271+
}
272+
}
257273
}
258274
logger.debug("downloadOnce execution completed");
259275
}
260276

277+
private static boolean isEmptyTranslog(Checkpoint checkpoint) {
278+
return checkpoint.generation == checkpoint.minTranslogGeneration
279+
&& checkpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED
280+
&& checkpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED
281+
&& checkpoint.numOps == 0;
282+
}
283+
261284
public static TranslogTransferManager buildTranslogTransferManager(
262285
BlobStoreRepository blobStoreRepository,
263286
ThreadPool threadPool,

server/src/main/java/org/opensearch/index/translog/Translog.java

+34-4
Original file line numberDiff line numberDiff line change
@@ -2011,17 +2011,47 @@ public static String createEmptyTranslog(
20112011
final long primaryTerm,
20122012
@Nullable final String translogUUID,
20132013
@Nullable final ChannelFactory factory
2014+
) throws IOException {
2015+
return createEmptyTranslog(location, shardId, initialGlobalCheckpoint, primaryTerm, translogUUID, factory, 1);
2016+
}
2017+
2018+
public static String createEmptyTranslog(final Path location, final ShardId shardId, Checkpoint checkpoint) throws IOException {
2019+
final Path highestGenTranslogFile = location.resolve(getFilename(checkpoint.generation));
2020+
final TranslogHeader translogHeader;
2021+
try (FileChannel channel = FileChannel.open(highestGenTranslogFile, StandardOpenOption.READ)) {
2022+
translogHeader = TranslogHeader.read(highestGenTranslogFile, channel);
2023+
}
2024+
final String translogUUID = translogHeader.getTranslogUUID();
2025+
final long primaryTerm = translogHeader.getPrimaryTerm();
2026+
final ChannelFactory channelFactory = FileChannel::open;
2027+
return Translog.createEmptyTranslog(
2028+
location,
2029+
shardId,
2030+
SequenceNumbers.NO_OPS_PERFORMED,
2031+
primaryTerm,
2032+
translogUUID,
2033+
channelFactory,
2034+
checkpoint.generation + 1
2035+
);
2036+
}
2037+
2038+
public static String createEmptyTranslog(
2039+
final Path location,
2040+
final ShardId shardId,
2041+
final long initialGlobalCheckpoint,
2042+
final long primaryTerm,
2043+
@Nullable final String translogUUID,
2044+
@Nullable final ChannelFactory factory,
2045+
final long generation
20142046
) throws IOException {
20152047
IOUtils.rm(location);
20162048
Files.createDirectories(location);
20172049

2018-
final long generation = 1L;
2019-
final long minTranslogGeneration = 1L;
20202050
final ChannelFactory channelFactory = factory != null ? factory : FileChannel::open;
20212051
final String uuid = Strings.hasLength(translogUUID) ? translogUUID : UUIDs.randomBase64UUID();
20222052
final Path checkpointFile = location.resolve(CHECKPOINT_FILE_NAME);
20232053
final Path translogFile = location.resolve(getFilename(generation));
2024-
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, minTranslogGeneration);
2054+
final Checkpoint checkpoint = Checkpoint.emptyTranslogCheckpoint(0, generation, initialGlobalCheckpoint, generation);
20252055

20262056
Checkpoint.write(channelFactory, checkpointFile, checkpoint, StandardOpenOption.WRITE, StandardOpenOption.CREATE_NEW);
20272057
final TranslogWriter writer = TranslogWriter.create(
@@ -2031,7 +2061,7 @@ public static String createEmptyTranslog(
20312061
translogFile,
20322062
channelFactory,
20332063
EMPTY_TRANSLOG_BUFFER_SIZE,
2034-
minTranslogGeneration,
2064+
generation,
20352065
initialGlobalCheckpoint,
20362066
() -> {
20372067
throw new UnsupportedOperationException();

server/src/test/java/org/opensearch/index/translog/RemoteFsTranslogTests.java

+76
Original file line numberDiff line numberDiff line change
@@ -1716,6 +1716,82 @@ public void testDownloadWithRetries() throws IOException {
17161716
RemoteFsTranslog.download(mockTransfer, location, logger);
17171717
}
17181718

1719+
// No translog data in local as well as remote, we skip creating empty translog
1720+
public void testDownloadWithNoTranslogInLocalAndRemote() throws IOException {
1721+
Path location = createTempDir();
1722+
1723+
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
1724+
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
1725+
when(mockTransfer.readMetadata()).thenReturn(null);
1726+
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);
1727+
1728+
Path[] filesBeforeDownload = FileSystemUtils.files(location);
1729+
RemoteFsTranslog.download(mockTransfer, location, logger);
1730+
assertEquals(filesBeforeDownload, FileSystemUtils.files(location));
1731+
}
1732+
1733+
// No translog data in remote but non-empty translog is present in local. In this case, we delete all the files
1734+
// from local file system and create empty translog
1735+
public void testDownloadWithTranslogOnlyInLocal() throws IOException {
1736+
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
1737+
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
1738+
when(mockTransfer.readMetadata()).thenReturn(null);
1739+
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);
1740+
1741+
Path location = createTempDir();
1742+
for (Path file : FileSystemUtils.files(translogDir)) {
1743+
Files.copy(file, location.resolve(file.getFileName()));
1744+
}
1745+
1746+
Checkpoint existingCheckpoint = Translog.readCheckpoint(location);
1747+
1748+
TranslogTransferManager finalMockTransfer = mockTransfer;
1749+
RemoteFsTranslog.download(finalMockTransfer, location, logger);
1750+
1751+
Path[] filesPostDownload = FileSystemUtils.files(location);
1752+
assertEquals(2, filesPostDownload.length);
1753+
assertTrue(
1754+
filesPostDownload[0].getFileName().toString().contains("translog.ckp")
1755+
|| filesPostDownload[1].getFileName().toString().contains("translog.ckp")
1756+
);
1757+
1758+
Checkpoint newEmptyTranslogCheckpoint = Translog.readCheckpoint(location);
1759+
// Verify that the new checkpoint points to empty translog
1760+
assertTrue(
1761+
newEmptyTranslogCheckpoint.generation == newEmptyTranslogCheckpoint.minTranslogGeneration
1762+
&& newEmptyTranslogCheckpoint.minSeqNo == SequenceNumbers.NO_OPS_PERFORMED
1763+
&& newEmptyTranslogCheckpoint.maxSeqNo == SequenceNumbers.NO_OPS_PERFORMED
1764+
&& newEmptyTranslogCheckpoint.numOps == 0
1765+
);
1766+
assertTrue(newEmptyTranslogCheckpoint.generation > existingCheckpoint.generation);
1767+
assertEquals(newEmptyTranslogCheckpoint.globalCheckpoint, existingCheckpoint.globalCheckpoint);
1768+
}
1769+
1770+
// No translog data in remote and empty translog in local. We skip creating another empty translog
1771+
public void testDownloadWithEmptyTranslogOnlyInLocal() throws IOException {
1772+
TranslogTransferManager mockTransfer = mock(TranslogTransferManager.class);
1773+
RemoteTranslogTransferTracker remoteTranslogTransferTracker = mock(RemoteTranslogTransferTracker.class);
1774+
when(mockTransfer.readMetadata()).thenReturn(null);
1775+
when(mockTransfer.getRemoteTranslogTransferTracker()).thenReturn(remoteTranslogTransferTracker);
1776+
1777+
Path location = createTempDir();
1778+
for (Path file : FileSystemUtils.files(translogDir)) {
1779+
Files.copy(file, location.resolve(file.getFileName()));
1780+
}
1781+
1782+
TranslogTransferManager finalMockTransfer = mockTransfer;
1783+
1784+
// download first time will ensure creating empty translog
1785+
RemoteFsTranslog.download(finalMockTransfer, location, logger);
1786+
Path[] filesPostFirstDownload = FileSystemUtils.files(location);
1787+
1788+
// download on empty translog should be a no-op
1789+
RemoteFsTranslog.download(finalMockTransfer, location, logger);
1790+
Path[] filesPostSecondDownload = FileSystemUtils.files(location);
1791+
1792+
assertArrayEquals(filesPostFirstDownload, filesPostSecondDownload);
1793+
}
1794+
17191795
public class ThrowingBlobRepository extends FsRepository {
17201796

17211797
private final Environment environment;

0 commit comments

Comments
 (0)