Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Now message-box/bt class will restart thread if it was aborted because of an error. #101

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sento.asd
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@
(:file "actor-system-test")
(:file "actor-tree-test")
(:file "spawn-in-receive-test")
(:file "message-box-test")
)))
:description "Test system for sento"
:perform (test-op (op c) (symbol-call :fiveam :run!
Expand Down
75 changes: 62 additions & 13 deletions src/mbox/message-box.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -113,18 +113,37 @@ This is used to break the environment possibly captured as closure at 'submit' s
(defclass message-box/bt (message-box-base)
((queue-thread :initform nil
:documentation
"The thread that pops queue items."))
"The thread that pops queue items.")
(thread-lock :initform (bt2:make-lock)
:documentation
"A lock which should be taken when queue-thread slot is set."))
(:documentation
"Bordeaux-Threads based message-box with a single thread operating on a message queue.
This is used when the actor is created using a `:pinned` dispatcher type.
There is a limit on the maximum number of actors/agents that can be created with
this kind of queue because each message-box (and with that each actor) requires exactly one thread."))


(declaim (ftype (function (message-box/bt &key (:thread-name (or null string)))
(values &optional))
start-thread))

(defun start-thread (msgbox &key thread-name)
(with-slots (name queue-thread)
msgbox
(flet ((run-processing-loop ()
(message-processing-loop msgbox)))
(setf queue-thread
(bt2:make-thread
#'run-processing-loop
:name (or thread-name
(mkstr "message-thread-" name))))))
(values))


(defmethod initialize-instance :after ((self message-box/bt) &key)
(with-slots (name queue-thread) self
(setf queue-thread (bt2:make-thread
(lambda () (message-processing-loop self))
:name (mkstr "message-thread-" name))))
(start-thread self)

(when (next-method-p)
(call-next-method)))

Expand Down Expand Up @@ -179,6 +198,23 @@ This function sets the result as `handler-result' in `item'. The return of this
(bt2:condition-notify withreply-cvar)))
(handler-fun)))))


(declaim (ftype (function (message-box/bt)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of the declaim?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recently I've discovered that such type information sometimes helps to find issues during the compilation. So I started to add declarations to the code I'm touching in my own projects and also decided it would not harm to use here.

But if you wish, I'll remove it.

(values &optional))
ensure-thread-is-running))

(defun ensure-thread-is-running (msgbox)
(with-slots (queue-thread thread-lock)
msgbox
(bt2:with-lock-held (thread-lock)
(unless (bt2:thread-alive-p queue-thread)
(log:trace "Restarting thread ~A"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to have this as a warn logging to know exactly when it happens. Should be an exception.

(bt2:thread-name queue-thread))
(start-thread msgbox
:thread-name (bt2:thread-name queue-thread)))
(values))))


(defmethod submit ((self message-box/bt) message withreply-p time-out handler-fun-args)
"The `handler-fun-args` argument must contain a handler function as first list item.
It will be apply'ed with the rest of the args when the message was 'popped' from queue."
Expand All @@ -200,14 +236,26 @@ It will be apply'ed with the rest of the args when the message was 'popped' from
:time-out time-out
:handler-fun-args handler-fun-args
:handler-result 'no-result)))
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox))
(bt2:with-lock-held (withreply-lock)
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
(queue:pushq queue push-item)

(if time-out
(wait-and-probe-for-msg-handler-result msgbox push-item)
(bt2:condition-wait withreply-cvar withreply-lock)))
(cond
(time-out
(bt2:with-lock-held (withreply-lock)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please try to remove the duplicated code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move these three lines into a local function:

         (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
         (queue:pushq queue push-item)
         (ensure-thread-is-running msgbox)

is it ok for you?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Local function must still be redefined on runtime for each call of submit, and then a function call in itself must safe and restore registers and so on.
We're just talking about a handful of lines of code. I would say it was OK as before with the factored-out common code and just leave the differences. Effectively we just have the additional (ensure-thread-is-running msgbox) call, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refactored this code and made these two branches outer for bt2:with-lock-held not because of (ensure-thread-is-running msgbox) it is a fix of another problem I've encounter trying to write a test.

The problem is that this call:

(submit box "The Message" t 1
                                     (list (lambda (msg)
                                             (reverse msg))))

when you specify both withreply-p = t and timeout != nil, then this submit call hangs. Why it hangs? Because of this old version of code in submit/reply:

(bt2:with-lock-held (withreply-lock)
      (log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
      (queue:pushq queue push-item)

      (if time-out
          (wait-and-probe-for-msg-handler-result msgbox push-item)
          (bt2:condition-wait withreply-cvar withreply-lock)))

If time-out is specified, then withreply-lock is held during (wait-and-probe-for-msg-handler-result msgbox push-item). But process-queue-item function called in the box's thread, also tries to acquire withreply-lock (here). And it can't because the lock is already held by another thread. And because of this, box's thread can't execute handler and return a value which leads to a situation where (wait-and-probe-for-msg-handler-result) is running during given timeout seconds and then fails because didn't receive any results.

I'd call this situation an "almost dead-lock" :(

That is why I've made these two branches separate and for branch where time-out is given, we release lock before wait-and-probe-for-msg-handler-result call, whereas in branch where timeout is NIL, lock is released by call to bt2:condition-wait.

By the way, I've just found bt2:condition-wait has it's own timeout argument. Why you didn't use it instead of call to wait-and-probe-for-msg-handler-result? Probably this code could be simpler, if we handle timeout error from bt2:condition-wait.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, indeed there is a problem when timeout is big, then message is only processed after wait time is over.

I need to wrap my head around this code. I'm wondering right now why not just remove the lock altogether and why it is needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lock is needed to use a condition variable. It's API requires lock to be used. Without condition you will have to use assert-cond with unlimited timeout.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, got it I think.
So I think this code is OK. We can keep that bit of duplication on each branch.

(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
(queue:pushq queue push-item)
(ensure-thread-is-running msgbox))

;; It is important to leave lock withreply-lock
;; before we will wait for result. Otherwisee handler-fun
;; will not be able to do it's job:
(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox))
(wait-and-probe-for-msg-handler-result msgbox push-item))
(t
(bt2:with-lock-held (withreply-lock)
(log:trace "~a: pushing item to queue: ~a" (name msgbox) push-item)
(queue:pushq queue push-item)
(ensure-thread-is-running msgbox)
Copy link
Owner

@mdbergmann mdbergmann Jan 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We also need to add this to submit/no-reply. And then, probably it's better to put it to submit, to catch both ways of submitting.

In which case then, we indeed need the additional lock.


(log:trace "~a: withreply: waiting for arrival of result..." (name msgbox))
(bt2:condition-wait withreply-cvar withreply-lock))))

(with-slots (handler-result) push-item
(log:trace "~a: withreply: result should be available: ~a" (name msgbox) handler-result)
Expand Down Expand Up @@ -301,6 +349,7 @@ Returns the handler-result if `withreply-p' is eq to `T', otherwise the return i
processed-messages
dispatcher) self
(incf processed-messages)

(let ((push-item (make-message-item/dp
:message message
:handler-fun-args handler-fun-args
Expand Down
121 changes: 121 additions & 0 deletions tests/message-box-test.lisp
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
(defpackage :sento.message-box-test
(:use :cl :fiveam :cl-mock :sento.actor :sento.future)
(:shadow #:! #:?)
(:import-from #:miscutils
#:assert-cond
#:await-cond
#:filter)
(:import-from #:timeutils
#:ask-timeout)
(:import-from #:sento.messageb
#:message-box/bt
#:submit
#:no-result
#:queue-thread
#:stop)
(:import-from #:ac
#:actor-of))

(in-package :sento.message-box-test)

(def-suite message-box-tests
:description "message-box tests"
:in sento.tests:test-suite)

(in-suite message-box-tests)


(defun wait-while-thread-will-die (msgbox &key (timeout 10))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can use one of the assert-cond or await-cond utils in miscutils package.

(let ((wait-until (+ (get-internal-real-time) (* timeout
internal-time-units-per-second))))
(with-slots (queue-thread)
msgbox
(loop while (bt2:thread-alive-p queue-thread)
do (sleep 0.1)
(when (< wait-until
(get-internal-real-time))
(error "Thread didn't die in ~A seconds."
timeout))))))


(test bt-box-resurrects-thread-after-error
"Tests that if an error happends during message processing, a thread will remain running."

(let ((box (make-instance 'message-box/bt
:name "foo")))
(unwind-protect
(progn
(let ((first-reply
(submit box "The Message"
t
;; Don't wait for result here, because we are
;; intentionally raise error here and will never
;; return a result:
nil
(list (lambda (msg)
(declare (ignore msg))
(handler-bind ((serious-condition #'abort))
(error "Die, thread, die!")))))))
(is (equal first-reply
'no-result)))

(wait-while-thread-will-die box)

(let ((result (handler-case
(submit box "The Message" t 1
(list (lambda (msg)
(reverse msg))))
(ask-timeout ()
:timeout))))
(is (string= "egasseM ehT" result))))

;; Cleanup a thread:
(stop box t))))


(test bt-box-resurrects-thread-after-abort-if-handler-catches-all-signals
"Tests that if an error happends during message processing, a thread will remain running."

(let ((box (make-instance 'message-box/bt
:name "foo")))
(unwind-protect
(progn
(let ((first-reply
(submit box "The Message"
t
;; Don't wait for result here, because we are
;; intentionally raise error here and will never
;; return a result:
nil
(list (lambda (msg)
(declare (ignore msg))
(handler-case
;; This way we are simulating that the user choose
;; an ABORT restart in the IDE during debug session:
(handler-bind ((serious-condition #'abort))
(error "Die, thread, die!"))
;; This part the same as error handling code in the
;; SENTO.ACTOR-CELL:HANDLE-MESSAGE function:
;;
;; TODO: t was used to check if it is able to
;; catch stack unwinding because of INVOKE-RESTART,
;; but it cant.
(t (c)
(log:error "error condition was raised: ~%~a~%"
c)
(cons :handler-error c))))))))
(is (equal first-reply
'no-result)))

(wait-while-thread-will-die box)

(let ((result (handler-case
(submit box "The Message" t 1
(list (lambda (msg)
(reverse msg))))
(ask-timeout ()
:timeout))))
(is (string= "egasseM ehT" result))))

;; Cleanup a thread:
(stop box t))))
Loading