diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index e0aa9f51d..c18356830 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -23,6 +23,7 @@ [app.util.time :as dt] [app.worker :as wrk] [clojure.spec.alpha :as s] + [clojure.tools.logging :as log] [cuerdas.core :as str] [datoteka.core :as fs] [integrant.core :as ig] @@ -285,25 +286,34 @@ (defmethod ig/init-key ::gc-deleted-task [_ {:keys [pool storage min-age] :as cfg}] - (letfn [(retrieve-deleted-objects [conn] - (let [min-age (db/interval min-age) - result (db/exec! conn [sql:retrieve-deleted-objects min-age])] - (when (seq result) - (as-> (group-by (comp keyword :backend) result) $ - (reduce-kv #(assoc %1 %2 (map :id %3)) $ $))))) + (letfn [(group-by-backend [rows] + (let [conj (fnil conj [])] + [(reduce (fn [acc {:keys [id backend]}] + (update acc (keyword backend) conj id)) + {} + rows) + (count rows)])) - (delete-in-bulk [conn backend ids] + (retrieve-deleted-objects [conn] + (let [min-age (db/interval min-age) + rows (db/exec! conn [sql:retrieve-deleted-objects min-age])] + (some-> (seq rows) (group-by-backend)))) + + (delete-in-bulk [conn [backend ids]] (let [backend (resolve-backend storage backend) backend (assoc backend :conn conn)] (impl/del-objects-in-bulk backend ids)))] (fn [task] (db/with-atomic [conn pool] - (loop [groups (retrieve-deleted-objects conn)] - (when groups - (doseq [[sid objects] groups] - (delete-in-bulk conn sid objects)) - (recur (retrieve-deleted-objects conn)))))))) + (loop [n 0] + (if-let [[groups total] (retrieve-deleted-objects conn)] + (do + (run! (partial delete-in-bulk conn) groups) + (recur (+ n total))) + (do + (log/infof "gc-deleted: processed %s items" n) + {:deleted n}))))))) (def sql:retrieve-deleted-objects "with items_part as ( @@ -342,10 +352,7 @@ (defmethod ig/init-key ::gc-touched-task [_ {:keys [pool] :as cfg}] - (letfn [(retrieve-touched-objects [conn] - (seq (db/exec! conn [sql:retrieve-touched-objects]))) - - (group-resuls [rows] + (letfn [(group-results [rows] (let [conj (fnil conj [])] (reduce (fn [acc {:keys [id nrefs]}] (if (pos? nrefs) @@ -354,6 +361,10 @@ {} rows))) + (retrieve-touched [conn] + (let [rows (db/exec! conn [sql:retrieve-touched-objects])] + (some-> (seq rows) (group-results)))) + (mark-delete-in-bulk [conn ids] (db/exec-one! conn ["update storage_object set deleted_at=now(), touched_at=null where id = ANY(?)" (db/create-array conn "uuid" (into-array java.util.UUID ids))])) @@ -364,16 +375,17 @@ (fn [task] (db/with-atomic [conn pool] - (loop [] - (when-let [touched (retrieve-touched-objects conn)] - (let [{:keys [to-delete to-freeze]} (group-resuls touched)] - (when (seq to-delete) - (mark-delete-in-bulk conn to-delete)) - (when (seq to-freeze) - (mark-freeze-in-bulk conn to-freeze)) - (Thread/sleep 100) - (recur)))) - nil)))) + (loop [cntf 0 + cntd 0] + (if-let [{:keys [to-delete to-freeze]} (retrieve-touched conn)] + (do + (when (seq to-delete) (mark-delete-in-bulk conn to-delete)) + (when (seq to-freeze) (mark-freeze-in-bulk conn to-freeze)) + (recur (+ cntf (count to-freeze)) + (+ cntd (count to-delete)))) + (do + (log/infof "gc-touched: %s objects marked as freeze and %s marked to be deleted" cntf cntd) + {:freeze cntf :delete cntd}))))))) (def sql:retrieve-touched-objects "select so.id, @@ -400,7 +412,9 @@ ;; and is inmediatelly deleted. The responsability of this task is ;; check that write log for possible leaked files. -(declare sql:retrieve-pending) +(def recheck-min-age (dt/duration {:hours 1})) + +(declare sql:retrieve-pending-to-recheck) (declare sql:exists-storage-object) (defmethod ig/pre-init-spec ::recheck-task [_] @@ -408,39 +422,59 @@ (defmethod ig/init-key ::recheck-task [_ {:keys [pool storage] :as cfg}] - (letfn [(retrieve-pending [conn] - (->> (db/exec! conn [sql:retrieve-pending]) - (map (fn [{:keys [backend] :as row}] - (assoc row :backend (keyword backend)))) - (seq))) + (letfn [(group-results [rows] + (let [conj (fnil conj [])] + (reduce (fn [acc {:keys [id backend exist] :as row}] + (cond-> (update acc :all conj id) + (false? exist) + (update :to-delete conj (dissoc row :exist)))) + {} + rows))) - (exists-on-database? [conn id] - (:exists (db/exec-one! conn [sql:exists-storage-object id]))) + (group-by-backend [rows] + (let [conj (fnil conj [])] + (reduce (fn [acc {:keys [id backend]}] + (update acc (keyword backend) conj id)) + {} + rows))) - (recheck-item [conn {:keys [id backend]}] - (when-not (exists-on-database? conn id) - (let [backend (resolve-backend storage backend) - backend (assoc backend :conn conn)] - (impl/del-objects-in-bulk backend [id]))))] + (retrieve-pending [conn] + (let [rows (db/exec! conn [sql:retrieve-pending-to-recheck (db/interval recheck-min-age)])] + (some-> (seq rows) (group-results)))) + + (delete-group [conn [backend ids]] + (let [backend (resolve-backend storage backend) + backend (assoc backend :conn conn)] + (impl/del-objects-in-bulk backend ids))) + + (delete-all [conn ids] + (let [ids (db/create-array conn "uuid" (into-array java.util.UUID ids))] + (db/exec-one! conn ["delete from storage_pending where id = ANY(?)" ids])))] (fn [task] (db/with-atomic [conn pool] - (loop [items (retrieve-pending conn)] - (when items - (run! (partial recheck-item conn) items) - (recur (retrieve-pending conn)))))))) + (loop [n 0 d 0] + (if-let [{:keys [all to-delete]} (retrieve-pending conn)] + (let [groups (group-by-backend to-delete)] + (run! (partial delete-group conn) groups) + (delete-all conn all) + (recur (+ n (count all)) + (+ d (count to-delete)))) + (do + (log/infof "recheck: processed %s items, %s deleted" n d) + {:processed n :deleted d}))))))) -(def sql:retrieve-pending - "with items_part as ( - select s.id - from storage_pending as s - where s.created_at < now() - '1 hour'::interval - order by s.created_at - limit 100 - ) - delete from storage_pending - where id in (select id from items_part) - returning *;") - -(def sql:exists-storage-object - "select exists (select id from storage_object where id = ?) as exists") +(def sql:retrieve-pending-to-recheck + "select sp.id, + sp.backend, + sp.created_at, + (case when count(so.id) > 0 then true + else false + end) as exist + from storage_pending as sp + left join storage_object as so + on (so.id = sp.id) + where sp.created_at < now() - ?::interval + group by 1,2,3 + order by sp.created_at asc + limit 100")