6
6
package org .opensearch .ml .action .batch ;
7
7
8
8
import static org .mockito .ArgumentMatchers .any ;
9
+ import static org .mockito .ArgumentMatchers .anyBoolean ;
10
+ import static org .mockito .ArgumentMatchers .anyLong ;
11
+ import static org .mockito .ArgumentMatchers .anyString ;
9
12
import static org .mockito .ArgumentMatchers .isA ;
10
13
import static org .mockito .Mockito .doAnswer ;
14
+ import static org .mockito .Mockito .doReturn ;
11
15
import static org .mockito .Mockito .doThrow ;
16
+ import static org .mockito .Mockito .never ;
12
17
import static org .mockito .Mockito .verify ;
13
18
import static org .mockito .Mockito .when ;
14
19
import static org .opensearch .ml .common .MLTask .ERROR_FIELD ;
15
20
import static org .opensearch .ml .common .MLTask .STATE_FIELD ;
16
21
import static org .opensearch .ml .common .MLTaskState .COMPLETED ;
17
22
import static org .opensearch .ml .common .MLTaskState .FAILED ;
18
23
import static org .opensearch .ml .engine .ingest .S3DataIngestion .SOURCE ;
24
+ import static org .opensearch .ml .plugin .MachineLearningPlugin .INGEST_THREAD_POOL ;
19
25
import static org .opensearch .ml .task .MLTaskManager .TASK_SEMAPHORE_TIMEOUT ;
20
26
21
27
import java .util .ArrayList ;
22
28
import java .util .Arrays ;
23
29
import java .util .HashMap ;
24
30
import java .util .Map ;
31
+ import java .util .concurrent .ExecutorService ;
25
32
26
33
import org .junit .Before ;
27
34
import org .mockito .ArgumentCaptor ;
45
52
import org .opensearch .threadpool .ThreadPool ;
46
53
import org .opensearch .transport .TransportService ;
47
54
55
+ import com .jayway .jsonpath .PathNotFoundException ;
56
+
48
57
public class TransportBatchIngestionActionTests extends OpenSearchTestCase {
49
58
@ Mock
50
59
private Client client ;
@@ -62,6 +71,8 @@ public class TransportBatchIngestionActionTests extends OpenSearchTestCase {
62
71
ActionListener <MLBatchIngestionResponse > actionListener ;
63
72
@ Mock
64
73
ThreadPool threadPool ;
74
+ @ Mock
75
+ ExecutorService executorService ;
65
76
66
77
private TransportBatchIngestionAction batchAction ;
67
78
private MLBatchIngestionInput batchInput ;
@@ -105,9 +116,42 @@ public void test_doExecute_success() {
105
116
listener .onResponse (indexResponse );
106
117
return null ;
107
118
}).when (mlTaskManager ).createMLTask (isA (MLTask .class ), isA (ActionListener .class ));
119
+ doReturn (executorService ).when (threadPool ).executor (INGEST_THREAD_POOL );
120
+ doAnswer (invocation -> {
121
+ Runnable runnable = invocation .getArgument (0 );
122
+ runnable .run ();
123
+ return null ;
124
+ }).when (executorService ).execute (any (Runnable .class ));
125
+
108
126
batchAction .doExecute (task , mlBatchIngestionRequest , actionListener );
109
127
110
128
verify (actionListener ).onResponse (any (MLBatchIngestionResponse .class ));
129
+ verify (threadPool ).executor (INGEST_THREAD_POOL );
130
+ }
131
+
132
+ public void test_doExecute_ExecuteWithNoErrorHandling () {
133
+ batchAction .executeWithErrorHandling (() -> {}, "taskId" );
134
+
135
+ verify (mlTaskManager , never ()).updateMLTask (anyString (), isA (Map .class ), anyLong (), anyBoolean ());
136
+ }
137
+
138
+ public void test_doExecute_ExecuteWithPathNotFoundException () {
139
+ batchAction .executeWithErrorHandling (() -> { throw new PathNotFoundException ("jsonPath not found!" ); }, "taskId" );
140
+
141
+ verify (mlTaskManager )
142
+ .updateMLTask ("taskId" , Map .of (STATE_FIELD , FAILED , ERROR_FIELD , "jsonPath not found!" ), TASK_SEMAPHORE_TIMEOUT , true );
143
+ }
144
+
145
+ public void test_doExecute_RuntimeException () {
146
+ batchAction .executeWithErrorHandling (() -> { throw new RuntimeException ("runtime exception in the ingestion!" ); }, "taskId" );
147
+
148
+ verify (mlTaskManager )
149
+ .updateMLTask (
150
+ "taskId" ,
151
+ Map .of (STATE_FIELD , FAILED , ERROR_FIELD , "runtime exception in the ingestion!" ),
152
+ TASK_SEMAPHORE_TIMEOUT ,
153
+ true
154
+ );
111
155
}
112
156
113
157
public void test_doExecute_handleSuccessRate100 () {
0 commit comments