0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-24 23:49:45 -05:00

Change execution model of storage.tmp cleaner

This commit is contained in:
Andrey Antukh 2022-11-23 23:45:10 +01:00
parent 0600b2abe4
commit 69011007ac
2 changed files with 24 additions and 23 deletions

View file

@ -166,10 +166,9 @@
:executor (ig/ref ::wrk/executor) :executor (ig/ref ::wrk/executor)
:redis (ig/ref ::rds/redis)} :redis (ig/ref ::rds/redis)}
;; TODO: refactor execution model
:app.storage.tmp/cleaner :app.storage.tmp/cleaner
{:executor (ig/ref ::wrk/executor) {::wrk/executor (ig/ref ::wrk/executor)
:scheduled-executor (ig/ref ::wrk/scheduled-executor)} ::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)}
::sto/gc-deleted-task ::sto/gc-deleted-task
{:pool (ig/ref ::db/pool) {:pool (ig/ref ::db/pool)

View file

@ -12,6 +12,7 @@
(:require (:require
[app.common.data :as d] [app.common.data :as d]
[app.common.logging :as l] [app.common.logging :as l]
[app.storage :as-alias sto]
[app.util.time :as dt] [app.util.time :as dt]
[app.worker :as wrk] [app.worker :as wrk]
[clojure.core.async :as a] [clojure.core.async :as a]
@ -23,42 +24,43 @@
(declare remove-temp-file) (declare remove-temp-file)
(defonce queue (a/chan 128)) (defonce queue (a/chan 128))
(s/def ::min-age ::dt/duration)
(defmethod ig/pre-init-spec ::cleaner [_] (defmethod ig/pre-init-spec ::cleaner [_]
(s/keys :req-un [::min-age ::wrk/scheduled-executor ::wrk/executor])) (s/keys :req [::sto/min-age ::wrk/scheduled-executor]))
(defmethod ig/prep-key ::cleaner (defmethod ig/prep-key ::cleaner
[_ cfg] [_ cfg]
(merge {:min-age (dt/duration {:minutes 30})} (merge {::sto/min-age (dt/duration "30m")}
(d/without-nils cfg))) (d/without-nils cfg)))
(defmethod ig/init-key ::cleaner (defmethod ig/init-key ::cleaner
[_ {:keys [scheduled-executor executor min-age] :as cfg}] [_ {:keys [::sto/min-age ::wrk/scheduled-executor] :as cfg}]
(l/info :hint "starting tempfile cleaner service") (px/thread
(let [cch (a/chan)] {:name "penpot/storage-tmp-cleaner"}
(a/go-loop [] (try
(let [[path port] (a/alts! [queue cch])] (l/info :hint "started tmp file cleaner")
(when (not= port cch) (loop []
(when-let [path (a/<!! queue)]
(l/trace :hint "schedule tempfile deletion" :path path (l/trace :hint "schedule tempfile deletion" :path path
:expires-at (dt/plus (dt/now) min-age)) :expires-at (dt/plus (dt/now) min-age))
(px/schedule! scheduled-executor (px/schedule! scheduled-executor
(inst-ms min-age) (inst-ms min-age)
(partial remove-temp-file executor path)) (partial remove-temp-file path))
(recur)))) (recur)))
cch)) (catch InterruptedException _
(l/debug :hint "interrupted"))
(finally
(l/info :hint "terminated tmp file cleaner")))))
(defmethod ig/halt-key! ::cleaner (defmethod ig/halt-key! ::cleaner
[_ close-ch] [_ thread]
(l/info :hint "stopping tempfile cleaner service") (px/interrupt! thread))
(some-> close-ch a/close!))
(defn- remove-temp-file (defn- remove-temp-file
"Permanently delete tempfile" "Permanently delete tempfile"
[executor path] [path]
(px/with-dispatch executor (l/trace :hint "permanently delete tempfile" :path path)
(l/trace :hint "permanently delete tempfile" :path path) (when (fs/exists? path)
(when (fs/exists? path) (fs/delete path)))
(fs/delete path))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; API ;; API