From dfdc9c9fa5688680bde528c7968d758a2b19da57 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 3 Mar 2023 14:05:26 +0100 Subject: [PATCH] :recycle: Refactor storage internal concurrency model --- backend/src/app/http/assets.clj | 59 +++---- backend/src/app/rpc/commands/binfile.clj | 7 +- backend/src/app/rpc/commands/fonts.clj | 14 +- backend/src/app/rpc/commands/media.clj | 7 +- backend/src/app/rpc/commands/profile.clj | 7 +- backend/src/app/rpc/commands/teams.clj | 18 +-- backend/src/app/storage.clj | 146 ++++++++---------- backend/src/app/storage/fs.clj | 96 +++++------- backend/src/app/storage/impl.clj | 6 +- backend/src/app/storage/s3.clj | 34 ++-- backend/src/app/tasks/objects_gc.clj | 12 +- backend/test/backend_tests/helpers.clj | 25 +-- backend/test/backend_tests/rpc_file_test.clj | 16 +- .../backend_tests/rpc_management_test.clj | 26 ++-- backend/test/backend_tests/rpc_media_test.clj | 16 +- backend/test/backend_tests/storage_test.clj | 62 ++++---- 16 files changed, 261 insertions(+), 290 deletions(-) diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index 7a318adf8..efd494249 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -14,10 +14,8 @@ [app.db :as db] [app.storage :as sto] [app.util.time :as dt] - [app.worker :as wrk] [clojure.spec.alpha :as s] [integrant.core :as ig] - [promesa.core :as p] [yetti.response :as-alias yrs])) (def ^:private cache-max-age @@ -38,15 +36,12 @@ (defn- serve-object-from-s3 [{:keys [::sto/storage] :as cfg} obj] - (let [mdata (meta obj)] - (->> (sto/get-object-url storage obj {:max-age signature-max-age}) - (p/fmap (fn [{:keys [host port] :as url}] - (let [headers {"location" (str url) - "x-host" (cond-> host port (str ":" port)) - "x-mtype" (:content-type mdata) - "cache-control" (str "max-age=" (inst-ms cache-max-age))}] - {::yrs/status 307 - ::yrs/headers headers})))))) + (let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})] + {::yrs/status 307 + ::yrs/headers {"location" (str url) + "x-host" (cond-> host port (str ":" port)) + "x-mtype" (-> obj meta :content-type) + "cache-control" (str "max-age=" (inst-ms cache-max-age))}})) (defn- serve-object-from-fs [{:keys [::path]} obj] @@ -56,9 +51,8 @@ headers {"x-accel-redirect" (:path purl) "content-type" (:content-type mdata) "cache-control" (str "max-age=" (inst-ms cache-max-age))}] - (p/resolved - {::yrs/status 204 - ::yrs/headers headers}))) + {::yrs/status 204 + ::yrs/headers headers})) (defn- serve-object "Helper function that returns the appropriate response depending on @@ -71,37 +65,34 @@ (defn objects-handler "Handler that servers storage objects by id." - [{:keys [::sto/storage ::wrk/executor] :as cfg} request] - (->> (get-id request) - (p/mcat executor (fn [id] (sto/get-object storage id))) - (p/mcat executor (fn [obj] - (if (some? obj) - (serve-object cfg obj) - (p/resolved {::yrs/status 404})))) - (p/await!))) + [{:keys [::sto/storage] :as cfg} request] + (let [id (get-id request) + obj (sto/get-object storage id)] + (if obj + (serve-object cfg obj) + {::yrs/status 404}))) (defn- generic-handler "A generic handler helper/common code for file-media based handlers." - [{:keys [::sto/storage ::wrk/executor] :as cfg} request kf] - (let [pool (::db/pool storage)] - (->> (get-id request) - (p/fmap executor (fn [id] (get-file-media-object pool id))) - (p/mcat executor (fn [mobj] (sto/get-object storage (kf mobj)))) - (p/mcat executor (fn [sobj] - (if sobj - (serve-object cfg sobj) - (p/resolved {::yrs/status 404}))))))) + [{:keys [::sto/storage] :as cfg} request kf] + (let [pool (::db/pool storage) + id (get-id request) + mobj (get-file-media-object pool id) + sobj (sto/get-object storage (kf mobj))] + (if sobj + (serve-object cfg sobj) + {::yrs/status 404}))) (defn file-objects-handler "Handler that serves storage objects by file media id." [cfg request] - (p/await! (generic-handler cfg request :media-id))) + (generic-handler cfg request :media-id)) (defn file-thumbnails-handler "Handler that serves storage objects by thumbnail-id and quick fallback to file-media-id if no thumbnail is available." [cfg request] - (p/await! (generic-handler cfg request #(or (:thumbnail-id %) (:media-id %))))) + (generic-handler cfg request #(or (:thumbnail-id %) (:media-id %)))) ;; --- Initialization @@ -109,7 +100,7 @@ (s/def ::routes vector?) (defmethod ig/pre-init-spec ::routes [_] - (s/keys :req [::sto/storage ::wrk/executor ::path])) + (s/keys :req [::sto/storage ::path])) (defmethod ig/init-key ::routes [_ cfg] diff --git a/backend/src/app/rpc/commands/binfile.clj b/backend/src/app/rpc/commands/binfile.clj index c34331b7a..33e0ed04d 100644 --- a/backend/src/app/rpc/commands/binfile.clj +++ b/backend/src/app/rpc/commands/binfile.clj @@ -37,7 +37,6 @@ [clojure.walk :as walk] [cuerdas.core :as str] [datoteka.io :as io] - [promesa.core :as p] [yetti.adapter :as yt] [yetti.response :as yrs]) (:import @@ -527,13 +526,13 @@ (write-obj! output sids) (doseq [id sids] - (let [{:keys [size] :as obj} (p/await! (sto/get-object storage id))] + (let [{:keys [size] :as obj} (sto/get-object storage id)] (l/debug :hint "write sobject" :id id ::l/sync? true) (doto output (write-uuid! id) (write-obj! (meta obj))) - (with-open [^InputStream stream (p/await! (sto/get-object-data storage obj))] + (with-open [^InputStream stream (sto/get-object-data storage obj)] (let [written (write-stream! output stream size)] (when (not= written size) (ex/raise :type :validation @@ -719,7 +718,7 @@ (assoc ::sto/touched-at (dt/now)) (assoc :bucket "file-media-object")) - sobject (p/await! (sto/put-object! storage params))] + sobject (sto/put-object! storage params)] (l/debug :hint "persisted storage object" :id id :new-id (:id sobject) ::l/sync? true) (vswap! *state* update :index assoc id (:id sobject))))) diff --git a/backend/src/app/rpc/commands/fonts.clj b/backend/src/app/rpc/commands/fonts.clj index c52dcc613..5aab17fde 100644 --- a/backend/src/app/rpc/commands/fonts.clj +++ b/backend/src/app/rpc/commands/fonts.clj @@ -6,6 +6,7 @@ (ns app.rpc.commands.fonts (:require + [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uuid :as uuid] @@ -24,8 +25,7 @@ [app.storage :as sto] [app.util.services :as sv] [app.util.time :as dt] - [clojure.spec.alpha :as s] - [promesa.core :as p])) + [clojure.spec.alpha :as s])) (def valid-weight #{100 200 300 400 500 600 700 800 900 950}) (def valid-style #{"normal" "italic"}) @@ -56,7 +56,7 @@ (sv/defmethod ::get-font-variants {::doc/added "1.18"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id file-id project-id] :as params}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (cond (uuid? team-id) (do @@ -134,13 +134,13 @@ wf2-params (prepare-font data "font/woff2")] (cond-> {} (some? otf-params) - (assoc :otf (p/await! (sto/put-object! storage otf-params))) + (assoc :otf (sto/put-object! storage otf-params)) (some? ttf-params) - (assoc :ttf (p/await! (sto/put-object! storage ttf-params))) + (assoc :ttf (sto/put-object! storage ttf-params)) (some? wf1-params) - (assoc :woff1 (p/await! (sto/put-object! storage wf1-params))) + (assoc :woff1 (sto/put-object! storage wf1-params)) (some? wf2-params) - (assoc :woff2 (p/await! (sto/put-object! storage wf2-params)))))) + (assoc :woff2 (sto/put-object! storage wf2-params))))) (insert-font-variant! [{:keys [woff1 woff2 otf ttf]}] (db/insert! pool :team-font-variant diff --git a/backend/src/app/rpc/commands/media.clj b/backend/src/app/rpc/commands/media.clj index ccd2981cc..e910a7c4d 100644 --- a/backend/src/app/rpc/commands/media.clj +++ b/backend/src/app/rpc/commands/media.clj @@ -24,8 +24,7 @@ [app.util.services :as sv] [clojure.spec.alpha :as s] [cuerdas.core :as str] - [datoteka.io :as io] - [promesa.core :as p])) + [datoteka.io :as io])) (def default-max-file-size (* 1024 1024 10)) ; 10 MiB @@ -151,9 +150,9 @@ (let [result (-> (climit/configure cfg :process-image) (climit/submit! (partial process-image content))) - image (p/await! (sto/put-object! storage (::image result))) + image (sto/put-object! storage (::image result)) thumb (when-let [params (::thumb result)] - (p/await! (sto/put-object! storage params)))] + (sto/put-object! storage params))] (db/exec-one! pool [sql:create-file-media-object (or id (uuid/next)) diff --git a/backend/src/app/rpc/commands/profile.clj b/backend/src/app/rpc/commands/profile.clj index 3d75e3e95..4683d370c 100644 --- a/backend/src/app/rpc/commands/profile.clj +++ b/backend/src/app/rpc/commands/profile.clj @@ -27,8 +27,7 @@ [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s] - [cuerdas.core :as str] - [promesa.core :as p])) + [cuerdas.core :as str])) (declare check-profile-existence!) (declare decode-row) @@ -182,7 +181,7 @@ ;; Schedule deletion of old photo (when-let [id (:photo-id profile)] - (p/await! (sto/touch-object! storage id))) + (sto/touch-object! storage id)) ;; Save new photo (db/update! pool :profile @@ -217,7 +216,7 @@ [{:keys [::sto/storage] :as cfg} {:keys [file]}] (let [params (-> (climit/configure cfg :process-image) (climit/submit! (partial generate-thumbnail! file)))] - (p/await! (sto/put-object! storage params)))) + (sto/put-object! storage params))) ;; --- MUTATION: Request Email Change diff --git a/backend/src/app/rpc/commands/teams.clj b/backend/src/app/rpc/commands/teams.clj index 483f20741..6f36b750c 100644 --- a/backend/src/app/rpc/commands/teams.clj +++ b/backend/src/app/rpc/commands/teams.clj @@ -7,6 +7,7 @@ (ns app.rpc.commands.teams (:require [app.common.data :as d] + [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.logging :as l] [app.common.spec :as us] @@ -28,8 +29,7 @@ [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s] - [cuerdas.core :as str] - [promesa.core :as p])) + [cuerdas.core :as str])) ;; --- Helpers & Specs @@ -84,7 +84,7 @@ (sv/defmethod ::get-teams {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id] :as params}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (retrieve-teams conn profile-id))) (def sql:teams @@ -129,7 +129,7 @@ (sv/defmethod ::get-team {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id]}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (retrieve-team conn profile-id id))) (defn retrieve-team @@ -170,7 +170,7 @@ (sv/defmethod ::get-team-members {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id]}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (check-read-permissions! conn profile-id team-id) (retrieve-team-members conn team-id))) @@ -188,7 +188,7 @@ (sv/defmethod ::get-team-users {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id file-id]}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (if team-id (do (check-read-permissions! conn profile-id team-id) @@ -246,7 +246,7 @@ (sv/defmethod ::get-team-stats {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id]}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (check-read-permissions! conn profile-id team-id) (retrieve-team-stats conn team-id))) @@ -277,7 +277,7 @@ (sv/defmethod ::get-team-invitations {::doc/added "1.17"} [{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id]}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (check-read-permissions! conn profile-id team-id) (get-team-invitations conn team-id))) @@ -595,7 +595,7 @@ ;; Mark object as touched for make it ellegible for tentative ;; garbage collection. (when-let [id (:photo-id team)] - (p/await! (sto/touch-object! storage id))) + (sto/touch-object! storage id)) ;; Save new photo (db/update! pool :team diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index dc013261b..20cc8efe6 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -22,8 +22,7 @@ [clojure.spec.alpha :as s] [datoteka.fs :as fs] [integrant.core :as ig] - [promesa.core :as p] - [promesa.exec :as px])) + [promesa.core :as p])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Storage Module State @@ -79,42 +78,40 @@ (update :metadata db/decode-transit-pgobject)))) (defn- create-database-object - [{:keys [::backend ::wrk/executor ::db/pool-or-conn]} {:keys [::content ::expired-at ::touched-at] :as params}] - (px/with-dispatch executor - (let [id (uuid/random) + [{:keys [::backend ::db/pool-or-conn]} {:keys [::content ::expired-at ::touched-at] :as params}] + (let [id (uuid/random) + mdata (cond-> (get-metadata params) + (satisfies? impl/IContentHash content) + (assoc :hash (impl/get-hash content))) - mdata (cond-> (get-metadata params) - (satisfies? impl/IContentHash content) - (assoc :hash (impl/get-hash content))) + ;; NOTE: for now we don't reuse the deleted objects, but in + ;; futute we can consider reusing deleted objects if we + ;; found a duplicated one and is marked for deletion but + ;; still not deleted. + result (when (and (::deduplicate? params) + (:hash mdata) + (:bucket mdata)) + (get-database-object-by-hash pool-or-conn backend (:bucket mdata) (:hash mdata))) - ;; NOTE: for now we don't reuse the deleted objects, but in - ;; futute we can consider reusing deleted objects if we - ;; found a duplicated one and is marked for deletion but - ;; still not deleted. - result (when (and (::deduplicate? params) - (:hash mdata) - (:bucket mdata)) - (get-database-object-by-hash pool-or-conn backend (:bucket mdata) (:hash mdata))) + result (or result + (-> (db/insert! pool-or-conn :storage-object + {:id id + :size (impl/get-size content) + :backend (name backend) + :metadata (db/tjson mdata) + :deleted-at expired-at + :touched-at touched-at}) + (update :metadata db/decode-transit-pgobject) + (update :metadata assoc ::created? true)))] - result (or result - (-> (db/insert! pool-or-conn :storage-object - {:id id - :size (impl/get-size content) - :backend (name backend) - :metadata (db/tjson mdata) - :deleted-at expired-at - :touched-at touched-at}) - (update :metadata db/decode-transit-pgobject) - (update :metadata assoc ::created? true)))] - - (impl/storage-object - (:id result) - (:size result) - (:created-at result) - (:deleted-at result) - (:touched-at result) - backend - (:metadata result))))) + (impl/storage-object + (:id result) + (:size result) + (:created-at result) + (:deleted-at result) + (:touched-at result) + backend + (:metadata result)))) (def ^:private sql:retrieve-storage-object "select * from storage_object where id = ? and (deleted_at is null or deleted_at > now())") @@ -153,45 +150,41 @@ (dm/export impl/object?) (defn get-object - [{:keys [::db/pool-or-conn ::wrk/executor] :as storage} id] + [{:keys [::db/pool-or-conn] :as storage} id] (us/assert! ::storage storage) - (px/with-dispatch executor - (retrieve-database-object pool-or-conn id))) + (retrieve-database-object pool-or-conn id)) (defn put-object! "Creates a new object with the provided content." [{:keys [::backend] :as storage} {:keys [::content] :as params}] (us/assert! ::storage-with-backend storage) (us/assert! ::impl/content content) - (->> (create-database-object storage params) - (p/mcat (fn [object] - (if (::created? (meta object)) - ;; Store the data finally on the underlying storage subsystem. - (-> (impl/resolve-backend storage backend) - (impl/put-object object content)) - (p/resolved object)))))) + (let [object (create-database-object storage params)] + (if (::created? (meta object)) + ;; Store the data finally on the underlying storage subsystem. + (-> (impl/resolve-backend storage backend) + (impl/put-object object content)) + object))) (defn touch-object! "Mark object as touched." - [{:keys [::db/pool-or-conn ::wrk/executor] :as storage} object-or-id] + [{:keys [::db/pool-or-conn] :as storage} object-or-id] (us/assert! ::storage storage) - (px/with-dispatch executor - (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) - rs (db/update! pool-or-conn :storage-object - {:touched-at (dt/now)} - {:id id} - {::db/return-keys? false})] - (pos? (db/get-update-count rs))))) + (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) + rs (db/update! pool-or-conn :storage-object + {:touched-at (dt/now)} + {:id id} + {::db/return-keys? false})] + (pos? (db/get-update-count rs)))) (defn get-object-data "Return an input stream instance of the object content." [storage object] (us/assert! ::storage storage) - (if (or (nil? (:expired-at object)) - (dt/is-after? (:expired-at object) (dt/now))) + (when (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now))) (-> (impl/resolve-backend storage (:backend object)) - (impl/get-object-data object)) - (p/resolved nil))) + (impl/get-object-data object)))) (defn get-object-bytes "Returns a byte array of object content." @@ -208,11 +201,10 @@ (get-object-url storage object nil)) ([storage object options] (us/assert! ::storage storage) - (if (or (nil? (:expired-at object)) - (dt/is-after? (:expired-at object) (dt/now))) + (when (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now))) (-> (impl/resolve-backend storage (:backend object)) - (impl/get-object-url object options)) - (p/resolved nil)))) + (impl/get-object-url object options))))) (defn get-object-path "Get the Path to the object. Only works with `:fs` type of @@ -220,24 +212,20 @@ [storage object] (us/assert! ::storage storage) (let [backend (impl/resolve-backend storage (:backend object))] - (if (not= :fs (::type backend)) - (p/resolved nil) - (if (or (nil? (:expired-at object)) - (dt/is-after? (:expired-at object) (dt/now))) - (->> (impl/get-object-url backend object nil) - (p/fmap file-url->path)) - (p/resolved nil))))) + (when (and (= :fs (::type backend)) + (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now)))) + (-> (impl/get-object-url backend object nil) file-url->path)))) (defn del-object! - [{:keys [::db/pool-or-conn ::wrk/executor] :as storage} object-or-id] + [{:keys [::db/pool-or-conn] :as storage} object-or-id] (us/assert! ::storage storage) - (px/with-dispatch executor - (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) - res (db/update! pool-or-conn :storage-object - {:deleted-at (dt/now)} - {:id id} - {::db/return-keys? false})] - (pos? (db/get-update-count res))))) + (let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id) + res (db/update! pool-or-conn :storage-object + {:deleted-at (dt/now)} + {:id id} + {::db/return-keys? false})] + (pos? (db/get-update-count res)))) (dm/export impl/resolve-backend) (dm/export impl/calculate-hash) @@ -281,7 +269,7 @@ (doseq [id ids] (l/debug :hint "gc-deleted: permanently delete storage object" :backend backend-id :id id)) - @(impl/del-objects-in-bulk backend ids)))] + (impl/del-objects-in-bulk backend ids)))] (fn [params] (let [min-age (or (:min-age params) min-age)] @@ -422,8 +410,8 @@ (ex/raise :type :internal :code :unexpected-unknown-reference :hint (dm/fmt "unknown reference %" bucket)))] - (recur (+ to-freeze f) - (+ to-delete d) + (recur (+ to-freeze (long f)) + (+ to-delete (long d)) (rest groups))) (do (l/info :hint "gc-touched: task finished" :to-freeze to-freeze :to-delete to-delete) diff --git a/backend/src/app/storage/fs.clj b/backend/src/app/storage/fs.clj index f6240e2ad..358fdc1e5 100644 --- a/backend/src/app/storage/fs.clj +++ b/backend/src/app/storage/fs.clj @@ -6,22 +6,18 @@ (ns app.storage.fs (:require + [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uri :as u] [app.storage :as-alias sto] [app.storage.impl :as impl] - [app.worker :as-alias wrk] [clojure.spec.alpha :as s] [cuerdas.core :as str] [datoteka.fs :as fs] [datoteka.io :as io] - [integrant.core :as ig] - [promesa.core :as p] - [promesa.exec :as px]) + [integrant.core :as ig]) (:import - java.io.InputStream - java.io.OutputStream java.nio.file.Path java.nio.file.Files)) @@ -48,74 +44,66 @@ (s/keys :req [::directory ::uri] :opt [::sto/type - ::sto/id - ::wrk/executor])) + ::sto/id])) ;; --- API IMPL (defmethod impl/put-object :fs - [{:keys [::wrk/executor] :as backend} {:keys [id] :as object} content] + [backend {:keys [id] :as object} content] (us/assert! ::backend backend) - (px/with-dispatch executor - (let [base (fs/path (::directory backend)) - path (fs/path (impl/id->path id)) - full (fs/normalize (fs/join base path))] - (when-not (fs/exists? (fs/parent full)) - (fs/create-dir (fs/parent full))) - (with-open [^InputStream src (io/input-stream content) - ^OutputStream dst (io/output-stream full)] - (io/copy! src dst)) + (let [base (fs/path (::directory backend)) + path (fs/path (impl/id->path id)) + full (fs/normalize (fs/join base path))] - object))) + (when-not (fs/exists? (fs/parent full)) + (fs/create-dir (fs/parent full))) + + (dm/with-open [src (io/input-stream content) + dst (io/output-stream full)] + (io/copy! src dst)) + + object)) (defmethod impl/get-object-data :fs - [{:keys [::wrk/executor] :as backend} {:keys [id] :as object}] + [backend {:keys [id] :as object}] (us/assert! ::backend backend) - (px/with-dispatch executor - (let [^Path base (fs/path (::directory backend)) - ^Path path (fs/path (impl/id->path id)) - ^Path full (fs/normalize (fs/join base path))] - (when-not (fs/exists? full) - (ex/raise :type :internal - :code :filesystem-object-does-not-exists - :path (str full))) - (io/input-stream full)))) + (let [^Path base (fs/path (::directory backend)) + ^Path path (fs/path (impl/id->path id)) + ^Path full (fs/normalize (fs/join base path))] + (when-not (fs/exists? full) + (ex/raise :type :internal + :code :filesystem-object-does-not-exists + :path (str full))) + (io/input-stream full))) (defmethod impl/get-object-bytes :fs [backend object] - (->> (impl/get-object-data backend object) - (p/fmap (fn [input] - (try - (io/read-as-bytes input) - (finally - (io/close! input))))))) + (dm/with-open [input (impl/get-object-data backend object)] + (io/read-as-bytes input))) (defmethod impl/get-object-url :fs [{:keys [::uri] :as backend} {:keys [id] :as object} _] (us/assert! ::backend backend) - (p/resolved - (update uri :path - (fn [existing] - (if (str/ends-with? existing "/") - (str existing (impl/id->path id)) - (str existing "/" (impl/id->path id))))))) + (update uri :path + (fn [existing] + (if (str/ends-with? existing "/") + (str existing (impl/id->path id)) + (str existing "/" (impl/id->path id)))))) (defmethod impl/del-object :fs - [{:keys [::wrk/executor] :as backend} {:keys [id] :as object}] + [backend {:keys [id] :as object}] (us/assert! ::backend backend) - (px/with-dispatch executor - (let [base (fs/path (::directory backend)) - path (fs/path (impl/id->path id)) - path (fs/join base path)] - (Files/deleteIfExists ^Path path)))) + (let [base (fs/path (::directory backend)) + path (fs/path (impl/id->path id)) + path (fs/join base path)] + (Files/deleteIfExists ^Path path))) (defmethod impl/del-objects-in-bulk :fs - [{:keys [::wrk/executor] :as backend} ids] + [backend ids] (us/assert! ::backend backend) - (px/with-dispatch executor - (let [base (fs/path (::directory backend))] - (doseq [id ids] - (let [path (fs/path (impl/id->path id)) - path (fs/join base path)] - (Files/deleteIfExists ^Path path)))))) + (let [base (fs/path (::directory backend))] + (doseq [id ids] + (let [path (fs/path (impl/id->path id)) + path (fs/join base path)] + (Files/deleteIfExists ^Path path))))) diff --git a/backend/src/app/storage/impl.clj b/backend/src/app/storage/impl.clj index 771ea95e7..4a564b58f 100644 --- a/backend/src/app/storage/impl.clj +++ b/backend/src/app/storage/impl.clj @@ -153,8 +153,8 @@ (content (.toPath ^java.io.File data) size) (instance? String data) - (let [data (.getBytes data "UTF-8")] - (bytes->content data (alength data))) + (let [data (.getBytes ^String data "UTF-8")] + (bytes->content data (alength ^bytes data))) (bytes? data) (bytes->content data (or size (alength ^bytes data))) @@ -195,7 +195,7 @@ (defn calculate-hash [resource] - (let [result (with-open [input (io/input-stream resource)] + (let [result (dm/with-open [input (io/input-stream resource)] (-> (bh/blake2b-256 input) (bc/bytes->hex)))] (str "blake2b:" result))) diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index fc26cccb4..ffd873c42 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -45,6 +45,7 @@ software.amazon.awssdk.http.nio.netty.SdkEventLoopGroup software.amazon.awssdk.regions.Region software.amazon.awssdk.services.s3.S3AsyncClient + software.amazon.awssdk.services.s3.S3AsyncClientBuilder software.amazon.awssdk.services.s3.S3Configuration software.amazon.awssdk.services.s3.model.Delete software.amazon.awssdk.services.s3.model.DeleteObjectRequest @@ -121,7 +122,7 @@ (defmethod impl/put-object :s3 [backend object content] (us/assert! ::backend backend) - (put-object backend object content)) + (p/await! (put-object backend object content))) (defmethod impl/get-object-data :s3 [backend object] @@ -135,12 +136,13 @@ :cause cause))] (-> (get-object-data backend object) - (p/catch no-such-key? handle-not-found)))) + (p/catch no-such-key? handle-not-found) + (p/await!)))) (defmethod impl/get-object-bytes :s3 [backend object] (us/assert! ::backend backend) - (get-object-bytes backend object)) + (p/await! (get-object-bytes backend object))) (defmethod impl/get-object-url :s3 [backend object options] @@ -150,12 +152,12 @@ (defmethod impl/del-object :s3 [backend object] (us/assert! ::backend backend) - (del-object backend object)) + (p/await! (del-object backend object))) (defmethod impl/del-objects-in-bulk :s3 [backend ids] (us/assert! ::backend backend) - (del-object-in-bulk backend ids)) + (p/await! (del-object-in-bulk backend ids))) ;; --- HELPERS @@ -187,13 +189,17 @@ (.writeTimeout default-timeout) (.build)) - client (-> (S3AsyncClient/builder) - (.serviceConfiguration ^S3Configuration sconfig) - (.asyncConfiguration ^ClientAsyncConfiguration aconfig) - (.httpClient ^NettyNioAsyncHttpClient hclient) - (.region (lookup-region region)) - (cond-> (some? endpoint) (.endpointOverride (URI. endpoint))) - (.build))] + client (let [builder (S3AsyncClient/builder) + builder (.serviceConfiguration ^S3AsyncClientBuilder builder ^S3Configuration sconfig) + builder (.asyncConfiguration ^S3AsyncClientBuilder builder ^ClientAsyncConfiguration aconfig) + builder (.httpClient ^S3AsyncClientBuilder builder ^NettyNioAsyncHttpClient hclient) + builder (.region ^S3AsyncClientBuilder builder (lookup-region region)) + builder (cond-> ^S3AsyncClientBuilder builder + (some? endpoint) + (.endpointOverride (URI. endpoint)))] + (.build ^S3AsyncClientBuilder builder)) + + ] (reify clojure.lang.IDeref @@ -288,6 +294,7 @@ ^AsyncRequestBody rbody) (p/fmap (constantly object))))) +;; FIXME: research how to avoid reflection on close method (defn- path->stream [path] (proxy [FilterInputStream] [(io/input-stream path)] @@ -347,8 +354,7 @@ (getObjectRequest ^GetObjectRequest gor) (build)) pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)] - (p/resolved - (u/uri (str (.url ^PresignedGetObjectRequest pgor)))))) + (u/uri (str (.url ^PresignedGetObjectRequest pgor))))) (defn- del-object [{:keys [::bucket ::client ::prefix]} {:keys [id] :as obj}] diff --git a/backend/src/app/tasks/objects_gc.clj b/backend/src/app/tasks/objects_gc.clj index 4169cd88f..a5b1b0195 100644 --- a/backend/src/app/tasks/objects_gc.clj +++ b/backend/src/app/tasks/objects_gc.clj @@ -85,7 +85,7 @@ ;; Mark as deleted the storage object related with the ;; photo-id field. - (some->> photo-id (sto/touch-object! storage) deref) + (some->> photo-id (sto/touch-object! storage)) ;; And finally, permanently delete the profile. (db/delete! conn :profile {:id id}) @@ -117,7 +117,7 @@ ;; Mark as deleted the storage object related with the ;; photo-id field. - (some->> photo-id (sto/touch-object! storage) deref) + (some->> photo-id (sto/touch-object! storage)) ;; And finally, permanently delete the team. (db/delete! conn :team {:id id}) @@ -184,10 +184,10 @@ (l/debug :hint "permanently delete font variant" :id (str id)) ;; Mark as deleted the all related storage objects - (some->> (:woff1-file-id font) (sto/touch-object! storage) deref) - (some->> (:woff2-file-id font) (sto/touch-object! storage) deref) - (some->> (:otf-file-id font) (sto/touch-object! storage) deref) - (some->> (:ttf-file-id font) (sto/touch-object! storage) deref) + (some->> (:woff1-file-id font) (sto/touch-object! storage)) + (some->> (:woff2-file-id font) (sto/touch-object! storage)) + (some->> (:otf-file-id font) (sto/touch-object! storage)) + (some->> (:ttf-file-id font) (sto/touch-object! storage)) ;; And finally, permanently delete the team font variant (db/delete! conn :team-font-variant {:id id}) diff --git a/backend/test/backend_tests/helpers.clj b/backend/test/backend_tests/helpers.clj index 51ab55e3b..3ca126dc8 100644 --- a/backend/test/backend_tests/helpers.clj +++ b/backend/test/backend_tests/helpers.clj @@ -8,6 +8,7 @@ (:require [app.auth] [app.common.data :as d] + [app.common.data.macros :as dm] [app.common.exceptions :as ex] [app.common.flags :as flags] [app.common.pages :as cp] @@ -208,7 +209,7 @@ :password "123123" :is-demo false} params)] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (->> params (cmd.auth/create-profile! conn) (cmd.auth/create-profile-rels! conn)))))) @@ -218,7 +219,7 @@ ([pool i {:keys [profile-id team-id] :as params}] (us/assert uuid? profile-id) (us/assert uuid? team-id) - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (->> (merge {:id (mk-uuid "project" i) :name (str "project" i)} params) @@ -230,7 +231,7 @@ ([pool i {:keys [profile-id project-id] :as params}] (us/assert uuid? profile-id) (us/assert uuid? project-id) - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (files.create/create-file conn (merge {:id (mk-uuid "file" i) :name (str "file" i) @@ -246,7 +247,7 @@ ([i params] (create-team* *pool* i params)) ([pool i {:keys [profile-id] :as params}] (us/assert uuid? profile-id) - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (let [id (mk-uuid "team" i)] (teams/create-team conn {:id id :profile-id profile-id @@ -257,7 +258,7 @@ ([pool {:keys [name width height mtype file-id is-local media-id] :or {name "sample" width 100 height 100 mtype "image/svg+xml" is-local true}}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (db/insert! conn :file-media-object {:id (uuid/next) :file-id file-id @@ -271,12 +272,12 @@ (defn link-file-to-library* ([params] (link-file-to-library* *pool* params)) ([pool {:keys [file-id library-id] :as params}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (#'files/link-file-to-library conn {:file-id file-id :library-id library-id})))) (defn create-complaint-for [pool {:keys [id created-at type]}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (db/insert! conn :profile-complaint-report {:profile-id id :created-at (or created-at (dt/now)) @@ -285,7 +286,7 @@ (defn create-global-complaint-for [pool {:keys [email type created-at]}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (db/insert! conn :global-complaint-report {:email email :type (name type) @@ -295,7 +296,7 @@ (defn create-team-role* ([params] (create-team-role* *pool* params)) ([pool {:keys [team-id profile-id role] :or {role :owner}}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (#'teams/create-team-role conn {:team-id team-id :profile-id profile-id :role role})))) @@ -303,7 +304,7 @@ (defn create-project-role* ([params] (create-project-role* *pool* params)) ([pool {:keys [project-id profile-id role] :or {role :owner}}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (#'teams/create-project-role conn {:project-id project-id :profile-id profile-id :role role})))) @@ -311,7 +312,7 @@ (defn create-file-role* ([params] (create-file-role* *pool* params)) ([pool {:keys [file-id profile-id role] :or {role :owner}}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (files.create/create-file-role! conn {:file-id file-id :profile-id profile-id :role role})))) @@ -320,7 +321,7 @@ ([params] (update-file* *pool* params)) ([pool {:keys [file-id changes session-id profile-id revn] :or {session-id (uuid/next) revn 0}}] - (with-open [conn (db/open pool)] + (dm/with-open [conn (db/open pool)] (let [features #{"components/v2"} cfg (-> (select-keys *system* [::mbus/msgbus ::mtx/metrics]) (assoc ::db/conn conn))] diff --git a/backend/test/backend_tests/rpc_file_test.clj b/backend/test/backend_tests/rpc_file_test.clj index 189f1e631..19e2a474a 100644 --- a/backend/test/backend_tests/rpc_file_test.clj +++ b/backend/test/backend_tests/rpc_file_test.clj @@ -215,10 +215,10 @@ (t/is (= 1 (count rows)))) ;; The underlying storage objects are still available. - (t/is (some? @(sto/get-object storage (:media-id fmo2)))) - (t/is (some? @(sto/get-object storage (:thumbnail-id fmo2)))) - (t/is (some? @(sto/get-object storage (:media-id fmo1)))) - (t/is (some? @(sto/get-object storage (:thumbnail-id fmo1)))) + (t/is (some? (sto/get-object storage (:media-id fmo2)))) + (t/is (some? (sto/get-object storage (:thumbnail-id fmo2)))) + (t/is (some? (sto/get-object storage (:media-id fmo1)))) + (t/is (some? (sto/get-object storage (:thumbnail-id fmo1)))) ;; proceed to remove usage of the file (update-file {:file-id (:id file) @@ -246,10 +246,10 @@ ;; Finally, check that some of the objects that are marked as ;; deleted we are unable to retrieve them using standard storage ;; public api. - (t/is (nil? @(sto/get-object storage (:media-id fmo2)))) - (t/is (nil? @(sto/get-object storage (:thumbnail-id fmo2)))) - (t/is (nil? @(sto/get-object storage (:media-id fmo1)))) - (t/is (nil? @(sto/get-object storage (:thumbnail-id fmo1)))) + (t/is (nil? (sto/get-object storage (:media-id fmo2)))) + (t/is (nil? (sto/get-object storage (:thumbnail-id fmo2)))) + (t/is (nil? (sto/get-object storage (:media-id fmo1)))) + (t/is (nil? (sto/get-object storage (:thumbnail-id fmo1)))) ))) (t/deftest permissions-checks-creating-file diff --git a/backend/test/backend_tests/rpc_management_test.clj b/backend/test/backend_tests/rpc_management_test.clj index e3e0ddbd3..82eb350b4 100644 --- a/backend/test/backend_tests/rpc_management_test.clj +++ b/backend/test/backend_tests/rpc_management_test.clj @@ -26,9 +26,9 @@ (let [storage (-> (:app.storage/storage th/*system*) (configure-storage-backend)) - sobject @(sto/put-object! storage {::sto/content (sto/content "content") - :content-type "text/plain" - :other "data"}) + sobject (sto/put-object! storage {::sto/content (sto/content "content") + :content-type "text/plain" + :other "data"}) profile (th/create-profile* 1 {:is-active true}) project (th/create-project* 1 {:team-id (:default-team-id profile) :profile-id (:id profile)}) @@ -98,9 +98,9 @@ (t/deftest duplicate-file-with-deleted-relations (let [storage (-> (:app.storage/storage th/*system*) (configure-storage-backend)) - sobject @(sto/put-object! storage {::sto/content (sto/content "content") - :content-type "text/plain" - :other "data"}) + sobject (sto/put-object! storage {::sto/content (sto/content "content") + :content-type "text/plain" + :other "data"}) profile (th/create-profile* 1 {:is-active true}) project (th/create-project* 1 {:team-id (:default-team-id profile) @@ -120,7 +120,7 @@ :media-id (:id sobject)})] (th/mark-file-deleted* {:id (:id file2)}) - @(sto/del-object! storage sobject) + (sto/del-object! storage sobject) (let [data {::th/type :duplicate-file ::rpc/profile-id (:id profile) @@ -157,9 +157,9 @@ (let [storage (-> (:app.storage/storage th/*system*) (configure-storage-backend)) - sobject @(sto/put-object! storage {::sto/content (sto/content "content") - :content-type "text/plain" - :other "data"}) + sobject (sto/put-object! storage {::sto/content (sto/content "content") + :content-type "text/plain" + :other "data"}) profile (th/create-profile* 1 {:is-active true}) project (th/create-project* 1 {:team-id (:default-team-id profile) @@ -230,9 +230,9 @@ (t/deftest duplicate-project-with-deleted-files (let [storage (-> (:app.storage/storage th/*system*) (configure-storage-backend)) - sobject @(sto/put-object! storage {::sto/content (sto/content "content") - :content-type "text/plain" - :other "data"}) + sobject (sto/put-object! storage {::sto/content (sto/content "content") + :content-type "text/plain" + :other "data"}) profile (th/create-profile* 1 {:is-active true}) project (th/create-project* 1 {:team-id (:default-team-id profile) :profile-id (:id profile)}) diff --git a/backend/test/backend_tests/rpc_media_test.clj b/backend/test/backend_tests/rpc_media_test.clj index ab2cd1de9..49324be42 100644 --- a/backend/test/backend_tests/rpc_media_test.clj +++ b/backend/test/backend_tests/rpc_media_test.clj @@ -42,8 +42,8 @@ (t/is (uuid? media-id)) (t/is (uuid? thumbnail-id)) (let [storage (:app.storage/storage th/*system*) - mobj1 @(sto/get-object storage media-id) - mobj2 @(sto/get-object storage thumbnail-id)] + mobj1 (sto/get-object storage media-id) + mobj2 (sto/get-object storage thumbnail-id)] (t/is (sto/object? mobj1)) (t/is (sto/object? mobj2)) (t/is (= 122785 (:size mobj1))) @@ -83,8 +83,8 @@ (t/is (uuid? media-id)) (t/is (uuid? thumbnail-id)) (let [storage (:app.storage/storage th/*system*) - mobj1 @(sto/get-object storage media-id) - mobj2 @(sto/get-object storage thumbnail-id)] + mobj1 (sto/get-object storage media-id) + mobj2 (sto/get-object storage thumbnail-id)] (t/is (sto/object? mobj1)) (t/is (sto/object? mobj2)) (t/is (= 312043 (:size mobj1))) @@ -162,8 +162,8 @@ (t/is (uuid? media-id)) (t/is (uuid? thumbnail-id)) (let [storage (:app.storage/storage th/*system*) - mobj1 @(sto/get-object storage media-id) - mobj2 @(sto/get-object storage thumbnail-id)] + mobj1 (sto/get-object storage media-id) + mobj2 (sto/get-object storage thumbnail-id)] (t/is (sto/object? mobj1)) (t/is (sto/object? mobj2)) (t/is (= 122785 (:size mobj1))) @@ -203,8 +203,8 @@ (t/is (uuid? media-id)) (t/is (uuid? thumbnail-id)) (let [storage (:app.storage/storage th/*system*) - mobj1 @(sto/get-object storage media-id) - mobj2 @(sto/get-object storage thumbnail-id)] + mobj1 (sto/get-object storage media-id) + mobj2 (sto/get-object storage thumbnail-id)] (t/is (sto/object? mobj1)) (t/is (sto/object? mobj2)) (t/is (= 312043 (:size mobj1))) diff --git a/backend/test/backend_tests/storage_test.clj b/backend/test/backend_tests/storage_test.clj index 032e85c2e..02adaca6a 100644 --- a/backend/test/backend_tests/storage_test.clj +++ b/backend/test/backend_tests/storage_test.clj @@ -37,61 +37,61 @@ (let [storage (-> (:app.storage/storage th/*system*) (configure-storage-backend)) content (sto/content "content") - object @(sto/put-object! storage {::sto/content content - :content-type "text/plain" - :other "data"})] + object (sto/put-object! storage {::sto/content content + :content-type "text/plain" + :other "data"})] (t/is (sto/object? object)) - (t/is (fs/path? @(sto/get-object-path storage object))) + (t/is (fs/path? (sto/get-object-path storage object))) (t/is (nil? (:expired-at object))) (t/is (= :assets-fs (:backend object))) (t/is (= "data" (:other (meta object)))) (t/is (= "text/plain" (:content-type (meta object)))) - (t/is (= "content" (slurp @(sto/get-object-data storage object)))) - (t/is (= "content" (slurp @(sto/get-object-path storage object)))) + (t/is (= "content" (slurp (sto/get-object-data storage object)))) + (t/is (= "content" (slurp (sto/get-object-path storage object)))) )) (t/deftest put-and-retrieve-expired-object (let [storage (-> (:app.storage/storage th/*system*) (configure-storage-backend)) content (sto/content "content") - object @(sto/put-object! storage {::sto/content content - ::sto/expired-at (dt/in-future {:seconds 1}) - :content-type "text/plain" - })] + object (sto/put-object! storage {::sto/content content + ::sto/expired-at (dt/in-future {:seconds 1}) + :content-type "text/plain" + })] (t/is (sto/object? object)) (t/is (dt/instant? (:expired-at object))) (t/is (dt/is-after? (:expired-at object) (dt/now))) - (t/is (= object @(sto/get-object storage (:id object)))) + (t/is (= object (sto/get-object storage (:id object)))) (th/sleep 1000) - (t/is (nil? @(sto/get-object storage (:id object)))) - (t/is (nil? @(sto/get-object-data storage object))) - (t/is (nil? @(sto/get-object-url storage object))) - (t/is (nil? @(sto/get-object-path storage object))) + (t/is (nil? (sto/get-object storage (:id object)))) + (t/is (nil? (sto/get-object-data storage object))) + (t/is (nil? (sto/get-object-url storage object))) + (t/is (nil? (sto/get-object-path storage object))) )) (t/deftest put-and-delete-object (let [storage (-> (:app.storage/storage th/*system*) (configure-storage-backend)) content (sto/content "content") - object @(sto/put-object! storage {::sto/content content - :content-type "text/plain" - :expired-at (dt/in-future {:seconds 1})})] + object (sto/put-object! storage {::sto/content content + :content-type "text/plain" + :expired-at (dt/in-future {:seconds 1})})] (t/is (sto/object? object)) - (t/is (true? @(sto/del-object! storage object))) + (t/is (true? (sto/del-object! storage object))) ;; retrieving the same object should be not nil because the ;; deletion is not immediate - (t/is (some? @(sto/get-object-data storage object))) - (t/is (some? @(sto/get-object-url storage object))) - (t/is (some? @(sto/get-object-path storage object))) + (t/is (some? (sto/get-object-data storage object))) + (t/is (some? (sto/get-object-url storage object))) + (t/is (some? (sto/get-object-path storage object))) ;; But you can't retrieve the object again because in database is ;; marked as deleted/expired. - (t/is (nil? @(sto/get-object storage (:id object)))) + (t/is (nil? (sto/get-object storage (:id object)))) )) (t/deftest test-deleted-gc-task @@ -99,14 +99,14 @@ (configure-storage-backend)) content1 (sto/content "content1") content2 (sto/content "content2") - object1 @(sto/put-object! storage {::sto/content content1 - ::sto/expired-at (dt/now) - :content-type "text/plain" - }) - object2 @(sto/put-object! storage {::sto/content content2 - ::sto/expired-at (dt/in-past {:hours 2}) - :content-type "text/plain" - })] + object1 (sto/put-object! storage {::sto/content content1 + ::sto/expired-at (dt/now) + :content-type "text/plain" + }) + object2 (sto/put-object! storage {::sto/content content2 + ::sto/expired-at (dt/in-past {:hours 2}) + :content-type "text/plain" + })] (th/sleep 200)