From c64e14859caa2d8b460b5ddb96514a1d4967f601 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 24 Nov 2023 15:35:59 +0100 Subject: [PATCH] :sparkles: Simplify internal executor module --- backend/src/app/main.clj | 14 ++---- backend/src/app/storage/impl.clj | 4 +- backend/src/app/worker.clj | 78 ++++++++++++++++---------------- 3 files changed, 43 insertions(+), 53 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index a37836a71..4411bea85 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -161,12 +161,7 @@ ::mdef/help "Current number of threads with state RUNNING." ::mdef/labels ["name"] ::mdef/type :gauge} - - :executors-queued-submissions - {::mdef/name "penpot_executors_queued_submissions" - ::mdef/help "Current number of queued submissions." - ::mdef/labels ["name"] - ::mdef/type :gauge}}) + }) (def system-config {::db/pool @@ -180,13 +175,12 @@ ;; Default thread pool for IO operations ::wrk/executor - {::wrk/parallelism (cf/get :default-executor-parallelism - (+ 3 (* (px/get-available-processors) 3)))} + {} ::wrk/monitor {::mtx/metrics (ig/ref ::mtx/metrics) - ::wrk/name "default" - ::wrk/executor (ig/ref ::wrk/executor)} + ::wrk/executor (ig/ref ::wrk/executor) + ::wrk/name "default"} :app.migrations/migrations {::db/pool (ig/ref ::db/pool)} diff --git a/backend/src/app/storage/impl.clj b/backend/src/app/storage/impl.clj index 4a564b58f..9dc7facc1 100644 --- a/backend/src/app/storage/impl.clj +++ b/backend/src/app/storage/impl.clj @@ -11,7 +11,6 @@ [app.common.exceptions :as ex] [app.db :as-alias db] [app.storage :as-alias sto] - [app.worker :as-alias wrk] [buddy.core.codecs :as bc] [buddy.core.hash :as bh] [clojure.java.io :as jio] @@ -201,7 +200,7 @@ (str "blake2b:" result))) (defn resolve-backend - [{:keys [::db/pool ::wrk/executor] :as storage} backend-id] + [{:keys [::db/pool] :as storage} backend-id] (let [backend (get-in storage [::sto/backends backend-id])] (when-not backend (ex/raise :type :internal @@ -209,7 +208,6 @@ :hint (dm/fmt "backend '%' not configured" backend-id))) (-> backend (assoc ::sto/id backend-id) - (assoc ::wrk/executor executor) (assoc ::db/pool pool)))) (defrecord StorageObject [id size created-at expired-at touched-at backend]) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 448fbaaec..a6f920220 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -25,43 +25,45 @@ [promesa.core :as p] [promesa.exec :as px]) (:import - java.util.concurrent.ExecutorService - java.util.concurrent.ForkJoinPool + java.util.concurrent.ThreadPoolExecutor + java.util.concurrent.Executor java.util.concurrent.Future)) (set! *warn-on-reflection* true) -(s/def ::executor #(instance? ExecutorService %)) +(s/def ::executor #(instance? Executor %)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::parallelism ::us/integer) - (defmethod ig/pre-init-spec ::executor [_] - (s/keys :req [::parallelism])) + (s/keys :req [])) (defmethod ig/init-key ::executor - [skey {:keys [::parallelism]}] - (let [prefix (if (vector? skey) (-> skey first name) "default") - tname (str "penpot/" prefix "/%s") - ttype (cf/get :worker-executor-type :fjoin)] - (case ttype - :fjoin - (let [factory (px/forkjoin-thread-factory :name tname)] - (px/forkjoin-executor {:factory factory - :core-size (px/get-available-processors) - :parallelism parallelism - :async true})) + [_ _] + (let [factory (px/thread-factory :prefix "penpot/default/") + executor (px/cached-executor :factory factory :keepalive 30000)] + (l/inf :hint "starting executor") + (reify + java.lang.AutoCloseable + (close [_] + (l/inf :hint "stoping executor") + (px/shutdown! executor)) - :cached - (let [factory (px/thread-factory :name tname)] - (px/cached-executor :factory factory))))) + clojure.lang.IDeref + (deref [_] + {:active (.getPoolSize ^ThreadPoolExecutor executor) + :running (.getActiveCount ^ThreadPoolExecutor executor) + :completed (.getCompletedTaskCount ^ThreadPoolExecutor executor)}) + + Executor + (execute [_ runnable] + (.execute ^Executor executor ^Runnable runnable))))) (defmethod ig/halt-key! ::executor [_ instance] - (px/shutdown! instance)) + (.close ^java.lang.AutoCloseable instance)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; TASKS REGISTRY @@ -111,42 +113,38 @@ (defmethod ig/init-key ::monitor [_ {:keys [::executor ::mtx/metrics ::interval ::name]}] - (letfn [(monitor! [^ForkJoinPool executor prev-steals] - (let [running (.getRunningThreadCount executor) - queued (.getQueuedSubmissionCount executor) - active (.getPoolSize executor) - steals (.getStealCount executor) - labels (into-array String [(d/name name)]) + (letfn [(monitor! [executor prev-completed] + (let [labels (into-array String [(d/name name)]) + stats (deref executor) - steals-inc (- steals prev-steals) - steals-inc (if (neg? steals-inc) 0 steals-inc)] + completed (:completed stats) + completed-inc (- completed prev-completed) + completed-inc (if (neg? completed-inc) 0 completed-inc)] (mtx/run! metrics :id :executor-active-threads :labels labels - :val active) + :val (:active stats)) + (mtx/run! metrics :id :executor-running-threads - :labels labels :val running) - (mtx/run! metrics - :id :executors-queued-submissions :labels labels - :val queued) + :val (:running stats)) + (mtx/run! metrics :id :executors-completed-tasks :labels labels - :inc steals-inc) + :inc completed-inc) - steals))] + completed-inc))] (px/thread {:name "penpot/executors-monitor" :virtual true} (l/inf :hint "monitor: started" :name name) (try - (loop [steals 0] - (when-not (px/shutdown? executor) - (px/sleep interval) - (recur (long (monitor! executor steals))))) + (loop [completed 0] + (px/sleep interval) + (recur (long (monitor! executor completed)))) (catch InterruptedException _cause (l/trc :hint "monitor: interrupted" :name name)) (catch Throwable cause