diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index 457bad1f5..8871764b1 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -146,9 +146,9 @@ (instance? javax.sql.DataSource v)) (s/def ::conn some?) -(s/def ::pool pool?) (s/def ::nilable-pool (s/nilable ::pool)) -(s/def ::conn-or-pool some?) +(s/def ::pool pool?) +(s/def ::pool-or-conn some?) (defn closed? [pool] diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index a5362bbf6..56584e37f 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -7,18 +7,17 @@ (ns app.http.assets "Assets related handlers." (:require + [app.common.data :as d] [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uri :as u] [app.db :as db] - [app.metrics :as mtx] [app.storage :as sto] [app.util.time :as dt] [app.worker :as wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] [promesa.core :as p] - [promesa.exec :as px] [yetti.response :as yrs])) (def ^:private cache-max-age @@ -27,104 +26,96 @@ (def ^:private signature-max-age (dt/duration {:hours 24 :minutes 15})) -(defn coerce-id - [id] - (let [res (parse-uuid id)] - (when-not (uuid? res) - (ex/raise :type :not-found - :hint "object not found")) - res)) +(defn get-id + [{:keys [path-params]}] + (if-let [id (some-> path-params :id d/parse-uuid)] + (p/resolved id) + (p/rejected (ex/error :type :not-found + :hunt "object not found")))) (defn- get-file-media-object - [{:keys [pool executor] :as storage} id] - (px/with-dispatch executor - (let [id (coerce-id id) - mobj (db/exec-one! pool ["select * from file_media_object where id=?" id])] - (when-not mobj - (ex/raise :type :not-found - :hint "object does not found")) - mobj))) + [pool id] + (db/get pool :file-media-object {:id id})) + +(defn- serve-object-from-s3 + [{:keys [::sto/storage] :as cfg} obj] + (let [mdata (meta obj)] + (->> (sto/get-object-url storage obj {:max-age signature-max-age}) + (p/fmap (fn [{:keys [host port] :as url}] + (let [headers {"location" (str url) + "x-host" (cond-> host port (str ":" port)) + "x-mtype" (:content-type mdata) + "cache-control" (str "max-age=" (inst-ms cache-max-age))}] + (yrs/response + :status 307 + :headers headers))))))) + +(defn- serve-object-from-fs + [{:keys [::path]} obj] + (let [purl (u/join (u/uri path) + (sto/object->relative-path obj)) + mdata (meta obj) + headers {"x-accel-redirect" (:path purl) + "content-type" (:content-type mdata) + "cache-control" (str "max-age=" (inst-ms cache-max-age))}] + (p/resolved + (yrs/response :status 204 :headers headers)))) (defn- serve-object "Helper function that returns the appropriate response depending on the storage object backend type." - [{:keys [storage] :as cfg} obj] - (let [mdata (meta obj) - backend (sto/resolve-backend storage (:backend obj))] - (case (:type backend) - :s3 - (p/let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})] - (yrs/response :status 307 - :headers {"location" (str url) - "x-host" (cond-> host port (str ":" port)) - "x-mtype" (:content-type mdata) - "cache-control" (str "max-age=" (inst-ms cache-max-age))})) - - :fs - (p/let [purl (u/uri (:assets-path cfg)) - purl (u/join purl (sto/object->relative-path obj))] - (yrs/response :status 204 - :headers {"x-accel-redirect" (:path purl) - "content-type" (:content-type mdata) - "cache-control" (str "max-age=" (inst-ms cache-max-age))}))))) + [{:keys [::sto/storage] :as cfg} {:keys [backend] :as obj}] + (let [backend (sto/resolve-backend storage backend)] + (case (::sto/type backend) + :s3 (serve-object-from-s3 cfg obj) + :fs (serve-object-from-fs cfg obj)))) (defn objects-handler "Handler that servers storage objects by id." - [{:keys [storage executor] :as cfg} request respond raise] - (-> (px/with-dispatch executor - (p/let [id (get-in request [:path-params :id]) - id (coerce-id id) - obj (sto/get-object storage id)] - (if obj - (serve-object cfg obj) - (yrs/response 404)))) - - (p/bind p/wrap) - (p/then' respond) - (p/catch raise))) + [{:keys [::sto/storage ::wrk/executor] :as cfg} request respond raise] + (->> (get-id request) + (p/mcat executor (fn [id] (sto/get-object storage id))) + (p/mcat executor (fn [obj] + (if (some? obj) + (serve-object cfg obj) + (p/resolved (yrs/response 404))))) + (p/fnly executor (fn [result cause] + (if cause (raise cause) (respond result)))))) (defn- generic-handler "A generic handler helper/common code for file-media based handlers." - [{:keys [storage] :as cfg} request kf] - (p/let [id (get-in request [:path-params :id]) - mobj (get-file-media-object storage id) - obj (sto/get-object storage (kf mobj))] - (if obj - (serve-object cfg obj) - (yrs/response 404)))) + [{:keys [::sto/storage ::wrk/executor] :as cfg} request kf] + (let [pool (::db/pool storage)] + (->> (get-id request) + (p/fmap executor (fn [id] (get-file-media-object pool id))) + (p/mcat executor (fn [mobj] (sto/get-object storage (kf mobj)))) + (p/mcat executor (fn [sobj] + (if sobj + (serve-object cfg sobj) + (p/resolved (yrs/response 404)))))))) (defn file-objects-handler "Handler that serves storage objects by file media id." [cfg request respond raise] - (-> (generic-handler cfg request :media-id) - (p/then respond) - (p/catch raise))) + (->> (generic-handler cfg request :media-id) + (p/fnly (fn [result cause] + (if cause (raise cause) (respond result)))))) (defn file-thumbnails-handler "Handler that serves storage objects by thumbnail-id and quick fallback to file-media-id if no thumbnail is available." [cfg request respond raise] - (-> (generic-handler cfg request #(or (:thumbnail-id %) (:media-id %))) - (p/then respond) - (p/catch raise))) + (->> (generic-handler cfg request #(or (:thumbnail-id %) (:media-id %))) + (p/fnly (fn [result cause] + (if cause (raise cause) (respond result)))))) ;; --- Initialization -(s/def ::storage some?) -(s/def ::assets-path ::us/string) -(s/def ::cache-max-age ::dt/duration) -(s/def ::signature-max-age ::dt/duration) - +(s/def ::path ::us/string) (s/def ::routes vector?) -;; FIXME: namespace qualified params (defmethod ig/pre-init-spec ::routes [_] - (s/keys :req-un [::storage - ::wrk/executor - ::mtx/metrics - ::assets-path - ::cache-max-age - ::signature-max-age])) + (s/keys :req [::sto/storage ::wrk/executor ::path])) (defmethod ig/init-key ::routes [_ cfg] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 6754365ac..2834f109a 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -33,6 +33,8 @@ [app.rpc.doc :as-alias rpc.doc] [app.srepl :as-alias srepl] [app.storage :as-alias sto] + [app.storage.fs :as-alias sto.fs] + [app.storage.s3 :as-alias sto.s3] [app.util.time :as dt] [app.worker :as-alias wrk] [cuerdas.core :as str] @@ -206,12 +208,11 @@ ::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)} ::sto/gc-deleted-task - {:pool (ig/ref ::db/pool) - :storage (ig/ref ::sto/storage) - :executor (ig/ref ::wrk/executor)} + {::db/pool (ig/ref ::db/pool) + ::sto/storage (ig/ref ::sto/storage)} ::sto/gc-touched-task - {:pool (ig/ref ::db/pool)} + {::db/pool (ig/ref ::db/pool)} ::http.client/client {::wrk/executor (ig/ref ::wrk/executor)} @@ -310,12 +311,11 @@ ::session/manager (ig/ref ::session/manager)} :app.http.assets/routes - {:metrics (ig/ref ::mtx/metrics) - :assets-path (cf/get :assets-path) - :storage (ig/ref ::sto/storage) - :executor (ig/ref ::wrk/executor) - :cache-max-age (dt/duration {:hours 24}) - :signature-max-age (dt/duration {:hours 24 :minutes 5})} + {::http.assets/path (cf/get :assets-path) + ::http.assets/cache-max-age (dt/duration {:hours 24}) + ::http.assets/cache-max-agesignature-max-age (dt/duration {:hours 24 :minutes 5}) + ::sto/storage (ig/ref ::sto/storage) + ::wrk/executor (ig/ref ::wrk/executor)} :app.rpc/climit {::mtx/metrics (ig/ref ::mtx/metrics) @@ -358,9 +358,9 @@ ::props (ig/ref :app.setup/props)} ::wrk/registry - {:metrics (ig/ref ::mtx/metrics) - :tasks - {:sendmail (ig/ref :app.emails/handler) + {::mtx/metrics (ig/ref ::mtx/metrics) + ::wrk/tasks + {:sendmail (ig/ref ::email/handler) :objects-gc (ig/ref :app.tasks.objects-gc/handler) :file-gc (ig/ref :app.tasks.file-gc/handler) :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) @@ -392,18 +392,17 @@ ::mtx/metrics (ig/ref ::mtx/metrics)} :app.tasks.tasks-gc/handler - {:pool (ig/ref ::db/pool) - :max-age cf/deletion-delay} + {::db/pool (ig/ref ::db/pool)} :app.tasks.objects-gc/handler {::db/pool (ig/ref ::db/pool) ::sto/storage (ig/ref ::sto/storage)} :app.tasks.file-gc/handler - {:pool (ig/ref ::db/pool)} + {::db/pool (ig/ref ::db/pool)} :app.tasks.file-xlog-gc/handler - {:pool (ig/ref ::db/pool)} + {::db/pool (ig/ref ::db/pool)} :app.tasks.telemetry/handler {::db/pool (ig/ref ::db/pool) @@ -457,25 +456,20 @@ {::db/pool (ig/ref ::db/pool)} ::sto/storage - {:pool (ig/ref ::db/pool) - :executor (ig/ref ::wrk/executor) - - :backends + {::db/pool (ig/ref ::db/pool) + ::wrk/executor (ig/ref ::wrk/executor) + ::sto/backends {:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) - :assets-fs (ig/ref [::assets :app.storage.fs/backend]) - - ;; keep this for backward compatibility - :s3 (ig/ref [::assets :app.storage.s3/backend]) - :fs (ig/ref [::assets :app.storage.fs/backend])}} + :assets-fs (ig/ref [::assets :app.storage.fs/backend])}} [::assets :app.storage.s3/backend] - {:region (cf/get :storage-assets-s3-region) - :endpoint (cf/get :storage-assets-s3-endpoint) - :bucket (cf/get :storage-assets-s3-bucket) - :executor (ig/ref ::wrk/executor)} + {::sto.s3/region (cf/get :storage-assets-s3-region) + ::sto.s3/endpoint (cf/get :storage-assets-s3-endpoint) + ::sto.s3/bucket (cf/get :storage-assets-s3-bucket) + ::wrk/executor (ig/ref ::wrk/executor)} [::assets :app.storage.fs/backend] - {:directory (cf/get :storage-assets-fs-directory)} + {::sto.fs/directory (cf/get :storage-assets-fs-directory)} }) diff --git a/backend/src/app/media.clj b/backend/src/app/media.clj index 689f50b3a..72dbb83d3 100644 --- a/backend/src/app/media.clj +++ b/backend/src/app/media.clj @@ -12,6 +12,8 @@ [app.common.media :as cm] [app.common.spec :as us] [app.config :as cf] + [app.db :as-alias db] + [app.storage :as-alias sto] [app.storage.tmp :as tmp] [app.util.svg :as svg] [buddy.core.bytes :as bb] @@ -297,8 +299,7 @@ "Given storage map, returns a storage configured with the appropriate backend for assets and optional connection attached." ([storage] - (assoc storage :backend (cf/get :assets-storage-backend :assets-fs))) - ([storage conn] - (-> storage - (assoc :conn conn) - (assoc :backend (cf/get :assets-storage-backend :assets-fs))))) + (assoc storage ::sto/backend (cf/get :assets-storage-backend :assets-fs))) + ([storage pool-or-conn] + (-> (configure-assets-storage storage) + (assoc ::db/pool-or-conn pool-or-conn)))) diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 60cd22cc0..b8b41d0e9 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -317,12 +317,11 @@ ]) -(defn- apply-migrations! - [pool migrations] - ;; (app.common.pprint/pprint migrations) +(defn apply-migrations! + [pool name migrations] (dm/with-open [conn (db/open pool)] (mg/setup! conn) - (mg/migrate! conn {:name "main" :steps migrations}))) + (mg/migrate! conn {:name name :steps migrations}))) (defmethod ig/pre-init-spec ::migrations [_] @@ -332,4 +331,4 @@ [module {:keys [::db/pool]}] (when-not (db/read-only? pool) (l/info :hint "running migrations" :module module) - (some->> (seq migrations) (apply-migrations! pool)))) + (some->> (seq migrations) (apply-migrations! pool "main")))) diff --git a/backend/src/app/rpc.clj b/backend/src/app/rpc.clj index 6afd38339..49a03ae6a 100644 --- a/backend/src/app/rpc.clj +++ b/backend/src/app/rpc.clj @@ -365,9 +365,10 @@ (defmethod ig/init-key ::methods [_ cfg] - {:mutations (resolve-mutation-methods cfg) - :queries (resolve-query-methods cfg) - :commands (resolve-command-methods cfg)}) + (let [cfg (d/without-nils cfg)] + {:mutations (resolve-mutation-methods cfg) + :queries (resolve-query-methods cfg) + :commands (resolve-command-methods cfg)})) (s/def ::mutations (s/map-of keyword? fn?)) diff --git a/backend/src/app/rpc/commands/binfile.clj b/backend/src/app/rpc/commands/binfile.clj index e4972432e..3fefa9109 100644 --- a/backend/src/app/rpc/commands/binfile.clj +++ b/backend/src/app/rpc/commands/binfile.clj @@ -516,7 +516,7 @@ (write-obj! output rels))) (defmethod write-section :v1/sobjects - [{:keys [storage ::output]}] + [{:keys [::sto/storage ::output]}] (let [sids (-> *state* deref :sids) storage (media/configure-assets-storage storage)] (l/debug :hint "found sobjects" diff --git a/backend/src/app/rpc/quotes.clj b/backend/src/app/rpc/quotes.clj index 49e2bb71a..4cdc3800d 100644 --- a/backend/src/app/rpc/quotes.clj +++ b/backend/src/app/rpc/quotes.clj @@ -23,7 +23,7 @@ ;; PUBLIC API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::conn ::db/conn-or-pool) +(s/def ::conn ::db/pool-or-conn) (s/def ::file-id ::us/uuid) (s/def ::team-id ::us/uuid) (s/def ::project-id ::us/uuid) @@ -53,7 +53,7 @@ (defn check-quote! [conn quote] - (us/assert! ::db/conn-or-pool conn) + (us/assert! ::db/pool-or-conn conn) (us/assert! ::quote quote) (when (contains? cf/flags :quotes) (when @enabled diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index 9d0906cb6..dc013261b 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -29,8 +29,10 @@ ;; Storage Module State ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(s/def ::id #{:assets-fs :assets-s3}) (s/def ::s3 ::ss3/backend) (s/def ::fs ::sfs/backend) +(s/def ::type #{:fs :s3}) (s/def ::backends (s/map-of ::us/keyword @@ -39,34 +41,26 @@ :fs ::sfs/backend)))) (defmethod ig/pre-init-spec ::storage [_] - (s/keys :req-un [::db/pool ::wrk/executor ::backends])) - -(defmethod ig/prep-key ::storage - [_ {:keys [backends] :as cfg}] - (-> (d/without-nils cfg) - (assoc :backends (d/without-nils backends)))) + (s/keys :req [::db/pool ::wrk/executor ::backends])) (defmethod ig/init-key ::storage - [_ {:keys [backends] :as cfg}] + [_ {:keys [::backends ::db/pool] :as cfg}] (-> (d/without-nils cfg) - (assoc :backends (d/without-nils backends)))) + (assoc ::backends (d/without-nils backends)) + (assoc ::db/pool-or-conn pool))) +(s/def ::backend keyword?) (s/def ::storage - (s/keys :req-un [::backends ::db/pool])) + (s/keys :req [::backends ::db/pool ::db/pool-or-conn] + :opt [::backend])) + +(s/def ::storage-with-backend + (s/and ::storage #(contains? % ::backend))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Database Objects ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defrecord StorageObject [id size created-at expired-at touched-at backend]) - -(defn storage-object? - [v] - (instance? StorageObject v)) - -(s/def ::storage-object storage-object?) -(s/def ::storage-content impl/content?) - (defn get-metadata [params] (into {} @@ -74,19 +68,18 @@ params)) (defn- get-database-object-by-hash - [conn backend bucket hash] + [pool-or-conn backend bucket hash] (let [sql (str "select * from storage_object " " where (metadata->>'~:hash') = ? " " and (metadata->>'~:bucket') = ? " " and backend = ?" " and deleted_at is null" " limit 1")] - (some-> (db/exec-one! conn [sql hash bucket (name backend)]) + (some-> (db/exec-one! pool-or-conn [sql hash bucket (name backend)]) (update :metadata db/decode-transit-pgobject)))) (defn- create-database-object - [{:keys [conn backend executor]} {:keys [::content ::expired-at ::touched-at] :as params}] - (us/assert ::storage-content content) + [{:keys [::backend ::wrk/executor ::db/pool-or-conn]} {:keys [::content ::expired-at ::touched-at] :as params}] (px/with-dispatch executor (let [id (uuid/random) @@ -101,10 +94,10 @@ result (when (and (::deduplicate? params) (:hash mdata) (:bucket mdata)) - (get-database-object-by-hash conn backend (:bucket mdata) (:hash mdata))) + (get-database-object-by-hash pool-or-conn backend (:bucket mdata) (:hash mdata))) result (or result - (-> (db/insert! conn :storage-object + (-> (db/insert! pool-or-conn :storage-object {:id id :size (impl/get-size content) :backend (name backend) @@ -114,33 +107,33 @@ (update :metadata db/decode-transit-pgobject) (update :metadata assoc ::created? true)))] - (StorageObject. (:id result) - (:size result) - (:created-at result) - (:deleted-at result) - (:touched-at result) - backend - (:metadata result) - nil)))) + (impl/storage-object + (:id result) + (:size result) + (:created-at result) + (:deleted-at result) + (:touched-at result) + backend + (:metadata result))))) (def ^:private sql:retrieve-storage-object "select * from storage_object where id = ? and (deleted_at is null or deleted_at > now())") (defn row->storage-object [res] (let [mdata (or (some-> (:metadata res) (db/decode-transit-pgobject)) {})] - (StorageObject. (:id res) - (:size res) - (:created-at res) - (:deleted-at res) - (:touched-at res) - (keyword (:backend res)) - mdata - nil))) + (impl/storage-object + (:id res) + (:size res) + (:created-at res) + (:deleted-at res) + (:touched-at res) + (keyword (:backend res)) + mdata))) (defn- retrieve-database-object - [{:keys [conn] :as storage} id] - (when-let [res (db/exec-one! conn [sql:retrieve-storage-object id])] - (row->storage-object res))) + [conn id] + (some-> (db/exec-one! conn [sql:retrieve-storage-object id]) + (row->storage-object))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; API @@ -152,103 +145,99 @@ (defn file-url->path [url] - (fs/path (java.net.URI. (str url)))) + (when url + (fs/path (java.net.URI. (str url))))) (dm/export impl/content) (dm/export impl/wrap-with-hash) +(dm/export impl/object?) (defn get-object - [{:keys [conn pool] :as storage} id] - (us/assert ::storage storage) - (p/do - (-> (assoc storage :conn (or conn pool)) - (retrieve-database-object id)))) + [{:keys [::db/pool-or-conn ::wrk/executor] :as storage} id] + (us/assert! ::storage storage) + (px/with-dispatch executor + (retrieve-database-object pool-or-conn id))) (defn put-object! "Creates a new object with the provided content." - [{:keys [pool conn backend] :as storage} {:keys [::content] :as params}] - (us/assert ::storage storage) - (us/assert ::storage-content content) - (us/assert ::us/keyword backend) - (p/let [storage (assoc storage :conn (or conn pool)) - object (create-database-object storage params)] - - (when (::created? (meta object)) - ;; Store the data finally on the underlying storage subsystem. - (-> (impl/resolve-backend storage backend) - (impl/put-object object content))) - - object)) + [{:keys [::backend] :as storage} {:keys [::content] :as params}] + (us/assert! ::storage-with-backend storage) + (us/assert! ::impl/content content) + (->> (create-database-object storage params) + (p/mcat (fn [object] + (if (::created? (meta object)) + ;; Store the data finally on the underlying storage subsystem. + (-> (impl/resolve-backend storage backend) + (impl/put-object object content)) + (p/resolved object)))))) (defn touch-object! "Mark object as touched." - [{:keys [pool conn] :as storage} object-or-id] - (p/do - (let [id (if (storage-object? object-or-id) (:id object-or-id) object-or-id) - res (db/update! (or conn pool) :storage-object - {:touched-at (dt/now)} - {:id id} - {::db/return-keys? false})] - (pos? (:next.jdbc/update-count res))))) + [{:keys [::db/pool-or-conn ::wrk/executor] :as storage} object-or-id] + (us/assert! ::storage storage) + (px/with-dispatch executor + (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) + rs (db/update! pool-or-conn :storage-object + {:touched-at (dt/now)} + {:id id} + {::db/return-keys? false})] + (pos? (db/get-update-count rs))))) (defn get-object-data "Return an input stream instance of the object content." - [{:keys [pool conn] :as storage} object] - (us/assert ::storage storage) - (p/do - (when (or (nil? (:expired-at object)) - (dt/is-after? (:expired-at object) (dt/now))) - (-> (assoc storage :conn (or conn pool)) - (impl/resolve-backend (:backend object)) - (impl/get-object-data object))))) + [storage object] + (us/assert! ::storage storage) + (if (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now))) + (-> (impl/resolve-backend storage (:backend object)) + (impl/get-object-data object)) + (p/resolved nil))) (defn get-object-bytes "Returns a byte array of object content." - [{:keys [pool conn] :as storage} object] - (us/assert ::storage storage) - (p/do - (when (or (nil? (:expired-at object)) - (dt/is-after? (:expired-at object) (dt/now))) - (-> (assoc storage :conn (or conn pool)) - (impl/resolve-backend (:backend object)) - (impl/get-object-bytes object))))) + [storage object] + (us/assert! ::storage storage) + (if (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now))) + (-> (impl/resolve-backend storage (:backend object)) + (impl/get-object-bytes object)) + (p/resolved nil))) (defn get-object-url ([storage object] (get-object-url storage object nil)) - ([{:keys [conn pool] :as storage} object options] - (us/assert ::storage storage) - (p/do - (when (or (nil? (:expired-at object)) - (dt/is-after? (:expired-at object) (dt/now))) - (-> (assoc storage :conn (or conn pool)) - (impl/resolve-backend (:backend object)) - (impl/get-object-url object options)))))) + ([storage object options] + (us/assert! ::storage storage) + (if (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now))) + (-> (impl/resolve-backend storage (:backend object)) + (impl/get-object-url object options)) + (p/resolved nil)))) (defn get-object-path "Get the Path to the object. Only works with `:fs` type of storages." [storage object] - (p/do - (let [backend (impl/resolve-backend storage (:backend object))] - (when (not= :fs (:type backend)) - (ex/raise :type :internal - :code :operation-not-allowed - :hint "get-object-path only works with fs type backends")) - (when (or (nil? (:expired-at object)) - (dt/is-after? (:expired-at object) (dt/now))) - (p/-> (impl/get-object-url backend object nil) file-url->path))))) + (us/assert! ::storage storage) + (let [backend (impl/resolve-backend storage (:backend object))] + (if (not= :fs (::type backend)) + (p/resolved nil) + (if (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now))) + (->> (impl/get-object-url backend object nil) + (p/fmap file-url->path)) + (p/resolved nil))))) (defn del-object! - [{:keys [conn pool] :as storage} object-or-id] - (us/assert ::storage storage) - (p/do - (let [id (if (storage-object? object-or-id) (:id object-or-id) object-or-id) - res (db/update! (or conn pool) :storage-object + [{:keys [::db/pool-or-conn ::wrk/executor] :as storage} object-or-id] + (us/assert! ::storage storage) + (px/with-dispatch executor + (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) + res (db/update! pool-or-conn :storage-object {:deleted-at (dt/now)} {:id id} {::db/return-keys? false})] - (pos? (:next.jdbc/update-count res))))) + (pos? (db/get-update-count res))))) (dm/export impl/resolve-backend) (dm/export impl/calculate-hash) @@ -265,18 +254,15 @@ (declare sql:retrieve-deleted-objects-chunk) -(s/def ::min-age ::dt/duration) - (defmethod ig/pre-init-spec ::gc-deleted-task [_] - (s/keys :req-un [::storage ::db/pool ::min-age ::wrk/executor])) + (s/keys :req [::storage ::db/pool])) (defmethod ig/prep-key ::gc-deleted-task [_ cfg] - (merge {:min-age (dt/duration {:hours 2})} - (d/without-nils cfg))) + (assoc cfg ::min-age (dt/duration {:hours 2}))) (defmethod ig/init-key ::gc-deleted-task - [_ {:keys [pool storage] :as cfg}] + [_ {:keys [::db/pool ::storage ::min-age]}] (letfn [(retrieve-deleted-objects-chunk [conn min-age cursor] (let [min-age (db/interval min-age) rows (db/exec! conn [sql:retrieve-deleted-objects-chunk min-age cursor])] @@ -289,27 +275,26 @@ :vf second :kf first)) - (delete-in-bulk [conn backend-name ids] - (let [backend (impl/resolve-backend storage backend-name) - backend (assoc backend :conn conn)] + (delete-in-bulk [backend-id ids] + (let [backend (impl/resolve-backend storage backend-id)] (doseq [id ids] - (l/debug :hint "permanently delete storage object" :task "gc-deleted" :backend backend-name :id id)) + (l/debug :hint "gc-deleted: permanently delete storage object" :backend backend-id :id id)) @(impl/del-objects-in-bulk backend ids)))] (fn [params] - (let [min-age (or (:min-age params) (:min-age cfg))] + (let [min-age (or (:min-age params) min-age)] (db/with-atomic [conn pool] (loop [total 0 groups (retrieve-deleted-objects conn min-age)] - (if-let [[backend ids] (first groups)] + (if-let [[backend-id ids] (first groups)] (do - (delete-in-bulk conn backend ids) + (delete-in-bulk backend-id ids) (recur (+ total (count ids)) (rest groups))) (do - (l/info :hint "task finished" :min-age (dt/format-duration min-age) :task "gc-deleted" :total total) + (l/info :hint "gc-deleted: task finished" :min-age (dt/format-duration min-age) :total total) {:deleted total})))))))) (def sql:retrieve-deleted-objects-chunk @@ -349,10 +334,10 @@ (declare sql:retrieve-profile-nrefs) (defmethod ig/pre-init-spec ::gc-touched-task [_] - (s/keys :req-un [::db/pool])) + (s/keys :req [::db/pool])) (defmethod ig/init-key ::gc-touched-task - [_ {:keys [pool] :as cfg}] + [_ {:keys [::db/pool]}] (letfn [(get-team-font-variant-nrefs [conn id] (-> (db/exec-one! conn [sql:retrieve-team-font-variant-nrefs id id id id]) :nrefs)) @@ -409,13 +394,13 @@ (let [nrefs (get-fn conn id)] (if (pos? nrefs) (do - (l/debug :hint "processing storage object" - :task "gc-touched" :id id :status "freeze" + (l/debug :hint "gc-touched: processing storage object" + :id id :status "freeze" :bucket bucket :refs nrefs) (recur (conj to-freeze id) to-delete (rest ids))) (do - (l/debug :hint "processing storage object" - :task "gc-touched" :id id :status "delete" + (l/debug :hint "gc-touched: processing storage object" + :id id :status "delete" :bucket bucket :refs nrefs) (recur to-freeze (conj to-delete id) (rest ids))))) (do @@ -441,7 +426,7 @@ (+ to-delete d) (rest groups))) (do - (l/info :hint "task finished" :task "gc-touched" :to-freeze to-freeze :to-delete to-delete) + (l/info :hint "gc-touched: task finished" :to-freeze to-freeze :to-delete to-delete) {:freeze to-freeze :delete to-delete}))))))) (def sql:retrieve-touched-objects-chunk diff --git a/backend/src/app/storage/fs.clj b/backend/src/app/storage/fs.clj index c88d4b33e..f6240e2ad 100644 --- a/backend/src/app/storage/fs.clj +++ b/backend/src/app/storage/fs.clj @@ -9,7 +9,9 @@ [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uri :as u] + [app.storage :as-alias sto] [app.storage.impl :as impl] + [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str] [datoteka.fs :as fs] @@ -28,42 +30,49 @@ (s/def ::directory ::us/string) (defmethod ig/pre-init-spec ::backend [_] - (s/keys :opt-un [::directory])) + (s/keys :opt [::directory])) (defmethod ig/init-key ::backend [_ cfg] ;; Return a valid backend data structure only if all optional ;; parameters are provided. - (when (string? (:directory cfg)) - (let [dir (fs/normalize (:directory cfg))] + (when (string? (::directory cfg)) + (let [dir (fs/normalize (::directory cfg))] (assoc cfg - :type :fs - :directory (str dir) - :uri (u/uri (str "file://" dir)))))) + ::sto/type :fs + ::directory (str dir) + ::uri (u/uri (str "file://" dir)))))) -(s/def ::type ::us/keyword) (s/def ::uri u/uri?) (s/def ::backend - (s/keys :req-un [::type ::directory ::uri])) + (s/keys :req [::directory + ::uri] + :opt [::sto/type + ::sto/id + ::wrk/executor])) ;; --- API IMPL (defmethod impl/put-object :fs - [{:keys [executor] :as backend} {:keys [id] :as object} content] + [{:keys [::wrk/executor] :as backend} {:keys [id] :as object} content] + (us/assert! ::backend backend) (px/with-dispatch executor - (let [base (fs/path (:directory backend)) + (let [base (fs/path (::directory backend)) path (fs/path (impl/id->path id)) full (fs/normalize (fs/join base path))] (when-not (fs/exists? (fs/parent full)) (fs/create-dir (fs/parent full))) (with-open [^InputStream src (io/input-stream content) ^OutputStream dst (io/output-stream full)] - (io/copy! src dst))))) + (io/copy! src dst)) + + object))) (defmethod impl/get-object-data :fs - [{:keys [executor] :as backend} {:keys [id] :as object}] + [{:keys [::wrk/executor] :as backend} {:keys [id] :as object}] + (us/assert! ::backend backend) (px/with-dispatch executor - (let [^Path base (fs/path (:directory backend)) + (let [^Path base (fs/path (::directory backend)) ^Path path (fs/path (impl/id->path id)) ^Path full (fs/normalize (fs/join base path))] (when-not (fs/exists? full) @@ -74,33 +83,37 @@ (defmethod impl/get-object-bytes :fs [backend object] - (p/let [input (impl/get-object-data backend object)] - (try - (io/read-as-bytes input) - (finally - (io/close! input))))) + (->> (impl/get-object-data backend object) + (p/fmap (fn [input] + (try + (io/read-as-bytes input) + (finally + (io/close! input))))))) (defmethod impl/get-object-url :fs - [{:keys [uri executor] :as backend} {:keys [id] :as object} _] - (px/with-dispatch executor - (update uri :path - (fn [existing] - (if (str/ends-with? existing "/") - (str existing (impl/id->path id)) - (str existing "/" (impl/id->path id))))))) + [{:keys [::uri] :as backend} {:keys [id] :as object} _] + (us/assert! ::backend backend) + (p/resolved + (update uri :path + (fn [existing] + (if (str/ends-with? existing "/") + (str existing (impl/id->path id)) + (str existing "/" (impl/id->path id))))))) (defmethod impl/del-object :fs - [{:keys [executor] :as backend} {:keys [id] :as object}] + [{:keys [::wrk/executor] :as backend} {:keys [id] :as object}] + (us/assert! ::backend backend) (px/with-dispatch executor - (let [base (fs/path (:directory backend)) + (let [base (fs/path (::directory backend)) path (fs/path (impl/id->path id)) path (fs/join base path)] (Files/deleteIfExists ^Path path)))) (defmethod impl/del-objects-in-bulk :fs - [{:keys [executor] :as backend} ids] + [{:keys [::wrk/executor] :as backend} ids] + (us/assert! ::backend backend) (px/with-dispatch executor - (let [base (fs/path (:directory backend))] + (let [base (fs/path (::directory backend))] (doseq [id ids] (let [path (fs/path (impl/id->path id)) path (fs/join base path)] diff --git a/backend/src/app/storage/impl.clj b/backend/src/app/storage/impl.clj index a4b60335b..771ea95e7 100644 --- a/backend/src/app/storage/impl.clj +++ b/backend/src/app/storage/impl.clj @@ -9,9 +9,13 @@ (:require [app.common.data.macros :as dm] [app.common.exceptions :as ex] + [app.db :as-alias db] + [app.storage :as-alias sto] + [app.worker :as-alias wrk] [buddy.core.codecs :as bc] [buddy.core.hash :as bh] [clojure.java.io :as jio] + [clojure.spec.alpha :as s] [datoteka.io :as io]) (:import java.nio.ByteBuffer @@ -21,7 +25,7 @@ ;; --- API Definition -(defmulti put-object (fn [cfg _ _] (:type cfg))) +(defmulti put-object (fn [cfg _ _] (::sto/type cfg))) (defmethod put-object :default [cfg _ _] @@ -29,7 +33,7 @@ :code :invalid-storage-backend :context cfg)) -(defmulti get-object-data (fn [cfg _] (:type cfg))) +(defmulti get-object-data (fn [cfg _] (::sto/type cfg))) (defmethod get-object-data :default [cfg _] @@ -37,7 +41,7 @@ :code :invalid-storage-backend :context cfg)) -(defmulti get-object-bytes (fn [cfg _] (:type cfg))) +(defmulti get-object-bytes (fn [cfg _] (::sto/type cfg))) (defmethod get-object-bytes :default [cfg _] @@ -45,7 +49,7 @@ :code :invalid-storage-backend :context cfg)) -(defmulti get-object-url (fn [cfg _ _] (:type cfg))) +(defmulti get-object-url (fn [cfg _ _] (::sto/type cfg))) (defmethod get-object-url :default [cfg _ _] @@ -54,7 +58,7 @@ :context cfg)) -(defmulti del-object (fn [cfg _] (:type cfg))) +(defmulti del-object (fn [cfg _] (::sto/type cfg))) (defmethod del-object :default [cfg _] @@ -62,7 +66,7 @@ :code :invalid-storage-backend :context cfg)) -(defmulti del-objects-in-bulk (fn [cfg _] (:type cfg))) +(defmulti del-objects-in-bulk (fn [cfg _] (::sto/type cfg))) (defmethod del-objects-in-bulk :default [cfg _] @@ -189,10 +193,6 @@ (make-output-stream [_ opts] (jio/make-output-stream content opts)))) -(defn content? - [v] - (satisfies? IContentObject v)) - (defn calculate-hash [resource] (let [result (with-open [input (io/input-stream resource)] @@ -201,13 +201,37 @@ (str "blake2b:" result))) (defn resolve-backend - [{:keys [conn pool executor] :as storage} backend-id] - (let [backend (get-in storage [:backends backend-id])] + [{:keys [::db/pool ::wrk/executor] :as storage} backend-id] + (let [backend (get-in storage [::sto/backends backend-id])] (when-not backend (ex/raise :type :internal :code :backend-not-configured :hint (dm/fmt "backend '%' not configured" backend-id))) - (assoc backend - :executor executor - :conn (or conn pool) - :id backend-id))) + (-> backend + (assoc ::sto/id backend-id) + (assoc ::wrk/executor executor) + (assoc ::db/pool pool)))) + +(defrecord StorageObject [id size created-at expired-at touched-at backend]) + +(ns-unmap *ns* '->StorageObject) +(ns-unmap *ns* 'map->StorageObject) + +(defn storage-object + ([id size created-at expired-at touched-at backend] + (StorageObject. id size created-at expired-at touched-at backend)) + ([id size created-at expired-at touched-at backend mdata] + (StorageObject. id size created-at expired-at touched-at backend mdata nil))) + +(defn object? + [v] + (instance? StorageObject v)) + +(defn content? + [v] + (satisfies? IContentObject v)) + +(s/def ::object object?) +(s/def ::content content?) + + diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index 6933b3d41..fc26cccb4 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -8,9 +8,12 @@ "S3 Storage backend implementation." (:require [app.common.data :as d] + [app.common.data.macros :as dm] [app.common.exceptions :as ex] + [app.common.logging :as l] [app.common.spec :as us] [app.common.uri :as u] + [app.storage :as-alias sto] [app.storage.impl :as impl] [app.storage.tmp :as tmp] [app.util.time :as dt] @@ -64,6 +67,9 @@ (declare build-s3-client) (declare build-s3-presigner) +;; (set! *warn-on-reflection* true) +;; (set! *unchecked-math* :warn-on-boxed) + ;; --- BACKEND INIT (s/def ::region ::us/keyword) @@ -72,26 +78,26 @@ (s/def ::endpoint ::us/string) (defmethod ig/pre-init-spec ::backend [_] - (s/keys :opt-un [::region ::bucket ::prefix ::endpoint ::wrk/executor])) + (s/keys :opt [::region ::bucket ::prefix ::endpoint ::wrk/executor])) (defmethod ig/prep-key ::backend - [_ {:keys [prefix region] :as cfg}] + [_ {:keys [::prefix ::region] :as cfg}] (cond-> (d/without-nils cfg) - (some? prefix) (assoc :prefix prefix) - (nil? region) (assoc :region :eu-central-1))) + (some? prefix) (assoc ::prefix prefix) + (nil? region) (assoc ::region :eu-central-1))) (defmethod ig/init-key ::backend [_ cfg] ;; Return a valid backend data structure only if all optional ;; parameters are provided. - (when (and (contains? cfg :region) - (string? (:bucket cfg))) + (when (and (contains? cfg ::region) + (string? (::bucket cfg))) (let [client (build-s3-client cfg) presigner (build-s3-presigner cfg)] (assoc cfg - :client @client - :presigner presigner - :type :s3 + ::sto/type :s3 + ::client @client + ::presigner presigner ::close-fn #(.close ^java.lang.AutoCloseable client))))) (defmethod ig/halt-key! ::backend @@ -99,21 +105,27 @@ (when (fn? close-fn) (px/run! close-fn))) -(s/def ::type ::us/keyword) (s/def ::client #(instance? S3AsyncClient %)) (s/def ::presigner #(instance? S3Presigner %)) (s/def ::backend - (s/keys :req-un [::region ::bucket ::client ::type ::presigner] - :opt-un [::prefix])) + (s/keys :req [::region + ::bucket + ::client + ::presigner] + :opt [::prefix + ::sto/id + ::wrk/executor])) ;; --- API IMPL (defmethod impl/put-object :s3 [backend object content] + (us/assert! ::backend backend) (put-object backend object content)) (defmethod impl/get-object-data :s3 [backend object] + (us/assert! ::backend backend) (letfn [(no-such-key? [cause] (instance? software.amazon.awssdk.services.s3.model.NoSuchKeyException cause)) (handle-not-found [cause] @@ -127,18 +139,22 @@ (defmethod impl/get-object-bytes :s3 [backend object] + (us/assert! ::backend backend) (get-object-bytes backend object)) (defmethod impl/get-object-url :s3 [backend object options] + (us/assert! ::backend backend) (get-object-url backend object options)) (defmethod impl/del-object :s3 [backend object] + (us/assert! ::backend backend) (del-object backend object)) (defmethod impl/del-objects-in-bulk :s3 [backend ids] + (us/assert! ::backend backend) (del-object-in-bulk backend ids)) ;; --- HELPERS @@ -152,8 +168,8 @@ [region] (Region/of (name region))) -(defn build-s3-client - [{:keys [region endpoint executor]}] +(defn- build-s3-client + [{:keys [::region ::endpoint ::wrk/executor]}] (let [aconfig (-> (ClientAsyncConfiguration/builder) (.advancedOption SdkAdvancedAsyncClientOption/FUTURE_COMPLETION_EXECUTOR executor) (.build)) @@ -188,8 +204,8 @@ (.close ^NettyNioAsyncHttpClient hclient) (.close ^S3AsyncClient client))))) -(defn build-s3-presigner - [{:keys [region endpoint]}] +(defn- build-s3-presigner + [{:keys [::region ::endpoint]}] (let [config (-> (S3Configuration/builder) (cond-> (some? endpoint) (.pathStyleAccessEnabled true)) (.build))] @@ -200,65 +216,87 @@ (.serviceConfiguration ^S3Configuration config) (.build)))) +(defn- upload-thread + [id subscriber sem content] + (px/thread + {:name "penpot/s3/uploader" + :daemon true} + (l/trace :hint "start upload thread" + :object-id (str id) + :size (impl/get-size content) + ::l/sync? true) + (let [stream (io/input-stream content) + bsize (* 1024 64) + tpoint (dt/tpoint)] + (try + (loop [] + (.acquire ^Semaphore sem 1) + (let [buffer (byte-array bsize) + readed (.read ^InputStream stream buffer)] + (when (pos? readed) + (let [data (ByteBuffer/wrap ^bytes buffer 0 readed)] + (.onNext ^Subscriber subscriber ^ByteBuffer data) + (when (= readed bsize) + (recur)))))) + (.onComplete ^Subscriber subscriber) + (catch InterruptedException _ + (l/trace :hint "interrupted upload thread" + :object-:id (str id) + ::l/sync? true) + nil) + (catch Throwable cause + (.onError ^Subscriber subscriber cause)) + (finally + (l/trace :hint "end upload thread" + :object-id (str id) + :elapsed (dt/format-duration (tpoint)) + ::l/sync? true) + (.close ^InputStream stream)))))) + (defn- make-request-body - [content] - (let [is (io/input-stream content) - buff-size (* 1024 64) - sem (Semaphore. 0) + [id content] + (reify + AsyncRequestBody + (contentLength [_] + (Optional/of (long (impl/get-size content)))) - writer-fn (fn [^Subscriber s] - (try - (loop [] - (.acquire sem 1) - (let [buffer (byte-array buff-size) - readed (.read is buffer)] - (when (pos? readed) - (.onNext ^Subscriber s (ByteBuffer/wrap buffer 0 readed)) - (when (= readed buff-size) - (recur))))) - (.onComplete s) - (catch Throwable cause - (.onError s cause)) - (finally - (.close ^InputStream is))))] - - (reify - AsyncRequestBody - (contentLength [_] - (Optional/of (long (impl/get-size content)))) - - (^void subscribe [_ ^Subscriber s] - (let [thread (Thread. #(writer-fn s))] - (.setDaemon thread true) - (.setName thread "penpot/storage:s3") - (.start thread) - - (.onSubscribe s (reify Subscription - (cancel [_] - (.interrupt thread) - (.release sem 1)) - (request [_ n] - (.release sem (int n)))))))))) + (^void subscribe [_ ^Subscriber subscriber] + (let [sem (Semaphore. 0) + thr (upload-thread id subscriber sem content)] + (.onSubscribe subscriber + (reify Subscription + (cancel [_] + (px/interrupt! thr) + (.release sem 1)) + (request [_ n] + (.release sem (int n))))))))) -(defn put-object - [{:keys [client bucket prefix]} {:keys [id] :as object} content] - (p/let [path (str prefix (impl/id->path id)) - mdata (meta object) - mtype (:content-type mdata "application/octet-stream") - request (.. (PutObjectRequest/builder) - (bucket bucket) - (contentType mtype) - (key path) - (build))] +(defn- put-object + [{:keys [::client ::bucket ::prefix]} {:keys [id] :as object} content] + (let [path (dm/str prefix (impl/id->path id)) + mdata (meta object) + mtype (:content-type mdata "application/octet-stream") + rbody (make-request-body id content) + request (.. (PutObjectRequest/builder) + (bucket bucket) + (contentType mtype) + (key path) + (build))] + (->> (.putObject ^S3AsyncClient client + ^PutObjectRequest request + ^AsyncRequestBody rbody) + (p/fmap (constantly object))))) - (let [content (make-request-body content)] - (.putObject ^S3AsyncClient client - ^PutObjectRequest request - ^AsyncRequestBody content)))) +(defn- path->stream + [path] + (proxy [FilterInputStream] [(io/input-stream path)] + (close [] + (fs/delete path) + (proxy-super close)))) -(defn get-object-data - [{:keys [client bucket prefix]} {:keys [id size]}] +(defn- get-object-data + [{:keys [::client ::bucket ::prefix]} {:keys [id size]}] (let [gor (.. (GetObjectRequest/builder) (bucket bucket) (key (str prefix (impl/id->path id))) @@ -267,83 +305,83 @@ ;; If the file size is greater than 2MiB then stream the content ;; to the filesystem and then read with buffered inputstream; if ;; not, read the contento into memory using bytearrays. - (if (> size (* 1024 1024 2)) - (p/let [path (tmp/tempfile :prefix "penpot.storage.s3.") - rxf (AsyncResponseTransformer/toFile ^Path path) - _ (.getObject ^S3AsyncClient client - ^GetObjectRequest gor - ^AsyncResponseTransformer rxf)] - (proxy [FilterInputStream] [(io/input-stream path)] - (close [] - (fs/delete path) - (proxy-super close)))) + (if (> ^long size (* 1024 1024 2)) + (let [path (tmp/tempfile :prefix "penpot.storage.s3.") + rxf (AsyncResponseTransformer/toFile ^Path path)] + (->> (.getObject ^S3AsyncClient client + ^GetObjectRequest gor + ^AsyncResponseTransformer rxf) + (p/fmap (constantly path)) + (p/fmap path->stream))) - (p/let [rxf (AsyncResponseTransformer/toBytes) - obj (.getObject ^S3AsyncClient client - ^GetObjectRequest gor - ^AsyncResponseTransformer rxf)] - (.asInputStream ^ResponseBytes obj))))) + (let [rxf (AsyncResponseTransformer/toBytes)] + (->> (.getObject ^S3AsyncClient client + ^GetObjectRequest gor + ^AsyncResponseTransformer rxf) + (p/fmap #(.asInputStream ^ResponseBytes %))))))) -(defn get-object-bytes - [{:keys [client bucket prefix]} {:keys [id]}] - (p/let [gor (.. (GetObjectRequest/builder) - (bucket bucket) - (key (str prefix (impl/id->path id))) - (build)) - rxf (AsyncResponseTransformer/toBytes) - obj (.getObject ^S3AsyncClient client - ^GetObjectRequest gor - ^AsyncResponseTransformer rxf)] - (.asByteArray ^ResponseBytes obj))) +(defn- get-object-bytes + [{:keys [::client ::bucket ::prefix]} {:keys [id]}] + (let [gor (.. (GetObjectRequest/builder) + (bucket bucket) + (key (str prefix (impl/id->path id))) + (build)) + rxf (AsyncResponseTransformer/toBytes)] + (->> (.getObject ^S3AsyncClient client + ^GetObjectRequest gor + ^AsyncResponseTransformer rxf) + (p/fmap #(.asByteArray ^ResponseBytes %))))) (def default-max-age (dt/duration {:minutes 10})) -(defn get-object-url - [{:keys [presigner bucket prefix]} {:keys [id]} {:keys [max-age] :or {max-age default-max-age}}] +(defn- get-object-url + [{:keys [::presigner ::bucket ::prefix]} {:keys [id]} {:keys [max-age] :or {max-age default-max-age}}] (us/assert dt/duration? max-age) - (p/do - (let [gor (.. (GetObjectRequest/builder) - (bucket bucket) - (key (str prefix (impl/id->path id))) - (build)) - gopr (.. (GetObjectPresignRequest/builder) - (signatureDuration ^Duration max-age) - (getObjectRequest ^GetObjectRequest gor) - (build)) - pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)] - (u/uri (str (.url ^PresignedGetObjectRequest pgor)))))) + (let [gor (.. (GetObjectRequest/builder) + (bucket bucket) + (key (dm/str prefix (impl/id->path id))) + (build)) + gopr (.. (GetObjectPresignRequest/builder) + (signatureDuration ^Duration max-age) + (getObjectRequest ^GetObjectRequest gor) + (build)) + pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)] + (p/resolved + (u/uri (str (.url ^PresignedGetObjectRequest pgor)))))) -(defn del-object - [{:keys [bucket client prefix]} {:keys [id] :as obj}] - (p/let [dor (.. (DeleteObjectRequest/builder) - (bucket bucket) - (key (str prefix (impl/id->path id))) - (build))] - (.deleteObject ^S3AsyncClient client - ^DeleteObjectRequest dor))) +(defn- del-object + [{:keys [::bucket ::client ::prefix]} {:keys [id] :as obj}] + (let [dor (.. (DeleteObjectRequest/builder) + (bucket bucket) + (key (dm/str prefix (impl/id->path id))) + (build))] + (->> (.deleteObject ^S3AsyncClient client ^DeleteObjectRequest dor) + (p/fmap (constantly nil))))) -(defn del-object-in-bulk - [{:keys [bucket client prefix]} ids] - (p/let [oids (map (fn [id] - (.. (ObjectIdentifier/builder) - (key (str prefix (impl/id->path id))) - (build))) - ids) - delc (.. (Delete/builder) - (objects ^Collection oids) - (build)) - dor (.. (DeleteObjectsRequest/builder) - (bucket bucket) - (delete ^Delete delc) - (build)) - dres (.deleteObjects ^S3AsyncClient client - ^DeleteObjectsRequest dor)] - (when (.hasErrors ^DeleteObjectsResponse dres) - (let [errors (seq (.errors ^DeleteObjectsResponse dres))] - (ex/raise :type :internal - :code :error-on-s3-bulk-delete - :s3-errors (mapv (fn [^S3Error error] - {:key (.key error) - :msg (.message error)}) - errors)))))) +(defn- del-object-in-bulk + [{:keys [::bucket ::client ::prefix]} ids] + + (let [oids (map (fn [id] + (.. (ObjectIdentifier/builder) + (key (str prefix (impl/id->path id))) + (build))) + ids) + delc (.. (Delete/builder) + (objects ^Collection oids) + (build)) + dor (.. (DeleteObjectsRequest/builder) + (bucket bucket) + (delete ^Delete delc) + (build))] + + (->> (.deleteObjects ^S3AsyncClient client ^DeleteObjectsRequest dor) + (p/fmap (fn [dres] + (when (.hasErrors ^DeleteObjectsResponse dres) + (let [errors (seq (.errors ^DeleteObjectsResponse dres))] + (ex/raise :type :internal + :code :error-on-s3-bulk-delete + :s3-errors (mapv (fn [^S3Error error] + {:key (.key error) + :msg (.message error)}) + errors))))))))) diff --git a/backend/src/app/tasks/file_gc.clj b/backend/src/app/tasks/file_gc.clj index 81d14eeca..f673213b3 100644 --- a/backend/src/app/tasks/file_gc.clj +++ b/backend/src/app/tasks/file_gc.clj @@ -32,27 +32,24 @@ ;; HANDLER ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(s/def ::min-age ::dt/duration) - (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::db/pool ::min-age])) + (s/keys :req [::db/pool])) (defmethod ig/prep-key ::handler [_ cfg] - (merge {:min-age cf/deletion-delay} - (d/without-nils cfg))) + (assoc cfg ::min-age cf/deletion-delay)) (defmethod ig/init-key ::handler - [_ {:keys [pool] :as cfg}] + [_ {:keys [::db/pool] :as cfg}] (fn [{:keys [file-id] :as params}] (db/with-atomic [conn pool] - (let [min-age (or (:min-age params) (:min-age cfg)) - cfg (assoc cfg :min-age min-age :conn conn :file-id file-id)] + (let [min-age (or (:min-age params) (::min-age cfg)) + cfg (assoc cfg ::min-age min-age ::conn conn ::file-id file-id)] (loop [total 0 files (retrieve-candidates cfg)] (if-let [file (first files)] (do - (process-file cfg file) + (process-file conn file) (recur (inc total) (rest files))) (do @@ -84,7 +81,7 @@ for update skip locked") (defn- retrieve-candidates - [{:keys [conn min-age file-id] :as cfg}] + [{:keys [::conn ::min-age ::file-id]}] (if (uuid? file-id) (do (l/warn :hint "explicit file id passed on params" :file-id file-id) @@ -256,7 +253,7 @@ (db/delete! conn :file-data-fragment {:id fragment-id :file-id file-id})))) (defn- process-file - [{:keys [conn] :as cfg} {:keys [id data revn modified-at features] :as file}] + [conn {:keys [id data revn modified-at features] :as file}] (l/debug :hint "processing file" :id id :modified-at modified-at) (binding [pmap/*load-fn* (partial files/load-pointer conn id)] diff --git a/backend/src/app/tasks/file_xlog_gc.clj b/backend/src/app/tasks/file_xlog_gc.clj index 561f0548b..5ee2e1bbc 100644 --- a/backend/src/app/tasks/file_xlog_gc.clj +++ b/backend/src/app/tasks/file_xlog_gc.clj @@ -8,42 +8,36 @@ "A maintenance task that performs a garbage collection of the file change (transaction) log." (:require - [app.common.data :as d] [app.common.logging :as l] [app.db :as db] [app.util.time :as dt] [clojure.spec.alpha :as s] [integrant.core :as ig])) -(declare sql:delete-files-xlog) - -(s/def ::min-age ::dt/duration) +(def ^:private + sql:delete-files-xlog + "delete from file_change + where created_at < now() - ?::interval") (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::db/pool] - :opt-un [::min-age])) + (s/keys :req [::db/pool])) (defmethod ig/prep-key ::handler [_ cfg] - (merge {:min-age (dt/duration {:hours 72})} - (d/without-nils cfg))) + (assoc cfg ::min-age (dt/duration {:hours 72}))) (defmethod ig/init-key ::handler - [_ {:keys [pool] :as cfg}] + [_ {:keys [::db/pool] :as cfg}] (fn [params] - (let [min-age (or (:min-age params) (:min-age cfg))] + (let [min-age (or (:min-age params) (::min-age cfg))] (db/with-atomic [conn pool] (let [interval (db/interval min-age) result (db/exec-one! conn [sql:delete-files-xlog interval]) - result (:next.jdbc/update-count result)] + result (db/get-update-count result)] + (l/info :hint "task finished" :min-age (dt/format-duration min-age) :total result) (when (:rollback? params) (db/rollback! conn)) result))))) - -(def ^:private - sql:delete-files-xlog - "delete from file_change - where created_at < now() - ?::interval") diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index eacd52341..4169cd88f 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -25,16 +25,12 @@ (declare ^:private delete-files!) (declare ^:private delete-orphan-teams!) -(s/def ::min-age ::dt/duration) - (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req [::db/pool ::sto/storage] - :opt [::min-age])) + (s/keys :req [::db/pool ::sto/storage])) (defmethod ig/prep-key ::handler [_ cfg] - (merge {::min-age cf/deletion-delay} - (d/without-nils cfg))) + (assoc cfg ::min-age cf/deletion-delay)) (defmethod ig/init-key ::handler [_ {:keys [::db/pool ::sto/storage] :as cfg}] @@ -133,7 +129,6 @@ :kf first :initk (dt/now))))) - (def ^:private sql:get-orphan-teams-chunk "select t.id, t.created_at from team as t @@ -154,14 +149,15 @@ [(some->> rows peek :created-at) rows]))] (reduce (fn [total {:keys [id]}] - (l/debug :hint "mark team for deletion" :id (str id)) + (let [result (db/update! conn :team + {:deleted-at (dt/now)} + {:id id :deleted-at nil} + {::db/return-keys? false}) + count (db/get-update-count result)] + (when (pos? count) + (l/debug :hint "mark team for deletion" :id (str id) )) - ;; And finally, permanently delete the team. - (db/update! conn :team - {:deleted-at (dt/now)} - {:id id}) - - (inc total)) + (+ total count))) 0 (d/iteration get-chunk :vf second diff --git a/backend/src/app/tasks/tasks_gc.clj b/backend/src/app/tasks/tasks_gc.clj index 81155f494..69dd11dfd 100644 --- a/backend/src/app/tasks/tasks_gc.clj +++ b/backend/src/app/tasks/tasks_gc.clj @@ -8,35 +8,33 @@ "A maintenance task that performs a cleanup of already executed tasks from the database table." (:require - [app.common.data :as d] [app.common.logging :as l] [app.config :as cf] [app.db :as db] - [app.util.time :as dt] [clojure.spec.alpha :as s] [integrant.core :as ig])) -(declare sql:delete-completed-tasks) - -(s/def ::min-age ::dt/duration) +(def ^:private + sql:delete-completed-tasks + "delete from task_completed + where scheduled_at < now() - ?::interval") (defmethod ig/pre-init-spec ::handler [_] - (s/keys :req-un [::db/pool] - :opt-un [::min-age])) + (s/keys :req [::db/pool])) (defmethod ig/prep-key ::handler [_ cfg] - (merge {:min-age cf/deletion-delay} - (d/without-nils cfg))) + (assoc cfg ::min-age cf/deletion-delay)) (defmethod ig/init-key ::handler - [_ {:keys [pool] :as cfg}] + [_ {:keys [::db/pool ::min-age] :as cfg}] (fn [params] - (let [min-age (or (:min-age params) (:min-age cfg))] + (let [min-age (or (:min-age params) min-age)] (db/with-atomic [conn pool] (let [interval (db/interval min-age) result (db/exec-one! conn [sql:delete-completed-tasks interval]) - result (:next.jdbc/update-count result)] + result (db/get-update-count result)] + (l/debug :hint "task finished" :total result) (when (:rollback? params) @@ -44,7 +42,3 @@ result))))) -(def ^:private - sql:delete-completed-tasks - "delete from task_completed - where scheduled_at < now() - ?::interval") diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index f3534f94e..8f4b9a50f 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -90,10 +90,10 @@ (s/def ::registry (s/map-of ::us/string fn?)) (defmethod ig/pre-init-spec ::registry [_] - (s/keys :req-un [::mtx/metrics ::tasks])) + (s/keys :req [::mtx/metrics ::tasks])) (defmethod ig/init-key ::registry - [_ {:keys [metrics tasks]}] + [_ {:keys [::mtx/metrics ::tasks]}] (l/info :hint "registry initialized" :tasks (count tasks)) (reduce-kv (fn [registry k v] (let [tname (name k)] diff --git a/backend/test/backend_tests/rpc_media_test.clj b/backend/test/backend_tests/rpc_media_test.clj index 138ac4d6a..ab2cd1de9 100644 --- a/backend/test/backend_tests/rpc_media_test.clj +++ b/backend/test/backend_tests/rpc_media_test.clj @@ -44,8 +44,8 @@ (let [storage (:app.storage/storage th/*system*) mobj1 @(sto/get-object storage media-id) mobj2 @(sto/get-object storage thumbnail-id)] - (t/is (sto/storage-object? mobj1)) - (t/is (sto/storage-object? mobj2)) + (t/is (sto/object? mobj1)) + (t/is (sto/object? mobj2)) (t/is (= 122785 (:size mobj1))) ;; This is because in ubuntu 21.04 generates different ;; thumbnail that in ubuntu 22.04. This hack should be removed @@ -85,8 +85,8 @@ (let [storage (:app.storage/storage th/*system*) mobj1 @(sto/get-object storage media-id) mobj2 @(sto/get-object storage thumbnail-id)] - (t/is (sto/storage-object? mobj1)) - (t/is (sto/storage-object? mobj2)) + (t/is (sto/object? mobj1)) + (t/is (sto/object? mobj2)) (t/is (= 312043 (:size mobj1))) (t/is (= 3887 (:size mobj2))))) )) @@ -164,8 +164,8 @@ (let [storage (:app.storage/storage th/*system*) mobj1 @(sto/get-object storage media-id) mobj2 @(sto/get-object storage thumbnail-id)] - (t/is (sto/storage-object? mobj1)) - (t/is (sto/storage-object? mobj2)) + (t/is (sto/object? mobj1)) + (t/is (sto/object? mobj2)) (t/is (= 122785 (:size mobj1))) ;; This is because in ubuntu 21.04 generates different ;; thumbnail that in ubuntu 22.04. This hack should be removed @@ -205,8 +205,8 @@ (let [storage (:app.storage/storage th/*system*) mobj1 @(sto/get-object storage media-id) mobj2 @(sto/get-object storage thumbnail-id)] - (t/is (sto/storage-object? mobj1)) - (t/is (sto/storage-object? mobj2)) + (t/is (sto/object? mobj1)) + (t/is (sto/object? mobj2)) (t/is (= 312043 (:size mobj1))) (t/is (= 3887 (:size mobj2))))) )) diff --git a/backend/test/backend_tests/storage_test.clj b/backend/test/backend_tests/storage_test.clj index b47e98b09..032e85c2e 100644 --- a/backend/test/backend_tests/storage_test.clj +++ b/backend/test/backend_tests/storage_test.clj @@ -27,11 +27,11 @@ "Given storage map, returns a storage configured with the appropriate backend for assets." ([storage] - (assoc storage :backend :assets-fs)) + (assoc storage ::sto/backend :assets-fs)) ([storage conn] (-> storage - (assoc :conn conn) - (assoc :backend :assets-fs)))) + (assoc ::db/pool-or-conn conn) + (assoc ::sto/backend :assets-fs)))) (t/deftest put-and-retrieve-object (let [storage (-> (:app.storage/storage th/*system*) @@ -40,8 +40,10 @@ object @(sto/put-object! storage {::sto/content content :content-type "text/plain" :other "data"})] - (t/is (sto/storage-object? object)) + + (t/is (sto/object? object)) (t/is (fs/path? @(sto/get-object-path storage object))) + (t/is (nil? (:expired-at object))) (t/is (= :assets-fs (:backend object))) (t/is (= "data" (:other (meta object)))) @@ -58,7 +60,8 @@ ::sto/expired-at (dt/in-future {:seconds 1}) :content-type "text/plain" })] - (t/is (sto/storage-object? object)) + + (t/is (sto/object? object)) (t/is (dt/instant? (:expired-at object))) (t/is (dt/is-after? (:expired-at object) (dt/now))) (t/is (= object @(sto/get-object storage (:id object)))) @@ -77,7 +80,7 @@ object @(sto/put-object! storage {::sto/content content :content-type "text/plain" :expired-at (dt/in-future {:seconds 1})})] - (t/is (sto/storage-object? object)) + (t/is (sto/object? object)) (t/is (true? @(sto/del-object! storage object))) ;; retrieving the same object should be not nil because the diff --git a/common/src/app/common/data.cljc b/common/src/app/common/data.cljc index 6a8cf1d23..ff09a005c 100644 --- a/common/src/app/common/data.cljc +++ b/common/src/app/common/data.cljc @@ -8,7 +8,8 @@ "A collection if helpers for working with data structures and other data resources." (:refer-clojure :exclude [read-string hash-map merge name update-vals - parse-double group-by iteration concat mapcat]) + parse-double group-by iteration concat mapcat + parse-uuid]) #?(:cljs (:require-macros [app.common.data])) @@ -17,6 +18,7 @@ :clj [clojure.edn :as r]) #?(:cljs [cljs.core :as c] :clj [clojure.core :as c]) + [app.common.exceptions :as ex] [app.common.math :as mth] [clojure.set :as set] [cuerdas.core :as str] @@ -516,6 +518,10 @@ default v)))) +(defn parse-uuid + [v] + (ex/ignoring (c/parse-uuid v))) + (defn num-string? [v] ;; https://stackoverflow.com/questions/175739/built-in-way-in-javascript-to-check-if-a-string-is-a-valid-number #?(:cljs (and (string? v)