From 13a092b1926ef1cf4df2962948ddbc10058e18f2 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Tue, 22 Nov 2022 14:51:26 +0100 Subject: [PATCH] :recycle: Normalize internal naming on the worker module --- backend/src/app/main.clj | 17 +++-- backend/src/app/rpc/rlimit.clj | 6 +- backend/src/app/storage/tmp.clj | 7 +- backend/src/app/worker.clj | 111 +++++++++++++------------------- common/deps.edn | 2 +- 5 files changed, 59 insertions(+), 84 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index ae32a063c..a5cb51673 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -139,9 +139,8 @@ [::worker :app.worker/executor] {:parallelism (cf/get :worker-executor-parallelism 20)} - :app.worker/scheduler - {:parallelism 1 - :prefix :scheduler} + :app.worker/scheduled-executor + {:parallelism 1} :app.worker/executors {:default (ig/ref [::default :app.worker/executor]) @@ -171,7 +170,7 @@ :app.storage.tmp/cleaner {:executor (ig/ref [::worker :app.worker/executor]) - :scheduler (ig/ref :app.worker/scheduler)} + :scheduled-executor (ig/ref :app.worker/scheduled-executor)} :app.storage/gc-deleted-task {:pool (ig/ref :app.db/pool) @@ -315,7 +314,7 @@ :app.rpc/rlimit {:executor (ig/ref [::worker :app.worker/executor]) - :scheduler (ig/ref :app.worker/scheduler)} + :scheduled-executor (ig/ref :app.worker/scheduled-executor)} :app.rpc/methods {:pool (ig/ref :app.db/pool) @@ -464,10 +463,10 @@ (def worker-config {:app.worker/cron - {:executor (ig/ref [::worker :app.worker/executor]) - :scheduler (ig/ref :app.worker/scheduler) - :tasks (ig/ref :app.worker/registry) - :pool (ig/ref :app.db/pool) + {:executor (ig/ref [::worker :app.worker/executor]) + :scheduled-executor (ig/ref :app.worker/scheduled-executor) + :tasks (ig/ref :app.worker/registry) + :pool (ig/ref :app.db/pool) :entries [{:cron #app/cron "0 0 * * * ?" ;; hourly :task :file-xlog-gc} diff --git a/backend/src/app/rpc/rlimit.clj b/backend/src/app/rpc/rlimit.clj index 390892e33..7c07d6758 100644 --- a/backend/src/app/rpc/rlimit.clj +++ b/backend/src/app/rpc/rlimit.clj @@ -332,7 +332,7 @@ ::limits limits})))) (defn- refresh-config - [{:keys [state path executor scheduler] :as params}] + [{:keys [state path executor scheduled-executor] :as params}] (letfn [(update-config [{:keys [::updated-at] :as state}] (let [updated-at' (fs/last-modified-time path)] (merge state @@ -347,7 +347,7 @@ state))))) (schedule-next [state] - (px/schedule! scheduler + (px/schedule! scheduled-executor (inst-ms (::refresh state)) (partial refresh-config params)) state)] @@ -371,7 +371,7 @@ (and (fs/exists? path) (fs/regular-file? path) path))) (defmethod ig/pre-init-spec :app.rpc/rlimit [_] - (s/keys :req-un [::wrk/executor ::wrk/scheduler])) + (s/keys :req-un [::wrk/executor ::wrk/scheduled-executor])) (defmethod ig/init-key ::rpc/rlimit [_ {:keys [executor] :as params}] diff --git a/backend/src/app/storage/tmp.clj b/backend/src/app/storage/tmp.clj index 89382a121..1fd069ef3 100644 --- a/backend/src/app/storage/tmp.clj +++ b/backend/src/app/storage/tmp.clj @@ -24,9 +24,8 @@ (defonce queue (a/chan 128)) (s/def ::min-age ::dt/duration) - (defmethod ig/pre-init-spec ::cleaner [_] - (s/keys :req-un [::min-age ::wrk/scheduler ::wrk/executor])) + (s/keys :req-un [::min-age ::wrk/scheduled-executor ::wrk/executor])) (defmethod ig/prep-key ::cleaner [_ cfg] @@ -34,7 +33,7 @@ (d/without-nils cfg))) (defmethod ig/init-key ::cleaner - [_ {:keys [scheduler executor min-age] :as cfg}] + [_ {:keys [scheduled-executor executor min-age] :as cfg}] (l/info :hint "starting tempfile cleaner service") (let [cch (a/chan)] (a/go-loop [] @@ -42,7 +41,7 @@ (when (not= port cch) (l/trace :hint "schedule tempfile deletion" :path path :expires-at (dt/plus (dt/now) min-age)) - (px/schedule! scheduler + (px/schedule! scheduled-executor (inst-ms min-age) (partial remove-temp-file executor path)) (recur)))) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index ea9df91af..b6f0ed859 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -26,73 +26,49 @@ java.util.concurrent.Executors java.util.concurrent.ForkJoinPool java.util.concurrent.Future - java.util.concurrent.ForkJoinPool$ForkJoinWorkerThreadFactory - java.util.concurrent.ForkJoinWorkerThread - java.util.concurrent.ScheduledExecutorService - java.util.concurrent.ThreadFactory - java.util.concurrent.atomic.AtomicLong)) + java.util.concurrent.ScheduledExecutorService)) (set! *warn-on-reflection* true) (s/def ::executor #(instance? ExecutorService %)) -(s/def ::scheduler #(instance? ScheduledExecutorService %)) +(s/def ::scheduled-executor #(instance? ScheduledExecutorService %)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Executor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(declare ^:private get-fj-thread-factory) -(declare ^:private get-thread-factory) - (s/def ::parallelism ::us/integer) (defmethod ig/pre-init-spec ::executor [_] - (s/keys :opt-un [::parallelism])) + (s/keys :req-un [::parallelism])) (defmethod ig/init-key ::executor [skey {:keys [parallelism]}] - (let [prefix (if (vector? skey) (-> skey first name keyword) :default)] - (if parallelism - (ForkJoinPool. (int parallelism) (get-fj-thread-factory prefix) nil false) - (Executors/newCachedThreadPool (get-thread-factory prefix))))) + (let [prefix (if (vector? skey) (-> skey first name keyword) :default) + tname (str "penpot/" prefix "/%s") + factory (px/forkjoin-thread-factory :name tname)] + (px/forkjoin-executor + :factory factory + :parallelism parallelism + :async? true))) (defmethod ig/halt-key! ::executor [_ instance] (.shutdown ^ExecutorService instance)) -(defmethod ig/pre-init-spec ::scheduler [_] - (s/keys :req-un [::prefix] - :opt-un [::parallelism])) +(defmethod ig/pre-init-spec ::scheduled-executor [_] + (s/keys :opt-un [::parallelism])) -(defmethod ig/init-key ::scheduler - [_ {:keys [parallelism prefix] :or {parallelism 1}}] - (px/scheduled-pool parallelism (get-thread-factory prefix))) +(defmethod ig/init-key ::scheduled-executor + [_ {:keys [parallelism] :or {parallelism 1}}] + (px/scheduled-executor + :parallelism parallelism + :factory (px/thread-factory :name "penpot/scheduled-executor/%s"))) (defmethod ig/halt-key! ::scheduler [_ instance] (.shutdown ^ExecutorService instance)) -(defn- get-fj-thread-factory - ^ForkJoinPool$ForkJoinWorkerThreadFactory - [prefix] - (let [^AtomicLong counter (AtomicLong. 0)] - (reify ForkJoinPool$ForkJoinWorkerThreadFactory - (newThread [_ pool] - (let [thread (.newThread ForkJoinPool/defaultForkJoinWorkerThreadFactory pool) - tname (str "penpot/" (name prefix) "-" (.getAndIncrement counter))] - (.setName ^ForkJoinWorkerThread thread ^String tname) - thread))))) - -(defn- get-thread-factory - ^ThreadFactory - [prefix] - (let [^AtomicLong counter (AtomicLong. 0)] - (reify ThreadFactory - (newThread [_ runnable] - (doto (Thread. runnable) - (.setDaemon true) - (.setName (str "penpot/" (name prefix) "-" (.getAndIncrement counter)))))))) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Executor Monitor ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -190,6 +166,30 @@ :queue :default} (d/without-nils cfg))) +(defmethod ig/init-key ::worker + [_ {:keys [pool name queue] :as cfg}] + (let [close-ch (a/chan 1) + cfg (assoc cfg :close-ch close-ch)] + + (if (db/read-only? pool) + (l/warn :hint "worker not started, db is read-only" + :name (d/name name) + :queue (d/name queue)) + (do + (l/info :hint "worker initialized" + :name (d/name name) + :queue (d/name queue)) + (event-loop cfg))) + + (reify + java.lang.AutoCloseable + (close [_] + (a/close! close-ch))))) + +(defmethod ig/halt-key! ::worker + [_ instance] + (.close ^java.lang.AutoCloseable instance)) + (defn- event-loop "Main, worker eventloop" [{:keys [pool poll-interval close-ch] :as cfg}] @@ -235,29 +235,6 @@ (a/