diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java
index be849452c0f5e..7c521927b4dce 100644
--- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationBaseIT.java
@@ -42,6 +42,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CountDownLatch;
@@ -49,8 +50,6 @@
 import java.util.stream.Collectors;
 
 import static java.util.Arrays.asList;
-import static org.opensearch.test.OpenSearchIntegTestCase.client;
-import static org.opensearch.test.OpenSearchTestCase.assertBusy;
 import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertHitCount;
 
 public class SegmentReplicationBaseIT extends OpenSearchIntegTestCase {
@@ -245,4 +244,11 @@ protected SegmentInfos getLatestSegmentInfos(IndexShard shard) throws IOExceptio
             return closeable.get();
         }
     }
+
+    protected boolean warmIndexSegmentReplicationEnabled() {
+        return Objects.equals(
+            IndexModule.INDEX_STORE_LOCALITY_SETTING.get(indexSettings()).toString(),
+            IndexModule.DataLocalityType.PARTIAL.name()
+        );
+    }
 }
diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
index 2421a1a507372..dc16c0c4db439 100644
--- a/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
+++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/SegmentReplicationIT.java
@@ -430,7 +430,6 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
         ) {
             indexer.start(initialDocCount);
             waitForDocs(initialDocCount, indexer);
-
             flush(INDEX_NAME);
             waitForSearchableDocs(initialDocCount, nodeA, nodeB);
 
@@ -450,7 +449,11 @@ private void performReplicationAfterForceMerge(boolean primaryOnly, int expected
             assertThat(forceMergeResponse.getFailedShards(), is(0));
             assertThat(forceMergeResponse.getSuccessfulShards(), is(expectedSuccessfulShards));
             refresh(INDEX_NAME);
-            verifyStoreContent();
+            // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
+            // store.
+            if (!warmIndexSegmentReplicationEnabled()) {
+                verifyStoreContent();
+            }
         }
     }
 
@@ -623,7 +626,7 @@ private void cancelDuringReplicaAction(String actionToblock) throws Exception {
         // this test stubs transport calls specific to node-node replication.
         assumeFalse(
             "Skipping the test as its not compatible with segment replication with remote store.",
-            segmentReplicationWithRemoteEnabled()
+            segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
         );
         final String primaryNode = internalCluster().startDataOnlyNode();
         createIndex(INDEX_NAME, Settings.builder().put(indexSettings()).put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1).build());
@@ -957,7 +960,11 @@ public void testReplicaHasDiffFilesThanPrimary() throws Exception {
         }
         ensureGreen(INDEX_NAME);
         waitForSearchableDocs(docCount, primaryNode, replicaNode);
-        verifyStoreContent();
+        // skipping verify store content over here as readLastCommittedSegmentsInfo files are not present in latest metadata of remote
+        // store.
+        if (!warmIndexSegmentReplicationEnabled()) {
+            verifyStoreContent();
+        }
         final IndexShard replicaAfterFailure = getIndexShard(replicaNode, INDEX_NAME);
         assertNotEquals(replicaAfterFailure.routingEntry().allocationId().getId(), replicaShard.routingEntry().allocationId().getId());
     }
