0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-13 02:28:18 -05:00

♻️ Refactor semaphore and executors

This commit is contained in:
Andrey Antukh 2022-09-19 12:25:44 +02:00
parent 12b98c22bc
commit 6f42f4ec45
12 changed files with 293 additions and 207 deletions

View file

@ -170,12 +170,11 @@
(s/def ::redis-uri ::us/string)
(s/def ::registration-domain-whitelist ::us/set-of-strings)
(s/def ::semaphore-font-process ::us/integer)
(s/def ::semaphore-file-update ::us/integer)
(s/def ::semaphore-image-process ::us/integer)
(s/def ::semaphore-authentication ::us/integer)
(s/def ::rpc-semaphore-permits-font ::us/integer)
(s/def ::rpc-semaphore-permits-file-update ::us/integer)
(s/def ::rpc-semaphore-permits-image ::us/integer)
(s/def ::rpc-semaphore-permits-password ::us/integer)
(s/def ::smtp-default-from ::us/string)
(s/def ::smtp-default-reply-to ::us/string)
(s/def ::smtp-host ::us/string)
@ -278,10 +277,12 @@
::public-uri
::redis-uri
::registration-domain-whitelist
::rpc-semaphore-permits-font
::rpc-semaphore-permits-file-update
::rpc-semaphore-permits-image
::rpc-semaphore-permits-password
::semaphore-process-font
::semaphore-process-image
::semaphore-update-file
::semaphore-auth
::rpc-rlimit-config
::sentry-dsn
::sentry-debug

View file

