0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-23 23:18:48 -05:00

Make storage tasks more testable and traceable.

This commit is contained in:
Andrey Antukh 2021-01-31 17:00:22 +01:00 committed by Alonso Torres
parent 586d95fb55
commit 26948fb68b

View file

@ -23,6 +23,7 @@
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[clojure.tools.logging :as log]
[cuerdas.core :as str]
[datoteka.core :as fs]
[integrant.core :as ig]
@ -285,25 +286,34 @@
(defmethod ig/init-key ::gc-deleted-task
[_ {:keys [pool storage min-age] :as cfg}]
(letfn [(retrieve-deleted-objects [conn]
(let [min-age (db/interval min-age)
result (db/exec! conn [sql:retrieve-deleted-objects min-age])]
(when (seq result)
(as-> (group-by (comp keyword :backend) result) $
(reduce-kv #(assoc %1 %2 (map :id %3)) $ $)))))
(letfn [(group-by-backend [rows]
(let [conj (fnil conj [])]
[(reduce (fn [acc {:keys [id backend]}]
(update acc (keyword backend) conj id))
{}
rows)
(count rows)]))
(delete-in-bulk [conn backend ids]
(retrieve-deleted-objects [conn]
(let [min-age (db/interval min-age)
rows (db/exec! conn [sql:retrieve-deleted-objects min-age])]
(some-> (seq rows) (group-by-backend))))
(delete-in-bulk [conn [backend ids]]
(let [backend (resolve-backend storage backend)
backend (assoc backend :conn conn)]
(impl/del-objects-in-bulk backend ids)))]
(fn [task]
(db/with-atomic [conn pool]
(loop [groups (retrieve-deleted-objects conn)]
(when groups
(doseq [[sid objects] groups]
(delete-in-bulk conn sid objects))
(recur (retrieve-deleted-objects conn))))))))
(loop [n 0]
(if-let [[groups total] (retrieve-deleted-objects conn)]
(do
(run! (partial delete-in-bulk conn) groups)
(recur (+ n total)))
(do
(log/infof "gc-deleted: processed %s items" n)
{:deleted n})))))))
(def sql:retrieve-deleted-objects
"with items_part as (
@ -342,10 +352,7 @@
(defmethod ig/init-key ::gc-touched-task
[_ {:keys [pool] :as cfg}]
(letfn [(retrieve-touched-objects [conn]
(seq (db/exec! conn [sql:retrieve-touched-objects])))
(group-resuls [rows]
(letfn [(group-results [rows]
(let [conj (fnil conj [])]
(reduce (fn [acc {:keys [id nrefs]}]
(if (pos? nrefs)
@ -354,6 +361,10 @@
{}
rows)))
(retrieve-touched [conn]
(let [rows (db/exec! conn [sql:retrieve-touched-objects])]
(some-> (seq rows) (group-results))))
(mark-delete-in-bulk [conn ids]
(db/exec-one! conn ["update storage_object set deleted_at=now(), touched_at=null where id = ANY(?)"
(db/create-array conn "uuid" (into-array java.util.UUID ids))]))
@ -364,16 +375,17 @@
(fn [task]
(db/with-atomic [conn pool]
(loop []
(when-let [touched (retrieve-touched-objects conn)]
(let [{:keys [to-delete to-freeze]} (group-resuls touched)]
(when (seq to-delete)
(mark-delete-in-bulk conn to-delete))
(when (seq to-freeze)
(mark-freeze-in-bulk conn to-freeze))
(Thread/sleep 100)
(recur))))
nil))))
(loop [cntf 0
cntd 0]
(if-let [{:keys [to-delete to-freeze]} (retrieve-touched conn)]
(do
(when (seq to-delete) (mark-delete-in-bulk conn to-delete))
(when (seq to-freeze) (mark-freeze-in-bulk conn to-freeze))
(recur (+ cntf (count to-freeze))
(+ cntd (count to-delete))))
(do
(log/infof "gc-touched: %s objects marked as freeze and %s marked to be deleted" cntf cntd)
{:freeze cntf :delete cntd})))))))
(def sql:retrieve-touched-objects
"select so.id,
@ -400,7 +412,9 @@
;; and is inmediatelly deleted. The responsability of this task is
;; check that write log for possible leaked files.
(declare sql:retrieve-pending)
(def recheck-min-age (dt/duration {:hours 1}))
(declare sql:retrieve-pending-to-recheck)
(declare sql:exists-storage-object)
(defmethod ig/pre-init-spec ::recheck-task [_]
@ -408,39 +422,59 @@
(defmethod ig/init-key ::recheck-task
[_ {:keys [pool storage] :as cfg}]
(letfn [(retrieve-pending [conn]
(->> (db/exec! conn [sql:retrieve-pending])
(map (fn [{:keys [backend] :as row}]
(assoc row :backend (keyword backend))))
(seq)))
(letfn [(group-results [rows]
(let [conj (fnil conj [])]
(reduce (fn [acc {:keys [id backend exist] :as row}]
(cond-> (update acc :all conj id)
(false? exist)
(update :to-delete conj (dissoc row :exist))))
{}
rows)))
(exists-on-database? [conn id]
(:exists (db/exec-one! conn [sql:exists-storage-object id])))
(group-by-backend [rows]
(let [conj (fnil conj [])]
(reduce (fn [acc {:keys [id backend]}]
(update acc (keyword backend) conj id))
{}
rows)))
(recheck-item [conn {:keys [id backend]}]
(when-not (exists-on-database? conn id)
(let [backend (resolve-backend storage backend)
backend (assoc backend :conn conn)]
(impl/del-objects-in-bulk backend [id]))))]
(retrieve-pending [conn]
(let [rows (db/exec! conn [sql:retrieve-pending-to-recheck (db/interval recheck-min-age)])]
(some-> (seq rows) (group-results))))
(delete-group [conn [backend ids]]
(let [backend (resolve-backend storage backend)
backend (assoc backend :conn conn)]
(impl/del-objects-in-bulk backend ids)))
(delete-all [conn ids]
(let [ids (db/create-array conn "uuid" (into-array java.util.UUID ids))]
(db/exec-one! conn ["delete from storage_pending where id = ANY(?)" ids])))]
(fn [task]
(db/with-atomic [conn pool]
(loop [items (retrieve-pending conn)]
(when items
(run! (partial recheck-item conn) items)
(recur (retrieve-pending conn))))))))
(loop [n 0 d 0]
(if-let [{:keys [all to-delete]} (retrieve-pending conn)]
(let [groups (group-by-backend to-delete)]
(run! (partial delete-group conn) groups)
(delete-all conn all)
(recur (+ n (count all))
(+ d (count to-delete))))
(do
(log/infof "recheck: processed %s items, %s deleted" n d)
{:processed n :deleted d})))))))
(def sql:retrieve-pending
"with items_part as (
select s.id
from storage_pending as s
where s.created_at < now() - '1 hour'::interval
order by s.created_at
limit 100
)
delete from storage_pending
where id in (select id from items_part)
returning *;")
(def sql:exists-storage-object
"select exists (select id from storage_object where id = ?) as exists")
(def sql:retrieve-pending-to-recheck
"select sp.id,
sp.backend,
sp.created_at,
(case when count(so.id) > 0 then true
else false
end) as exist
from storage_pending as sp
left join storage_object as so
on (so.id = sp.id)
where sp.created_at < now() - ?::interval
group by 1,2,3
order by sp.created_at asc
limit 100")