@@ -1068,6 +1075,12 @@ private void assertAllocationIdsInReplicaShardStats(Set<String> expected, Set<Se
      * @throws Exception when issue is encountered
      */
     public void testScrollCreatedOnReplica() throws Exception {
+        // Skipping this test in case of remote store enabled warm index
+        assumeFalse(
+            "Skipping the test as its not compatible with segment replication with remote store.",
+            warmIndexSegmentReplicationEnabled()
+        );
+
         // create the cluster with one primary node containing primary shard and replica node containing replica shard
         final String primary = internalCluster().startDataOnlyNode();
         prepareCreate(
@@ -1179,7 +1192,7 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
         // this test stubs transport calls specific to node-node replication.
         assumeFalse(
             "Skipping the test as its not compatible with segment replication with remote store.",
-            segmentReplicationWithRemoteEnabled()
+            segmentReplicationWithRemoteEnabled() || warmIndexSegmentReplicationEnabled()
         );
 
         // create the cluster with one primary node containing primary shard and replica node containing replica shard
@@ -1306,6 +1319,12 @@ public void testScrollWithOngoingSegmentReplication() throws Exception {
     }
 
     public void testPitCreatedOnReplica() throws Exception {
+        //// Skipping this test in case of remote store enabled warm index
+        assumeFalse(
+            "Skipping the test as its not compatible with segment replication with remote store.",
+            warmIndexSegmentReplicationEnabled()
+        );
+
         final String primary = internalCluster().startDataOnlyNode();
         createIndex(INDEX_NAME);
         ensureYellowAndNoInitializingShards(INDEX_NAME);
diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java
new file mode 100644
index 0000000000000..61b21df12b514
--- /dev/null
+++ b/server/src/internalClusterTest/java/org/opensearch/indices/replication/WarmIndexRemoteStoreSegmentReplicationIT.java
@@ -0,0 +1,82 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
+
+package org.opensearch.indices.replication;
+
+import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters;
+
+import org.opensearch.common.settings.Settings;
+import org.opensearch.common.util.FeatureFlags;
+import org.opensearch.index.IndexModule;
+import org.opensearch.index.store.remote.file.CleanerDaemonThreadLeakFilter;
+import org.opensearch.index.store.remote.filecache.FileCache;
+import org.opensearch.node.Node;
+import org.opensearch.test.OpenSearchIntegTestCase;
+import org.junit.After;
+import org.junit.Before;
+
+import java.nio.file.Path;
+
+@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0)
+@ThreadLeakFilters(filters = CleanerDaemonThreadLeakFilter.class)
+public class WarmIndexRemoteStoreSegmentReplicationIT extends SegmentReplicationIT {
+
+    protected static final String REPOSITORY_NAME = "test-remote-store-repo";
+    protected Path absolutePath;
+
+    @Before
+    private void setup() {
+        internalCluster().startClusterManagerOnlyNode();
+    }
+
+    @Override
+    public Settings indexSettings() {
+        return Settings.builder()
+            .put(super.indexSettings())
+            .put(IndexModule.INDEX_STORE_LOCALITY_SETTING.getKey(), IndexModule.DataLocalityType.PARTIAL.name())
+            .build();
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+        if (absolutePath == null) {
+            absolutePath = randomRepoPath().toAbsolutePath();
+        }
+        return Settings.builder()
+            .put(super.nodeSettings(nodeOrdinal))
+            .put(remoteStoreClusterSettings(REPOSITORY_NAME, absolutePath))
+            .build();
+    }
+
+    @Override
+    protected Settings featureFlagSettings() {
+        Settings.Builder featureSettings = Settings.builder();
+        featureSettings.put(FeatureFlags.TIERED_REMOTE_INDEX, true);
+
+        return featureSettings.build();
+    }
+
+    @Override
+    protected boolean addMockIndexStorePlugin() {
+        return false;
+    }
+
+    protected boolean warmIndexSegmentReplicationEnabled() {
+        return true;
+    }
+
+    @After
+    public void teardown() {
+        for (String nodeName : internalCluster().getNodeNames()) {
+            logger.info("file cache node name is {}", nodeName);
+            FileCache fileCache = internalCluster().getInstance(Node.class, nodeName).fileCache();
+            fileCache.clear();
+        }
+        clusterAdmin().prepareCleanupRepository(REPOSITORY_NAME).get();
+    }
+}
diff --git a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
index d759423ce5a55..c63846ffccf4c 100644
--- a/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
+++ b/server/src/main/java/org/opensearch/index/engine/NRTReplicationEngine.java
@@ -171,7 +171,9 @@ public synchronized void updateSegments(final SegmentInfos infos) throws IOExcep
             // a lower gen from a newly elected primary shard that is behind this shard's last commit gen.
             // In that case we still commit into the next local generation.
             if (incomingGeneration != this.lastReceivedPrimaryGen) {
-                flush(false, true);
+                if (engineConfig.getIndexSettings().isStoreLocalityPartial() == false) {
+                    flush(false, true);
+                }
                 translogManager.getDeletionPolicy().setLocalCheckpointOfSafeCommit(maxSeqNo);
                 translogManager.rollTranslogGeneration();
             }
diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
index 82b68b32f3bf8..0c6ab28a2853d 100644
--- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java
+++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java
@@ -5030,6 +5030,8 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal) throws IOE
      */
     public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runnable onFileSync) throws IOException {
         boolean syncSegmentSuccess = false;
+        boolean shouldOverrideLocalFiles = overrideLocal && indexSettings.isStoreLocalityPartial() == false;
+
         long startTimeMs = System.currentTimeMillis();
         assert indexSettings.isRemoteStoreEnabled() || this.isRemoteSeeded();
         logger.trace("Downloading segments from remote segment store");
@@ -5052,7 +5054,7 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
                 storeDirectory = new StoreRecovery.StatsDirectoryWrapper(store.directory(), recoveryState.getIndex());
                 for (String file : uploadedSegments.keySet()) {
                     long checksum = Long.parseLong(uploadedSegments.get(file).getChecksum());
-                    if (overrideLocal || localDirectoryContains(storeDirectory, file, checksum) == false) {
+                    if (shouldOverrideLocalFiles || localDirectoryContains(storeDirectory, file, checksum) == false) {
                         recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), false);
                     } else {
                         recoveryState.getIndex().addFileDetail(file, uploadedSegments.get(file).getLength(), true);
@@ -5061,7 +5063,9 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
             } else {
                 storeDirectory = store.directory();
             }
-            copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
+            if (indexSettings.isStoreLocalityPartial() == false) {
+                copySegmentFiles(storeDirectory, remoteDirectory, null, uploadedSegments, overrideLocal, onFileSync);
+            }
 
             if (remoteSegmentMetadata != null) {
                 final SegmentInfos infosSnapshot = store.buildSegmentInfos(
@@ -5071,13 +5075,15 @@ public void syncSegmentsFromRemoteSegmentStore(boolean overrideLocal, final Runn
                 long processedLocalCheckpoint = Long.parseLong(infosSnapshot.getUserData().get(LOCAL_CHECKPOINT_KEY));
                 // delete any other commits, we want to start the engine only from a new commit made with the downloaded infos bytes.
                 // Extra segments will be wiped on engine open.
-                for (String file : List.of(store.directory().listAll())) {
-                    if (file.startsWith(IndexFileNames.SEGMENTS)) {
-                        store.deleteQuiet(file);
+                if (indexSettings.isStoreLocalityPartial() == false) {
+                    for (String file : List.of(store.directory().listAll())) {
+                        if (file.startsWith(IndexFileNames.SEGMENTS)) {
+                            store.deleteQuiet(file);
+                        }
                     }
+                    assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
+                        : "There should not be any segments file in the dir";
                 }
-                assert Arrays.stream(store.directory().listAll()).filter(f -> f.startsWith(IndexFileNames.SEGMENTS)).findAny().isEmpty()
-                    : "There should not be any segments file in the dir";
                 store.commitSegmentInfos(infosSnapshot, processedLocalCheckpoint, processedLocalCheckpoint);
             }
             syncSegmentSuccess = true;
diff --git a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java
index b06b3e0497cf7..a9f026badeee0 100644
--- a/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java
+++ b/server/src/main/java/org/opensearch/indices/replication/RemoteStoreReplicationSource.java
@@ -117,9 +117,14 @@ public void getSegmentFiles(
                 final List<String> toDownloadSegmentNames = new ArrayList<>();
                 for (StoreFileMetadata fileMetadata : filesToFetch) {
                     String file = fileMetadata.name();
-                    assert directoryFiles.contains(file) == false : "Local store already contains the file " + file;
+                    assert directoryFiles.contains(file) == false || indexShard.indexSettings().isStoreLocalityPartial()
+                        : "Local store already contains the file " + file;
                     toDownloadSegmentNames.add(file);
                 }
+                if (indexShard.indexSettings().isStoreLocalityPartial()) {
+                    listener.onResponse(new GetSegmentFilesResponse(filesToFetch));
+                    return;
+                }
                 indexShard.getFileDownloader()
                     .downloadAsync(
                         cancellableThreads,
diff --git a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java
index af764556b7549..175bb6592af10 100644
--- a/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java
+++ b/server/src/main/java/org/opensearch/indices/replication/SegmentReplicationTarget.java
@@ -38,6 +38,7 @@
 import java.io.UncheckedIOException;
 import java.util.List;
 import java.util.Locale;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -170,14 +171,22 @@ public void startReplication(ActionListener<Void> listener) {
         final StepListener<CheckpointInfoResponse> checkpointInfoListener = new StepListener<>();
         final StepListener<GetSegmentFilesResponse> getFilesListener = new StepListener<>();
 
+        Map<String, StoreFileMetadata> replicaMd = null;
+        try {
+            replicaMd = indexShard.getSegmentMetadataMap();
+        } catch (IOException e) {
+            listener.onFailure(new RuntimeException(e));
+        }
+
         logger.trace(new ParameterizedMessage("Starting Replication Target: {}", description()));
         // Get list of files to copy from this checkpoint.
         state.setStage(SegmentReplicationState.Stage.GET_CHECKPOINT_INFO);
         cancellableThreads.checkForCancel();
         source.getCheckpointMetadata(getId(), checkpoint, checkpointInfoListener);
 
+        Map<String, StoreFileMetadata> finalReplicaMd = replicaMd;
         checkpointInfoListener.whenComplete(checkpointInfo -> {
-            final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo);
+            final List<StoreFileMetadata> filesToFetch = getFiles(checkpointInfo, finalReplicaMd);
             state.setStage(SegmentReplicationState.Stage.GET_FILES);
             cancellableThreads.checkForCancel();
             source.getSegmentFiles(
@@ -196,31 +205,37 @@ public void startReplication(ActionListener<Void> listener) {
         }, listener::onFailure);
     }
 
-    private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo) throws IOException {
+    private List<StoreFileMetadata> getFiles(CheckpointInfoResponse checkpointInfo, Map<String, StoreFileMetadata> finalReplicaMd)
+        throws IOException {
         cancellableThreads.checkForCancel();
         state.setStage(SegmentReplicationState.Stage.FILE_DIFF);
-        final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), indexShard.getSegmentMetadataMap());
+        final Store.RecoveryDiff diff = Store.segmentReplicationDiff(checkpointInfo.getMetadataMap(), finalReplicaMd);
         // local files
         final Set<String> localFiles = Set.of(indexShard.store().directory().listAll());
-        // set of local files that can be reused
-        final Set<String> reuseFiles = diff.missing.stream()
-            .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
-            .filter(this::validateLocalChecksum)
-            .map(StoreFileMetadata::name)
-            .collect(Collectors.toSet());
+        final List<StoreFileMetadata> missingFiles;
+        // Skip reuse logic for warm indices
+        if (indexShard.indexSettings().isStoreLocalityPartial() == true) {
+            missingFiles = diff.missing;
+        } else {
+            // set of local files that can be reused
+            final Set<String> reuseFiles = diff.missing.stream()
+                .filter(storeFileMetadata -> localFiles.contains(storeFileMetadata.name()))
+                .filter(this::validateLocalChecksum)
+                .map(StoreFileMetadata::name)
+                .collect(Collectors.toSet());
 
-        final List<StoreFileMetadata> missingFiles = diff.missing.stream()
-            .filter(md -> reuseFiles.contains(md.name()) == false)
-            .collect(Collectors.toList());
+            missingFiles = diff.missing.stream().filter(md -> reuseFiles.contains(md.name()) == false).collect(Collectors.toList());
+
+            logger.trace(
+                () -> new ParameterizedMessage(
+                    "Replication diff for checkpoint {} {} {}",
+                    checkpointInfo.getCheckpoint(),
+                    missingFiles,
+                    diff.different
+                )
+            );
+        }
 
-        logger.trace(
-            () -> new ParameterizedMessage(
-                "Replication diff for checkpoint {} {} {}",
-                checkpointInfo.getCheckpoint(),
-                missingFiles,
-                diff.different
-            )
-        );
         /*
          * Segments are immutable. So if the replica has any segments with the same name that differ from the one in the incoming
          * snapshot from source that means the local copy of the segment has been corrupted/changed in some way and we throw an