Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SCMOD-12730: Initial implementation #1

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open

Conversation

rorytorneymf
Copy link
Contributor

@rorytorneymf rorytorneymf self-assigned this Apr 20, 2021
Comment on lines 50 to 53
<dependency>
<groupId>com.github.cafdataprocessing</groupId>
<artifactId>worker-document-shared</artifactId>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is worker-document-shared directly required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, removed now

Comment on lines 87 to 94
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-sqlobject</artifactId>
</dependency>
<dependency>
<groupId>org.jdbi</groupId>
<artifactId>jdbi3-postgres</artifactId>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these be declared test scope?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One can be test scope, the other needs to be runtime scoped, updated now.

stowedTaskRow.getTaskData(),
TaskStatus.valueOf(stowedTaskRow.getTaskStatus()),
stowedTaskRow.getContext() != null
? OBJECT_MAPPER.readValue(stowedTaskRow.getContext(), Map.class)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This deserializes to a Map<String, byte[]> correctly even though it was only told to deserialize to a Map?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've updated this to use a TypeReference now to specify that the type should be Map<String, byte[]>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I've updated this to use a TypeReference now to specify that the type should be Map<String, byte[]>

I need to double-check and test this a bit more, I'm not entirely sure I'm serializing and deserializing the context properly. Do you have any examples of actual context objects that are used? I've only ever seen context being null or an empty map in my testing.

Copy link
Contributor Author

@rorytorneymf rorytorneymf May 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem here is this:

  1. A task message is sent to any worker with a context that looks like this:
  "context": {
    "caf/worker": "ew0KICAia2V5MSI6ICJ2YWx1ZTEiDQp9"
  },

The key must be caf/worker (or whatever the service path is for that particular worker) otherwise won't get passed to the worker.

The value is a base64 encoded map:

https://github.com/WorkerFramework/worker-framework/blob/develop/worker-core/src/main/java/com/hpe/caf/worker/core/WorkerTaskImpl.java#L122

  1. The worker framework uses the caf/worker key to pull the value out of this Map<String, byte[]>, passing on just the byte[] to the worker:

https://github.com/WorkerFramework/worker-framework/blob/develop/worker-core/src/main/java/com/hpe/caf/worker/core/WorkerTaskImpl.java#L122

  1. The Task Stowing Worker stores this byte[] in the database:

https://github.com/JobService/worker-taskstowing/blob/develop/worker-taskstowing/src/main/java/com/github/jobservice/workers/taskstowing/TaskStowingWorker.java#L102

  1. The Task Unstowing Worker reads this byte[] from the database, however, the Task Unstowing Worker actually needs a Map<String, byte[]> in order to create a TaskMessage to sent onto the intended worker:

https://github.com/JobService/worker-taskunstowing/blob/SCMOD-12730/worker-taskunstowing/src/main/java/com/github/jobservice/workers/taskunstowing/TaskUnstowingWorker.java#L174

So basically, I think the Task Stowing Worker needs to know the servicePath key for the worker that sent it the message to be stowed, and would also need a new db column to store it maybe - as the Task Unstowing Worker will need to know the servicePath in order to reconstruct the context for the TaskMessage, I'm just not sure the best way to go about it? The ServicePath is currently not exposed anywhere as far as I can see:

https://github.com/WorkerFramework/worker-framework/blob/develop/worker-core/src/main/java/com/hpe/caf/worker/core/WorkerTaskImpl.java#L52

My initial thoughts were that when a worker sends a task message to the paused queue, it would have to include its servicePath in the taskData somehow, maybe as custom data, but that feels like it's getting a bit messy?

https://github.com/WorkerFramework/worker-framework/blob/develop/worker-core/src/main/java/com/hpe/caf/worker/core/WorkerCore.java#L573

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed context as discussed

processFailure(failure, exception, document);
}
}
} catch (final Exception exception) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What exception is this trying to catch here? As is it this defeats the transient issue handling above by catching the DocumentWorkerTransientException and turning it into a non-transient failure. I suspect the catch block shouldn't be present at all.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm catching the exception that might be thrown when trying to send a message to rabbit, but its in the wrong place, I've moved it now.

final int numStowedTaskRowsBeforeExcluding = stowedTaskRows.size();
stowedTaskRows = stowedTaskRows.stream()
.filter(stowedTask -> !stowedTaskRowsToExclude.contains(stowedTask))
.collect(Collectors.toList());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may have been more efficient in SQL but I couldn't figure out exactly how to do it. I think it's probably ok as this is an error case scenario and hopefully unlikely to happen often.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants