diff --git a/bench.lisp b/bench.lisp index 04728d6..8219871 100644 --- a/bench.lisp +++ b/bench.lisp @@ -1,92 +1,203 @@ -;;(push #P"~/Development/MySources/sento/" asdf:*central-registry*) -(asdf:load-system "sento") - -(log:config :warn) - -(defparameter *starttime* 0) -(defparameter *endtime* 0) - -(defparameter *withreply-p* nil) - -(defparameter *system* nil) -(defparameter *actor* nil) -(defparameter *counter* 0) -(defparameter +threads+ 8) -(defparameter *per-thread* nil) - -(defun max-loop () (* *per-thread* +threads+)) - -(defun runner-bt (&optional (withreply-p nil) (asyncask nil) (queue-size 0)) - (declare (ignore queue-size)) - ;; dispatchers used for the async-ask - (setf *per-thread* 125000) - (setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8))))) - (setf *actor* (ac:actor-of *system* - :receive (lambda (msg) - (declare (ignore msg)) - (incf *counter*)) - :dispatcher :pinned)) - (setf *withreply-p* withreply-p) - (setf *counter* 0) - (setf *starttime* (get-universal-time)) - (format t "Times: ~a~%" (max-loop)) - (time - (progn - (map nil #'bt2:join-thread - (mapcar (lambda (x) - (bt2:make-thread - (lambda () - (dotimes (n *per-thread*) - (if withreply-p - (if asyncask - (act:ask *actor* :foo) - (act:ask-s *actor* :foo)) - (act:tell *actor* :foo)))) - :name x)) - (mapcar (lambda (n) (format nil "thread-~a" n)) - (loop :for n :from 1 :to +threads+ :collect n)))) - (miscutils:assert-cond (lambda () (= *counter* (max-loop))) 20))) - (setf *endtime* (get-universal-time)) - (format t "Counter: ~a~%" *counter*) - (format t "Elapsed: ~a~%" (- *endtime* *starttime*)) - (print *system*) - (ac:shutdown *system*)) - -(defun runner-dp (&optional (withreply-p nil) (asyncask nil) (queue-size 0)) - (declare (ignore queue-size)) - (setf *per-thread* 125000) - (setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8))))) - (setf *actor* (ac:actor-of *system* - :receive (lambda (msg) - (declare (ignore msg)) - (incf *counter*)) - :dispatcher :shared)) - ;;(print *actor*) - (setf *withreply-p* withreply-p) - (setf *counter* 0) - (setf *starttime* (get-universal-time)) - (format t "Times: ~a~%" (max-loop)) - (time - (progn - (map nil #'bt2:join-thread - (mapcar (lambda (x) - (bt2:make-thread - (lambda () - (dotimes (n *per-thread*) - (if withreply-p - (if asyncask - (act:ask *actor* :foo) - (act:ask-s *actor* :foo)) - (act:tell *actor* :foo)))) - :name x)) - (mapcar (lambda (n) (format nil "thread-~a" n)) - (loop for n from 1 to +threads+ collect n)))) - (miscutils:assert-cond (lambda () (= *counter* (max-loop))) 120))) - (setf *endtime* (get-universal-time)) - (format t "Counter: ~a~%" *counter*) - (format t "Elapsed: ~a~%" (- *endtime* *starttime*)) - ;;(print *system*) - (ac:shutdown *system*)) +(uiop:define-package #:sento/bench + (:use #:cl) + (:import-from #:org.shirakumo.trivial-benchmark + #:*default-samplers* + #:define-sampler + #:with-timing) + (:import-from #:serapeum + #:eval-always + #:defvar-unbound) + (:import-from #:alexandria + #:with-gensyms) + (:import-from #:sento.queue + #:queue-full-error)) +(in-package #:sento/bench) + + +(defun actor-queue-size (a) + (let* ((msgbox (sento.actor-cell:msgbox a)) + (queue (slot-value msgbox + 'sento.messageb::queue))) + (sento.queue:queued-count queue))) + + +(defun total-queues-size (system) + (+ (loop for actor in (sento.actor-system::%all-actors system :user) + summing (actor-queue-size actor)) + (loop for actor in (sento.actor-system::%all-actors system :internal) + summing (actor-queue-size actor)))) + + +(defvar-unbound *num-processed-messages* + "We will bind this varible during a benchmark.") + + +(eval-always + (define-sampler messages-per-second (test-duration processed-messages-count) + (:measure (form) + (with-gensyms (test-started-at) + `(let ((*num-processed-messages* 0) + (,test-started-at (get-internal-real-time))) + + ;; Benchmark code will be called here: + ,form + + (setf ,test-duration + (- (get-internal-real-time) + ,test-started-at)) + (setf ,processed-messages-count + *num-processed-messages*)))) + (:commit (commit-fn) + `(,commit-fn messages-per-second + (/ (float ,processed-messages-count 0d0) + (/ ,test-duration + internal-time-units-per-second)))))) + + +(eval-always + (defparameter *samplers* + (list* 'messages-per-second + *default-samplers*))) + + +;; Use only our message counter. +;; *default-samplers* includes different system +;; metrics similar to metrics TIME macro collects. +;; (eval-always +;; (defparameter *samplers* +;; (list 'messages-per-second))) + + +(defun run-benchmark (&key + (dispatcher :pinned) + (with-reply-p nil) + (async-ask-p nil) + (num-shared-workers 8) + ;; When queue-size is given, then Actor will be created + ;; with bound-queue. Otherwise, queue will be unbound. + ;; To prevent unbound-queue grow, set wait-if-queue-large-than + ;; argument to some value. + (queue-size nil queue-size-given-p) + ;; When actor's goes abover this value, + ;; generator threads will stop and wait while + ;; actor will process some messages from the queue. + ;; Can be turned off if set to NIL, but this could + ;; lead to a high memory consumption and probably + ;; program failure. + + ;; This setting applies some kind of backpressure, + ;; when queue-size is 0 and no other way + ;; to keep generators from filling all the memory + ;; with messages. + (wait-if-queue-large-than 10000 wait-if-queue-large-than-given-p) + (duration 10) + (num-iterations 60) + (load-threads 8)) + + (log:config :warn) + + (check-type dispatcher (member :shared :pinned)) + + ;; Leave only one default + (when (and queue-size-given-p + (not wait-if-queue-large-than-given-p)) + (setf wait-if-queue-large-than nil)) + + (when (and (not queue-size-given-p) + wait-if-queue-large-than-given-p) + (setf queue-size nil)) + + (when (and queue-size + (not (zerop queue-size)) + wait-if-queue-large-than) + (error "Argument WAIT-IF-QUEUE-LARGE-THAN does not makes sense when QUEUE-SIZE is not zero.")) + + (when (and async-ask-p + (not with-reply-p)) + (error "Argument ASYNC-ASK-P should be given together with WITH-REPLY-P argument.")) + + ;; It is useful to save benchmark results along with all params + ;; used to run the benchmark. + (format t "~2&Results for benchmark: ~S~%" + (list :dispatcher dispatcher + :with-reply-p with-reply-p + :async-ask-p async-ask-p + :num-shared-workers num-shared-workers + :queue-size queue-size + :wait-if-queue-large-than wait-if-queue-large-than)) + (force-output) + + (with-timing (num-iterations + :samplers *samplers*) + (let ((counter 0) + (stop-at (+ (get-internal-real-time) + (* duration internal-time-units-per-second)))) + (flet ((receiver (msg) + (declare (ignore msg)) + (incf counter))) + (let* ((system (asys:make-actor-system `(:dispatchers (:shared (:workers ,num-shared-workers))))) + (actor (ac:actor-of system + :receive #'receiver + :dispatcher dispatcher + :queue-size queue-size))) + (flet ((sender () + (loop with check-every = 1000 + for iteration upfrom 0 + while (< (get-internal-real-time) + stop-at) + do (cond + ((and wait-if-queue-large-than + ;; Calling queue-size function + ;; requires lock acquisition which hits performance + ;; and makes message generation up to 10 times slower + ;; depending on generator threads cound. + ;; That is why each thread checks this count only + ;; at some iterations: + (zerop + (mod iteration + check-every)) + (< wait-if-queue-large-than + (actor-queue-size actor))) + (sleep (random 0.1))) + (t + (if with-reply-p + (if async-ask-p + (act:ask actor :foo) + (act:ask-s actor :foo)) + (handler-case + (act:tell actor :foo) + (queue-full-error () + ;; For this test it is ok to just sleep a little + ;; before the next attempt to send message + (sleep (random 0.1)))))))))) + + (unwind-protect + (progn + (let ((threads + (loop for thread-id from 1 upto load-threads + for thread-name = (format nil "thread-~a" thread-id) + collect (bt2:make-thread #'sender + :name thread-name)))) + + (unwind-protect (mapc #'bt2:join-thread threads) + ;; If user will interrupt execution while we are waiting for threads, + ;; we need to clean rest threads: + (loop for thread in threads + when (bt2:thread-alive-p thread) + do (bt2:destroy-thread thread)))) + + ;; Wait while receiver will process all messages in the queue + (miscutils:assert-cond + (lambda () + (zerop (total-queues-size system))) + 60) + + (trivial-garbage:gc :full t) + + ;; To make trivial-benchmark collector see our counter. + (setf *num-processed-messages* + counter)) + (ac:shutdown system)))))))) ;; (defun runner-lp () @@ -141,3 +252,36 @@ ;; (format t "Counter: ~a~%" *counter*) ;; (lparallel:end-kernel) ;; (sento.messageb::stop *msgbox*))) + + +(defun run-all (&key + (num-iterations 10) + (duration 10)) + (run-benchmark :num-iterations num-iterations + :duration duration) + + (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil)) + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t :async-ask-p nil) + + (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t)) + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t :async-ask-p t) + + (format t "Running ~A:~%" '(run-benchmark :queue-size 100)) + (run-benchmark :num-iterations num-iterations + :duration duration + :queue-size 100) + + (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil :queue-size 100)) + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t :async-ask-p nil :queue-size 100) + + (format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t :queue-size 100)) + (run-benchmark :num-iterations num-iterations + :duration duration + :with-reply-p t :async-ask-p t :queue-size 100)) + diff --git a/sento.asd b/sento.asd index 14ef62f..16bffde 100644 --- a/sento.asd +++ b/sento.asd @@ -111,6 +111,19 @@ "mgl-pax/full") :components ((:file "documentation"))) + +;; -------------------------------- +;; benchmark +;; -------------------------------- + +(defsystem "sento/bench" + :author "Manfred Bergmann" + :description "Benchmark for Sento" + :depends-on ("sento" + "trivial-benchmark" + "trivial-garbage") + :components ((:file "bench"))) + ;; load system ;; (asdf:load-system "sento") ;; diff --git a/src/actor-context.lisp b/src/actor-context.lisp index 61549ef..3f0019f 100644 --- a/src/actor-context.lisp +++ b/src/actor-context.lisp @@ -45,7 +45,8 @@ The `actor-system` and the `actor` itself are composed of an `actor-context`.")) (defun %message-box-for-dispatcher-id (context dispatcher-id queue-size) (case dispatcher-id - (:pinned (make-instance 'mesgb:message-box/bt)) + (:pinned (make-instance 'mesgb:message-box/bt + :max-queue-size queue-size)) (otherwise (let ((dispatcher (%get-shared-dispatcher (system context) dispatcher-id))) (unless dispatcher (error (format nil "No such dispatcher identifier '~a' exists!" dispatcher-id))) diff --git a/src/queue/queue-locked.lisp b/src/queue/queue-locked.lisp index af5bc0d..5c3ffd3 100644 --- a/src/queue/queue-locked.lisp +++ b/src/queue/queue-locked.lisp @@ -40,6 +40,12 @@ than the 'queue' implementation of lparallel. (not (or (queue-head queue) (queue-tail queue)))) +(defun size (queue) + (let ((head (queue-head queue)) + (tail (queue-tail queue))) + (+ (length head) + (length tail)))) + #| queue implementation from lparallel. @@ -104,3 +110,9 @@ Copyright (c) 2011-2012, James M. Lawrence. All rights reserved. (defmethod emptyq-p ((self queue-unbounded)) (with-slots (queue) self (emptyp queue))) + + +(defmethod queued-count ((self queue-unbounded)) + (with-slots (queue lock) self + (bt2:with-lock-held (lock) + (size queue))))