Skip to content

Commit

Permalink
Benchmark was refactored to not overflow memory
Browse files Browse the repository at this point in the history
Now threads generating load are pause generation if actor's queue
size growth above given threshold.

Now benchmark uses trivial-benchmark library which formats
results in a unified way on different lisp implementations.
However the most wide variety of metrics are supported for SBCL only.

The pro of using trivial-benchmark here is that it automates
repeated tests. Now you can set a number of runs for the benchmark
and a duration of the each run. And trivial-benchmark will automatically
calculate average, median and deviation of each metric across all runs.

For Sento benchmark I've added a special metric - a message per second.
During my tests satisfactory results were obtained when test duration was set to
10 seconds and a number of runs to 60.

On my Macbook M1 with 16G of memory benchmark results are:

Results for benchmark: (:DISPATCHER :PINNED :WITH-REPLY-P NIL :ASYNC-ASK-P NIL
                        :NUM-SHARED-WORKERS 8 :QUEUE-SIZE NIL
                        :WAIT-IF-QUEUE-LARGE-THAN 10000)
┌─────────────────────┬───────────────┬─────────────┬─────────────┬─────────────┬─────────────┬─────────────┐
│ -                   │ TOTAL         │ MINIMUM     │ MAXIMUM     │ MEDIAN      │ AVERAGE     │ DEVIATION   │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ MESSAGES-PER-SECOND │    59763090.0 │    657702.9 │   1103853.4 │   1014389.2 │   996051.44 │    76637.07 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ USER-RUN-TIME       │      707.3372 │    7.839694 │   12.719797 │   11.985142 │   11.788954 │    0.839375 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ SYSTEM-RUN-TIME     │     232.37593 │    2.988574 │    4.180502 │    3.896854 │    3.872932 │    0.177386 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ REAL-TIME           │      635.9478 │      10.508 │      10.637 │      10.608 │   10.599131 │    0.029722 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ GC-RUN-TIME         │         3.924 │        0.05 │       0.075 │       0.067 │      0.0654 │    0.005194 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ GC-REAL-TIME        │         4.052 │        0.05 │       0.128 │       0.067 │    0.067533 │    0.009718 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ PROCESSOR-CYCLES    │           0.0 │         0.0 │         0.0 │         0.0 │         0.0 │         0.0 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ EVAL-CALLS          │       26520.0 │       442.0 │       442.0 │       442.0 │       442.0 │         0.0 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ LAMBDAS-CONVERTED   │           0.0 │         0.0 │         0.0 │         0.0 │         0.0 │         0.0 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ PAGE-FAULTS         │           0.0 │         0.0 │         0.0 │         0.0 │         0.0 │         0.0 │
├─────────────────────┼───────────────┼─────────────┼─────────────┼─────────────┼─────────────┼─────────────┤
│ BYTES-CONSED        │ 121484440000. │ 1338653400. │ 2245857000. │ 2063355600. │ 2024740700. │ 155061730.0 │
└─────────────────────┴───────────────┴─────────────┴─────────────┴─────────────┴─────────────┴─────────────┘
  • Loading branch information
svetlyak40wt committed Jan 24, 2025
1 parent c721de6 commit 1973723
Show file tree
Hide file tree
Showing 4 changed files with 260 additions and 90 deletions.
322 changes: 233 additions & 89 deletions bench.lisp
Original file line number Diff line number Diff line change
@@ -1,92 +1,203 @@
;;(push #P"~/Development/MySources/sento/" asdf:*central-registry*)
(asdf:load-system "sento")

(log:config :warn)

(defparameter *starttime* 0)
(defparameter *endtime* 0)

(defparameter *withreply-p* nil)

(defparameter *system* nil)
(defparameter *actor* nil)
(defparameter *counter* 0)
(defparameter +threads+ 8)
(defparameter *per-thread* nil)

(defun max-loop () (* *per-thread* +threads+))

(defun runner-bt (&optional (withreply-p nil) (asyncask nil) (queue-size 0))
(declare (ignore queue-size))
;; dispatchers used for the async-ask
(setf *per-thread* 125000)
(setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8)))))
(setf *actor* (ac:actor-of *system*
:receive (lambda (msg)
(declare (ignore msg))
(incf *counter*))
:dispatcher :pinned))
(setf *withreply-p* withreply-p)
(setf *counter* 0)
(setf *starttime* (get-universal-time))
(format t "Times: ~a~%" (max-loop))
(time
(progn
(map nil #'bt2:join-thread
(mapcar (lambda (x)
(bt2:make-thread
(lambda ()
(dotimes (n *per-thread*)
(if withreply-p
(if asyncask
(act:ask *actor* :foo)
(act:ask-s *actor* :foo))
(act:tell *actor* :foo))))
:name x))
(mapcar (lambda (n) (format nil "thread-~a" n))
(loop :for n :from 1 :to +threads+ :collect n))))
(miscutils:assert-cond (lambda () (= *counter* (max-loop))) 20)))
(setf *endtime* (get-universal-time))
(format t "Counter: ~a~%" *counter*)
(format t "Elapsed: ~a~%" (- *endtime* *starttime*))
(print *system*)
(ac:shutdown *system*))

