|
15 | 15 | import static org.mockito.Mockito.spy;
|
16 | 16 | import static org.mockito.Mockito.verify;
|
17 | 17 | import static org.mockito.Mockito.when;
|
18 |
| -import static org.opensearch.ml.settings.MLCommonsSettings.*; |
| 18 | +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_CANCELLED_REGEX; |
| 19 | +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_CANCELLING_REGEX; |
| 20 | +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_COMPLETED_REGEX; |
| 21 | +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_EXPIRED_REGEX; |
| 22 | +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_FAILED_REGEX; |
| 23 | +import static org.opensearch.ml.settings.MLCommonsSettings.ML_COMMONS_REMOTE_JOB_STATUS_FIELD; |
19 | 24 |
|
20 | 25 | import java.io.IOException;
|
21 | 26 | import java.util.Arrays;
|
|
67 | 72 | import org.opensearch.ml.common.transport.task.MLTaskGetResponse;
|
68 | 73 | import org.opensearch.ml.engine.MLEngine;
|
69 | 74 | import org.opensearch.ml.engine.encryptor.EncryptorImpl;
|
70 |
| -import org.opensearch.ml.engine.ingest.S3DataIngestion; |
71 | 75 | import org.opensearch.ml.helper.ConnectorAccessControlHelper;
|
72 | 76 | import org.opensearch.ml.helper.ModelAccessControlHelper;
|
73 | 77 | import org.opensearch.ml.model.MLModelManager;
|
@@ -120,9 +124,6 @@ public class GetTaskTransportActionTests extends OpenSearchTestCase {
|
120 | 124 | @Mock
|
121 | 125 | private MLModelManager mlModelManager;
|
122 | 126 |
|
123 |
| - @Mock |
124 |
| - private S3DataIngestion s3DataIngestion; |
125 |
| - |
126 | 127 | @Mock
|
127 | 128 | private MLTaskManager mlTaskManager;
|
128 | 129 |
|
@@ -460,6 +461,10 @@ public void test_processTaskResponse_expired() {
|
460 | 461 | processTaskResponse("status", "expired", MLTaskState.EXPIRED);
|
461 | 462 | }
|
462 | 463 |
|
| 464 | + public void test_processTaskResponse_failed() { |
| 465 | + processTaskResponse("status", "failed", MLTaskState.FAILED); |
| 466 | + } |
| 467 | + |
463 | 468 | public void test_processTaskResponse_WrongStatusField() {
|
464 | 469 | processTaskResponse("wrong_status_field", "expired", null);
|
465 | 470 | }
|
@@ -500,4 +505,78 @@ private void processTaskResponse(String statusField, String remoteJobResponseSta
|
500 | 505 | assertEquals(remoteJobResponseStatus, updatedRemoteJob.get(statusField));
|
501 | 506 | assertEquals(remoteJobName, updatedRemoteJob.get("name"));
|
502 | 507 | }
|
| 508 | + |
| 509 | + public void testUpdateDLQ_Success() throws IOException { |
| 510 | + // Setup test data |
| 511 | + Map<String, Object> remoteJob = new HashMap<>(); |
| 512 | + remoteJob.put("TransformJobName", "test-job"); |
| 513 | + Map<String, String> dlq = new HashMap<>(); |
| 514 | + dlq.put("bucket", "test-bucket"); |
| 515 | + dlq.put("region", "us-west-2"); |
| 516 | + remoteJob.put("dlq", dlq); |
| 517 | + |
| 518 | + MLTask mlTask = MLTask |
| 519 | + .builder() |
| 520 | + .taskId("test-task") |
| 521 | + .state(MLTaskState.FAILED) |
| 522 | + .error("Test error message") |
| 523 | + .remoteJob(remoteJob) |
| 524 | + .build(); |
| 525 | + |
| 526 | + // Setup decrypted credentials |
| 527 | + Map<String, String> decryptedCredential = new HashMap<>(); |
| 528 | + decryptedCredential.put("aws_access_key", "test-key"); |
| 529 | + decryptedCredential.put("aws_secret_key", "test-secret"); |
| 530 | + decryptedCredential.put("aws_session_token", "test-token"); |
| 531 | + |
| 532 | + // Call the method |
| 533 | + getTaskTransportAction.updateDLQ(mlTask, decryptedCredential); |
| 534 | + |
| 535 | + // Verify remoteJob DLQ is removed |
| 536 | + assertNull(mlTask.getRemoteJob().get("dlq")); |
| 537 | + } |
| 538 | + |
| 539 | + public void testUpdateDLQ_MissingBucketOrRegion() { |
| 540 | + // Setup test data with missing bucket/region |
| 541 | + Map<String, Object> remoteJob = new HashMap<>(); |
| 542 | + remoteJob.put("TransformJobName", "test-job"); |
| 543 | + Map<String, String> dlq = new HashMap<>(); |
| 544 | + // Intentionally missing bucket and region |
| 545 | + remoteJob.put("dlq", dlq); |
| 546 | + |
| 547 | + MLTask mlTask = MLTask |
| 548 | + .builder() |
| 549 | + .taskId("test-task") |
| 550 | + .state(MLTaskState.FAILED) |
| 551 | + .error("Test error message") |
| 552 | + .remoteJob(remoteJob) |
| 553 | + .build(); |
| 554 | + |
| 555 | + // Call the method - should not throw exception but log error |
| 556 | + getTaskTransportAction.updateDLQ(mlTask, Collections.emptyMap()); |
| 557 | + |
| 558 | + // Verify DLQ still exists since update failed |
| 559 | + assertNotNull(mlTask.getRemoteJob().get("dlq")); |
| 560 | + } |
| 561 | + |
| 562 | + public void testUpdateDLQ_NullDLQ() { |
| 563 | + // Setup test data with null DLQ |
| 564 | + Map<String, Object> remoteJob = new HashMap<>(); |
| 565 | + remoteJob.put("TransformJobName", "test-job"); |
| 566 | + // No DLQ configuration |
| 567 | + |
| 568 | + MLTask mlTask = MLTask |
| 569 | + .builder() |
| 570 | + .taskId("test-task") |
| 571 | + .state(MLTaskState.FAILED) |
| 572 | + .error("Test error message") |
| 573 | + .remoteJob(remoteJob) |
| 574 | + .build(); |
| 575 | + |
| 576 | + // Call the method - should do nothing |
| 577 | + getTaskTransportAction.updateDLQ(mlTask, null); |
| 578 | + |
| 579 | + // Verify remoteJob is unchanged |
| 580 | + assertEquals("test-job", mlTask.getRemoteJob().get("TransformJobName")); |
| 581 | + } |
503 | 582 | }
|
0 commit comments