-
Notifications
You must be signed in to change notification settings - Fork 14
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Now message-box/bt class will restart thread if it was aborted because of an error. #101
Changes from all commits
4eecb9b
c66a672
a40221e
3df36a7
da61abf
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -113,18 +113,37 @@ This is used to break the environment possibly captured as closure at 'submit' s | |
(defclass message-box/bt (message-box-base) | ||
((queue-thread :initform nil | ||
:documentation | ||
"The thread that pops queue items.")) | ||
"The thread that pops queue items.") | ||
(thread-lock :initform (bt2:make-lock) | ||
:documentation | ||
"A lock which should be taken when queue-thread slot is set.")) | ||
(:documentation | ||
"Bordeaux-Threads based message-box with a single thread operating on a message queue. | ||
This is used when the actor is created using a `:pinned` dispatcher type. | ||
There is a limit on the maximum number of actors/agents that can be created with | ||
this kind of queue because each message-box (and with that each actor) requires exactly one thread.")) | ||
|
||
|
||
(declaim (ftype (function (message-box/bt &key (:thread-name (or null string))) | ||
(values &optional)) | ||
start-thread)) | ||
|
||
(defun start-thread (msgbox &key thread-name) | ||
(with-slots (name queue-thread) | ||
msgbox | ||
(flet ((run-processing-loop () | ||
(message-processing-loop msgbox))) | ||
(setf queue-thread | ||
(bt2:make-thread | ||
#'run-processing-loop | ||
:name (or thread-name | ||
(mkstr "message-thread-" name)))))) | ||
(values)) | ||
|
||
|
||
(defmethod initialize-instance :after ((self message-box/bt) &key) | ||
(with-slots (name queue-thread) self | ||
(setf queue-thread (bt2:make-thread | ||
(lambda () (message-processing-loop self)) | ||
:name (mkstr "message-thread-" name)))) | ||
(start-thread self) | ||
|
||
(when (next-method-p) | ||
(call-next-method))) | ||
|
||
|
@@ -179,6 +198,23 @@ This function sets the result as `handler-result' in `item'. The return of this | |
(bt2:condition-notify withreply-cvar))) | ||
(handler-fun))))) | ||
|
||
|
||
(declaim (ftype (function (message-box/bt) | ||
(values &optional)) | ||
ensure-thread-is-running)) | ||
|
||
(defun ensure-thread-is-running (msgbox) | ||
(with-slots (queue-thread thread-lock) | ||
msgbox | ||
(bt2:with-lock-held (thread-lock) | ||
mdbergmann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
(unless (bt2:thread-alive-p queue-thread) | ||
(log:trace "Restarting thread ~A" | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd like to have this as a |
||
(bt2:thread-name queue-thread)) | ||
(start-thread msgbox | ||
:thread-name (bt2:thread-name queue-thread))) | ||
(values)))) | ||
|
||
|
||
(defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args) | ||
"The `handler-fun-args` argument must contain a handler function as first list item. | ||
It will be apply'ed with the rest of the args when the message was 'popped' from queue." | ||
|
@@ -200,14 +236,26 @@ It will be apply'ed with the rest of the args when the message was 'popped' from | |
:time-out time-out | ||
:handler-fun-args handler-fun-args | ||
:handler-result 'no-result))) | ||
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) | ||
(bt2:with-lock-held (withreply-lock) | ||
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) | ||
(queue:pushq queue push-item) | ||
|
||
(if time-out | ||
(wait-and-probe-for-msg-handler-result msgbox push-item) | ||
(bt2:condition-wait withreply-cvar withreply-lock))) | ||
(cond | ||
(time-out | ||
(bt2:with-lock-held (withreply-lock) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please try to remove the duplicated code. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I could move these three lines into a local function:
is it ok for you? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Local function must still be redefined on runtime for each call of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I've refactored this code and made these two branches outer for bt2:with-lock-held not because of The problem is that this call:
when you specify both
If I'd call this situation an "almost dead-lock" :( That is why I've made these two branches separate and for branch where time-out is given, we release lock before By the way, I've just found There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, indeed there is a problem when timeout is big, then message is only processed after wait time is over. I need to wrap my head around this code. I'm wondering right now why not just remove the lock altogether and why it is needed. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Lock is needed to use a condition variable. It's API requires lock to be used. Without condition you will have to use assert-cond with unlimited timeout. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK, got it I think. |
||
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) | ||
(queue:pushq queue push-item) | ||
(ensure-thread-is-running msgbox)) | ||
|
||
;; It is important to leave lock withreply-lock | ||
;; before we will wait for result. Otherwisee handler-fun | ||
;; will not be able to do it's job: | ||
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) | ||
(wait-and-probe-for-msg-handler-result msgbox push-item)) | ||
(t | ||
(bt2:with-lock-held (withreply-lock) | ||
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item) | ||
(queue:pushq queue push-item) | ||
(ensure-thread-is-running msgbox) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need to add this to In which case then, we indeed need the additional lock. |
||
|
||
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox)) | ||
(bt2:condition-wait withreply-cvar withreply-lock)))) | ||
|
||
(with-slots (handler-result) push-item | ||
(log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result) | ||
|
@@ -301,6 +349,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i | |
processed-messages | ||
dispatcher) self | ||
(incf processed-messages) | ||
|
||
(let ((push-item (make-message-item/dp | ||
:message message | ||
:handler-fun-args handler-fun-args | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
(defpackage :sento.message-box-test | ||
(:use :cl :fiveam :cl-mock :sento.actor :sento.future) | ||
(:shadow #:! #:?) | ||
(:import-from #:miscutils | ||
#:assert-cond | ||
#:await-cond | ||
#:filter) | ||
(:import-from #:timeutils | ||
#:ask-timeout) | ||
(:import-from #:sento.messageb | ||
#:message-box/bt | ||
#:submit | ||
#:no-result | ||
#:queue-thread | ||
#:stop) | ||
(:import-from #:ac | ||
#:actor-of)) | ||
|
||
(in-package :sento.message-box-test) | ||
|
||
(def-suite message-box-tests | ||
:description "message-box tests" | ||
:in sento.tests:test-suite) | ||
|
||
(in-suite message-box-tests) | ||
|
||
|
||
(defun wait-while-thread-will-die (msgbox &key (timeout 10)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you can use one of the |
||
(let ((wait-until (+ (get-internal-real-time) (* timeout | ||
internal-time-units-per-second)))) | ||
(with-slots (queue-thread) | ||
msgbox | ||
(loop while (bt2:thread-alive-p queue-thread) | ||
do (sleep 0.1) | ||
(when (< wait-until | ||
(get-internal-real-time)) | ||
(error "Thread didn't die in ~A seconds." | ||
timeout)))))) | ||
|
||
|
||
(test bt-box-resurrects-thread-after-error | ||
"Tests that if an error happends during message processing, a thread will remain running." | ||
|
||
(let ((box (make-instance 'message-box/bt | ||
:name "foo"))) | ||
(unwind-protect | ||
(progn | ||
(let ((first-reply | ||
(submit box "The Message" | ||
t | ||
;; Don't wait for result here, because we are | ||
;; intentionally raise error here and will never | ||
;; return a result: | ||
nil | ||
(list (lambda (msg) | ||
(declare (ignore msg)) | ||
(handler-bind ((serious-condition #'abort)) | ||
(error "Die, thread, die!"))))))) | ||
(is (equal first-reply | ||
'no-result))) | ||
|
||
(wait-while-thread-will-die box) | ||
|
||
(let ((result (handler-case | ||
(submit box "The Message" t 1 | ||
(list (lambda (msg) | ||
(reverse msg)))) | ||
(ask-timeout () | ||
:timeout)))) | ||
(is (string= "egasseM ehT" result)))) | ||
|
||
;; Cleanup a thread: | ||
(stop box t)))) | ||
|
||
|
||
(test bt-box-resurrects-thread-after-abort-if-handler-catches-all-signals | ||
mdbergmann marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"Tests that if an error happends during message processing, a thread will remain running." | ||
|
||
(let ((box (make-instance 'message-box/bt | ||
:name "foo"))) | ||
(unwind-protect | ||
(progn | ||
(let ((first-reply | ||
(submit box "The Message" | ||
t | ||
;; Don't wait for result here, because we are | ||
;; intentionally raise error here and will never | ||
;; return a result: | ||
nil | ||
(list (lambda (msg) | ||
(declare (ignore msg)) | ||
(handler-case | ||
;; This way we are simulating that the user choose | ||
;; an ABORT restart in the IDE during debug session: | ||
(handler-bind ((serious-condition #'abort)) | ||
(error "Die, thread, die!")) | ||
;; This part the same as error handling code in the | ||
;; SENTO.ACTOR-CELL:HANDLE-MESSAGE function: | ||
;; | ||
;; TODO: t was used to check if it is able to | ||
;; catch stack unwinding because of INVOKE-RESTART, | ||
;; but it cant. | ||
(t (c) | ||
(log:error "error condition was raised: ~%~a~%" | ||
c) | ||
(cons :handler-error c)))))))) | ||
(is (equal first-reply | ||
'no-result))) | ||
|
||
(wait-while-thread-will-die box) | ||
|
||
(let ((result (handler-case | ||
(submit box "The Message" t 1 | ||
(list (lambda (msg) | ||
(reverse msg)))) | ||
(ask-timeout () | ||
:timeout)))) | ||
(is (string= "egasseM ehT" result)))) | ||
|
||
;; Cleanup a thread: | ||
(stop box t)))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the purpose of the declaim?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Recently I've discovered that such type information sometimes helps to find issues during the compilation. So I started to add declarations to the code I'm touching in my own projects and also decided it would not harm to use here.
But if you wish, I'll remove it.