2020-12-30 14:38:00 +01:00
|
|
|
;; This Source Code Form is subject to the terms of the Mozilla Public
|
|
|
|
;; License, v. 2.0. If a copy of the MPL was not distributed with this
|
|
|
|
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
|
|
|
|
;;
|
2021-04-10 09:43:04 +02:00
|
|
|
;; Copyright (c) UXBOX Labs SL
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(ns app.storage
|
|
|
|
"File Storage abstraction layer."
|
|
|
|
(:require
|
|
|
|
[app.common.data :as d]
|
|
|
|
[app.common.exceptions :as ex]
|
|
|
|
[app.common.spec :as us]
|
|
|
|
[app.common.uuid :as uuid]
|
|
|
|
[app.db :as db]
|
2021-01-04 18:41:05 +01:00
|
|
|
[app.storage.db :as sdb]
|
2020-12-30 14:38:00 +01:00
|
|
|
[app.storage.fs :as sfs]
|
|
|
|
[app.storage.impl :as impl]
|
|
|
|
[app.storage.s3 :as ss3]
|
2021-04-06 23:25:34 +02:00
|
|
|
[app.util.logging :as l]
|
2020-12-30 14:38:00 +01:00
|
|
|
[app.util.time :as dt]
|
2021-01-04 18:41:05 +01:00
|
|
|
[app.worker :as wrk]
|
2020-12-30 14:38:00 +01:00
|
|
|
[clojure.spec.alpha :as s]
|
|
|
|
[cuerdas.core :as str]
|
2021-01-25 16:14:54 +01:00
|
|
|
[datoteka.core :as fs]
|
2021-01-04 18:41:05 +01:00
|
|
|
[integrant.core :as ig]
|
2021-01-19 14:54:34 +01:00
|
|
|
[promesa.exec :as px])
|
|
|
|
(:import
|
|
|
|
java.io.InputStream))
|
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Storage Module State
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
|
|
|
(s/def ::backend ::us/keyword)
|
2021-01-25 15:22:39 +01:00
|
|
|
|
|
|
|
(s/def ::s3 ::ss3/backend)
|
|
|
|
(s/def ::fs ::sfs/backend)
|
|
|
|
(s/def ::db ::sdb/backend)
|
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
(s/def ::backends
|
2021-01-25 15:22:39 +01:00
|
|
|
(s/keys :opt-un [::s3 ::fs ::db]))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(defmethod ig/pre-init-spec ::storage [_]
|
2021-01-04 18:41:05 +01:00
|
|
|
(s/keys :req-un [::backend ::wrk/executor ::db/pool ::backends]))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(defmethod ig/prep-key ::storage
|
|
|
|
[_ {:keys [backends] :as cfg}]
|
2021-01-04 18:41:05 +01:00
|
|
|
(-> (d/without-nils cfg)
|
|
|
|
(assoc :backends (d/without-nils backends))))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(defmethod ig/init-key ::storage
|
2021-01-04 18:41:05 +01:00
|
|
|
[_ cfg]
|
|
|
|
cfg)
|
2020-12-30 14:38:00 +01:00
|
|
|
|
2021-01-25 16:14:54 +01:00
|
|
|
(s/def ::storage
|
|
|
|
(s/keys :req-un [::backends ::wrk/executor ::db/pool ::backend]))
|
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Database Objects
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
2021-01-25 15:22:39 +01:00
|
|
|
(defrecord StorageObject [id size created-at expired-at backend])
|
2020-12-30 14:38:00 +01:00
|
|
|
|
2021-01-31 11:50:21 +01:00
|
|
|
(defn storage-object?
|
|
|
|
[v]
|
|
|
|
(instance? StorageObject v))
|
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
(def ^:private
|
|
|
|
sql:insert-storage-object
|
|
|
|
"insert into storage_object (id, size, backend, metadata)
|
|
|
|
values (?, ?, ?, ?::jsonb)
|
|
|
|
returning *")
|
|
|
|
|
2021-01-25 15:22:39 +01:00
|
|
|
(def ^:private
|
|
|
|
sql:insert-storage-object-with-expiration
|
|
|
|
"insert into storage_object (id, size, backend, metadata, deleted_at)
|
|
|
|
values (?, ?, ?, ?::jsonb, ?)
|
|
|
|
returning *")
|
|
|
|
|
|
|
|
(defn- insert-object
|
|
|
|
[conn id size backend mdata expiration]
|
|
|
|
(if expiration
|
|
|
|
(db/exec-one! conn [sql:insert-storage-object-with-expiration id size backend mdata expiration])
|
|
|
|
(db/exec-one! conn [sql:insert-storage-object id size backend mdata])))
|
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
(defn- create-database-object
|
2021-01-04 18:41:05 +01:00
|
|
|
[{:keys [conn backend]} {:keys [content] :as object}]
|
|
|
|
(if (instance? StorageObject object)
|
2021-01-30 11:28:11 +01:00
|
|
|
;; If we in this condition branch, this means we come from the
|
|
|
|
;; clone-object, so we just need to clone it with a new backend.
|
2021-01-04 18:41:05 +01:00
|
|
|
(let [id (uuid/random)
|
|
|
|
mdata (meta object)
|
2021-01-25 15:22:39 +01:00
|
|
|
result (insert-object conn
|
|
|
|
id
|
|
|
|
(:size object)
|
|
|
|
(name backend)
|
|
|
|
(db/tjson mdata)
|
|
|
|
(:expired-at object))]
|
2021-01-04 18:41:05 +01:00
|
|
|
(assoc object
|
|
|
|
:id (:id result)
|
2021-01-19 13:43:09 +01:00
|
|
|
:backend backend
|
2021-01-04 18:41:05 +01:00
|
|
|
:created-at (:created-at result)))
|
|
|
|
(let [id (uuid/random)
|
2021-01-25 15:22:39 +01:00
|
|
|
mdata (dissoc object :content :expired-at)
|
|
|
|
result (insert-object conn
|
|
|
|
id
|
|
|
|
(count content)
|
|
|
|
(name backend)
|
|
|
|
(db/tjson mdata)
|
|
|
|
(:expired-at object))]
|
2021-01-04 18:41:05 +01:00
|
|
|
(StorageObject. (:id result)
|
|
|
|
(:size result)
|
|
|
|
(:created-at result)
|
2021-01-25 15:22:39 +01:00
|
|
|
(:deleted-at result)
|
2021-01-04 18:41:05 +01:00
|
|
|
backend
|
|
|
|
mdata
|
|
|
|
nil))))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(def ^:private sql:retrieve-storage-object
|
2021-01-25 15:22:39 +01:00
|
|
|
"select * from storage_object where id = ? and (deleted_at is null or deleted_at > now())")
|
2020-12-30 14:38:00 +01:00
|
|
|
|
2021-01-19 15:04:28 +01:00
|
|
|
(defn row->storage-object [res]
|
|
|
|
(let [mdata (some-> (:metadata res) (db/decode-transit-pgobject))]
|
|
|
|
(StorageObject. (:id res)
|
|
|
|
(:size res)
|
|
|
|
(:created-at res)
|
2021-01-25 15:22:39 +01:00
|
|
|
(:deleted-at res)
|
2021-01-19 15:04:28 +01:00
|
|
|
(keyword (:backend res))
|
|
|
|
mdata
|
|
|
|
nil)))
|
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
(defn- retrieve-database-object
|
2021-01-04 18:41:05 +01:00
|
|
|
[{:keys [conn] :as storage} id]
|
2020-12-30 14:38:00 +01:00
|
|
|
(when-let [res (db/exec-one! conn [sql:retrieve-storage-object id])]
|
2021-01-19 15:04:28 +01:00
|
|
|
(row->storage-object res)))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(def sql:delete-storage-object
|
2021-01-25 15:22:39 +01:00
|
|
|
"update storage_object set deleted_at=now() where id=?")
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(defn- delete-database-object
|
2021-01-04 18:41:05 +01:00
|
|
|
[{:keys [conn] :as storage} id]
|
2020-12-30 14:38:00 +01:00
|
|
|
(let [result (db/exec-one! conn [sql:delete-storage-object id])]
|
|
|
|
(pos? (:next.jdbc/update-count result))))
|
|
|
|
|
2021-01-04 18:41:05 +01:00
|
|
|
(defn- register-recheck
|
|
|
|
[{:keys [pool] :as storage} backend id]
|
|
|
|
(db/insert! pool :storage-pending {:id id :backend (name backend)}))
|
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; API
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
|
|
|
(declare resolve-backend)
|
|
|
|
|
2021-01-30 11:28:11 +01:00
|
|
|
(defn object->relative-path
|
|
|
|
[{:keys [id] :as obj}]
|
|
|
|
(impl/id->path id))
|
|
|
|
|
|
|
|
(defn file-url->path
|
|
|
|
[url]
|
|
|
|
(fs/path (java.net.URI. (str url))))
|
|
|
|
|
2021-01-04 18:41:05 +01:00
|
|
|
(defn content
|
|
|
|
([data] (impl/content data nil))
|
|
|
|
([data size] (impl/content data size)))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(defn get-object
|
2021-01-04 18:41:05 +01:00
|
|
|
[{:keys [conn pool] :as storage} id]
|
2021-01-25 16:14:54 +01:00
|
|
|
(us/assert ::storage storage)
|
2021-01-04 18:41:05 +01:00
|
|
|
(-> (assoc storage :conn (or conn pool))
|
|
|
|
(retrieve-database-object id)))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(defn put-object
|
2021-01-30 11:28:11 +01:00
|
|
|
"Creates a new object with the provided content."
|
2021-01-19 13:43:09 +01:00
|
|
|
[{:keys [pool conn backend executor] :as storage} {:keys [content] :as params}]
|
2021-01-25 16:14:54 +01:00
|
|
|
(us/assert ::storage storage)
|
2021-01-04 18:41:05 +01:00
|
|
|
(us/assert impl/content? content)
|
|
|
|
(let [storage (assoc storage :conn (or conn pool))
|
2021-01-19 13:43:09 +01:00
|
|
|
object (create-database-object storage params)]
|
2021-01-04 18:41:05 +01:00
|
|
|
|
|
|
|
;; Schedule to execute in background; in an other transaction and
|
|
|
|
;; register the currently created storage object id for a later
|
|
|
|
;; recheck.
|
|
|
|
(px/run! executor #(register-recheck storage backend (:id object)))
|
|
|
|
|
|
|
|
;; Store the data finally on the underlying storage subsystem.
|
2020-12-30 14:38:00 +01:00
|
|
|
(-> (resolve-backend storage backend)
|
|
|
|
(impl/put-object object content))
|
2021-01-04 18:41:05 +01:00
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
object))
|
|
|
|
|
2021-01-04 18:41:05 +01:00
|
|
|
(defn clone-object
|
2021-01-30 11:28:11 +01:00
|
|
|
"Creates a clone of the provided object using backend basded efficient
|
|
|
|
method. Always clones objects to the configured default."
|
2021-01-31 19:25:26 +01:00
|
|
|
[{:keys [pool conn] :as storage} object]
|
2021-01-25 16:14:54 +01:00
|
|
|
(us/assert ::storage storage)
|
2021-01-04 18:41:05 +01:00
|
|
|
(let [storage (assoc storage :conn (or conn pool))
|
|
|
|
object* (create-database-object storage object)]
|
2021-01-19 13:43:09 +01:00
|
|
|
(if (= (:backend object) (:backend storage))
|
|
|
|
;; if the source and destination backends are the same, we
|
|
|
|
;; proceed to use the fast path with specific copy
|
|
|
|
;; implementation on backend.
|
2021-01-04 18:41:05 +01:00
|
|
|
(-> (resolve-backend storage (:backend storage))
|
2021-01-19 13:43:09 +01:00
|
|
|
(impl/copy-object object object*))
|
|
|
|
|
|
|
|
;; if the source and destination backends are different, we just
|
|
|
|
;; need to obtain the streams and proceed full copy of the data
|
2021-01-19 14:54:34 +01:00
|
|
|
(with-open [^InputStream input
|
|
|
|
(-> (resolve-backend storage (:backend object))
|
|
|
|
(impl/get-object-data object))]
|
2021-01-19 13:43:09 +01:00
|
|
|
(-> (resolve-backend storage (:backend storage))
|
|
|
|
(impl/put-object object* (impl/content input (:size object))))))
|
2021-01-04 18:41:05 +01:00
|
|
|
|
2021-01-19 13:43:09 +01:00
|
|
|
object*))
|
2021-01-04 18:41:05 +01:00
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
(defn get-object-data
|
|
|
|
[{:keys [pool conn] :as storage} object]
|
2021-01-25 16:14:54 +01:00
|
|
|
(us/assert ::storage storage)
|
2021-01-31 11:50:21 +01:00
|
|
|
(when (or (nil? (:expired-at object))
|
|
|
|
(dt/is-after? (:expired-at object) (dt/now)))
|
|
|
|
(-> (assoc storage :conn (or conn pool))
|
|
|
|
(resolve-backend (:backend object))
|
|
|
|
(impl/get-object-data object))))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(defn get-object-url
|
|
|
|
([storage object]
|
|
|
|
(get-object-url storage object nil))
|
2021-01-04 18:41:05 +01:00
|
|
|
([{:keys [conn pool] :as storage} object options]
|
2021-01-25 16:14:54 +01:00
|
|
|
(us/assert ::storage storage)
|
2021-01-31 11:50:21 +01:00
|
|
|
(when (or (nil? (:expired-at object))
|
|
|
|
(dt/is-after? (:expired-at object) (dt/now)))
|
|
|
|
(-> (assoc storage :conn (or conn pool))
|
|
|
|
(resolve-backend (:backend object))
|
|
|
|
(impl/get-object-url object options)))))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
2021-01-30 11:28:11 +01:00
|
|
|
(defn get-object-path
|
|
|
|
"Get the Path to the object. Only works with `:fs` type of
|
|
|
|
storages."
|
2021-01-31 19:25:26 +01:00
|
|
|
[storage object]
|
2021-01-30 11:28:11 +01:00
|
|
|
(let [backend (resolve-backend storage (:backend object))]
|
|
|
|
(when (not= :fs (:type backend))
|
|
|
|
(ex/raise :type :internal
|
|
|
|
:code :operation-not-allowed
|
|
|
|
:hint "get-object-path only works with fs type backends"))
|
2021-01-31 11:50:21 +01:00
|
|
|
(when (or (nil? (:expired-at object))
|
|
|
|
(dt/is-after? (:expired-at object) (dt/now)))
|
|
|
|
(-> (impl/get-object-url backend object nil)
|
|
|
|
(file-url->path)))))
|
2021-01-25 16:14:54 +01:00
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
(defn del-object
|
2021-01-25 15:22:39 +01:00
|
|
|
[{:keys [conn pool] :as storage} id-or-obj]
|
2021-01-25 16:14:54 +01:00
|
|
|
(us/assert ::storage storage)
|
2021-01-04 18:41:05 +01:00
|
|
|
(-> (assoc storage :conn (or conn pool))
|
2021-01-25 15:22:39 +01:00
|
|
|
(delete-database-object (if (uuid? id-or-obj) id-or-obj (:id id-or-obj)))))
|
|
|
|
|
2020-12-30 14:38:00 +01:00
|
|
|
;; --- impl
|
|
|
|
|
2021-01-04 18:41:05 +01:00
|
|
|
(defn resolve-backend
|
2021-01-08 12:37:32 +01:00
|
|
|
[{:keys [conn pool] :as storage} backend-id]
|
2021-01-04 18:41:05 +01:00
|
|
|
(let [backend (get-in storage [:backends backend-id])]
|
|
|
|
(when-not backend
|
2020-12-30 14:38:00 +01:00
|
|
|
(ex/raise :type :internal
|
|
|
|
:code :backend-not-configured
|
2021-01-04 18:41:05 +01:00
|
|
|
:hint (str/fmt "backend '%s' not configured" backend-id)))
|
2021-01-08 12:37:32 +01:00
|
|
|
(assoc backend :conn (or conn pool))))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
2021-01-29 23:56:11 +01:00
|
|
|
;; Garbage Collection: Permanently delete objects
|
2020-12-30 14:38:00 +01:00
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
|
|
|
;; A task responsible to permanently delete already marked as deleted
|
|
|
|
;; storage files.
|
|
|
|
|
|
|
|
(declare sql:retrieve-deleted-objects)
|
|
|
|
|
2021-01-25 15:22:39 +01:00
|
|
|
(s/def ::min-age ::dt/duration)
|
|
|
|
|
2021-01-29 23:56:11 +01:00
|
|
|
(defmethod ig/pre-init-spec ::gc-deleted-task [_]
|
2021-01-25 15:22:39 +01:00
|
|
|
(s/keys :req-un [::storage ::db/pool ::min-age]))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
2021-01-29 23:56:11 +01:00
|
|
|
(defmethod ig/init-key ::gc-deleted-task
|
2021-01-25 15:22:39 +01:00
|
|
|
[_ {:keys [pool storage min-age] :as cfg}]
|
2021-01-31 17:00:22 +01:00
|
|
|
(letfn [(group-by-backend [rows]
|
|
|
|
(let [conj (fnil conj [])]
|
|
|
|
[(reduce (fn [acc {:keys [id backend]}]
|
|
|
|
(update acc (keyword backend) conj id))
|
|
|
|
{}
|
|
|
|
rows)
|
|
|
|
(count rows)]))
|
|
|
|
|
|
|
|
(retrieve-deleted-objects [conn]
|
2021-01-25 15:22:39 +01:00
|
|
|
(let [min-age (db/interval min-age)
|
2021-01-31 17:00:22 +01:00
|
|
|
rows (db/exec! conn [sql:retrieve-deleted-objects min-age])]
|
|
|
|
(some-> (seq rows) (group-by-backend))))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
2021-01-31 17:00:22 +01:00
|
|
|
(delete-in-bulk [conn [backend ids]]
|
2020-12-30 14:38:00 +01:00
|
|
|
(let [backend (resolve-backend storage backend)
|
|
|
|
backend (assoc backend :conn conn)]
|
|
|
|
(impl/del-objects-in-bulk backend ids)))]
|
|
|
|
|
2021-01-31 19:25:26 +01:00
|
|
|
(fn [_]
|
2020-12-30 14:38:00 +01:00
|
|
|
(db/with-atomic [conn pool]
|
2021-01-31 17:00:22 +01:00
|
|
|
(loop [n 0]
|
|
|
|
(if-let [[groups total] (retrieve-deleted-objects conn)]
|
|
|
|
(do
|
|
|
|
(run! (partial delete-in-bulk conn) groups)
|
2021-02-15 21:03:24 +01:00
|
|
|
(recur (+ n ^long total)))
|
2021-01-31 17:00:22 +01:00
|
|
|
(do
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/info :task "gc-deleted"
|
|
|
|
:action "permanently delete items"
|
|
|
|
:count n)
|
2021-01-31 17:00:22 +01:00
|
|
|
{:deleted n})))))))
|
2020-12-30 14:38:00 +01:00
|
|
|
|
|
|
|
(def sql:retrieve-deleted-objects
|
|
|
|
"with items_part as (
|
2021-01-25 15:22:39 +01:00
|
|
|
select s.id
|
|
|
|
from storage_object as s
|
2020-12-30 14:38:00 +01:00
|
|
|
where s.deleted_at is not null
|
2021-01-25 15:22:39 +01:00
|
|
|
and s.deleted_at < (now() - ?::interval)
|
2020-12-30 14:38:00 +01:00
|
|
|
order by s.deleted_at
|
2021-06-04 09:41:42 +02:00
|
|
|
limit 100
|
2020-12-30 14:38:00 +01:00
|
|
|
)
|
|
|
|
delete from storage_object
|
|
|
|
where id in (select id from items_part)
|
|
|
|
returning *;")
|
|
|
|
|
2021-01-29 23:56:11 +01:00
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Garbage Collection: Analize touched objects
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
|
|
|
;; This task is part of the garbage collection of storage objects and
|
|
|
|
;; is responsible on analizing the touched objects and mark them for deletion
|
|
|
|
;; if corresponds.
|
|
|
|
;;
|
|
|
|
;; When file_media_object is deleted, the depending storage_object are
|
|
|
|
;; marked as touched. This means that some files that depend on a
|
|
|
|
;; concrete storage_object are no longer exists and maybe this
|
|
|
|
;; storage_object is no longer necessary and can be ellegible for
|
|
|
|
;; elimination. This task peridically analizes touched objects and
|
|
|
|
;; mark them as freeze (means that has other references and the object
|
|
|
|
;; is still valid) or deleted (no more references to this object so is
|
|
|
|
;; ready to be deleted).
|
|
|
|
|
|
|
|
(declare sql:retrieve-touched-objects)
|
|
|
|
|
|
|
|
(defmethod ig/pre-init-spec ::gc-touched-task [_]
|
|
|
|
(s/keys :req-un [::db/pool]))
|
|
|
|
|
|
|
|
(defmethod ig/init-key ::gc-touched-task
|
|
|
|
[_ {:keys [pool] :as cfg}]
|
2021-01-31 17:00:22 +01:00
|
|
|
(letfn [(group-results [rows]
|
2021-01-29 23:56:11 +01:00
|
|
|
(let [conj (fnil conj [])]
|
|
|
|
(reduce (fn [acc {:keys [id nrefs]}]
|
|
|
|
(if (pos? nrefs)
|
|
|
|
(update acc :to-freeze conj id)
|
|
|
|
(update acc :to-delete conj id)))
|
|
|
|
{}
|
|
|
|
rows)))
|
|
|
|
|
2021-01-31 17:00:22 +01:00
|
|
|
(retrieve-touched [conn]
|
|
|
|
(let [rows (db/exec! conn [sql:retrieve-touched-objects])]
|
|
|
|
(some-> (seq rows) (group-results))))
|
|
|
|
|
2021-01-29 23:56:11 +01:00
|
|
|
(mark-delete-in-bulk [conn ids]
|
|
|
|
(db/exec-one! conn ["update storage_object set deleted_at=now(), touched_at=null where id = ANY(?)"
|
|
|
|
(db/create-array conn "uuid" (into-array java.util.UUID ids))]))
|
|
|
|
|
|
|
|
(mark-freeze-in-bulk [conn ids]
|
|
|
|
(db/exec-one! conn ["update storage_object set touched_at=null where id = ANY(?)"
|
|
|
|
(db/create-array conn "uuid" (into-array java.util.UUID ids))]))]
|
|
|
|
|
2021-01-31 19:25:26 +01:00
|
|
|
(fn [_]
|
2021-01-29 23:56:11 +01:00
|
|
|
(db/with-atomic [conn pool]
|
2021-01-31 17:00:22 +01:00
|
|
|
(loop [cntf 0
|
|
|
|
cntd 0]
|
|
|
|
(if-let [{:keys [to-delete to-freeze]} (retrieve-touched conn)]
|
|
|
|
(do
|
|
|
|
(when (seq to-delete) (mark-delete-in-bulk conn to-delete))
|
|
|
|
(when (seq to-freeze) (mark-freeze-in-bulk conn to-freeze))
|
|
|
|
(recur (+ cntf (count to-freeze))
|
|
|
|
(+ cntd (count to-delete))))
|
|
|
|
(do
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/info :task "gc-touched"
|
|
|
|
:action "mark freeze"
|
|
|
|
:count cntf)
|
|
|
|
(l/info :task "gc-touched"
|
|
|
|
:action "mark for deletion"
|
|
|
|
:count cntd)
|
2021-01-31 17:00:22 +01:00
|
|
|
{:freeze cntf :delete cntd})))))))
|
2021-01-29 23:56:11 +01:00
|
|
|
|
|
|
|
(def sql:retrieve-touched-objects
|
|
|
|
"select so.id,
|
|
|
|
((select count(*) from file_media_object where media_id = so.id) +
|
|
|
|
(select count(*) from file_media_object where thumbnail_id = so.id)) as nrefs
|
|
|
|
from storage_object as so
|
|
|
|
where so.touched_at is not null
|
|
|
|
order by so.touched_at
|
2021-06-04 09:41:42 +02:00
|
|
|
limit 100;")
|
2020-12-30 14:38:00 +01:00
|
|
|
|
2021-01-04 18:41:05 +01:00
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
;; Recheck Stalled Task
|
|
|
|
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
|
|
|
|
2021-01-25 16:14:54 +01:00
|
|
|
;; Because the physical storage (filesystem, s3, ... except db) is not
|
|
|
|
;; transactional, in some situations we can found physical object
|
|
|
|
;; leakage. That situations happens when the transaction that writes
|
|
|
|
;; the file aborts, leaving the file written to the underlying storage
|
|
|
|
;; but the reference on the database is lost with the rollback.
|
|
|
|
;;
|
|
|
|
;; For this situations we need to write a "log" of inserted files that
|
|
|
|
;; are checked in some time in future. If physical file exists but the
|
|
|
|
;; database refence does not exists means that leaked file is found
|
|
|
|
;; and is inmediatelly deleted. The responsability of this task is
|
|
|
|
;; check that write log for possible leaked files.
|
|
|
|
|
2021-01-31 17:00:22 +01:00
|
|
|
(def recheck-min-age (dt/duration {:hours 1}))
|
|
|
|
|
|
|
|
(declare sql:retrieve-pending-to-recheck)
|
2021-01-04 18:41:05 +01:00
|
|
|
(declare sql:exists-storage-object)
|
|
|
|
|
|
|
|
(defmethod ig/pre-init-spec ::recheck-task [_]
|
|
|
|
(s/keys :req-un [::storage ::db/pool]))
|
|
|
|
|
|
|
|
(defmethod ig/init-key ::recheck-task
|
|
|
|
[_ {:keys [pool storage] :as cfg}]
|
2021-01-31 17:00:22 +01:00
|
|
|
(letfn [(group-results [rows]
|
|
|
|
(let [conj (fnil conj [])]
|
2021-01-31 19:25:26 +01:00
|
|
|
(reduce (fn [acc {:keys [id exist] :as row}]
|
2021-01-31 17:00:22 +01:00
|
|
|
(cond-> (update acc :all conj id)
|
|
|
|
(false? exist)
|
|
|
|
(update :to-delete conj (dissoc row :exist))))
|
|
|
|
{}
|
|
|
|
rows)))
|
|
|
|
|
|
|
|
(group-by-backend [rows]
|
|
|
|
(let [conj (fnil conj [])]
|
|
|
|
(reduce (fn [acc {:keys [id backend]}]
|
|
|
|
(update acc (keyword backend) conj id))
|
|
|
|
{}
|
|
|
|
rows)))
|
|
|
|
|
|
|
|
(retrieve-pending [conn]
|
|
|
|
(let [rows (db/exec! conn [sql:retrieve-pending-to-recheck (db/interval recheck-min-age)])]
|
|
|
|
(some-> (seq rows) (group-results))))
|
2021-01-04 18:41:05 +01:00
|
|
|
|
2021-01-31 17:00:22 +01:00
|
|
|
(delete-group [conn [backend ids]]
|
|
|
|
(let [backend (resolve-backend storage backend)
|
|
|
|
backend (assoc backend :conn conn)]
|
|
|
|
(impl/del-objects-in-bulk backend ids)))
|
2021-01-04 18:41:05 +01:00
|
|
|
|
2021-01-31 17:00:22 +01:00
|
|
|
(delete-all [conn ids]
|
|
|
|
(let [ids (db/create-array conn "uuid" (into-array java.util.UUID ids))]
|
|
|
|
(db/exec-one! conn ["delete from storage_pending where id = ANY(?)" ids])))]
|
2021-01-04 18:41:05 +01:00
|
|
|
|
2021-01-31 19:25:26 +01:00
|
|
|
(fn [_]
|
2021-01-04 18:41:05 +01:00
|
|
|
(db/with-atomic [conn pool]
|
2021-01-31 17:00:22 +01:00
|
|
|
(loop [n 0 d 0]
|
|
|
|
(if-let [{:keys [all to-delete]} (retrieve-pending conn)]
|
|
|
|
(let [groups (group-by-backend to-delete)]
|
|
|
|
(run! (partial delete-group conn) groups)
|
|
|
|
(delete-all conn all)
|
|
|
|
(recur (+ n (count all))
|
|
|
|
(+ d (count to-delete))))
|
|
|
|
(do
|
2021-04-06 23:25:34 +02:00
|
|
|
(l/info :task "recheck"
|
|
|
|
:action "recheck items"
|
|
|
|
:processed n
|
|
|
|
:deleted n)
|
2021-01-31 17:00:22 +01:00
|
|
|
{:processed n :deleted d})))))))
|
|
|
|
|
|
|
|
(def sql:retrieve-pending-to-recheck
|
|
|
|
"select sp.id,
|
|
|
|
sp.backend,
|
|
|
|
sp.created_at,
|
|
|
|
(case when count(so.id) > 0 then true
|
|
|
|
else false
|
|
|
|
end) as exist
|
|
|
|
from storage_pending as sp
|
|
|
|
left join storage_object as so
|
|
|
|
on (so.id = sp.id)
|
|
|
|
where sp.created_at < now() - ?::interval
|
|
|
|
group by 1,2,3
|
|
|
|
order by sp.created_at asc
|
|
|
|
limit 100")
|