0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-04-12 23:11:23 -05:00

Move events batching to a util/async ns.

This commit is contained in:
Andrey Antukh 2021-05-09 14:00:23 +02:00 committed by Andrés Moya
parent 0f8e2a9b1b
commit 7cf120e2e1
2 changed files with 34 additions and 32 deletions

View file

@ -106,7 +106,6 @@
;; --- STATE INIT: SESSION UPDATER
(declare batch-events)
(declare update-sessions)
(s/def ::session map?)
@ -129,7 +128,9 @@
(l/info :action "initialize session updater"
:max-batch-age (str (:max-batch-age cfg))
:max-batch-size (str (:max-batch-size cfg)))
(let [input (batch-events cfg (::events-ch session))
(let [input (aa/batch (::events-ch session)
{:max-batch-size (:max-batch-size cfg)
:max-batch-age (inst-ms (:max-batch-age cfg))})
mcnt (mtx/create
{:name "http_session_update_total"
:help "A counter of session update batch events."
@ -149,36 +150,6 @@
:count result))
(recur))))))
(defn- timeout-chan
[cfg]
(a/timeout (inst-ms (:max-batch-age cfg))))
(defn- batch-events
[cfg in]
(let [out (a/chan)]
(a/go-loop [tch (timeout-chan cfg)
buf #{}]
(let [[val port] (a/alts! [tch in])]
(cond
(identical? port tch)
(if (empty? buf)
(recur (timeout-chan cfg) buf)
(do
(a/>! out [:timeout buf])
(recur (timeout-chan cfg) #{})))
(nil? val)
(a/close! out)
(identical? port in)
(let [buf (conj buf val)]
(if (>= (count buf) (:max-batch-size cfg))
(do
(a/>! out [:size buf])
(recur (timeout-chan cfg) #{}))
(recur tch buf))))))
out))
(defn- update-sessions
[{:keys [pool executor]} ids]
(aa/with-thread executor

View file

@ -60,3 +60,34 @@
(if (= executor ::default)
`(a/thread-call (^:once fn* [] (try ~@body (catch Exception e# e#))))
`(thread-call ~executor (^:once fn* [] ~@body))))
(defn batch
[in {:keys [max-batch-size
max-batch-age
init]
:or {max-batch-size 200
max-batch-age (* 30 1000)
init #{}}
:as opts}]
(let [out (a/chan)]
(a/go-loop [tch (a/timeout max-batch-age) buf init]
(let [[val port] (a/alts! [tch in])]
(cond
(identical? port tch)
(if (empty? buf)
(recur (a/timeout max-batch-age) buf)
(do
(a/>! out [:timeout buf])
(recur (a/timeout max-batch-age) init)))
(nil? val)
(a/close! out)
(identical? port in)
(let [buf (conj buf val)]
(if (>= (count buf) max-batch-size)
(do
(a/>! out [:size buf])
(recur (a/timeout max-batch-age) init))
(recur tch buf))))))
out))