@ -27,32 +27,22 @@
;; Default thread pool for IO operations
[::default :app.worker/executor]
{:parallelism (cf/get :default-executor-parallelism 60)
:prefix :default}
;; Constrained thread pool. Should only be used from high resources
;; demanding operations.
[::blocking :app.worker/executor]
{:parallelism (cf/get :blocking-executor-parallelism 10)
:prefix :blocking}
{:parallelism (cf/get :default-executor-parallelism 70)}
;; Dedicated thread pool for backround tasks execution.
[::worker :app.worker/executor]
{:parallelism (cf/get :worker-executor-parallelism 10)
:prefix :worker}
{:parallelism (cf/get :worker-executor-parallelism 20)}
:app.worker/scheduler
{:parallelism 1
:prefix :scheduler}
:app.worker/executors
{:default (ig/ref [::default :app.worker/executor])
:worker (ig/ref [::worker :app.worker/executor])
:blocking (ig/ref [::blocking :app.worker/executor])}
{:default (ig/ref [::default :app.worker/executor])
:worker (ig/ref [::worker :app.worker/executor])}
:app.worker/executors-monitor
:app.worker/executor-monitor
{:metrics (ig/ref :app.metrics/metrics)
:scheduler (ig/ref :app.worker/scheduler)
:executors (ig/ref :app.worker/executors)}
:app.migrations/migrations
@ -216,6 +206,10 @@
{:pool (ig/ref :app.db/pool)
:executor (ig/ref [::default :app.worker/executor])}
:app.rpc/semaphores
{:metrics (ig/ref :app.metrics/metrics)
:executor (ig/ref [::default :app.worker/executor])}
:app.rpc/rlimit
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)}
@ -234,7 +228,10 @@
:http-client (ig/ref :app.http/client)
:rlimit (ig/ref :app.rpc/rlimit)
:executors (ig/ref :app.worker/executors)
:templates (ig/ref :app.setup/builtin-templates)}
:executor (ig/ref [::default :app.worker/executor])
:templates (ig/ref :app.setup/builtin-templates)
:semaphores (ig/ref :app.rpc/semaphores)
}
:app.rpc.doc/routes
{:methods (ig/ref :app.rpc/methods)}
@ -359,7 +356,7 @@
(def worker-config
{ :app.worker/cron
{:app.worker/cron
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)
:tasks (ig/ref :app.worker/registry)

View file

@ -100,23 +100,23 @@
::mdef/labels ["name"]
::mdef/type :summary}
:rpc-semaphore-queued-submissions
{::mdef/name "penpot_rpc_semaphore_queued_submissions"
::mdef/help "Current number of queued submissions on RPC-SEMAPHORE."
:semaphore-queued-submissions
{::mdef/name "penpot_semaphore_queued_submissions"
::mdef/help "Current number of queued submissions on SEMAPHORE."
::mdef/labels ["name"]
::mdef/type :gauge}
:rpc-semaphore-used-permits
{::mdef/name "penpot_rpc_semaphore_used_permits"
::mdef/help "Current number of used permits on RPC-SEMAPHORE."
:semaphore-used-permits
{::mdef/name "penpot_semaphore_used_permits"
::mdef/help "Current number of used permits on SEMAPHORE."
::mdef/labels ["name"]
::mdef/type :gauge}
:rpc-semaphore-acquires-total
{::mdef/name "penpot_rpc_semaphore_acquires_total"
::mdef/help "Total number of acquire operations on RPC-SEMAPHORE."
:semaphore-timing
{::mdef/name "penpot_semaphore_timing"
::mdef/help "Total timing of SEMAPHORE."
::mdef/labels ["name"]
::mdef/type :counter}
::mdef/type :summary}
:executors-active-threads
{::mdef/name "penpot_executors_active_threads"

View file

@ -16,10 +16,9 @@
[app.msgbus :as-alias mbus]
[app.rpc.retry :as retry]
[app.rpc.rlimit :as rlimit]
[app.rpc.semaphore :as rsem]
[app.util.async :as async]
[app.rpc.semaphore :as-alias rsem]
[app.util.services :as sv]
[app.worker :as wrk]
[app.util.time :as ts]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]
@ -107,38 +106,25 @@
"Wrap service method with metrics measurement."
[{:keys [metrics ::metrics-id]} f mdata]
(let [labels (into-array String [(::sv/name mdata)])]
(fn [cfg params]
(let [start (System/nanoTime)]
(let [tp (ts/tpoint)]
(p/finally
(f cfg params)
(fn [_ _]
(mtx/run! metrics
{:id metrics-id
:val (/ (- (System/nanoTime) start) 1000000)
:labels labels})))))))
:id metrics-id
:val (inst-ms (tp))
:labels labels)))))))
(defn- wrap-dispatch
"Wraps service method into async flow, with the ability to dispatching
it to a preconfigured executor service."
[{:keys [executors] :as cfg} f mdata]
(let [dname (::async/dispatch mdata :default)]
(if (= :none dname)
(with-meta
(fn [cfg params]
(p/do (f cfg params)))
mdata)
(let [executor (get executors dname)]
(when-not executor
(ex/raise :type :internal
:code :executor-not-configured
:hint (format "executor %s not configured" dname)))
(with-meta
(fn [cfg params]
(-> (px/submit! executor #(f cfg params))
(p/bind p/wrap)))
mdata)))))
[{:keys [executor] :as cfg} f mdata]
(with-meta
(fn [cfg params]
(-> (px/submit! executor #(f cfg params))
(p/bind p/wrap)))
mdata))
(defn- wrap-audit
[{:keys [audit] :as cfg} f mdata]
@ -171,8 +157,8 @@
[cfg f mdata]
(let [f (as-> f $
(wrap-dispatch cfg $ mdata)
(rsem/wrap cfg $ mdata)
(rlimit/wrap cfg $ mdata)
(rsem/wrap cfg $ mdata)
(retry/wrap-retry cfg $ mdata)
(wrap-audit cfg $ mdata)
(wrap-metrics cfg $ mdata)
@ -245,8 +231,6 @@
(into {}))))
(s/def ::audit (s/nilable fn?))
(s/def ::executors (s/map-of keyword? ::wrk/executor))
(s/def ::executors map?)
(s/def ::http-client fn?)
(s/def ::ldap (s/nilable map?))
(s/def ::msgbus ::mbus/msgbus)
@ -260,10 +244,10 @@
::session
::sprops
::audit
::executors
::public-uri
::msgbus
::http-client
::rsem/semaphores
::rlimit/rlimit
::mtx/metrics
::db/pool

View file

@ -136,7 +136,7 @@
(sv/defmethod ::login-with-password
"Performs authentication using penpot password."
{:auth false
::rsem/permits (cf/get :rpc-semaphore-permits-password)
::rsem/queue :auth
::doc/added "1.15"}
[cfg params]
(login-with-password cfg params))
@ -177,7 +177,7 @@
(sv/defmethod ::recover-profile
{:auth false
::rsem/permits (cf/get :rpc-semaphore-permits-password)
::rsem/queue :auth
::doc/added "1.15"}
[cfg params]
(recover-profile cfg params))
@ -368,7 +368,7 @@
(sv/defmethod ::register-profile
{:auth false
::rsem/permits (cf/get :rpc-semaphore-permits-password)
::rsem/queue :auth
::doc/added "1.15"}
[{:keys [pool] :as cfg} params]
(db/with-atomic [conn pool]

View file

@ -315,7 +315,7 @@
(contains? o :changes-with-metadata)))))
(sv/defmethod ::update-file
{::rsem/permits (cf/get :rpc-semaphore-permits-file-update)}
{::rsem/queue :update-file}
[{:keys [pool] :as cfg} {:keys [id profile-id] :as params}]
(db/with-atomic [conn pool]
(db/xact-lock! conn id)

View file

@ -10,7 +10,6 @@
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.media :as media]
[app.rpc.doc :as-alias doc]
@ -20,8 +19,7 @@
[app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[promesa.core :as p]
[promesa.exec :as px]))
[promesa.core :as p]))
(declare create-font-variant)
@ -42,24 +40,21 @@
::font-id ::font-family ::font-weight ::font-style]))
(sv/defmethod ::create-font-variant
{::rsem/permits (cf/get :rpc-semaphore-permits-font)}
[{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)]
(teams/check-edition-permissions! pool profile-id team-id)
(create-font-variant cfg params)))
(defn create-font-variant
[{:keys [storage pool executors] :as cfg} {:keys [data] :as params}]
[{:keys [storage pool executor semaphores] :as cfg} {:keys [data] :as params}]
(letfn [(generate-fonts [data]
(px/with-dispatch (:blocking executors)
(rsem/with-dispatch (:process-font semaphores)
(media/run {:cmd :generate-fonts :input data})))
;; Function responsible of calculating cryptographyc hash of
;; the provided data. Even though it uses the hight
;; performance BLAKE2b algorithm, we prefer to schedule it
;; to be executed on the blocking executor.
;; the provided data.
(calculate-hash [data]
(px/with-dispatch (:blocking executors)
(rsem/with-dispatch (:process-font semaphores)
(sto/calculate-hash data)))
(validate-data [data]
@ -110,8 +105,8 @@
(-> (generate-fonts data)
(p/then validate-data)
(p/then persist-fonts (:default executors))
(p/then insert-into-db (:default executors)))))
(p/then persist-fonts executor)
(p/then insert-into-db executor))))
;; --- UPDATE FONT FAMILY

View file

@ -23,8 +23,7 @@
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.io :as io]
[promesa.core :as p]
[promesa.exec :as px]))
[promesa.core :as p]))
(def default-max-file-size (* 1024 1024 10)) ; 10 MiB
@ -53,7 +52,6 @@
:opt-un [::id]))
(sv/defmethod ::upload-file-media-object
{::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[{:keys [pool] :as cfg} {:keys [profile-id file-id content] :as params}]
(let [file (select-file pool file-id)
cfg (update cfg :storage media/configure-assets-storage)]
@ -106,26 +104,25 @@
;; inverse, soft referential integrity).
(defn create-file-media-object
[{:keys [storage pool executors] :as cfg} {:keys [id file-id is-local name content] :as params}]
[{:keys [storage pool semaphores] :as cfg}
{:keys [id file-id is-local name content] :as params}]
(letfn [;; Function responsible to retrieve the file information, as
;; it is synchronous operation it should be wrapped into
;; with-dispatch macro.
(get-info [content]
(px/with-dispatch (:blocking executors)
(rsem/with-dispatch (:process-image semaphores)
(media/run {:cmd :info :input content})))
;; Function responsible of calculating cryptographyc hash of
;; the provided data. Even though it uses the hight
;; performance BLAKE2b algorithm, we prefer to schedule it
;; to be executed on the blocking executor.
;; the provided data.
(calculate-hash [data]
(px/with-dispatch (:blocking executors)
(rsem/with-dispatch (:process-image semaphores)
(sto/calculate-hash data)))
;; Function responsible of generating thumnail. As it is synchronous
;; opetation, it should be wrapped into with-dispatch macro
(generate-thumbnail [info]
(px/with-dispatch (:blocking executors)
(rsem/with-dispatch (:process-image semaphores)
(media/run (assoc thumbnail-options
:cmd :generic-thumbnail
:input info))))
@ -157,15 +154,14 @@
:bucket "file-media-object"})))
(insert-into-database [info image thumb]
(px/with-dispatch (:default executors)
(db/exec-one! pool [sql:create-file-media-object
(or id (uuid/next))
file-id is-local name
(:id image)
(:id thumb)
(:width info)
(:height info)
(:mtype info)])))]
(db/exec-one! pool [sql:create-file-media-object
(or id (uuid/next))
file-id is-local name
(:id image)
(:id thumb)
(:width info)
(:height info)
(:mtype info)]))]
(p/let [info (get-info content)
thumb (create-thumbnail info)
@ -181,7 +177,6 @@
:opt-un [::id ::name]))
(sv/defmethod ::create-file-media-object-from-url
{::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}]
(let [file (select-file pool file-id)
cfg (update cfg :storage media/configure-assets-storage)]

View file

@ -15,6 +15,7 @@
[app.loggers.audit :as audit]
[app.media :as media]
[app.rpc.commands.auth :as cmd.auth]
[app.rpc.doc :as-alias doc]
[app.rpc.mutations.teams :as teams]
[app.rpc.queries.profile :as profile]
[app.rpc.semaphore :as rsem]
@ -87,7 +88,7 @@
(s/keys :req-un [::profile-id ::password ::old-password]))
(sv/defmethod ::update-profile-password
{::rsem/permits (cf/get :rpc-semaphore-permits-password)}
{::rsem/queue :auth}
[{:keys [pool] :as cfg} {:keys [password] :as params}]
(db/with-atomic [conn pool]
(let [profile (validate-password! conn params)
@ -130,7 +131,6 @@
(s/keys :req-un [::profile-id ::file]))
(sv/defmethod ::update-profile-photo
{::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[cfg {:keys [file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
@ -138,8 +138,8 @@
(update-profile-photo cfg params)))
(defn update-profile-photo
[{:keys [pool storage executors] :as cfg} {:keys [profile-id] :as params}]
(p/let [profile (px/with-dispatch (:default executors)
[{:keys [pool storage executor] :as cfg} {:keys [profile-id] :as params}]
(p/let [profile (px/with-dispatch executor
(db/get-by-id pool :profile profile-id))
photo (teams/upload-photo cfg params)]
@ -305,7 +305,10 @@
(s/def ::login ::cmd.auth/login-with-password)
(sv/defmethod ::login
{:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
{:auth false
::rsem/queue :auth
::doc/added "1.0"
::doc/deprecated "1.15"}
[cfg params]
(cmd.auth/login-with-password cfg params))
@ -313,7 +316,10 @@
(s/def ::logout ::cmd.auth/logout)
(sv/defmethod ::logout {:auth false}
(sv/defmethod ::logout
{:auth false
::doc/added "1.0"
::doc/deprecated "1.15"}
[{:keys [session] :as cfg} _]
(with-meta {}
{:transform-response (:delete session)}))
@ -323,7 +329,8 @@
(s/def ::recover-profile ::cmd.auth/recover-profile)
(sv/defmethod ::recover-profile
{:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
{::doc/added "1.0"
::doc/deprecated "1.15"}
[cfg params]
(cmd.auth/recover-profile cfg params))
@ -331,7 +338,10 @@
(s/def ::prepare-register-profile ::cmd.auth/prepare-register-profile)
(sv/defmethod ::prepare-register-profile {:auth false}
(sv/defmethod ::prepare-register-profile
{:auth false
::doc/added "1.0"
::doc/deprecated "1.15"}
[cfg params]
(cmd.auth/prepare-register cfg params))
@ -340,7 +350,10 @@
(s/def ::register-profile ::cmd.auth/register-profile)
(sv/defmethod ::register-profile
{:auth false ::rsem/permits (cf/get :rpc-semaphore-permits-password)}
{:auth false
::rsem/queue :auth
::doc/added "1.0"
::doc/deprecated "1.15"}
[{:keys [pool] :as cfg} params]
(db/with-atomic [conn pool]
(-> (assoc cfg :conn conn)
@ -350,6 +363,9 @@
(s/def ::request-profile-recovery ::cmd.auth/request-profile-recovery)
(sv/defmethod ::request-profile-recovery {:auth false}
(sv/defmethod ::request-profile-recovery
{:auth false
::doc/added "1.0"
::doc/deprecated "1.15"}
[cfg params]
(cmd.auth/request-profile-recovery cfg params))

View file

@ -290,7 +290,6 @@
(s/keys :req-un [::profile-id ::team-id ::file]))
(sv/defmethod ::update-team-photo
{::rsem/permits (cf/get :rpc-semaphore-permits-image)}
[cfg {:keys [file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
@ -298,8 +297,8 @@
(update-team-photo cfg params)))
(defn update-team-photo
[{:keys [pool storage executors] :as cfg} {:keys [profile-id team-id] :as params}]
(p/let [team (px/with-dispatch (:default executors)
[{:keys [pool storage executor] :as cfg} {:keys [profile-id team-id] :as params}]
(p/let [team (px/with-dispatch executor
(teams/retrieve-team pool profile-id team-id))
photo (upload-photo cfg params)]
@ -316,13 +315,13 @@
(assoc team :photo-id (:id photo))))
(defn upload-photo
[{:keys [storage executors] :as cfg} {:keys [file]}]
[{:keys [storage semaphores] :as cfg} {:keys [file]}]
(letfn [(get-info [content]
(px/with-dispatch (:blocking executors)
(rsem/with-dispatch (:process-image semaphores)
(media/run {:cmd :info :input content})))
(generate-thumbnail [info]
(px/with-dispatch (:blocking executors)
(rsem/with-dispatch (:process-image semaphores)
(media/run {:cmd :profile-thumbnail
:format :jpeg
:quality 85
@ -331,11 +330,9 @@
:input info})))
;; Function responsible of calculating cryptographyc hash of
;; the provided data. Even though it uses the hight
;; performance BLAKE2b algorithm, we prefer to schedule it
;; to be executed on the blocking executor.
;; the provided data.
(calculate-hash [data]
(px/with-dispatch (:blocking executors)
(rsem/with-dispatch (:process-image semaphores)
(sto/calculate-hash data)))]
(p/let [info (get-info file)
@ -343,11 +340,11 @@
hash (calculate-hash (:data thumb))
content (-> (sto/content (:data thumb) (:size thumb))
(sto/wrap-with-hash hash))]
(sto/put-object! storage {::sto/content content
::sto/deduplicate? true
:bucket "profile"
:content-type (:mtype thumb)}))))
(rsem/with-dispatch (:process-image semaphores)
(sto/put-object! storage {::sto/content content
::sto/deduplicate? true
:bucket "profile"
:content-type (:mtype thumb)})))))
;; --- Mutation: Invite Member

View file

@ -9,23 +9,38 @@
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.common.spec :as us]
[app.config :as cf]
[app.metrics :as mtx]
[app.rpc :as-alias rpc]
[app.util.locks :as locks]
[app.util.services :as-alias sv]
[app.util.time :as ts]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; ASYNC SEMAPHORE IMPL
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defprotocol IAsyncSemaphore
(acquire! [_])
(release! [_]))
(release! [_ tp]))
(defn create
[& {:keys [permits metrics name]}]
(let [name (d/name name)
used (volatile! 0)
queue (volatile! (d/queue))
labels (into-array String [name])
lock (locks/create)]
[& {:keys [permits metrics name executor]}]
(let [used (volatile! 0)
queue (volatile! (d/queue))
labels (into-array String [(d/name name)])
lock (locks/create)
permits (or permits Long/MAX_VALUE)]
(when (>= permits Long/MAX_VALUE)
(l/warn :hint "permits value too hight" :permits permits :semaphore name))
^{::wrk/executor executor
::name name}
(reify IAsyncSemaphore
(acquire! [_]
(let [d (p/deferred)]
@ -36,12 +51,17 @@
(p/resolve! d))
(vswap! queue conj d)))
(mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels })
(mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels})
(mtx/run! metrics {:id :rpc-semaphore-acquires-total :inc 1 :labels labels})
(mtx/run! metrics
:id :semaphore-used-permits
:val @used
:labels labels)
(mtx/run! metrics
:id :semaphore-queued-submissions
:val (count @queue)
:labels labels)
d))
(release! [_]
(release! [_ tp]
(locks/locking lock
(if-let [item (peek @queue)]
(do
@ -50,19 +70,80 @@
(when (pos? @used)
(vswap! used dec))))
(mtx/run! metrics {:id :rpc-semaphore-used-permits :val @used :labels labels})
(mtx/run! metrics {:id :rpc-semaphore-queued-submissions :val (count @queue) :labels labels})))))
(mtx/run! metrics
:id :semaphore-timing
:val (inst-ms (tp))
:labels labels)
(mtx/run! metrics
:id :semaphore-used-permits
:val @used
:labels labels)
(mtx/run! metrics
:id :semaphore-queued-submissions
:val (count @queue)
:labels labels)))))
(defn semaphore?
[v]
(satisfies? IAsyncSemaphore v))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PREDEFINED SEMAPHORES
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::semaphore semaphore?)
(s/def ::semaphores
(s/map-of ::us/keyword ::semaphore))
(defmethod ig/pre-init-spec ::rpc/semaphores [_]
(s/keys :req-un [::mtx/metrics]))
(defn- create-default-semaphores
[metrics executor]
[(create :permits (cf/get :semaphore-process-font)
:metrics metrics
:name :process-font
:executor executor)
(create :permits (cf/get :semaphore-update-file)
:metrics metrics
:name :update-file
:executor executor)
(create :permits (cf/get :semaphore-process-image)
:metrics metrics
:name :process-image
:executor executor)
(create :permits (cf/get :semaphore-auth)
:metrics metrics
:name :auth
:executor executor)])
(defmethod ig/init-key ::rpc/semaphores
[_ {:keys [metrics executor]}]
(->> (create-default-semaphores metrics executor)
(d/index-by (comp ::name meta))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmacro with-dispatch
[queue & body]
`(let [tpoint# (ts/tpoint)
queue# ~queue
executor# (-> queue# meta ::wrk/executor)]
(-> (acquire! queue#)
(p/then (fn [_#] ~@body) executor#)
(p/finally (fn [_# _#]
(release! queue# tpoint#))))))
(defn wrap
[{:keys [metrics executors] :as cfg} f mdata]
(if-let [permits (::permits mdata)]
(let [sem (create {:permits permits
:metrics metrics
:name (::sv/name mdata)})]
(l/debug :hint "wrapping semaphore" :handler (::sv/name mdata) :permits permits)
[{:keys [semaphores]} f {:keys [::queue]}]
(let [queue' (get semaphores queue)]
(if (semaphore? queue')
(fn [cfg params]
(-> (acquire! sem)
(p/then (fn [_] (f cfg params)) (:default executors))
(p/finally (fn [_ _] (release! sem))))))
f))
(with-dispatch queue'
(f cfg params)))
(do
(when (some? queue)
(l/warn :hint "undefined semaphore" :name queue))
f))))

View file

@ -44,20 +44,17 @@
(declare ^:private get-fj-thread-factory)
(declare ^:private get-thread-factory)
(s/def ::prefix keyword?)
(s/def ::parallelism ::us/integer)
(s/def ::idle-timeout ::us/integer)
(defmethod ig/pre-init-spec ::executor [_]
(s/keys :req-un [::prefix]
:opt-un [::parallelism]))
(s/keys :opt-un [::parallelism]))
(defmethod ig/init-key ::executor
[_ {:keys [parallelism prefix]}]
(let [counter (AtomicLong. 0)]
[skey {:keys [parallelism]}]
(let [prefix (if (vector? skey) (-> skey first name keyword) :default)]
(if parallelism
(ForkJoinPool. (int parallelism) (get-fj-thread-factory prefix counter) nil false)
(Executors/newCachedThreadPool (get-thread-factory prefix counter)))))
(ForkJoinPool. (int parallelism) (get-fj-thread-factory prefix) nil false)
(Executors/newCachedThreadPool (get-thread-factory prefix)))))
(defmethod ig/halt-key! ::executor
[_ instance]
@ -69,8 +66,7 @@
(defmethod ig/init-key ::scheduler
[_ {:keys [parallelism prefix] :or {parallelism 1}}]
(let [counter (AtomicLong. 0)]
(px/scheduled-pool parallelism (get-thread-factory prefix counter))))
(px/scheduled-pool parallelism (get-thread-factory prefix)))
(defmethod ig/halt-key! ::scheduler
[_ instance]
@ -78,66 +74,90 @@
(defn- get-fj-thread-factory
^ForkJoinPool$ForkJoinWorkerThreadFactory
[prefix counter]
(reify ForkJoinPool$ForkJoinWorkerThreadFactory
(newThread [_ pool]
(let [^ForkJoinWorkerThread thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool)
^String thread-name (str "penpot/" (name prefix) "-" (.getAndIncrement ^AtomicLong counter))]
(.setName thread thread-name)
thread))))
[prefix]
(let [^AtomicLong counter (AtomicLong. 0)]
(reify ForkJoinPool$ForkJoinWorkerThreadFactory
(newThread [_ pool]
(let [thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool)
tname (str "penpot/" (name prefix) "-" (.getAndIncrement counter))]
(.setName ^ForkJoinWorkerThread thread ^String tname)
thread)))))
(defn- get-thread-factory
^ThreadFactory
[prefix counter]
(reify ThreadFactory
(newThread [_ runnable]
(doto (Thread. runnable)
(.setDaemon true)
(.setName (str "penpot/" (name prefix) "-" (.getAndIncrement ^AtomicLong counter)))))))
[prefix]
(let [^AtomicLong counter (AtomicLong. 0)]
(reify ThreadFactory
(newThread [_ runnable]
(doto (Thread. runnable)
(.setDaemon true)
(.setName (str "penpot/" (name prefix) "-" (.getAndIncrement counter))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Executor Monitor
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::executors (s/map-of keyword? ::executor))
(s/def ::executors
(s/map-of keyword? ::executor))
(defmethod ig/pre-init-spec ::executors-monitor [_]
(s/keys :req-un [::executors ::scheduler ::mtx/metrics]))
(defmethod ig/pre-init-spec ::executor-monitor [_]
(s/keys :req-un [::executors ::mtx/metrics]))
(defmethod ig/init-key ::executors-monitor
[_ {:keys [executors metrics interval scheduler] :or {interval 3000}}]
(letfn [(log-stats [state]
(doseq [[key ^ForkJoinPool executor] executors]
(let [labels (into-array String [(name key)])
running (.getRunningThreadCount executor)
queued (.getQueuedSubmissionCount executor)
active (.getPoolSize executor)
steals (.getStealCount executor)
steals-increment (- steals (or (get-in @state [key :steals]) 0))
steals-increment (if (neg? steals-increment) 0 steals-increment)]
(defmethod ig/init-key ::executor-monitor
[_ {:keys [executors metrics interval] :or {interval 3000}}]
(letfn [(monitor! [state skey ^ForkJoinPool executor]
(let [prev-steals (get state skey 0)
running (.getRunningThreadCount executor)
queued (.getQueuedSubmissionCount executor)
active (.getPoolSize executor)
steals (.getStealCount executor)
labels (into-array String [(name skey)])
(mtx/run! metrics {:id :executors-active-threads :labels labels :val active})
(mtx/run! metrics {:id :executors-running-threads :labels labels :val running})
(mtx/run! metrics {:id :executors-queued-submissions :labels labels :val queued})
(mtx/run! metrics {:id :executors-completed-tasks :labels labels :inc steals-increment})
steals-increment (- steals prev-steals)
steals-increment (if (neg? steals-increment) 0 steals-increment)]
(swap! state update key assoc
:running running
:active active
:queued queued
:steals steals)))
(mtx/run! metrics
:id :executor-active-threads
:labels labels
:val active)
(mtx/run! metrics
:id :executor-running-threads
:labels labels :val running)
(mtx/run! metrics
:id :executors-queued-submissions
:labels labels
:val queued)
(mtx/run! metrics
:id :executors-completed-tasks
:labels labels
:inc steals-increment)
(when (and (not (.isShutdown scheduler))
(not (:shutdown @state)))
(px/schedule! scheduler interval (partial log-stats state))))]
(aa/thread-sleep interval)
(if (.isShutdown executor)
(l/debug :hint "stoping monitor; cause: executor is shutdown")
(assoc state skey steals))))
(let [state (atom {})]
(px/schedule! scheduler interval (partial log-stats state))
{:state state})))
(monitor-fn []
(try
(loop [items (into (d/queue) executors)
state {}]
(when-let [[skey executor :as item] (peek items)]
(if-let [state (monitor! state skey executor)]
(recur (conj items item) state)
(recur items state))))
(catch InterruptedException _cause
(l/debug :hint "stoping monitor; interrupted"))))]
(defmethod ig/halt-key! ::executors-monitor
[_ {:keys [state]}]
(swap! state assoc :shutdown true))
(let [thread (Thread. monitor-fn)]
(.setDaemon thread true)
(.setName thread "penpot/executor-monitor")
(.start thread)
thread)))
(defmethod ig/halt-key! ::executor-monitor
[_ thread]
(.interrupt ^Thread thread))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Worker