(defun runner-dp (&optional (withreply-p nil) (asyncask nil) (queue-size 0))
(declare (ignore queue-size))
(setf *per-thread* 125000)
(setf *system* (asys:make-actor-system '(:dispatchers (:shared (:workers 8)))))
(setf *actor* (ac:actor-of *system*
:receive (lambda (msg)
(declare (ignore msg))
(incf *counter*))
:dispatcher :shared))
;;(print *actor*)
(setf *withreply-p* withreply-p)
(setf *counter* 0)
(setf *starttime* (get-universal-time))
(format t "Times: ~a~%" (max-loop))
(time
(progn
(map nil #'bt2:join-thread
(mapcar (lambda (x)
(bt2:make-thread
(lambda ()
(dotimes (n *per-thread*)
(if withreply-p
(if asyncask
(act:ask *actor* :foo)
(act:ask-s *actor* :foo))
(act:tell *actor* :foo))))
:name x))
(mapcar (lambda (n) (format nil "thread-~a" n))
(loop for n from 1 to +threads+ collect n))))
(miscutils:assert-cond (lambda () (= *counter* (max-loop))) 120)))
(setf *endtime* (get-universal-time))
(format t "Counter: ~a~%" *counter*)
(format t "Elapsed: ~a~%" (- *endtime* *starttime*))
;;(print *system*)
(ac:shutdown *system*))
(uiop:define-package #:sento/bench
(:use #:cl)
(:import-from #:org.shirakumo.trivial-benchmark
#:*default-samplers*
#:define-sampler
#:with-timing)
(:import-from #:serapeum
#:eval-always
#:defvar-unbound)
(:import-from #:alexandria
#:with-gensyms)
(:import-from #:sento.queue
#:queue-full-error))
(in-package #:sento/bench)


(defun actor-queue-size (a)
(let* ((msgbox (sento.actor-cell:msgbox a))
(queue (slot-value msgbox
'sento.messageb::queue)))
(sento.queue:queued-count queue)))


(defun total-queues-size (system)
(+ (loop for actor in (sento.actor-system::%all-actors system :user)
summing (actor-queue-size actor))
(loop for actor in (sento.actor-system::%all-actors system :internal)
summing (actor-queue-size actor))))


(defvar-unbound *num-processed-messages*
"We will bind this varible during a benchmark.")


(eval-always
(define-sampler messages-per-second (test-duration processed-messages-count)
(:measure (form)
(with-gensyms (test-started-at)
`(let ((*num-processed-messages* 0)
(,test-started-at (get-internal-real-time)))

;; Benchmark code will be called here:
,form

(setf ,test-duration
(- (get-internal-real-time)
,test-started-at))
(setf ,processed-messages-count
*num-processed-messages*))))
(:commit (commit-fn)
`(,commit-fn messages-per-second
(/ (float ,processed-messages-count 0d0)
(/ ,test-duration
internal-time-units-per-second))))))


(eval-always
(defparameter *samplers*
(list* 'messages-per-second
*default-samplers*)))


;; Use only our message counter.
;; *default-samplers* includes different system
;; metrics similar to metrics TIME macro collects.
;; (eval-always
;; (defparameter *samplers*
;; (list 'messages-per-second)))


(defun run-benchmark (&key
(dispatcher :pinned)
(with-reply-p nil)
(async-ask-p nil)
(num-shared-workers 8)
;; When queue-size is given, then Actor will be created
;; with bound-queue. Otherwise, queue will be unbound.
;; To prevent unbound-queue grow, set wait-if-queue-large-than
;; argument to some value.
(queue-size nil queue-size-given-p)
;; When actor's goes abover this value,
;; generator threads will stop and wait while
;; actor will process some messages from the queue.
;; Can be turned off if set to NIL, but this could
;; lead to a high memory consumption and probably
;; program failure.

;; This setting applies some kind of backpressure,
;; when queue-size is 0 and no other way
;; to keep generators from filling all the memory
;; with messages.
(wait-if-queue-large-than 10000 wait-if-queue-large-than-given-p)
(duration 10)
(num-iterations 60)
(load-threads 8))

(log:config :warn)

(check-type dispatcher (member :shared :pinned))

;; Leave only one default
(when (and queue-size-given-p
(not wait-if-queue-large-than-given-p))
(setf wait-if-queue-large-than nil))

(when (and (not queue-size-given-p)
wait-if-queue-large-than-given-p)
(setf queue-size nil))

(when (and queue-size
(not (zerop queue-size))
wait-if-queue-large-than)
(error "Argument WAIT-IF-QUEUE-LARGE-THAN does not makes sense when QUEUE-SIZE is not zero."))

(when (and async-ask-p
(not with-reply-p))
(error "Argument ASYNC-ASK-P should be given together with WITH-REPLY-P argument."))

;; It is useful to save benchmark results along with all params
;; used to run the benchmark.
(format t "~2&Results for benchmark: ~S~%"
(list :dispatcher dispatcher
:with-reply-p with-reply-p
:async-ask-p async-ask-p
:num-shared-workers num-shared-workers
:queue-size queue-size
:wait-if-queue-large-than wait-if-queue-large-than))
(force-output)

(with-timing (num-iterations
:samplers *samplers*)
(let ((counter 0)
(stop-at (+ (get-internal-real-time)
(* duration internal-time-units-per-second))))
(flet ((receiver (msg)
(declare (ignore msg))
(incf counter)))
(let* ((system (asys:make-actor-system `(:dispatchers (:shared (:workers ,num-shared-workers)))))
(actor (ac:actor-of system
:receive #'receiver
:dispatcher dispatcher
:queue-size queue-size)))
(flet ((sender ()
(loop with check-every = 1000
for iteration upfrom 0
while (< (get-internal-real-time)
stop-at)
do (cond
((and wait-if-queue-large-than
;; Calling queue-size function
;; requires lock acquisition which hits performance
;; and makes message generation up to 10 times slower
;; depending on generator threads cound.
;; That is why each thread checks this count only
;; at some iterations:
(zerop
(mod iteration
check-every))
(< wait-if-queue-large-than
(actor-queue-size actor)))
(sleep (random 0.1)))
(t
(if with-reply-p
(if async-ask-p
(act:ask actor :foo)
(act:ask-s actor :foo))
(handler-case
(act:tell actor :foo)
(queue-full-error ()
;; For this test it is ok to just sleep a little
;; before the next attempt to send message
(sleep (random 0.1))))))))))

(unwind-protect
(progn
(let ((threads
(loop for thread-id from 1 upto load-threads
for thread-name = (format nil "thread-~a" thread-id)
collect (bt2:make-thread #'sender
:name thread-name))))

(unwind-protect (mapc #'bt2:join-thread threads)
;; If user will interrupt execution while we are waiting for threads,
;; we need to clean rest threads:
(loop for thread in threads
when (bt2:thread-alive-p thread)
do (bt2:destroy-thread thread))))

;; Wait while receiver will process all messages in the queue
(miscutils:assert-cond
(lambda ()
(zerop (total-queues-size system)))
60)

(trivial-garbage:gc :full t)

;; To make trivial-benchmark collector see our counter.
(setf *num-processed-messages*
counter))
(ac:shutdown system))))))))


;; (defun runner-lp ()
Expand Down Expand Up @@ -141,3 +252,36 @@
;; (format t "Counter: ~a~%" *counter*)
;; (lparallel:end-kernel)
;; (sento.messageb::stop *msgbox*)))


(defun run-all (&key
(num-iterations 10)
(duration 10))
(run-benchmark :num-iterations num-iterations
:duration duration)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p nil)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p t)

(format t "Running ~A:~%" '(run-benchmark :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:queue-size 100)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p nil :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p nil :queue-size 100)

(format t "Running ~A:~%" '(run-benchmark :with-reply-p t :async-ask-p t :queue-size 100))
(run-benchmark :num-iterations num-iterations
:duration duration
:with-reply-p t :async-ask-p t :queue-size 100))

13 changes: 13 additions & 0 deletions sento.asd
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,19 @@
"mgl-pax/full")
:components ((:file "documentation")))


;; --------------------------------
;; benchmark
;; --------------------------------

(defsystem "sento/bench"
:author "Manfred Bergmann"
:description "Benchmark for Sento"
:depends-on ("sento"
"trivial-benchmark"
"trivial-garbage")
:components ((:file "bench")))

;; load system
;; (asdf:load-system "sento")
;;
Expand Down
3 changes: 2 additions & 1 deletion src/actor-context.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ The `actor-system` and the `actor` itself are composed of an `actor-context`."))

(defun %message-box-for-dispatcher-id (context dispatcher-id queue-size)
(case dispatcher-id
(:pinned (make-instance 'mesgb:message-box/bt))
(:pinned (make-instance 'mesgb:message-box/bt
:max-queue-size queue-size))
(otherwise (let ((dispatcher (%get-shared-dispatcher (system context) dispatcher-id)))
(unless dispatcher
(error (format nil "No such dispatcher identifier '~a' exists!" dispatcher-id)))
Expand Down
12 changes: 12 additions & 0 deletions src/queue/queue-locked.lisp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ than the 'queue' implementation of lparallel.
(not (or (queue-head queue)
(queue-tail queue))))

(defun size (queue)
(let ((head (queue-head queue))
(tail (queue-tail queue)))
(+ (length head)
(length tail))))


#|
queue implementation from lparallel.
Expand Down Expand Up @@ -104,3 +110,9 @@ Copyright (c) 2011-2012, James M. Lawrence. All rights reserved.
(defmethod emptyq-p ((self queue-unbounded))
(with-slots (queue) self
(emptyp queue)))


(defmethod queued-count ((self queue-unbounded))
(with-slots (queue lock) self
(bt2:with-lock-held (lock)
(size queue))))

0 comments on commit 1973723

Please sign in to comment.