62
62
import org .opensearch .ml .common .transport .task .MLTaskGetAction ;
63
63
import org .opensearch .ml .common .transport .task .MLTaskGetRequest ;
64
64
import org .opensearch .ml .common .transport .task .MLTaskGetResponse ;
65
- import org .opensearch .ml .common .utils .S3Utils ;
66
65
import org .opensearch .ml .engine .MLEngine ;
67
66
import org .opensearch .ml .engine .MLEngineClassLoader ;
68
67
import org .opensearch .ml .engine .algorithms .remote .ConnectorUtils ;
69
68
import org .opensearch .ml .engine .algorithms .remote .RemoteConnectorExecutor ;
70
69
import org .opensearch .ml .engine .encryptor .EncryptorImpl ;
70
+ import org .opensearch .ml .engine .utils .S3Utils ;
71
71
import org .opensearch .ml .helper .ConnectorAccessControlHelper ;
72
72
import org .opensearch .ml .helper .ModelAccessControlHelper ;
73
73
import org .opensearch .ml .model .MLModelManager ;
85
85
import org .opensearch .transport .TransportService ;
86
86
87
87
import lombok .extern .log4j .Log4j2 ;
88
- import software .amazon .awssdk .services .s3 .S3Client ;
89
- import software .amazon .awssdk .services .s3 .model .S3Exception ;
90
88
91
89
@ Log4j2
92
90
public class GetTaskTransportAction extends HandledTransportAction <ActionRequest , MLTaskGetResponse > {
@@ -239,7 +237,6 @@ private void handleAsyncResponse(
239
237
handleThrowable (throwable , taskId , actionListener );
240
238
return ;
241
239
}
242
-
243
240
processResponse (response , taskId , isUserInitiatedGetTaskRequest , tenantId , actionListener );
244
241
}
245
242
@@ -531,7 +528,7 @@ protected void processTaskResponse(
531
528
}
532
529
}
533
530
534
- private void updateDLQ (MLTask mlTask ) {
531
+ protected void updateDLQ (MLTask mlTask ) {
535
532
Map <String , Object > remoteJob = mlTask .getRemoteJob ();
536
533
Map <String , String > dlq = (Map <String , String >) remoteJob .get ("dlq" );
537
534
if (dlq != null && !dlq .isEmpty ()) {
@@ -542,33 +539,22 @@ private void updateDLQ(MLTask mlTask) {
542
539
String secretKey = this .decryptedCredential .get (SECRET_KEY_FIELD );
543
540
String sessionToken = this .decryptedCredential .get (SESSION_TOKEN_FIELD );
544
541
545
- if (dlq != null ) {
546
- String bucketName = dlq .get ("bucket" );
547
- String region = dlq .get ("region" );
542
+ String bucketName = dlq .get ("bucket" );
543
+ String region = dlq .get ("region" );
548
544
549
- if (bucketName == null || region == null ) {
550
- log .error ("Failed to get the bucket name and region from batch predict request" );
551
- }
552
- remoteJobDetails .remove ("dlq" );
553
- S3Client s3Client = S3Utils .initS3Client (accessKey , secretKey , sessionToken , region );
554
- try {
555
-
556
- String jobName = (String ) remoteJobDetails .getOrDefault ("TransformJobName" , remoteJob .get ("job_name" ));
557
- String s3ObjectKey = "BatchJobFailure_" + jobName ;
558
- String content = mlTask .getState ().equals (UNREACHABLE )
559
- ? String .format ("Unable to reach the Job: %s. Error Message: %s" , jobName , mlTask .getError ())
560
- : remoteJobDetails .toString ();
561
-
562
- S3Utils .putObject (s3Client , bucketName , s3ObjectKey , content );
563
- log .debug ("Task status successfully uploaded to S3 for task ID: {} at {}" , taskId , Instant .now ());
564
- } catch (S3Exception e ) {
565
- log .error ("S3 Exception: " + e .awsErrorDetails ().errorMessage ());
566
- } catch (Exception e ) {
567
- e .printStackTrace ();
568
- } finally {
569
- s3Client .close ();
570
- }
545
+ if (bucketName == null || region == null ) {
546
+ log .error ("Failed to get the bucket name and region from batch predict request" );
571
547
}
548
+ remoteJobDetails .remove ("dlq" );
549
+
550
+ String jobName = (String ) remoteJobDetails .getOrDefault ("TransformJobName" , remoteJob .get ("job_name" ));
551
+ String s3ObjectKey = "BatchJobFailure_" + jobName ;
552
+ String content = mlTask .getState ().equals (UNREACHABLE )
553
+ ? String .format ("Unable to reach the Job: %s. Error Message: %s" , jobName , mlTask .getError ())
554
+ : remoteJobDetails .toString ();
555
+
556
+ S3Utils .putObject (accessKey , secretKey , sessionToken , region , bucketName , s3ObjectKey , content );
557
+ log .debug ("Task status successfully uploaded to S3 for task ID: {} at {}" , taskId , Instant .now ());
572
558
} catch (Exception e ) {
573
559
log .error ("Failed to update task status for task: " + taskId , e );
574
560
}
0 commit comments