diff --git a/CHANGES.md b/CHANGES.md index 81d16cf7d..4555cc071 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -24,6 +24,7 @@ - Redesign of workspace toolbars [Taiga #2319](https://tree.taiga.io/project/penpot/us/2319) - Graphic Tablet usability improvements [Taiga #1913](https://tree.taiga.io/project/penpot/us/1913) - Improved mouse collision detection for groups and text shapes [Taiga #2452](https://tree.taiga.io/project/penpot/us/2452), [Taiga #2453](https://tree.taiga.io/project/penpot/us/2453) +- Add support for alternative S3 storage providers and all aws regions [#1267](https://github.com/penpot/penpot/issues/1267) ### :bug: Bugs fixed diff --git a/backend/scripts/repl b/backend/scripts/repl index 22bebe8c7..ebb3e1554 100755 --- a/backend/scripts/repl +++ b/backend/scripts/repl @@ -10,6 +10,18 @@ # export PENPOT_DATABASE_PASSWORD="penpot_pre" # export PENPOT_FLAGS="enable-asserts enable-audit-log $PENPOT_FLAGS" +# Initialize MINIO config +# mc alias set penpot-s3/ http://minio:9000 minioadmin minioadmin +# mc admin user add penpot-s3 penpot-devenv penpot-devenv +# mc admin policy set penpot-s3 readwrite user=penpot-devenv +# mc mb penpot-s3/penpot -p +# export AWS_ACCESS_KEY_ID=penpot-devenv +# export AWS_SECRET_ACCESS_KEY=penpot-devenv +# export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3 +# export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000 +# export PENPOT_STORAGE_ASSETS_S3_REGION=eu-central-1 +# export PENPOT_STORAGE_ASSETS_S3_BUCKET=penpot + export OPTIONS=" -A:dev \ -J-Djava.util.logging.manager=org.apache.logging.log4j.jul.LogManager \ diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index b0030a2d1..7083b490e 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -181,9 +181,11 @@ (s/def ::storage-assets-fs-directory ::us/string) (s/def ::storage-assets-s3-bucket ::us/string) (s/def ::storage-assets-s3-region ::us/keyword) +(s/def ::storage-assets-s3-endpoint ::us/string) (s/def ::storage-fdata-s3-bucket ::us/string) (s/def ::storage-fdata-s3-region ::us/keyword) (s/def ::storage-fdata-s3-prefix ::us/string) +(s/def ::storage-fdata-s3-endpoint ::us/string) (s/def ::telemetry-uri ::us/string) (s/def ::telemetry-with-taiga ::us/boolean) (s/def ::tenant ::us/string) @@ -278,10 +280,12 @@ ::storage-assets-fs-directory ::storage-assets-s3-bucket ::storage-assets-s3-region + ::storage-assets-s3-endpoint ::fdata-storage-backend ::storage-fdata-s3-bucket ::storage-fdata-s3-region ::storage-fdata-s3-prefix + ::storage-fdata-s3-endpoint ::telemetry-enabled ::telemetry-uri ::telemetry-referer diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index 18c14462e..550bfcdfb 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -52,10 +52,10 @@ :body (sto/get-object-bytes storage obj)} :s3 - (let [url (sto/get-object-url storage obj {:max-age signature-max-age})] + (let [{:keys [host port] :as url} (sto/get-object-url storage obj {:max-age signature-max-age})] {:status 307 :headers {"location" (str url) - "x-host" (:host url) + "x-host" (cond-> host port (str ":" port)) "cache-control" (str "max-age=" (inst-ms cache-max-age))} :body ""}) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index ab9de80a1..fabf61c4d 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -49,10 +49,6 @@ :app.storage/gc-touched-task {:pool (ig/ref :app.db/pool)} - :app.storage/recheck-task - {:pool (ig/ref :app.db/pool) - :storage (ig/ref :app.storage/storage)} - :app.http.session/session {:pool (ig/ref :app.db/pool) :tokens (ig/ref :app.tokens/tokens)} @@ -163,9 +159,6 @@ {:cron #app/cron "0 0 0 * * ?" ;; daily :task :session-gc} - {:cron #app/cron "0 0 * * * ?" ;; hourly - :task :storage-recheck} - {:cron #app/cron "0 0 0 * * ?" ;; daily :task :objects-gc} @@ -198,7 +191,6 @@ :file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler) :storage-deleted-gc (ig/ref :app.storage/gc-deleted-task) :storage-touched-gc (ig/ref :app.storage/gc-touched-task) - :storage-recheck (ig/ref :app.storage/recheck-task) :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) :session-gc (ig/ref :app.http.session/gc-task) @@ -304,27 +296,28 @@ :app.storage/storage {:pool (ig/ref :app.db/pool) - :executor (ig/ref :app.worker/executor) + :backends + {:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) + :assets-db (ig/ref [::assets :app.storage.db/backend]) + :assets-fs (ig/ref [::assets :app.storage.fs/backend]) - :backends { - :assets-s3 (ig/ref [::assets :app.storage.s3/backend]) - :assets-db (ig/ref [::assets :app.storage.db/backend]) - :assets-fs (ig/ref [::assets :app.storage.fs/backend]) - :tmp (ig/ref [::tmp :app.storage.fs/backend]) - :fdata-s3 (ig/ref [::fdata :app.storage.s3/backend]) + :tmp (ig/ref [::tmp :app.storage.fs/backend]) + :fdata-s3 (ig/ref [::fdata :app.storage.s3/backend]) - ;; keep this for backward compatibility - :s3 (ig/ref [::assets :app.storage.s3/backend]) - :fs (ig/ref [::assets :app.storage.fs/backend])}} + ;; keep this for backward compatibility + :s3 (ig/ref [::assets :app.storage.s3/backend]) + :fs (ig/ref [::assets :app.storage.fs/backend])}} [::fdata :app.storage.s3/backend] - {:region (cf/get :storage-fdata-s3-region) - :bucket (cf/get :storage-fdata-s3-bucket) - :prefix (cf/get :storage-fdata-s3-prefix)} + {:region (cf/get :storage-fdata-s3-region) + :bucket (cf/get :storage-fdata-s3-bucket) + :endpoint (cf/get :storage-fdata-s3-endpoint) + :prefix (cf/get :storage-fdata-s3-prefix)} [::assets :app.storage.s3/backend] - {:region (cf/get :storage-assets-s3-region) - :bucket (cf/get :storage-assets-s3-bucket)} + {:region (cf/get :storage-assets-s3-region) + :endpoint (cf/get :storage-assets-s3-endpoint) + :bucket (cf/get :storage-assets-s3-bucket)} [::assets :app.storage.fs/backend] {:directory (cf/get :storage-assets-fs-directory)} diff --git a/backend/src/app/media.clj b/backend/src/app/media.clj index 6948df513..176b3cc6f 100644 --- a/backend/src/app/media.clj +++ b/backend/src/app/media.clj @@ -326,8 +326,10 @@ (defn configure-assets-storage "Given storage map, returns a storage configured with the appropriate backend for assets." - [storage conn] - (-> storage - (assoc :conn conn) - (assoc :backend (cf/get :assets-storage-backend :assets-fs)))) + ([storage] + (assoc storage :backend (cf/get :assets-storage-backend :assets-fs))) + ([storage conn] + (-> storage + (assoc :conn conn) + (assoc :backend (cf/get :assets-storage-backend :assets-fs))))) diff --git a/backend/src/app/rpc/mutations/fonts.clj b/backend/src/app/rpc/mutations/fonts.clj index 6416567d0..f36a00ae8 100644 --- a/backend/src/app/rpc/mutations/fonts.clj +++ b/backend/src/app/rpc/mutations/fonts.clj @@ -9,12 +9,10 @@ [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uuid :as uuid] - [app.config :as cf] [app.db :as db] [app.media :as media] [app.rpc.queries.teams :as teams] [app.storage :as sto] - [app.util.rlimit :as rlimit] [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s])) @@ -39,52 +37,57 @@ ::font-id ::font-family ::font-weight ::font-style])) (sv/defmethod ::create-font-variant - {::rlimit/permits (cf/get :rlimit-font)} [{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}] - (db/with-atomic [conn pool] - (let [cfg (assoc cfg :conn conn)] - (teams/check-edition-permissions! conn profile-id team-id) - (create-font-variant cfg params)))) + (teams/check-edition-permissions! pool profile-id team-id) + (create-font-variant cfg params)) (defn create-font-variant - [{:keys [conn storage] :as cfg} {:keys [data] :as params}] + [{:keys [storage pool] :as cfg} {:keys [data] :as params}] (let [data (media/run {:cmd :generate-fonts :input data}) - storage (media/configure-assets-storage storage conn) + storage (media/configure-assets-storage storage)] - otf (when-let [fdata (get data "font/otf")] - (sto/put-object storage {:content (sto/content fdata) - :content-type "font/otf"})) - - ttf (when-let [fdata (get data "font/ttf")] - (sto/put-object storage {:content (sto/content fdata) - :content-type "font/ttf"})) - - woff1 (when-let [fdata (get data "font/woff")] - (sto/put-object storage {:content (sto/content fdata) - :content-type "font/woff"})) - - woff2 (when-let [fdata (get data "font/woff2")] - (sto/put-object storage {:content (sto/content fdata) - :content-type "font/woff2"}))] - - (when (and (nil? otf) - (nil? ttf) - (nil? woff1) - (nil? woff2)) + (when (and (not (contains? data "font/otf")) + (not (contains? data "font/ttf")) + (not (contains? data "font/woff")) + (not (contains? data "font/woff2"))) (ex/raise :type :validation :code :invalid-font-upload)) - (db/insert! conn :team-font-variant - {:id (uuid/next) - :team-id (:team-id params) - :font-id (:font-id params) - :font-family (:font-family params) - :font-weight (:font-weight params) - :font-style (:font-style params) - :woff1-file-id (:id woff1) - :woff2-file-id (:id woff2) - :otf-file-id (:id otf) - :ttf-file-id (:id ttf)}))) + (let [otf (when-let [fdata (get data "font/otf")] + (sto/put-object storage {:content (sto/content fdata) + :content-type "font/otf" + :reference :team-font-variant + :touched-at (dt/now)})) + + ttf (when-let [fdata (get data "font/ttf")] + (sto/put-object storage {:content (sto/content fdata) + :content-type "font/ttf" + :touched-at (dt/now) + :reference :team-font-variant})) + + woff1 (when-let [fdata (get data "font/woff")] + (sto/put-object storage {:content (sto/content fdata) + :content-type "font/woff" + :touched-at (dt/now) + :reference :team-font-variant})) + + woff2 (when-let [fdata (get data "font/woff2")] + (sto/put-object storage {:content (sto/content fdata) + :content-type "font/woff2" + :touched-at (dt/now) + :reference :team-font-variant}))] + + (db/insert! pool :team-font-variant + {:id (uuid/next) + :team-id (:team-id params) + :font-id (:font-id params) + :font-family (:font-family params) + :font-weight (:font-weight params) + :font-style (:font-style params) + :woff1-file-id (:id woff1) + :woff2-file-id (:id woff2) + :otf-file-id (:id otf) + :ttf-file-id (:id ttf)})))) ;; --- UPDATE FONT FAMILY diff --git a/backend/src/app/rpc/mutations/media.clj b/backend/src/app/rpc/mutations/media.clj index 8f9075cf1..68b5d98ea 100644 --- a/backend/src/app/rpc/mutations/media.clj +++ b/backend/src/app/rpc/mutations/media.clj @@ -10,13 +10,11 @@ [app.common.media :as cm] [app.common.spec :as us] [app.common.uuid :as uuid] - [app.config :as cf] [app.db :as db] [app.media :as media] [app.rpc.queries.teams :as teams] [app.storage :as sto] [app.util.http :as http] - [app.util.rlimit :as rlimit] [app.util.services :as sv] [app.util.time :as dt] [clojure.spec.alpha :as s] @@ -49,13 +47,10 @@ :opt-un [::id])) (sv/defmethod ::upload-file-media-object - {::rlimit/permits (cf/get :rlimit-image)} [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] - (db/with-atomic [conn pool] - (let [file (select-file conn file-id)] - (teams/check-edition-permissions! conn profile-id (:team-id file)) - (-> (assoc cfg :conn conn) - (create-file-media-object params))))) + (let [file (select-file pool file-id)] + (teams/check-edition-permissions! pool profile-id (:team-id file)) + (create-file-media-object cfg params))) (defn- big-enough-for-thumbnail? "Checks if the provided image info is big enough for @@ -77,6 +72,9 @@ :code :unable-to-access-to-url :cause e)))) +;; TODO: we need to check the size before fetch resource, if not we +;; can start downloading very big object and cause OOM errors. + (defn- download-media [{:keys [storage] :as cfg} url] (let [result (fetch-url url) @@ -90,6 +88,8 @@ (-> (assoc storage :backend :tmp) (sto/put-object {:content (sto/content data) :content-type mtype + :reference :file-media-object + :touched-at (dt/now) :expired-at (dt/in-future {:minutes 30})})))) ;; NOTE: we use the `on conflict do update` instead of `do nothing` @@ -102,13 +102,27 @@ on conflict (id) do update set created_at=file_media_object.created_at returning *") +;; NOTE: the following function executes without a transaction, this +;; means that if something fails in the middle of this function, it +;; will probably leave leaked/unreferenced objects in the database and +;; probably in the storage layer. For handle possible object leakage, +;; we create all media objects marked as touched, this ensures that if +;; something fails, all leaked (already created storage objects) will +;; be eventually marked as deleted by the touched-gc task. +;; +;; The touched-gc task, performs periodic analisis of all touched +;; storage objects and check references of it. This is the reason why +;; `reference` metadata exists: it indicates the name of the table +;; witch holds the reference to storage object (it some kind of +;; inverse, soft referential integrity). + (defn create-file-media-object - [{:keys [conn storage] :as cfg} {:keys [id file-id is-local name content] :as params}] + [{:keys [storage pool] :as cfg} {:keys [id file-id is-local name content] :as params}] (media/validate-media-type (:content-type content)) - (let [storage (media/configure-assets-storage storage conn) - source-path (fs/path (:tempfile content)) + (let [source-path (fs/path (:tempfile content)) source-mtype (:content-type content) source-info (media/run {:cmd :info :input {:path source-path :mtype source-mtype}}) + storage (media/configure-assets-storage storage) thumb (when (and (not (svg-image? source-info)) (big-enough-for-thumbnail? source-info)) @@ -119,16 +133,25 @@ image (if (= (:mtype source-info) "image/svg+xml") (let [data (slurp source-path)] - (sto/put-object storage {:content (sto/content data) - :content-type (:mtype source-info)})) - (sto/put-object storage {:content (sto/content source-path) - :content-type (:mtype source-info)})) + (sto/put-object storage + {:content (sto/content data) + :content-type (:mtype source-info) + :reference :file-media-object + :touched-at (dt/now)})) + (sto/put-object storage + {:content (sto/content source-path) + :content-type (:mtype source-info) + :reference :file-media-object + :touched-at (dt/now)})) thumb (when thumb - (sto/put-object storage {:content (sto/content (:data thumb) (:size thumb)) - :content-type (:mtype thumb)}))] + (sto/put-object storage + {:content (sto/content (:data thumb) (:size thumb)) + :content-type (:mtype thumb) + :reference :file-media-object + :touched-at (dt/now)}))] - (db/exec-one! conn [sql:create-file-media-object + (db/exec-one! pool [sql:create-file-media-object (or id (uuid/next)) file-id is-local name (:id image) @@ -145,19 +168,16 @@ (sv/defmethod ::create-file-media-object-from-url [{:keys [pool storage] :as cfg} {:keys [profile-id file-id url name] :as params}] - (db/with-atomic [conn pool] - (let [file (select-file conn file-id)] - (teams/check-edition-permissions! conn profile-id (:team-id file)) - (let [mobj (download-media cfg url) - content {:filename "tempfile" - :size (:size mobj) - :tempfile (sto/get-object-path storage mobj) - :content-type (:content-type (meta mobj))} - params' (merge params {:content content - :name (or name (:filename content))})] - (-> (assoc cfg :conn conn) - (create-file-media-object params')))))) + (let [file (select-file pool file-id)] + (teams/check-edition-permissions! pool profile-id (:team-id file)) + (let [mobj (download-media cfg url) + content {:filename "tempfile" + :size (:size mobj) + :tempfile (sto/get-object-path storage mobj) + :content-type (:content-type (meta mobj))}] + (->> (merge params {:content content :name (or name (:filename content))}) + (create-file-media-object cfg))))) ;; --- Clone File Media object (Upload and create from url) @@ -189,7 +209,6 @@ :height (:height mobj) :mtype (:mtype mobj)}))) - ;; --- HELPERS (def ^:private diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index eeee57f46..4891e338d 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -18,11 +18,9 @@ [app.storage.impl :as impl] [app.storage.s3 :as ss3] [app.util.time :as dt] - [app.worker :as wrk] [clojure.spec.alpha :as s] [datoteka.core :as fs] - [integrant.core :as ig] - [promesa.exec :as px])) + [integrant.core :as ig])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Storage Module State @@ -40,7 +38,7 @@ :db ::sdb/backend)))) (defmethod ig/pre-init-spec ::storage [_] - (s/keys :req-un [::wrk/executor ::db/pool ::backends])) + (s/keys :req-un [::db/pool ::backends])) (defmethod ig/prep-key ::storage [_ {:keys [backends] :as cfg}] @@ -53,78 +51,74 @@ (assoc :backends (d/without-nils backends)))) (s/def ::storage - (s/keys :req-un [::backends ::wrk/executor ::db/pool])) + (s/keys :req-un [::backends ::db/pool])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Database Objects ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(defrecord StorageObject [id size created-at expired-at backend]) +(defrecord StorageObject [id size created-at expired-at touched-at backend]) (defn storage-object? [v] (instance? StorageObject v)) -(def ^:private - sql:insert-storage-object - "insert into storage_object (id, size, backend, metadata) - values (?, ?, ?, ?::jsonb) - returning *") +(s/def ::storage-object storage-object?) +(s/def ::storage-content impl/content?) -(def ^:private - sql:insert-storage-object-with-expiration - "insert into storage_object (id, size, backend, metadata, deleted_at) - values (?, ?, ?, ?::jsonb, ?) - returning *") -(defn- insert-object - [conn id size backend mdata expiration] - (if expiration - (db/exec-one! conn [sql:insert-storage-object-with-expiration id size backend mdata expiration]) - (db/exec-one! conn [sql:insert-storage-object id size backend mdata]))) +(defn- clone-database-object + ;; If we in this condition branch, this means we come from the + ;; clone-object, so we just need to clone it with a new backend. + [{:keys [conn backend]} object] + (let [id (uuid/random) + mdata (meta object) + result (db/insert! conn :storage-object + {:id id + :size (:size object) + :backend (name backend) + :metadata (db/tjson mdata) + :deleted-at (:expired-at object) + :touched-at (:touched-at object)})] + (assoc object + :id (:id result) + :backend backend + :created-at (:created-at result) + :touched-at (:touched-at result)))) (defn- create-database-object [{:keys [conn backend]} {:keys [content] :as object}] - (if (instance? StorageObject object) - ;; If we in this condition branch, this means we come from the - ;; clone-object, so we just need to clone it with a new backend. - (let [id (uuid/random) - mdata (meta object) - result (insert-object conn - id - (:size object) - (name backend) - (db/tjson mdata) - (:expired-at object))] - (assoc object - :id (:id result) - :backend backend - :created-at (:created-at result))) - (let [id (uuid/random) - mdata (dissoc object :content :expired-at) - result (insert-object conn - id - (count content) - (name backend) - (db/tjson mdata) - (:expired-at object))] - (StorageObject. (:id result) - (:size result) - (:created-at result) - (:deleted-at result) - backend - mdata - nil)))) + (us/assert ::storage-content content) + (let [id (uuid/random) + mdata (dissoc object :content :expired-at :touched-at) + + result (db/insert! conn :storage-object + {:id id + :size (count content) + :backend (name backend) + :metadata (db/tjson mdata) + :deleted-at (:expired-at object) + :touched-at (:touched-at object)})] + + (StorageObject. (:id result) + (:size result) + (:created-at result) + (:deleted-at result) + (:touched-at result) + backend + mdata + nil))) (def ^:private sql:retrieve-storage-object "select * from storage_object where id = ? and (deleted_at is null or deleted_at > now())") (defn row->storage-object [res] - (let [mdata (some-> (:metadata res) (db/decode-transit-pgobject))] + (let [mdata (or (some-> (:metadata res) (db/decode-transit-pgobject)) {})] (StorageObject. (:id res) (:size res) (:created-at res) (:deleted-at res) + (:touched-at res) (keyword (:backend res)) mdata nil))) @@ -142,10 +136,6 @@ (let [result (db/exec-one! conn [sql:delete-storage-object id])] (pos? (:next.jdbc/update-count result)))) -(defn- register-recheck - [{:keys [pool] :as storage} backend id] - (db/insert! pool :storage-pending {:id id :backend (name backend)})) - ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; @@ -170,17 +160,13 @@ (defn put-object "Creates a new object with the provided content." - [{:keys [pool conn backend executor] :as storage} {:keys [content] :as params}] + [{:keys [pool conn backend] :as storage} {:keys [content] :as params}] (us/assert ::storage storage) - (us/assert impl/content? content) + (us/assert ::storage-content content) + (us/assert ::us/keyword backend) (let [storage (assoc storage :conn (or conn pool)) object (create-database-object storage params)] - ;; Schedule to execute in background; in an other transaction and - ;; register the currently created storage object id for a later - ;; recheck. - (px/run! executor #(register-recheck storage backend (:id object))) - ;; Store the data finally on the underlying storage subsystem. (-> (impl/resolve-backend storage backend) (impl/put-object object content)) @@ -190,10 +176,12 @@ (defn clone-object "Creates a clone of the provided object using backend based efficient method. Always clones objects to the configured default." - [{:keys [pool conn] :as storage} object] + [{:keys [pool conn backend] :as storage} object] (us/assert ::storage storage) + (us/assert ::storage-object object) + (us/assert ::us/keyword backend) (let [storage (assoc storage :conn (or conn pool)) - object* (create-database-object storage object)] + object* (clone-database-object storage object)] (if (= (:backend object) (:backend storage)) ;; if the source and destination backends are the same, we ;; proceed to use the fast path with specific copy @@ -269,7 +257,7 @@ ;; A task responsible to permanently delete already marked as deleted ;; storage files. -(declare sql:retrieve-deleted-objects) +(declare sql:retrieve-deleted-objects-chunk) (s/def ::min-age ::dt/duration) @@ -278,44 +266,46 @@ (defmethod ig/init-key ::gc-deleted-task [_ {:keys [pool storage min-age] :as cfg}] - (letfn [(group-by-backend [rows] - (let [conj (fnil conj [])] - [(reduce (fn [acc {:keys [id backend]}] - (update acc (keyword backend) conj id)) - {} - rows) - (count rows)])) + (letfn [(retrieve-deleted-objects-chunk [conn cursor] + (let [min-age (db/interval min-age) + rows (db/exec! conn [sql:retrieve-deleted-objects-chunk min-age cursor])] + [(some-> rows peek :created-at) + (some->> (seq rows) (d/group-by' #(-> % :backend keyword) :id) seq)])) (retrieve-deleted-objects [conn] - (let [min-age (db/interval min-age) - rows (db/exec! conn [sql:retrieve-deleted-objects min-age])] - (some-> (seq rows) (group-by-backend)))) + (->> (d/iteration (fn [cursor] + (retrieve-deleted-objects-chunk conn cursor)) + :initk (dt/now) + :vf second + :kf first) + (sequence cat))) - (delete-in-bulk [conn [backend ids]] + (delete-in-bulk [conn backend ids] (let [backend (impl/resolve-backend storage backend) backend (assoc backend :conn conn)] (impl/del-objects-in-bulk backend ids)))] (fn [_] (db/with-atomic [conn pool] - (loop [n 0] - (if-let [[groups total] (retrieve-deleted-objects conn)] + (loop [total 0 + groups (retrieve-deleted-objects conn)] + (if-let [[backend ids] (first groups)] (do - (run! (partial delete-in-bulk conn) groups) - (recur (+ n ^long total))) + (delete-in-bulk conn backend ids) + (recur (+ total (count ids)) + (rest groups))) (do - (l/info :task "gc-deleted" - :hint "permanently delete items" - :count n) - {:deleted n}))))))) + (l/info :task "gc-deleted" :count total) + {:deleted total}))))))) -(def sql:retrieve-deleted-objects +(def sql:retrieve-deleted-objects-chunk "with items_part as ( select s.id from storage_object as s where s.deleted_at is not null and s.deleted_at < (now() - ?::interval) - order by s.deleted_at + and s.created_at < ? + order by s.created_at desc limit 100 ) delete from storage_object @@ -326,157 +316,102 @@ ;; Garbage Collection: Analyze touched objects ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; This task is part of the garbage collection of storage objects and -;; is responsible on analyzing the touched objects and mark them for deletion -;; if corresponds. +;; This task is part of the garbage collection of storage objects and is responsible on analyzing the touched +;; objects and mark them for deletion if corresponds. ;; -;; When file_media_object is deleted, the depending storage_object are -;; marked as touched. This means that some files that depend on a -;; concrete storage_object are no longer exists and maybe this -;; storage_object is no longer necessary and can be eligible for -;; elimination. This task periodically analyzes touched objects and -;; mark them as freeze (means that has other references and the object -;; is still valid) or deleted (no more references to this object so is -;; ready to be deleted). +;; For example: when file_media_object is deleted, the depending storage_object are marked as touched. This +;; means that some files that depend on a concrete storage_object are no longer exists and maybe this +;; storage_object is no longer necessary and can be eligible for elimination. This task periodically analyzes +;; touched objects and mark them as freeze (means that has other references and the object is still valid) or +;; deleted (no more references to this object so is ready to be deleted). -(declare sql:retrieve-touched-objects) +(declare sql:retrieve-touched-objects-chunk) +(declare sql:retrieve-file-media-object-nrefs) +(declare sql:retrieve-team-font-variant-nrefs) (defmethod ig/pre-init-spec ::gc-touched-task [_] (s/keys :req-un [::db/pool])) (defmethod ig/init-key ::gc-touched-task [_ {:keys [pool] :as cfg}] - (letfn [(group-results [rows] - (let [conj (fnil conj [])] - (reduce (fn [acc {:keys [id nrefs]}] - (if (pos? nrefs) - (update acc :to-freeze conj id) - (update acc :to-delete conj id))) - {} - rows))) + (letfn [(has-team-font-variant-nrefs? [conn id] + (-> (db/exec-one! conn [sql:retrieve-team-font-variant-nrefs id id id id]) :nrefs pos?)) - (retrieve-touched [conn] - (let [rows (db/exec! conn [sql:retrieve-touched-objects])] - (some-> (seq rows) (group-results)))) - - (mark-delete-in-bulk [conn ids] - (db/exec-one! conn ["update storage_object set deleted_at=now(), touched_at=null where id = ANY(?)" - (db/create-array conn "uuid" (into-array java.util.UUID ids))])) + (has-file-media-object-nrefs? [conn id] + (-> (db/exec-one! conn [sql:retrieve-file-media-object-nrefs id id]) :nrefs pos?)) (mark-freeze-in-bulk [conn ids] (db/exec-one! conn ["update storage_object set touched_at=null where id = ANY(?)" - (db/create-array conn "uuid" (into-array java.util.UUID ids))]))] + (db/create-array conn "uuid" ids)])) + + (mark-delete-in-bulk [conn ids] + (db/exec-one! conn ["update storage_object set deleted_at=now(), touched_at=null where id = ANY(?)" + (db/create-array conn "uuid" ids)])) + + (retrieve-touched-chunk [conn cursor] + (let [rows (->> (db/exec! conn [sql:retrieve-touched-objects-chunk cursor]) + (mapv #(d/update-when % :metadata db/decode-transit-pgobject)))] + (when (seq rows) + [(-> rows peek :created-at) + ;; NOTE: we use the :file-media-object as default value for backward compatibility because when we + ;; deploy it we can have old backend instances running in the same time as the new one and we can + ;; still have storage-objects created without reference value. And we know that if it does not + ;; have value, it means :file-media-object. + (d/group-by' #(or (-> % :metadata :reference) :file-media-object) :id rows)]))) + + (retrieve-touched [conn] + (->> (d/iteration (fn [cursor] + (retrieve-touched-chunk conn cursor)) + :initk (dt/now) + :vf second + :kf first) + (sequence cat))) + + (process-objects! [conn pred-fn ids] + (loop [to-freeze #{} + to-delete #{} + ids (seq ids)] + (if-let [id (first ids)] + (if (pred-fn conn id) + (recur (conj to-freeze id) to-delete (rest ids)) + (recur to-freeze (conj to-delete id) (rest ids))) + + (do + (some->> (seq to-freeze) (mark-freeze-in-bulk conn)) + (some->> (seq to-delete) (mark-delete-in-bulk conn)) + [(count to-freeze) (count to-delete)])))) + ] (fn [_] (db/with-atomic [conn pool] - (loop [cntf 0 - cntd 0] - (if-let [{:keys [to-delete to-freeze]} (retrieve-touched conn)] + (loop [to-freeze 0 + to-delete 0 + groups (retrieve-touched conn)] + (if-let [[reference ids] (first groups)] + (let [[f d] (case reference + :file-media-object (process-objects! conn has-file-media-object-nrefs? ids) + :team-font-variant (process-objects! conn has-team-font-variant-nrefs? ids) + (ex/raise :type :internal :code :unexpected-unknown-reference))] + (recur (+ to-freeze f) + (+ to-delete d) + (rest groups))) (do - (when (seq to-delete) (mark-delete-in-bulk conn to-delete)) - (when (seq to-freeze) (mark-freeze-in-bulk conn to-freeze)) - (recur (+ cntf (count to-freeze)) - (+ cntd (count to-delete)))) - (do - (l/info :task "gc-touched" - :hint "mark freeze" - :count cntf) - (l/info :task "gc-touched" - :hint "mark for deletion" - :count cntd) - {:freeze cntf :delete cntd}))))))) + (l/info :task "gc-touched" :to-freeze to-freeze :to-delete to-delete) + {:freeze to-freeze :delete to-delete}))))))) -(def sql:retrieve-touched-objects - "select so.id, - ((select count(*) from file_media_object where media_id = so.id) + - (select count(*) from file_media_object where thumbnail_id = so.id)) as nrefs - from storage_object as so +(def sql:retrieve-touched-objects-chunk + "select so.* from storage_object as so where so.touched_at is not null - order by so.touched_at - limit 100;") + and so.created_at < ? + order by so.created_at desc + limit 500;") -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -;; Recheck Stalled Task -;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(def sql:retrieve-file-media-object-nrefs + "select ((select count(*) from file_media_object where media_id = ?) + + (select count(*) from file_media_object where thumbnail_id = ?)) as nrefs") -;; Because the physical storage (filesystem, s3, ... except db) is not -;; transactional, in some situations we can found physical object -;; leakage. That situations happens when the transaction that writes -;; the file aborts, leaving the file written to the underlying storage -;; but the reference on the database is lost with the rollback. -;; -;; For this situations we need to write a "log" of inserted files that -;; are checked in some time in future. If physical file exists but the -;; database refence does not exists means that leaked file is found -;; and is immediately deleted. The responsibility of this task is -;; check that write log for possible leaked files. - -(def recheck-min-age (dt/duration {:hours 1})) - -(declare sql:retrieve-pending-to-recheck) -(declare sql:exists-storage-object) - -(defmethod ig/pre-init-spec ::recheck-task [_] - (s/keys :req-un [::storage ::db/pool])) - -(defmethod ig/init-key ::recheck-task - [_ {:keys [pool storage] :as cfg}] - (letfn [(group-results [rows] - (let [conj (fnil conj [])] - (reduce (fn [acc {:keys [id exist] :as row}] - (cond-> (update acc :all conj id) - (false? exist) - (update :to-delete conj (dissoc row :exist)))) - {} - rows))) - - (group-by-backend [rows] - (let [conj (fnil conj [])] - (reduce (fn [acc {:keys [id backend]}] - (update acc (keyword backend) conj id)) - {} - rows))) - - (retrieve-pending [conn] - (let [rows (db/exec! conn [sql:retrieve-pending-to-recheck (db/interval recheck-min-age)])] - (some-> (seq rows) (group-results)))) - - (delete-group [conn [backend ids]] - (let [backend (impl/resolve-backend storage backend) - backend (assoc backend :conn conn)] - (impl/del-objects-in-bulk backend ids))) - - (delete-all [conn ids] - (let [ids (db/create-array conn "uuid" (into-array java.util.UUID ids))] - (db/exec-one! conn ["delete from storage_pending where id = ANY(?)" ids])))] - - (fn [_] - (db/with-atomic [conn pool] - (loop [n 0 d 0] - (if-let [{:keys [all to-delete]} (retrieve-pending conn)] - (let [groups (group-by-backend to-delete)] - (run! (partial delete-group conn) groups) - (delete-all conn all) - (recur (+ n (count all)) - (+ d (count to-delete)))) - (do - (l/info :task "recheck" - :hint "recheck items" - :processed n - :deleted d) - {:processed n :deleted d}))))))) - -(def sql:retrieve-pending-to-recheck - "select sp.id, - sp.backend, - sp.created_at, - (case when count(so.id) > 0 then true - else false - end) as exist - from storage_pending as sp - left join storage_object as so - on (so.id = sp.id) - where sp.created_at < now() - ?::interval - group by 1,2,3 - order by sp.created_at asc - limit 100") +(def sql:retrieve-team-font-variant-nrefs + "select ((select count(*) from team_font_variant where woff1_file_id = ?) + + (select count(*) from team_font_variant where woff2_file_id = ?) + + (select count(*) from team_font_variant where otf_file_id = ?) + + (select count(*) from team_font_variant where ttf_file_id = ?)) as nrefs") diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index 10c5710e0..22b3d88bd 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -56,9 +56,10 @@ (s/def ::region #{:eu-central-1}) (s/def ::bucket ::us/string) (s/def ::prefix ::us/string) +(s/def ::endpoint ::us/string) (defmethod ig/pre-init-spec ::backend [_] - (s/keys :opt-un [::region ::bucket ::prefix])) + (s/keys :opt-un [::region ::bucket ::prefix ::endpoint])) (defmethod ig/prep-key ::backend [_ {:keys [prefix] :as cfg}] @@ -119,20 +120,31 @@ (defn- ^Region lookup-region [region] - (case region - :eu-central-1 Region/EU_CENTRAL_1)) + (Region/of (name region))) (defn build-s3-client - [{:keys [region]}] - (.. (S3Client/builder) - (region (lookup-region region)) - (build))) + [{:keys [region endpoint]}] + (if (string? endpoint) + (let [uri (java.net.URI. endpoint)] + (.. (S3Client/builder) + (endpointOverride uri) + (region (lookup-region region)) + (build))) + (.. (S3Client/builder) + (region (lookup-region region)) + (build)))) (defn build-s3-presigner - [{:keys [region]}] - (.. (S3Presigner/builder) - (region (lookup-region region)) - (build))) + [{:keys [region endpoint]}] + (if (string? endpoint) + (let [uri (java.net.URI. endpoint)] + (.. (S3Presigner/builder) + (endpointOverride uri) + (region (lookup-region region)) + (build))) + (.. (S3Presigner/builder) + (region (lookup-region region)) + (build)))) (defn put-object [{:keys [client bucket prefix]} {:keys [id] :as object} content] diff --git a/backend/test/app/services_files_test.clj b/backend/test/app/services_files_test.clj index 4c41ee557..dc96c2cba 100644 --- a/backend/test/app/services_files_test.clj +++ b/backend/test/app/services_files_test.clj @@ -174,6 +174,14 @@ :type :image :metadata {:id (:id fmo1)}}}]})] + + + ;; If we launch gc-touched-task, we should have 4 items to freeze. + (let [task (:app.storage/gc-touched-task th/*system*) + res (task {})] + (t/is (= 4 (:freeze res))) + (t/is (= 0 (:delete res)))) + ;; run the task immediately (let [task (:app.tasks.file-media-gc/handler th/*system*) res (task {})] @@ -202,16 +210,22 @@ (t/is (some? (sto/get-object storage (:media-id fmo1)))) (t/is (some? (sto/get-object storage (:thumbnail-id fmo1)))) - ;; but if we pass the touched gc task two of them should disappear + ;; now, we have deleted the unused file-media-object, if we + ;; execute the touched-gc task, we should see that two of them + ;; are marked to be deleted. (let [task (:app.storage/gc-touched-task th/*system*) res (task {})] (t/is (= 0 (:freeze res))) - (t/is (= 2 (:delete res))) + (t/is (= 2 (:delete res)))) - (t/is (nil? (sto/get-object storage (:media-id fmo2)))) - (t/is (nil? (sto/get-object storage (:thumbnail-id fmo2)))) - (t/is (some? (sto/get-object storage (:media-id fmo1)))) - (t/is (some? (sto/get-object storage (:thumbnail-id fmo1))))) + + ;; Finally, check that some of the objects that are marked as + ;; deleted we are unable to retrieve them using standard storage + ;; public api. + (t/is (nil? (sto/get-object storage (:media-id fmo2)))) + (t/is (nil? (sto/get-object storage (:thumbnail-id fmo2)))) + (t/is (some? (sto/get-object storage (:media-id fmo1)))) + (t/is (some? (sto/get-object storage (:thumbnail-id fmo1)))) ))) diff --git a/backend/test/app/services_management_test.clj b/backend/test/app/services_management_test.clj index 7d237b5d1..eb9c28f73 100644 --- a/backend/test/app/services_management_test.clj +++ b/backend/test/app/services_management_test.clj @@ -11,6 +11,7 @@ [app.http :as http] [app.storage :as sto] [app.test-helpers :as th] + [app.storage-test :refer [configure-storage-backend]] [clojure.test :as t] [buddy.core.bytes :as b] [datoteka.core :as fs])) @@ -19,7 +20,9 @@ (t/use-fixtures :each th/database-reset) (t/deftest duplicate-file - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) + sobject (sto/put-object storage {:content (sto/content "content") :content-type "text/plain" :other "data"}) @@ -90,7 +93,8 @@ )))) (t/deftest duplicate-file-with-deleted-rels - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) sobject (sto/put-object storage {:content (sto/content "content") :content-type "text/plain" :other "data"}) @@ -151,7 +155,9 @@ )))) (t/deftest duplicate-project - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) + sobject (sto/put-object storage {:content (sto/content "content") :content-type "text/plain" :other "data"}) @@ -221,7 +227,8 @@ ))))) (t/deftest duplicate-project-with-deleted-files - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) sobject (sto/put-object storage {:content (sto/content "content") :content-type "text/plain" :other "data"}) diff --git a/backend/test/app/storage_test.clj b/backend/test/app/storage_test.clj index 23777215b..a7353a65d 100644 --- a/backend/test/app/storage_test.clj +++ b/backend/test/app/storage_test.clj @@ -7,6 +7,7 @@ (ns app.storage-test (:require [app.common.exceptions :as ex] + [app.common.uuid :as uuid] [app.db :as db] [app.storage :as sto] [app.test-helpers :as th] @@ -22,9 +23,19 @@ th/database-reset th/clean-storage)) +(defn configure-storage-backend + "Given storage map, returns a storage configured with the appropriate + backend for assets." + ([storage] + (assoc storage :backend :tmp)) + ([storage conn] + (-> storage + (assoc :conn conn) + (assoc :backend :tmp)))) (t/deftest put-and-retrieve-object - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) content (sto/content "content") object (sto/put-object storage {:content content :content-type "text/plain" @@ -39,9 +50,9 @@ (t/is (= "content" (slurp (sto/get-object-path storage object)))) )) - (t/deftest put-and-retrieve-expired-object - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) content (sto/content "content") object (sto/put-object storage {:content content :content-type "text/plain" @@ -59,7 +70,8 @@ )) (t/deftest put-and-delete-object - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) content (sto/content "content") object (sto/put-object storage {:content content :content-type "text/plain" @@ -79,7 +91,8 @@ )) (t/deftest test-deleted-gc-task - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) content (sto/content "content") object1 (sto/put-object storage {:content content :content-type "text/plain" @@ -96,14 +109,17 @@ (let [res (db/exec-one! th/*pool* ["select count(*) from storage_object;"])] (t/is (= 1 (:count res)))))) -(t/deftest test-touched-gc-task - (let [storage (:app.storage/storage th/*system*) +(t/deftest test-touched-gc-task-1 + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) prof (th/create-profile* 1) proj (th/create-project* 1 {:profile-id (:id prof) :team-id (:default-team-id prof)}) + file (th/create-file* 1 {:profile-id (:id prof) :project-id (:default-project-id prof) :is-shared false}) + mfile {:filename "sample.jpg" :tempfile (th/tempfile "app/test_files/sample.jpg") :content-type "image/jpeg" @@ -140,12 +156,12 @@ ;; now check if the storage objects are touched (let [res (db/exec-one! th/*pool* ["select count(*) from storage_object where touched_at is not null"])] - (t/is (= 2 (:count res)))) + (t/is (= 4 (:count res)))) ;; run the touched gc task (let [task (:app.storage/gc-touched-task th/*system*) res (task {})] - (t/is (= 0 (:freeze res))) + (t/is (= 2 (:freeze res))) (t/is (= 2 (:delete res)))) ;; now check that there are no touched objects @@ -157,8 +173,85 @@ (t/is (= 2 (:count res)))) ))) + +(t/deftest test-touched-gc-task-2 + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) + prof (th/create-profile* 1 {:is-active true}) + team-id (:default-team-id prof) + proj-id (:default-project-id prof) + font-id (uuid/custom 10 1) + + proj (th/create-project* 1 {:profile-id (:id prof) + :team-id team-id}) + + file (th/create-file* 1 {:profile-id (:id prof) + :project-id proj-id + :is-shared false}) + + ttfdata (-> (io/resource "app/test_files/font-1.ttf") + (fs/slurp-bytes)) + + mfile {:filename "sample.jpg" + :tempfile (th/tempfile "app/test_files/sample.jpg") + :content-type "image/jpeg" + :size 312043} + + params1 {::th/type :upload-file-media-object + :profile-id (:id prof) + :file-id (:id file) + :is-local true + :name "testfile" + :content mfile} + + params2 {::th/type :create-font-variant + :profile-id (:id prof) + :team-id team-id + :font-id font-id + :font-family "somefont" + :font-weight 400 + :font-style "normal" + :data {"font/ttf" ttfdata}} + + out1 (th/mutation! params1) + out2 (th/mutation! params2)] + + ;; (th/print-result! out) + + (t/is (nil? (:error out1))) + (t/is (nil? (:error out2))) + + ;; run the touched gc task + (let [task (:app.storage/gc-touched-task th/*system*) + res (task {})] + (t/is (= 6 (:freeze res))) + (t/is (= 0 (:delete res))) + + (let [result-1 (:result out1) + result-2 (:result out2)] + + ;; now we proceed to manually delete one team-font-variant + (db/exec-one! th/*pool* ["delete from team_font_variant where id = ?" (:id result-2)]) + + ;; revert touched state to all storage objects + (db/exec-one! th/*pool* ["update storage_object set touched_at=now()"]) + + ;; Run the task again + (let [res (task {})] + (t/is (= 2 (:freeze res))) + (t/is (= 4 (:delete res)))) + + ;; now check that there are no touched objects + (let [res (db/exec-one! th/*pool* ["select count(*) from storage_object where touched_at is not null"])] + (t/is (= 0 (:count res)))) + + ;; now check that all objects are marked to be deleted + (let [res (db/exec-one! th/*pool* ["select count(*) from storage_object where deleted_at is not null"])] + (t/is (= 4 (:count res)))))))) + (t/deftest test-touched-gc-task-without-delete - (let [storage (:app.storage/storage th/*system*) + (let [storage (-> (:app.storage/storage th/*system*) + (configure-storage-backend)) prof (th/create-profile* 1) proj (th/create-project* 1 {:profile-id (:id prof) :team-id (:default-team-id prof)}) @@ -198,72 +291,3 @@ ;; check that we have all object in the db (let [res (db/exec-one! th/*pool* ["select count(*) from storage_object where deleted_at is null"])] (t/is (= 4 (:count res))))))) - - -;; Recheck is the mechanism for delete leaked resources on -;; transaction failure. - -(t/deftest test-recheck - (let [storage (:app.storage/storage th/*system*) - content (sto/content "content") - object (sto/put-object storage {:content content - :content-type "text/plain"})] - ;; Sleep fo 50ms - (th/sleep 50) - - (let [rows (db/exec! th/*pool* ["select * from storage_pending"])] - (t/is (= 1 (count rows))) - (t/is (= (:id object) (:id (first rows))))) - - ;; Artificially make all storage_pending object 1 hour older. - (db/exec-one! th/*pool* ["update storage_pending set created_at = created_at - '1 hour'::interval"]) - - ;; Sleep fo 50ms - (th/sleep 50) - - ;; Run recheck task - (let [task (:app.storage/recheck-task th/*system*) - res (task {})] - (t/is (= 1 (:processed res))) - (t/is (= 0 (:deleted res)))) - - ;; After recheck task, storage-pending table should be empty - (let [rows (db/exec! th/*pool* ["select * from storage_pending"])] - (t/is (= 0 (count rows)))))) - -(t/deftest test-recheck-with-rollback - (let [storage (:app.storage/storage th/*system*) - content (sto/content "content")] - - ;; check with aborted transaction - (ex/ignoring - (db/with-atomic [conn th/*pool*] - (let [storage (assoc storage :conn conn)] ; make participate storage in the transaction - (sto/put-object storage {:content content - :content-type "text/plain"}) - (throw (ex-info "expected" {}))))) - - ;; let a 200ms window for recheck registration thread - ;; completion before proceed. - (th/sleep 200) - - ;; storage_pending table should have the object - ;; registered independently of the aborted transaction. - (let [rows (db/exec! th/*pool* ["select * from storage_pending"])] - (t/is (= 1 (count rows)))) - - ;; Artificially make all storage_pending object 1 hour older. - (db/exec-one! th/*pool* ["update storage_pending set created_at = created_at - '1 hour'::interval"]) - - ;; Sleep fo 50ms - (th/sleep 50) - - ;; Run recheck task - (let [task (:app.storage/recheck-task th/*system*) - res (task {})] - (t/is (= 1 (:processed res))) - (t/is (= 1 (:deleted res)))) - - ;; After recheck task, storage-pending table should be empty - (let [rows (db/exec! th/*pool* ["select * from storage_pending"])] - (t/is (= 0 (count rows)))))) diff --git a/backend/test/app/test_helpers.clj b/backend/test/app/test_helpers.clj index 9161296d1..2807e6f3b 100644 --- a/backend/test/app/test_helpers.clj +++ b/backend/test/app/test_helpers.clj @@ -52,7 +52,6 @@ (assoc-in [:app.db/pool :uri] (:database-uri config)) (assoc-in [:app.db/pool :username] (:database-username config)) (assoc-in [:app.db/pool :password] (:database-password config)) - (assoc-in [[:app.main/main :app.storage.fs/backend] :directory] "/tmp/app/storage") (dissoc :app.srepl/server :app.http/server :app.http/router @@ -65,8 +64,7 @@ :app.worker/scheduler :app.worker/worker) (d/deep-merge - {:app.storage/storage {:backend :tmp} - :app.tasks.file-media-gc/handler {:max-age (dt/duration 300)}})) + {:app.tasks.file-media-gc/handler {:max-age (dt/duration 300)}})) _ (ig/load-namespaces config) system (-> (ig/prep config) (ig/init))] diff --git a/common/src/app/common/data.cljc b/common/src/app/common/data.cljc index ad167a321..dcd0cf368 100644 --- a/common/src/app/common/data.cljc +++ b/common/src/app/common/data.cljc @@ -6,7 +6,7 @@ (ns app.common.data "Data manipulation and query helper functions." - (:refer-clojure :exclude [read-string hash-map merge name parse-double]) + (:refer-clojure :exclude [read-string hash-map merge name parse-double group-by]) #?(:cljs (:require-macros [app.common.data])) (:require @@ -609,3 +609,70 @@ (if (or (keyword? k) (string? k)) [(keyword (str/kebab (name k))) v] [k v]))))) + + +(defn group-by + ([kf coll] (group-by kf identity coll)) + ([kf vf coll] + (let [conj (fnil conj [])] + (reduce (fn [result item] + (update result (kf item) conj (vf item))) + {} + coll)))) + +(defn group-by' + "A variant of group-by that uses a set for collecting results." + ([kf coll] (group-by kf identity coll)) + ([kf vf coll] + (let [conj (fnil conj #{})] + (reduce (fn [result item] + (update result (kf item) conj (vf item))) + {} + coll)))) + +;; TEMPORAL COPY of clojure-1.11 iteration function, should be +;; replaced with the builtin on when stable version is released. + +#?(:clj + (defn iteration + "Creates a seqable/reducible via repeated calls to step, + a function of some (continuation token) 'k'. The first call to step + will be passed initk, returning 'ret'. Iff (somef ret) is true, + (vf ret) will be included in the iteration, else iteration will + terminate and vf/kf will not be called. If (kf ret) is non-nil it + will be passed to the next step call, else iteration will terminate. + This can be used e.g. to consume APIs that return paginated or batched data. + step - (possibly impure) fn of 'k' -> 'ret' + :somef - fn of 'ret' -> logical true/false, default 'some?' + :vf - fn of 'ret' -> 'v', a value produced by the iteration, default 'identity' + :kf - fn of 'ret' -> 'next-k' or nil (signaling 'do not continue'), default 'identity' + :initk - the first value passed to step, default 'nil' + It is presumed that step with non-initk is unreproducible/non-idempotent. + If step with initk is unreproducible it is on the consumer to not consume twice." + {:added "1.11"} + [step & {:keys [somef vf kf initk] + :or {vf identity + kf identity + somef some? + initk nil}}] + (reify + clojure.lang.Seqable + (seq [_] + ((fn next [ret] + (when (somef ret) + (cons (vf ret) + (when-some [k (kf ret)] + (lazy-seq (next (step k))))))) + (step initk))) + clojure.lang.IReduceInit + (reduce [_ rf init] + (loop [acc init + ret (step initk)] + (if (somef ret) + (let [acc (rf acc (vf ret))] + (if (reduced? acc) + @acc + (if-some [k (kf ret)] + (recur acc (step k)) + acc))) + acc)))))) diff --git a/docker/devenv/docker-compose.yaml b/docker/devenv/docker-compose.yaml index ee6b1d7df..5b92d0ed4 100644 --- a/docker/devenv/docker-compose.yaml +++ b/docker/devenv/docker-compose.yaml @@ -10,6 +10,7 @@ networks: volumes: postgres_data: user_data: + minio_data: services: main: @@ -66,6 +67,22 @@ services: - PENPOT_LDAP_ATTRS_FULLNAME=cn - PENPOT_LDAP_ATTRS_PHOTO=jpegPhoto + minio: + profiles: ["full"] + image: "minio/minio:latest" + command: minio server /mnt/data --console-address ":9001" + + volumes: + - "minio_data:/mnt/data" + + environment: + - MINIO_ROOT_USER=minioadmin + - MINIO_ROOT_PASSWORD=minioadmin + + ports: + - 9000:9000 + - 9001:9001 + backend: profiles: ["backend"] privileged: true @@ -91,6 +108,7 @@ services: environment: - EXTERNAL_UID=${CURRENT_USER_ID} - PENPOT_SECRET_KEY=super-secret-devenv-key + # SMTP setup - PENPOT_SMTP_ENABLED=true - PENPOT_SMTP_DEFAULT_FROM=no-reply@example.com