0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-18 10:41:29 -05:00

Merge pull request #4958 from penpot/niwinz-fdata-storage-offload

🎉 Add file-data offload mechanism
This commit is contained in:
Alejandro 2024-08-12 10:59:29 +02:00 committed by GitHub
commit 280252d40e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
40 changed files with 737 additions and 388 deletions

View file

@ -27,6 +27,7 @@ export PENPOT_FLAGS="\
enable-file-snapshot \
enable-webhooks \
enable-access-tokens \
enable-tiered-file-data-storage \
enable-file-validation \
enable-file-schema-validation";
@ -62,9 +63,10 @@ mc mb penpot-s3/penpot -p -q
export AWS_ACCESS_KEY_ID=penpot-devenv
export AWS_SECRET_ACCESS_KEY=penpot-devenv
export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3
export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000
export PENPOT_STORAGE_ASSETS_S3_BUCKET=penpot
export PENPOT_OBJECTS_STORAGE_BACKEND=s3
export PENPOT_OBJECTS_STORAGE_S3_ENDPOINT=http://minio:9000
export PENPOT_OBJECTS_STORAGE_S3_BUCKET=penpot
export OPTIONS="
-A:jmx-remote -A:dev \

View file

@ -19,6 +19,7 @@ export PENPOT_FLAGS="\
enable-smtp \
enable-file-snapshot \
enable-access-tokens \
enable-tiered-file-data-storage \
enable-file-validation \
enable-file-schema-validation";
@ -56,9 +57,9 @@ mc mb penpot-s3/penpot -p -q
export AWS_ACCESS_KEY_ID=penpot-devenv
export AWS_SECRET_ACCESS_KEY=penpot-devenv
export PENPOT_ASSETS_STORAGE_BACKEND=assets-s3
export PENPOT_STORAGE_ASSETS_S3_ENDPOINT=http://minio:9000
export PENPOT_STORAGE_ASSETS_S3_BUCKET=penpot
export PENPOT_OBJECTS_STORAGE_BACKEND=s3
export PENPOT_OBJECTS_STORAGE_S3_ENDPOINT=http://minio:9000
export PENPOT_OBJECTS_STORAGE_S3_BUCKET=penpot
entrypoint=${1:-app.main};

View file

@ -22,7 +22,6 @@
[app.db :as db]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.media :as media]
[app.rpc :as-alias rpc]
[app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc]
@ -403,9 +402,9 @@
(write-obj! output rels)))
(defmethod write-section :v1/sobjects
[{:keys [::sto/storage ::output]}]
[{:keys [::output] :as cfg}]
(let [sids (-> bfc/*state* deref :sids)
storage (media/configure-assets-storage storage)]
storage (sto/resolve cfg)]
(l/dbg :hint "found sobjects"
:items (count sids)
@ -620,8 +619,8 @@
::l/sync? true))))))
(defmethod read-section :v1/sobjects
[{:keys [::sto/storage ::db/conn ::input ::bfc/overwrite ::bfc/timestamp]}]
(let [storage (media/configure-assets-storage storage)
[{:keys [::db/conn ::input ::bfc/overwrite ::bfc/timestamp] :as cfg}]
(let [storage (sto/resolve cfg)
ids (read-obj! input)
thumb? (into #{} (map :media-id) (:thumbnails @bfc/*state*))]

View file

@ -20,7 +20,6 @@
[app.db.sql :as sql]
[app.loggers.audit :as-alias audit]
[app.loggers.webhooks :as-alias webhooks]
[app.media :as media]
[app.storage :as sto]
[app.storage.tmp :as tmp]
[app.util.events :as events]
@ -347,9 +346,7 @@
[cfg team-id]
(let [id (uuid/next)
tp (dt/tpoint)
cfg (-> (create-database cfg)
(update ::sto/storage media/configure-assets-storage))]
cfg (create-database cfg)]
(l/inf :hint "start"
:operation "export"
@ -390,7 +387,6 @@
tp (dt/tpoint)
cfg (-> (create-database cfg path)
(update ::sto/storage media/configure-assets-storage)
(assoc ::bfc/timestamp (dt/now)))]
(l/inf :hint "start"

View file

@ -52,8 +52,8 @@
:redis-uri "redis://redis/0"
:assets-storage-backend :assets-fs
:storage-assets-fs-directory "assets"
:objects-storage-backend "fs"
:objects-storage-fs-directory "assets"
:assets-path "/internal/assets/"
:smtp-default-reply-to "Penpot <no-reply@example.com>"
@ -207,16 +207,24 @@
[:prepl-host {:optional true} :string]
[:prepl-port {:optional true} :int]
[:assets-storage-backend {:optional true} :keyword]
[:media-directory {:optional true} :string] ;; REVIEW
[:media-uri {:optional true} :string]
[:assets-path {:optional true} :string]
;; Legacy, will be removed in 2.5
[:assets-storage-backend {:optional true} :keyword]
[:storage-assets-fs-directory {:optional true} :string]
[:storage-assets-s3-bucket {:optional true} :string]
[:storage-assets-s3-region {:optional true} :keyword]
[:storage-assets-s3-endpoint {:optional true} :string]
[:storage-assets-s3-io-threads {:optional true} :int]]))
[:storage-assets-s3-io-threads {:optional true} :int]
[:objects-storage-backend {:optional true} :keyword]
[:objects-storage-fs-directory {:optional true} :string]
[:objects-storage-s3-bucket {:optional true} :string]
[:objects-storage-s3-region {:optional true} :keyword]
[:objects-storage-s3-endpoint {:optional true} :string]
[:objects-storage-s3-io-threads {:optional true} :int]]))
(def default-flags
[:enable-backend-api-doc

View file

@ -153,7 +153,7 @@
(s/def ::conn some?)
(s/def ::nilable-pool (s/nilable ::pool))
(s/def ::pool pool?)
(s/def ::pool-or-conn some?)
(s/def ::connectable some?)
(defn closed?
[pool]

View file

@ -62,6 +62,7 @@
[datoteka.io :as io]
[promesa.util :as pu]))
(def ^:dynamic *stats*
"A dynamic var for setting up state for collect stats globally."
nil)
@ -1742,7 +1743,7 @@
:validate validate?
:skip-on-graphic-error skip-on-graphic-error?)
(db/tx-run! (update system ::sto/storage media/configure-assets-storage)
(db/tx-run! system
(fn [system]
(binding [*system* system]
(when (string? label)

View file

@ -12,6 +12,7 @@
[app.common.logging :as l]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.storage :as sto]
[app.util.blob :as blob]
[app.util.objects-map :as omap]
[app.util.pointer-map :as pmap]))
@ -55,12 +56,28 @@
;; POINTER-MAP
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn get-file-data
"Get file data given a file instance."
[system {:keys [data-backend data-ref-id] :as file} & {:keys [touch]}]
(if (= data-backend "objects-storage")
(let [storage (sto/resolve system ::db/reuse-conn true)
object (sto/get-object storage data-ref-id)]
(when touch (sto/touch-object! storage data-ref-id))
(sto/get-object-bytes storage object))
(:data file)))
(defn resolve-file-data
[system file & {:as opts}]
(let [data (get-file-data system file opts)]
(assoc file :data data)))
(defn load-pointer
"A database loader pointer helper"
[system file-id id]
(let [fragment (db/get* system :file-data-fragment
{:id id :file-id file-id}
{::sql/columns [:data]})]
{::sql/columns [:data :data-backend :data-ref-id :id]})]
(l/trc :hint "load pointer"
:file-id (str file-id)
@ -74,7 +91,9 @@
:file-id file-id
:fragment-id id))
(blob/decode (:data fragment))))
(let [data (get-file-data system fragment)]
;; FIXME: conditional thread scheduling for decoding big objects
(blob/decode data))))
(defn persist-pointers!
"Persist all currently tracked pointer objects"

View file

@ -57,11 +57,10 @@
(defn- serve-object
"Helper function that returns the appropriate response depending on
the storage object backend type."
[{:keys [::sto/storage] :as cfg} {:keys [backend] :as obj}]
(let [backend (sto/resolve-backend storage backend)]
(case (::sto/type backend)
:s3 (serve-object-from-s3 cfg obj)
:fs (serve-object-from-fs cfg obj))))
[cfg {:keys [backend] :as obj}]
(case backend
(:s3 :assets-s3) (serve-object-from-s3 cfg obj)
(:fs :assets-fs) (serve-object-from-fs cfg obj)))
(defn objects-handler
"Handler that servers storage objects by id."

View file

@ -344,6 +344,8 @@
{:sendmail (ig/ref ::email/handler)
:objects-gc (ig/ref :app.tasks.objects-gc/handler)
:file-gc (ig/ref :app.tasks.file-gc/handler)
:file-gc-scheduler (ig/ref :app.tasks.file-gc-scheduler/handler)
:offload-file-data (ig/ref :app.tasks.offload-file-data/handler)
:file-xlog-gc (ig/ref :app.tasks.file-xlog-gc/handler)
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler)
@ -394,6 +396,13 @@
{::db/pool (ig/ref ::db/pool)
::sto/storage (ig/ref ::sto/storage)}
:app.tasks.file-gc-scheduler/handler
{::db/pool (ig/ref ::db/pool)}
:app.tasks.offload-file-data/handler
{::db/pool (ig/ref ::db/pool)
::sto/storage (ig/ref ::sto/storage)}
:app.tasks.file-xlog-gc/handler
{::db/pool (ig/ref ::db/pool)}
@ -448,17 +457,28 @@
::sto/storage
{::db/pool (ig/ref ::db/pool)
::sto/backends
{:assets-s3 (ig/ref [::assets :app.storage.s3/backend])
:assets-fs (ig/ref [::assets :app.storage.fs/backend])}}
{:s3 (ig/ref :app.storage.s3/backend)
:fs (ig/ref :app.storage.fs/backend)
[::assets :app.storage.s3/backend]
{::sto.s3/region (cf/get :storage-assets-s3-region)
::sto.s3/endpoint (cf/get :storage-assets-s3-endpoint)
::sto.s3/bucket (cf/get :storage-assets-s3-bucket)
::sto.s3/io-threads (cf/get :storage-assets-s3-io-threads)}
;; LEGACY (should not be removed, can only be removed after an
;; explicit migration because the database objects/rows will
;; still reference the old names).
:assets-s3 (ig/ref :app.storage.s3/backend)
:assets-fs (ig/ref :app.storage.fs/backend)}}
[::assets :app.storage.fs/backend]
{::sto.fs/directory (cf/get :storage-assets-fs-directory)}})
:app.storage.s3/backend
{::sto.s3/region (or (cf/get :storage-assets-s3-region)
(cf/get :objects-storage-s3-region))
::sto.s3/endpoint (or (cf/get :storage-assets-s3-endpoint)
(cf/get :objects-storage-s3-endpoint))
::sto.s3/bucket (or (cf/get :storage-assets-s3-bucket)
(cf/get :objects-storage-s3-bucket))
::sto.s3/io-threads (or (cf/get :storage-assets-s3-io-threads)
(cf/get :objects-storage-s3-io-threads))}
:app.storage.fs/backend
{::sto.fs/directory (or (cf/get :storage-assets-fs-directory)
(cf/get :objects-storage-fs-directory))}})
(def worker-config
@ -485,7 +505,7 @@
:task :tasks-gc}
{:cron #app/cron "0 0 2 * * ?" ;; daily
:task :file-gc}
:task :file-gc-scheduler}
{:cron #app/cron "0 30 */3,23 * * ?"
:task :telemetry}

View file

@ -313,17 +313,3 @@
(= stype :ttf)
(-> (assoc "font/otf" (ttf->otf sfnt))
(assoc "font/ttf" sfnt)))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Utility functions
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn configure-assets-storage
"Given storage map, returns a storage configured with the appropriate
backend for assets and optional connection attached."
([storage]
(assoc storage ::sto/backend (cf/get :assets-storage-backend :assets-fs)))
([storage pool-or-conn]
(-> (configure-assets-storage storage)
(assoc ::db/pool-or-conn pool-or-conn))))

View file

@ -382,7 +382,22 @@
:fn (mg/resource "app/migrations/sql/0120-mod-audit-log-table.sql")}
{:name "0121-mod-file-data-fragment-table"
:fn (mg/resource "app/migrations/sql/0121-mod-file-data-fragment-table.sql")}])
:fn (mg/resource "app/migrations/sql/0121-mod-file-data-fragment-table.sql")}
{:name "0122-mod-file-table"
:fn (mg/resource "app/migrations/sql/0122-mod-file-table.sql")}
{:name "0122-mod-file-data-fragment-table"
:fn (mg/resource "app/migrations/sql/0122-mod-file-data-fragment-table.sql")}
{:name "0123-mod-file-change-table"
:fn (mg/resource "app/migrations/sql/0123-mod-file-change-table.sql")}
{:name "0124-mod-profile-table"
:fn (mg/resource "app/migrations/sql/0124-mod-profile-table.sql")}
{:name "0125-mod-file-table"
:fn (mg/resource "app/migrations/sql/0125-mod-file-table.sql")}])
(defn apply-migrations!
[pool name migrations]

View file

@ -0,0 +1,6 @@
ALTER TABLE file_data_fragment
ADD COLUMN data_backend text NULL,
ADD COLUMN data_ref_id uuid NULL;
CREATE INDEX IF NOT EXISTS file_data_fragment__data_ref_id__idx
ON file_data_fragment (data_ref_id);

View file

@ -0,0 +1,6 @@
ALTER TABLE file_data_fragment
ADD COLUMN data_backend text NULL,
ADD COLUMN data_ref_id uuid NULL;
CREATE INDEX IF NOT EXISTS file_data_fragment__data_ref_id__idx
ON file_data_fragment (data_ref_id);

View file

@ -0,0 +1,4 @@
ALTER TABLE file ADD COLUMN data_ref_id uuid NULL;
CREATE INDEX IF NOT EXISTS file__data_ref_id__idx
ON file (data_ref_id);

View file

@ -0,0 +1,2 @@
CREATE INDEX IF NOT EXISTS file_change__created_at__label__idx
ON file_change (created_at, label);

View file

@ -0,0 +1,2 @@
CREATE INDEX profile__props__newsletter1__idx ON profile (email) WHERE props->>'~:newsletter-news' = 'true';
CREATE INDEX profile__props__newsletter2__idx ON profile (email) WHERE props->>'~:newsletter-updates' = 'true';

View file

@ -0,0 +1,3 @@
--- This setting allow to optimize the table for heavy write workload
--- leaving space on the page for HOT updates
ALTER TABLE file SET (FILLFACTOR=50);

View file

@ -522,7 +522,6 @@
(create-recovery-token)
(send-email-notification conn)))))))
(def schema:request-profile-recovery
[:map {:title "request-profile-recovery"}
[:email ::sm/email]])

View file

@ -68,6 +68,9 @@
:max-version fmg/version))
file))
;; --- FILE DATA
;; --- FILE PERMISSIONS
(def ^:private sql:file-permissions
@ -258,11 +261,12 @@
(let [params (merge {:id id}
(when (some? project-id)
{:project-id project-id}))
file (-> (db/get conn :file params
{::db/check-deleted (not include-deleted?)
::db/remove-deleted (not include-deleted?)
::sql/for-update lock-for-update?})
(decode-row))]
file (->> (db/get conn :file params
{::db/check-deleted (not include-deleted?)
::db/remove-deleted (not include-deleted?)
::sql/for-update lock-for-update?})
(feat.fdata/resolve-file-data cfg)
(decode-row))]
(if (and migrate? (fmg/need-migration? file))
(migrate-file cfg file)
file)))
@ -328,8 +332,10 @@
(defn- get-file-fragment
[cfg file-id fragment-id]
(some-> (db/get cfg :file-data-fragment {:file-id file-id :id fragment-id})
(update :data blob/decode)))
(let [resolve-file-data (partial feat.fdata/resolve-file-data cfg)]
(some-> (db/get cfg :file-data-fragment {:file-id file-id :id fragment-id})
(resolve-file-data)
(update :data blob/decode))))
(sv/defmethod ::get-file-fragment
"Retrieve a file fragment by its ID. Only authenticated users."
@ -802,7 +808,8 @@
(db/update! cfg :file
{:revn (inc (:revn file))
:data (blob/encode (:data file))
:modified-at (dt/now)}
:modified-at (dt/now)
:has-media-trimmed false}
{:id file-id})
(feat.fdata/persist-pointers! cfg file-id))))

View file

@ -14,7 +14,6 @@
[app.db :as db]
[app.db.sql :as-alias sql]
[app.main :as-alias main]
[app.media :as media]
[app.rpc :as-alias rpc]
[app.rpc.commands.files :as files]
[app.rpc.commands.profile :as profile]
@ -63,8 +62,8 @@
(db/run! cfg get-file-snapshots params))
(defn restore-file-snapshot!
[{:keys [::db/conn ::sto/storage] :as cfg} {:keys [file-id id]}]
(let [storage (media/configure-assets-storage storage conn)
[{:keys [::db/conn] :as cfg} {:keys [file-id id]}]
(let [storage (sto/resolve cfg {::db/reuse-conn true})
file (files/get-minimal-file conn file-id {::db/for-update true})
snapshot (db/get* conn :file-change
{:file-id file-id

View file

@ -295,8 +295,7 @@
(db/run! cfg files/check-edition-permissions! profile-id file-id)
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(create-file-object-thumbnail! cfg file-id object-id media (or tag "frame"))))
(create-file-object-thumbnail! cfg file-id object-id media (or tag "frame")))
;; --- MUTATION COMMAND: delete-file-object-thumbnail
@ -327,7 +326,7 @@
(files/check-edition-permissions! cfg profile-id file-id)
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(-> cfg
(update ::sto/storage media/configure-assets-storage conn)
(update ::sto/storage sto/configure conn)
(delete-file-object-thumbnail! file-id object-id))
nil)))
@ -405,7 +404,6 @@
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(files/check-edition-permissions! conn profile-id file-id)
(when-not (db/read-only? conn)
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)
media (create-file-thumbnail! cfg params)]
(let [media (create-file-thumbnail! cfg params)]
{:uri (files/resolve-public-uri (:id media))
:id (:id media)})))))

View file

@ -227,8 +227,12 @@
(defn- update-file*
[{:keys [::db/conn ::wrk/executor] :as cfg}
{:keys [profile-id file changes session-id ::created-at skip-validate] :as params}]
(let [;; Process the file data on separated thread for avoid to do
(let [;; Retrieve the file data
file (feat.fdata/resolve-file-data cfg file {:touch true})
;; Process the file data on separated thread for avoid to do
;; the CPU intensive operation on vthread.
file (px/invoke! executor (partial update-file-data cfg file changes skip-validate))
features (db/create-array conn "text" (:features file))]
@ -254,6 +258,7 @@
:version (:version file)
:features features
:data-backend nil
:data-ref-id nil
:modified-at created-at
:has-media-trimmed false}
{:id (:id file)})

View file

@ -95,12 +95,11 @@
[cfg {:keys [::rpc/profile-id team-id] :as params}]
(db/tx-run! cfg
(fn [{:keys [::db/conn] :as cfg}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(teams/check-edition-permissions! conn profile-id team-id)
(quotes/check-quote! conn {::quotes/id ::quotes/font-variants-per-team
::quotes/profile-id profile-id
::quotes/team-id team-id})
(create-font-variant cfg (assoc params :profile-id profile-id))))))
(teams/check-edition-permissions! conn profile-id team-id)
(quotes/check-quote! conn {::quotes/id ::quotes/font-variants-per-team
::quotes/profile-id profile-id
::quotes/team-id team-id})
(create-font-variant cfg (assoc params :profile-id profile-id)))))
(defn create-font-variant
[{:keys [::sto/storage ::db/conn ::wrk/executor]} {:keys [data] :as params}]
@ -203,14 +202,13 @@
::sm/params schema:delete-font}
[cfg {:keys [::rpc/profile-id id team-id]}]
(db/tx-run! cfg
(fn [{:keys [::db/conn ::sto/storage] :as cfg}]
(fn [{:keys [::db/conn] :as cfg}]
(teams/check-edition-permissions! conn profile-id team-id)
(let [fonts (db/query conn :team-font-variant
{:team-id team-id
:font-id id
:deleted-at nil}
{::sql/for-update true})
storage (media/configure-assets-storage storage conn)
tnow (dt/now)]
(when-not (seq fonts)
@ -220,11 +218,7 @@
(doseq [font fonts]
(db/update! conn :team-font-variant
{:deleted-at tnow}
{:id (:id font)})
(some->> (:woff1-file-id font) (sto/touch-object! storage))
(some->> (:woff2-file-id font) (sto/touch-object! storage))
(some->> (:ttf-file-id font) (sto/touch-object! storage))
(some->> (:otf-file-id font) (sto/touch-object! storage)))
{:id (:id font)}))
(rph/with-meta (rph/wrap)
{::audit/props {:id id
@ -245,22 +239,16 @@
::sm/params schema:delete-font-variant}
[cfg {:keys [::rpc/profile-id id team-id]}]
(db/tx-run! cfg
(fn [{:keys [::db/conn ::sto/storage] :as cfg}]
(fn [{:keys [::db/conn] :as cfg}]
(teams/check-edition-permissions! conn profile-id team-id)
(let [variant (db/get conn :team-font-variant
{:id id :team-id team-id}
{::sql/for-update true})
storage (media/configure-assets-storage storage conn)]
{::sql/for-update true})]
(db/update! conn :team-font-variant
{:deleted-at (dt/now)}
{:id (:id variant)})
(some->> (:woff1-file-id variant) (sto/touch-object! storage))
(some->> (:woff2-file-id variant) (sto/touch-object! storage))
(some->> (:ttf-file-id variant) (sto/touch-object! storage))
(some->> (:otf-file-id variant) (sto/touch-object! storage))
(rph/with-meta (rph/wrap)
{::audit/props {:font-family (:font-family variant)
:font-id (:font-id variant)}})))))

View file

@ -56,21 +56,19 @@
::climit/id [[:process-image/by-profile ::rpc/profile-id]
[:process-image/global]]}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(files/check-edition-permissions! pool profile-id file-id)
(media/validate-media-type! content)
(media/validate-media-size! content)
(files/check-edition-permissions! pool profile-id file-id)
(media/validate-media-type! content)
(media/validate-media-size! content)
(db/run! cfg (fn [cfg]
(let [object (create-file-media-object cfg params)
props {:name (:name params)
:file-id file-id
:is-local (:is-local params)
:size (:size content)
:mtype (:mtype content)}]
(with-meta object
{::audit/replace-props props}))))))
(db/run! cfg (fn [cfg]
(let [object (create-file-media-object cfg params)
props {:name (:name params)
:file-id file-id
:is-local (:is-local params)
:size (:size content)
:mtype (:mtype content)}]
(with-meta object
{::audit/replace-props props})))))
(defn- big-enough-for-thumbnail?
"Checks if the provided image info is big enough for
@ -183,9 +181,8 @@
{::doc/added "1.17"
::sm/params schema:create-file-media-object-from-url}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(files/check-edition-permissions! pool profile-id file-id)
(create-file-media-object-from-url cfg (assoc params :profile-id profile-id))))
(files/check-edition-permissions! pool profile-id file-id)
(create-file-media-object-from-url cfg (assoc params :profile-id profile-id)))
(defn download-image
[{:keys [::http/client]} uri]

View file

@ -210,8 +210,7 @@
[cfg {:keys [::rpc/profile-id file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(update-profile-photo cfg (assoc params :profile-id profile-id))))
(update-profile-photo cfg (assoc params :profile-id profile-id)))
(defn update-profile-photo
[{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id file] :as params}]

View file

@ -674,8 +674,7 @@
[cfg {:keys [::rpc/profile-id file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(update-team-photo cfg (assoc params :profile-id profile-id))))
(update-team-photo cfg (assoc params :profile-id profile-id)))
(defn update-team-photo
[{:keys [::db/pool ::sto/storage] :as cfg} {:keys [profile-id team-id] :as params}]

View file

@ -6,11 +6,13 @@
(ns app.storage
"Objects storage abstraction layer."
(:refer-clojure :exclude [resolve])
(:require
[app.common.data :as d]
[app.common.data.macros :as dm]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.storage.fs :as sfs]
[app.storage.impl :as impl]
@ -18,16 +20,23 @@
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[datoteka.fs :as fs]
[integrant.core :as ig]
[promesa.core :as p])
[integrant.core :as ig])
(:import
java.io.InputStream))
(defn get-legacy-backend
[]
(let [name (cf/get :assets-storage-backend)]
(case name
:assets-fs :fs
:assets-s3 :s3
:fs)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Storage Module State
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::id #{:assets-fs :assets-s3})
(s/def ::id #{:assets-fs :assets-s3 :fs :s3})
(s/def ::s3 ::ss3/backend)
(s/def ::fs ::sfs/backend)
(s/def ::type #{:fs :s3})
@ -45,11 +54,13 @@
[_ {:keys [::backends ::db/pool] :as cfg}]
(-> (d/without-nils cfg)
(assoc ::backends (d/without-nils backends))
(assoc ::db/pool-or-conn pool)))
(assoc ::backend (or (get-legacy-backend)
(cf/get :objects-storage-backend :fs)))
(assoc ::db/connectable pool)))
(s/def ::backend keyword?)
(s/def ::storage
(s/keys :req [::backends ::db/pool ::db/pool-or-conn]
(s/keys :req [::backends ::db/pool ::db/connectable]
:opt [::backend]))
(s/def ::storage-with-backend
@ -61,23 +72,26 @@
(defn get-metadata
[params]
(into {}
(remove (fn [[k _]] (qualified-keyword? k)))
params))
(reduce-kv (fn [res k _]
(if (qualified-keyword? k)
(dissoc res k)
res))
params
params))
(defn- get-database-object-by-hash
[pool-or-conn backend bucket hash]
[connectable backend bucket hash]
(let [sql (str "select * from storage_object "
" where (metadata->>'~:hash') = ? "
" and (metadata->>'~:bucket') = ? "
" and backend = ?"
" and deleted_at is null"
" limit 1")]
(some-> (db/exec-one! pool-or-conn [sql hash bucket (name backend)])
(some-> (db/exec-one! connectable [sql hash bucket (name backend)])
(update :metadata db/decode-transit-pgobject))))
(defn- create-database-object
[{:keys [::backend ::db/pool-or-conn]} {:keys [::content ::expired-at ::touched-at] :as params}]
[{:keys [::backend ::db/connectable]} {:keys [::content ::expired-at ::touched-at ::touch] :as params}]
(let [id (or (:id params) (uuid/random))
mdata (cond-> (get-metadata params)
(satisfies? impl/IContentHash content)
@ -86,7 +100,9 @@
:always
(dissoc :id))
;; FIXME: touch object on deduplicated put operation ??
touched-at (if touch
(or touched-at (dt/now))
touched-at)
;; NOTE: for now we don't reuse the deleted objects, but in
;; futute we can consider reusing deleted objects if we
@ -95,10 +111,20 @@
result (when (and (::deduplicate? params)
(:hash mdata)
(:bucket mdata))
(get-database-object-by-hash pool-or-conn backend (:bucket mdata) (:hash mdata)))
(let [result (get-database-object-by-hash connectable backend
(:bucket mdata)
(:hash mdata))]
(if touch
(do
(db/update! connectable :storage-object
{:touched-at touched-at}
{:id (:id result)}
{::db/return-keys false})
(assoc result :touced-at touched-at))
result)))
result (or result
(-> (db/insert! pool-or-conn :storage-object
(-> (db/insert! connectable :storage-object
{:id id
:size (impl/get-size content)
:backend (name backend)
@ -154,9 +180,9 @@
(dm/export impl/object?)
(defn get-object
[{:keys [::db/pool-or-conn] :as storage} id]
[{:keys [::db/connectable] :as storage} id]
(us/assert! ::storage storage)
(retrieve-database-object pool-or-conn id))
(retrieve-database-object connectable id))
(defn put-object!
"Creates a new object with the provided content."
@ -172,10 +198,10 @@
(defn touch-object!
"Mark object as touched."
[{:keys [::db/pool-or-conn] :as storage} object-or-id]
[{:keys [::db/connectable] :as storage} object-or-id]
(us/assert! ::storage storage)
(let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)]
(-> (db/update! pool-or-conn :storage-object
(-> (db/update! connectable :storage-object
{:touched-at (dt/now)}
{:id id})
(db/get-update-count)
@ -195,11 +221,10 @@
"Returns a byte array of object content."
[storage object]
(us/assert! ::storage storage)
(if (or (nil? (:expired-at object))
(dt/is-after? (:expired-at object) (dt/now)))
(when (or (nil? (:expired-at object))
(dt/is-after? (:expired-at object) (dt/now)))
(-> (impl/resolve-backend storage (:backend object))
(impl/get-object-bytes object))
(p/resolved nil)))
(impl/get-object-bytes object))))
(defn get-object-url
([storage object]
@ -223,13 +248,26 @@
(-> (impl/get-object-url backend object nil) file-url->path))))
(defn del-object!
[{:keys [::db/pool-or-conn] :as storage} object-or-id]
[{:keys [::db/connectable] :as storage} object-or-id]
(us/assert! ::storage storage)
(let [id (if (impl/object? object-or-id) (:id object-or-id) object-or-id)
res (db/update! pool-or-conn :storage-object
res (db/update! connectable :storage-object
{:deleted-at (dt/now)}
{:id id})]
(pos? (db/get-update-count res))))
(dm/export impl/resolve-backend)
(dm/export impl/calculate-hash)
(defn configure
[storage connectable]
(assoc storage ::db/connectable connectable))
(defn resolve
"Resolves the storage instance with preconfigured backend. You can
specify to reuse the database connection from provided
cfg/system (default false)."
[cfg & {:as opts}]
(let [storage (::storage cfg)]
(if (::db/reuse-conn opts false)
(configure storage (db/get-connectable cfg))
storage)))

View file

@ -121,5 +121,3 @@
:total total)
{:deleted total}))))))

View file

@ -28,58 +28,71 @@
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(def ^:private sql:get-team-font-variant-nrefs
"SELECT ((SELECT count(*) FROM team_font_variant WHERE woff1_file_id = ?) +
(SELECT count(*) FROM team_font_variant WHERE woff2_file_id = ?) +
(SELECT count(*) FROM team_font_variant WHERE otf_file_id = ?) +
(SELECT count(*) FROM team_font_variant WHERE ttf_file_id = ?)) AS nrefs")
(def ^:private sql:has-team-font-variant-refs
"SELECT ((SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE woff1_file_id = ?)) OR
(SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE woff2_file_id = ?)) OR
(SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE otf_file_id = ?)) OR
(SELECT EXISTS (SELECT 1 FROM team_font_variant WHERE ttf_file_id = ?))) AS has_refs")
(defn- get-team-font-variant-nrefs
(defn- has-team-font-variant-refs?
[conn id]
(-> (db/exec-one! conn [sql:get-team-font-variant-nrefs id id id id])
(get :nrefs)))
(-> (db/exec-one! conn [sql:has-team-font-variant-refs id id id id])
(get :has-refs)))
(def ^:private
sql:get-file-media-object-nrefs
"SELECT ((SELECT count(*) FROM file_media_object WHERE media_id = ?) +
(SELECT count(*) FROM file_media_object WHERE thumbnail_id = ?)) AS nrefs")
sql:has-file-media-object-refs
"SELECT ((SELECT EXISTS (SELECT 1 FROM file_media_object WHERE media_id = ?)) OR
(SELECT EXISTS (SELECT 1 FROM file_media_object WHERE thumbnail_id = ?))) AS has_refs")
(defn- get-file-media-object-nrefs
(defn- has-file-media-object-refs?
[conn id]
(-> (db/exec-one! conn [sql:get-file-media-object-nrefs id id])
(get :nrefs)))
(-> (db/exec-one! conn [sql:has-file-media-object-refs id id])
(get :has-refs)))
(def ^:private sql:has-profile-refs
"SELECT ((SELECT EXISTS (SELECT 1 FROM profile WHERE photo_id = ?)) OR
(SELECT EXISTS (SELECT 1 FROM team WHERE photo_id = ?))) AS has_refs")
(def ^:private sql:get-profile-nrefs
"SELECT ((SELECT count(*) FROM profile WHERE photo_id = ?) +
(SELECT count(*) FROM team WHERE photo_id = ?)) AS nrefs")
(defn- get-profile-nrefs
(defn- has-profile-refs?
[conn id]
(-> (db/exec-one! conn [sql:get-profile-nrefs id id])
(get :nrefs)))
(-> (db/exec-one! conn [sql:has-profile-refs id id])
(get :has-refs)))
(def ^:private
sql:get-file-object-thumbnail-nrefs
"SELECT (SELECT count(*) FROM file_tagged_object_thumbnail WHERE media_id = ?) AS nrefs")
sql:has-file-object-thumbnail-refs
"SELECT EXISTS (SELECT 1 FROM file_tagged_object_thumbnail WHERE media_id = ?) AS has_refs")
(defn- get-file-object-thumbnails
(defn- has-file-object-thumbnails-refs?
[conn id]
(-> (db/exec-one! conn [sql:get-file-object-thumbnail-nrefs id])
(get :nrefs)))
(-> (db/exec-one! conn [sql:has-file-object-thumbnail-refs id])
(get :has-refs)))
(def ^:private
sql:get-file-thumbnail-nrefs
"SELECT (SELECT count(*) FROM file_thumbnail WHERE media_id = ?) AS nrefs")
sql:has-file-thumbnail-refs
"SELECT EXISTS (SELECT 1 FROM file_thumbnail WHERE media_id = ?) AS has_refs")
(defn- get-file-thumbnails
(defn- has-file-thumbnails-refs?
[conn id]
(-> (db/exec-one! conn [sql:get-file-thumbnail-nrefs id])
(get :nrefs)))
(-> (db/exec-one! conn [sql:has-file-thumbnail-refs id])
(get :has-refs)))
(def ^:private
sql:has-file-data-refs
"SELECT EXISTS (SELECT 1 FROM file WHERE data_ref_id = ?) AS has_refs")
(defn- has-file-data-refs?
[conn id]
(-> (db/exec-one! conn [sql:has-file-data-refs id])
(get :has-refs)))
(def ^:private
sql:has-file-data-fragment-refs
"SELECT EXISTS (SELECT 1 FROM file_data_fragment WHERE data_ref_id = ?) AS has_refs")
(defn- has-file-data-fragment-refs?
[conn id]
(-> (db/exec-one! conn [sql:has-file-data-fragment-refs id])
(get :has-refs)))
(def ^:private sql:mark-freeze-in-bulk
"UPDATE storage_object
@ -91,7 +104,6 @@
(let [ids (db/create-array conn "uuid" ids)]
(db/exec-one! conn [sql:mark-freeze-in-bulk ids])))
(def ^:private sql:mark-delete-in-bulk
"UPDATE storage_object
SET deleted_at = now(),
@ -123,25 +135,24 @@
"file-media-object"))
(defn- process-objects!
[conn get-fn ids bucket]
[conn has-refs? ids bucket]
(loop [to-freeze #{}
to-delete #{}
ids (seq ids)]
(if-let [id (first ids)]
(let [nrefs (get-fn conn id)]
(if (pos? nrefs)
(do
(l/debug :hint "processing object"
:id (str id)
:status "freeze"
:bucket bucket :refs nrefs)
(recur (conj to-freeze id) to-delete (rest ids)))
(do
(l/debug :hint "processing object"
:id (str id)
:status "delete"
:bucket bucket :refs nrefs)
(recur to-freeze (conj to-delete id) (rest ids)))))
(if (has-refs? conn id)
(do
(l/debug :hint "processing object"
:id (str id)
:status "freeze"
:bucket bucket)
(recur (conj to-freeze id) to-delete (rest ids)))
(do
(l/debug :hint "processing object"
:id (str id)
:status "delete"
:bucket bucket)
(recur to-freeze (conj to-delete id) (rest ids))))
(do
(some->> (seq to-freeze) (mark-freeze-in-bulk! conn))
(some->> (seq to-delete) (mark-delete-in-bulk! conn))
@ -150,15 +161,25 @@
(defn- process-bucket!
[conn bucket ids]
(case bucket
"file-media-object" (process-objects! conn get-file-media-object-nrefs ids bucket)
"team-font-variant" (process-objects! conn get-team-font-variant-nrefs ids bucket)
"file-object-thumbnail" (process-objects! conn get-file-object-thumbnails ids bucket)
"file-thumbnail" (process-objects! conn get-file-thumbnails ids bucket)
"profile" (process-objects! conn get-profile-nrefs ids bucket)
"file-media-object" (process-objects! conn has-file-media-object-refs? ids bucket)
"team-font-variant" (process-objects! conn has-team-font-variant-refs? ids bucket)
"file-object-thumbnail" (process-objects! conn has-file-object-thumbnails-refs? ids bucket)
"file-thumbnail" (process-objects! conn has-file-thumbnails-refs? ids bucket)
"profile" (process-objects! conn has-profile-refs? ids bucket)
"file-data" (process-objects! conn has-file-data-refs? ids bucket)
"file-data-fragment" (process-objects! conn has-file-data-fragment-refs? ids bucket)
(ex/raise :type :internal
:code :unexpected-unknown-reference
:hint (dm/fmt "unknown reference %" bucket))))
:hint (dm/fmt "unknown reference '%'" bucket))))
(defn process-chunk!
[{:keys [::db/conn]} chunk]
(reduce-kv (fn [[nfo ndo] bucket ids]
(let [[nfo' ndo'] (process-bucket! conn bucket ids)]
[(+ nfo nfo')
(+ ndo ndo')]))
[0 0]
(d/group-by lookup-bucket :id #{} chunk)))
(def ^:private
sql:get-touched-storage-objects
@ -167,29 +188,22 @@
WHERE so.touched_at IS NOT NULL
ORDER BY touched_at ASC
FOR UPDATE
SKIP LOCKED")
SKIP LOCKED
LIMIT 10")
(defn- group-by-bucket
[row]
(d/group-by lookup-bucket :id #{} row))
(defn- get-buckets
(defn get-chunk
[conn]
(sequence
(comp (map impl/decode-row)
(partition-all 25)
(mapcat group-by-bucket))
(db/cursor conn sql:get-touched-storage-objects)))
(->> (db/exec! conn [sql:get-touched-storage-objects])
(map impl/decode-row)
(not-empty)))
(defn- process-touched!
[{:keys [::db/conn]}]
(loop [buckets (get-buckets conn)
freezed 0
[{:keys [::db/pool] :as cfg}]
(loop [freezed 0
deleted 0]
(if-let [[bucket ids] (first buckets)]
(let [[nfo ndo] (process-bucket! conn bucket ids)]
(recur (rest buckets)
(+ freezed nfo)
(if-let [chunk (get-chunk pool)]
(let [[nfo ndo] (db/tx-run! cfg process-chunk! chunk)]
(recur (+ freezed nfo)
(+ deleted ndo)))
(do
(l/inf :hint "task finished"
@ -198,11 +212,14 @@
{:freeze freezed :delete deleted}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [_]
(db/tx-run! cfg process-touched!)))
(fn [_] (process-touched! cfg)))

View file

@ -207,15 +207,13 @@
(str "blake2b:" result)))
(defn resolve-backend
[{:keys [::db/pool] :as storage} backend-id]
[storage backend-id]
(let [backend (get-in storage [::sto/backends backend-id])]
(when-not backend
(ex/raise :type :internal
:code :backend-not-configured
:hint (dm/fmt "backend '%' not configured" backend-id)))
(-> backend
(assoc ::sto/id backend-id)
(assoc ::db/pool pool))))
(assoc backend ::sto/id backend-id)))
(defrecord StorageObject [id size created-at expired-at touched-at backend])

View file

@ -21,78 +21,18 @@
[app.config :as cf]
[app.db :as db]
[app.features.fdata :as feat.fdata]
[app.media :as media]
[app.storage :as sto]
[app.util.blob :as blob]
[app.util.pointer-map :as pmap]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.set :as set]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(declare ^:private clean-file!)
(defn- decode-file
[cfg {:keys [id] :as file}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(-> file
(update :features db/decode-pgarray #{})
(update :data blob/decode)
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(update :data assoc :id id)
(fmg/migrate-file))))
(defn- update-file!
[{:keys [::db/conn] :as cfg} {:keys [id] :as file}]
(let [file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! cfg id)
file))
file)
file (-> file
(update :features db/encode-pgarray conn "text")
(update :data blob/encode))]
(db/update! conn :file
{:has-media-trimmed true
:features (:features file)
:version (:version file)
:data (:data file)}
{:id id}
{::db/return-keys true})))
(def ^:private
sql:get-candidates
"SELECT f.id,
f.data,
f.revn,
f.version,
f.features,
f.modified_at
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
ORDER BY f.modified_at DESC
FOR UPDATE
SKIP LOCKED")
(defn- get-candidates
[{:keys [::db/conn ::min-age ::file-id]}]
(if (uuid? file-id)
(do
(l/warn :hint "explicit file id passed on params" :file-id (str file-id))
(db/query conn :file {:id file-id}))
(let [min-age (db/interval min-age)]
(db/cursor conn [sql:get-candidates min-age] {:chunk-size 1}))))
(declare ^:private get-file)
(declare ^:private decode-file)
(declare ^:private persist-file!)
(def ^:private sql:mark-file-media-object-deleted
"UPDATE file_media_object
@ -172,7 +112,6 @@
file))
(def ^:private sql:get-files-for-library
"SELECT f.id, f.data, f.modified_at, f.features, f.version
FROM file AS f
@ -274,17 +213,74 @@
(cfv/validate-file-schema! file)
file))
(def ^:private sql:get-file
"SELECT f.id,
f.data,
f.revn,
f.version,
f.features,
f.modified_at
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
AND f.id = ?
FOR UPDATE
SKIP LOCKED")
(defn- get-file
[{:keys [::db/conn ::min-age ::file-id]}]
(->> (db/exec! conn [sql:get-file min-age file-id])
(first)))
(defn- decode-file
[cfg {:keys [id] :as file}]
(binding [pmap/*load-fn* (partial feat.fdata/load-pointer cfg id)]
(-> file
(update :features db/decode-pgarray #{})
(update :data blob/decode)
(update :data feat.fdata/process-pointers deref)
(update :data feat.fdata/process-objects (partial into {}))
(update :data assoc :id id)
(fmg/migrate-file))))
(defn- persist-file!
[{:keys [::db/conn] :as cfg} {:keys [id] :as file}]
(let [file (if (contains? (:features file) "fdata/objects-map")
(feat.fdata/enable-objects-map file)
file)
file (if (contains? (:features file) "fdata/pointer-map")
(binding [pmap/*tracked* (pmap/create-tracked)]
(let [file (feat.fdata/enable-pointer-map file)]
(feat.fdata/persist-pointers! cfg id)
file))
file)
file (-> file
(update :features db/encode-pgarray conn "text")
(update :data blob/encode))]
(db/update! conn :file
{:has-media-trimmed true
:features (:features file)
:version (:version file)
:data (:data file)}
{:id id}
{::db/return-keys true})))
(defn- process-file!
[cfg file]
(try
[cfg]
(if-let [file (get-file cfg)]
(let [file (decode-file cfg file)
file (clean-media! cfg file)
file (update-file! cfg file)]
(clean-data-fragments! cfg file))
(catch Throwable cause
(l/err :hint "error on cleaning file (skiping)"
:file-id (str (:id file))
:cause cause))))
file (persist-file! cfg file)]
(clean-data-fragments! cfg file)
true)
(do
(l/dbg :hint "skip" :file-id (str (::file-id cfg)))
false)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HANDLER
@ -293,33 +289,30 @@
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool ::sto/storage]))
(defmethod ig/prep-key ::handler
[_ cfg]
(assoc cfg ::min-age (cf/get-deletion-delay)))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as task}]
(db/tx-run! cfg
(fn [{:keys [::db/conn] :as cfg}]
(let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))
cfg (-> cfg
(update ::sto/storage media/configure-assets-storage conn)
(assoc ::file-id (:file-id props))
(assoc ::min-age min-age))
(let [min-age (dt/duration (or (:min-age props)
(cf/get-deletion-delay)))
cfg (-> cfg
(assoc ::db/rollback (:rollback? props))
(assoc ::file-id (:file-id props))
(assoc ::min-age (db/interval min-age)))]
total (reduce (fn [total file]
(process-file! cfg file)
(inc total))
0
(get-candidates cfg))]
(try
(db/tx-run! cfg (fn [{:keys [::db/conn] :as cfg}]
(let [cfg (update cfg ::sto/storage sto/configure conn)
res (process-file! cfg)]
(l/inf :hint "finished"
:min-age (dt/format-duration min-age)
:processed total)
(when (contains? cf/flags :tiered-file-data-storage)
(wrk/submit! (-> cfg
(assoc ::wrk/task :offload-file-data)
(assoc ::wrk/params props)
(assoc ::wrk/priority 10)
(assoc ::wrk/delay 1000))))
res)))
;; Allow optional rollback passed by params
(when (:rollback? props)
(db/rollback! conn))
{:processed total})))))
(catch Throwable cause
(l/err :hint "error on cleaning file"
:file-id (str (:file-id props))
:cause cause))))))

View file

@ -0,0 +1,64 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.tasks.file-gc-scheduler
"A maintenance task that is responsible of properly scheduling the
file-gc task for all files that matches the eligibility threshold."
(:require
[app.config :as cf]
[app.db :as db]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(def ^:private
sql:get-candidates
"SELECT f.id,
f.modified_at
FROM file AS f
WHERE f.has_media_trimmed IS false
AND f.modified_at < now() - ?::interval
AND f.deleted_at IS NULL
ORDER BY f.modified_at DESC
FOR UPDATE
SKIP LOCKED")
(defn- get-candidates
[{:keys [::db/conn ::min-age] :as cfg}]
(let [min-age (db/interval min-age)]
(db/cursor conn [sql:get-candidates min-age] {:chunk-size 10})))
(defn- schedule!
[{:keys [::min-age] :as cfg}]
(let [total (reduce (fn [total {:keys [id]}]
(let [params {:file-id id :min-age min-age}]
(wrk/submit! (assoc cfg ::wrk/params params))
(inc total)))
0
(get-candidates cfg))]
{:processed total}))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool]))
(defmethod ig/prep-key ::handler
[_ cfg]
(assoc cfg ::min-age (cf/get-deletion-delay)))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as task}]
(let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))]
(-> cfg
(assoc ::db/rollback (:rollback? props))
(assoc ::min-age min-age)
(assoc ::wrk/task :file-gc)
(assoc ::wrk/priority 10)
(assoc ::wrk/mark-retries 0)
(assoc ::wrk/delay 1000)
(db/tx-run! schedule!)))))

View file

@ -11,7 +11,6 @@
[app.common.logging :as l]
[app.config :as cf]
[app.db :as db]
[app.media :as media]
[app.storage :as sto]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
@ -126,7 +125,7 @@
0)))
(def ^:private sql:get-files
"SELECT id, deleted_at, project_id
"SELECT id, deleted_at, project_id, data_ref_id
FROM file
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
@ -136,15 +135,17 @@
SKIP LOCKED")
(defn- delete-files!
[{:keys [::db/conn ::min-age ::chunk-size] :as cfg}]
[{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
(->> (db/cursor conn [sql:get-files min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [id deleted-at project-id]}]
(reduce (fn [total {:keys [id deleted-at project-id data-ref-id]}]
(l/trc :hint "permanently delete"
:rel "file"
:id (str id)
:project-id (str project-id)
:deleted-at (dt/format-instant deleted-at))
(some->> data-ref-id (sto/touch-object! storage))
;; And finally, permanently delete the file.
(db/delete! conn :file {:id id})
@ -210,7 +211,7 @@
0)))
(def ^:private sql:get-file-data-fragments
"SELECT file_id, id, deleted_at
"SELECT file_id, id, deleted_at, data_ref_id
FROM file_data_fragment
WHERE deleted_at IS NOT NULL
AND deleted_at < now() - ?::interval
@ -220,15 +221,16 @@
SKIP LOCKED")
(defn- delete-file-data-fragments!
[{:keys [::db/conn ::min-age ::chunk-size] :as cfg}]
[{:keys [::db/conn ::sto/storage ::min-age ::chunk-size] :as cfg}]
(->> (db/cursor conn [sql:get-file-data-fragments min-age chunk-size] {:chunk-size 1})
(reduce (fn [total {:keys [file-id id deleted-at]}]
(reduce (fn [total {:keys [file-id id deleted-at data-ref-id]}]
(l/trc :hint "permanently delete"
:rel "file-data-fragment"
:id (str id)
:file-id (str file-id)
:deleted-at (dt/format-instant deleted-at))
(some->> data-ref-id (sto/touch-object! storage))
(db/delete! conn :file-data-fragment {:file-id file-id :id id})
(inc total))
@ -299,9 +301,7 @@
[_ cfg]
(fn [{:keys [props] :as task}]
(let [min-age (dt/duration (or (:min-age props) (::min-age cfg)))
cfg (-> cfg
(assoc ::min-age (db/interval min-age))
(update ::sto/storage media/configure-assets-storage))]
cfg (assoc cfg ::min-age (db/interval min-age))]
(loop [procs (map deref deletion-proc-vars)
total 0]

View file

@ -0,0 +1,85 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.tasks.offload-file-data
"A maintenance task responsible of moving file data from hot
storage (the database row) to a cold storage (fs or s3)."
(:require
[app.common.logging :as l]
[app.db :as db]
[app.db.sql :as-alias sql]
[app.storage :as sto]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(defn- offload-file-data!
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
(let [file (db/get conn :file {:id file-id}
{::sql/for-update true})
data (sto/content (:data file))
sobj (sto/put-object! storage
{::sto/content data
::sto/touch true
:bucket "file-data"
:content-type "application/octet-stream"
:file-id file-id})]
(l/trc :hint "offload file data"
:file-id (str file-id)
:storage-id (str (:id sobj)))
(db/update! conn :file
{:data-backend "objects-storage"
:data-ref-id (:id sobj)
:data nil}
{:id file-id}
{::db/return-keys false})))
(defn- offload-file-data-fragments!
[{:keys [::db/conn ::sto/storage ::file-id] :as cfg}]
(doseq [fragment (db/query conn :file-data-fragment
{:file-id file-id
:deleted-at nil
:data-backend nil}
{::db/for-update true})]
(let [data (sto/content (:data fragment))
sobj (sto/put-object! storage
{::sto/content data
::sto/touch true
:bucket "file-data-fragment"
:content-type "application/octet-stream"
:file-id file-id
:file-fragment-id (:id fragment)})]
(l/trc :hint "offload file data fragment"
:file-id (str file-id)
:file-fragment-id (str (:id fragment))
:storage-id (str (:id sobj)))
(db/update! conn :file-data-fragment
{:data-backend "objects-storage"
:data-ref-id (:id sobj)
:data nil}
{:id (:id fragment)}
{::db/return-keys false}))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req [::db/pool ::sto/storage]))
(defmethod ig/init-key ::handler
[_ cfg]
(fn [{:keys [props] :as task}]
(-> cfg
(assoc ::db/rollback (:rollback? props))
(assoc ::file-id (:file-id props))
(db/tx-run! (fn [cfg]
(offload-file-data! cfg)
(offload-file-data-fragments! cfg))))))

View file

@ -62,19 +62,25 @@
[conn]
(-> (db/exec-one! conn ["SELECT count(*) AS count FROM file"]) :count))
(def ^:private sql:num-file-changes
"SELECT count(*) AS count
FROM file_change
WHERE created_at < date_trunc('day', now()) + '24 hours'::interval
AND created_at > date_trunc('day', now())")
(defn- get-num-file-changes
[conn]
(let [sql (str "SELECT count(*) AS count "
" FROM file_change "
" where date_trunc('day', created_at) = date_trunc('day', now())")]
(-> (db/exec-one! conn [sql]) :count)))
(-> (db/exec-one! conn [sql:num-file-changes]) :count))
(def ^:private sql:num-touched-files
"SELECT count(distinct file_id) AS count
FROM file_change
WHERE created_at < date_trunc('day', now()) + '24 hours'::interval
AND created_at > date_trunc('day', now())")
(defn- get-num-touched-files
[conn]
(let [sql (str "SELECT count(distinct file_id) AS count "
" FROM file_change "
" where date_trunc('day', created_at) = date_trunc('day', now())")]
(-> (db/exec-one! conn [sql]) :count)))
(-> (db/exec-one! conn [sql:num-touched-files]) :count))
(defn- get-num-users
[conn]

View file

@ -149,8 +149,7 @@
shape-id (uuid/random)]
;; Preventive file-gc
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
@ -171,8 +170,7 @@
(t/is (= 3 (count rows))))
;; The file-gc should mark for remove unused fragments
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; Check the number of fragments
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
@ -210,15 +208,13 @@
(t/is (= 3 (count rows))))
;; The file-gc should mark for remove unused fragments
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; The objects-gc should remove unused fragments
(let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 3 (:processed res))))
;; Check the number of fragments; should be 3 because changes
;; are also holding pointers to fragments;
;; Check the number of fragments;
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)
:deleted-at nil})]
(t/is (= 2 (count rows))))
@ -231,8 +227,7 @@
;; The file-gc should remove fragments related to changes
;; snapshots previously deleted.
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; Check the number of fragments;
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
@ -325,12 +320,10 @@
(t/is (= 0 (:delete res))))
;; run the file-gc task immediately without forced min-age
(let [res (th/run-task! :file-gc)]
(t/is (= 0 (:processed res))))
(t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
;; run the task again
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; retrieve file and check trimmed attribute
(let [row (th/db-get :file {:id (:id file)})]
@ -367,8 +360,7 @@
;; Now, we have deleted the usage of pointers to the
;; file-media-objects, if we paste file-gc, they should be marked
;; as deleted.
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 3 (:processed res))))
@ -490,12 +482,10 @@
:strokes [{:opacity 1 :stroke-image {:id (:id fmo5) :width 100 :height 100 :mtype "image/jpeg"}}]})}])
;; run the file-gc task immediately without forced min-age
(let [res (th/run-task! :file-gc)]
(t/is (= 0 (:processed res))))
(t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
;; run the task again
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 2 (:processed res))))
@ -534,9 +524,7 @@
;; Now, we have deleted the usage of pointers to the
;; file-media-objects, if we paste file-gc, they should be marked
;; as deleted.
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 7 (:processed res))))
@ -659,12 +647,10 @@
(t/is (= 0 (:delete res))))
;; run the file-gc task immediately without forced min-age
(let [res (th/run-task! :file-gc)]
(t/is (= 0 (:processed res))))
(t/is (false? (th/run-task! :file-gc {:file-id (:id file)})))
;; run the task again
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; retrieve file and check trimmed attribute
(let [row (th/db-get :file {:id (:id file)})]
@ -693,8 +679,7 @@
:page-id page-id
:id frame-id-2}])
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})]
(t/is (= 2 (count rows)))
@ -727,8 +712,7 @@
:page-id page-id
:id frame-id-1}])
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id file-id})]
(t/is (= 1 (count rows)))
@ -1127,8 +1111,7 @@
(th/sleep 300)
;; run the task
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; check that object thumbnails are still here
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id (:id file)})]
@ -1157,8 +1140,7 @@
(t/is (= 2 (count rows))))
;; run the task again
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; check that we have all object thumbnails
(let [rows (th/db-query :file-tagged-object-thumbnail {:file-id (:id file)})]
@ -1220,8 +1202,7 @@
(t/is (= 2 (count rows)))))
(t/testing "gc task"
(let [res (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed res))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [rows (th/db-query :file-thumbnail {:file-id (:id file)})]
(t/is (= 2 (count rows)))
@ -1232,3 +1213,113 @@
(let [rows (th/db-query :file-thumbnail {:file-id (:id file)})]
(t/is (= 1 (count rows)))))))
(defn- update-file!
[& {:keys [profile-id file-id changes revn] :or {revn 0}}]
(let [params {::th/type :update-file
::rpc/profile-id profile-id
:id file-id
:session-id (uuid/random)
:revn revn
:features cfeat/supported-features
:changes changes}
out (th/command! params)]
;; (th/print-result! out)
(t/is (nil? (:error out)))
(:result out)))
(t/deftest file-tiered-storage
(let [profile (th/create-profile* 1)
file (th/create-file* 1 {:profile-id (:id profile)
:project-id (:default-project-id profile)
:is-shared false})
page-id (uuid/random)
shape-id (uuid/random)]
;; Preventive file-gc
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; Preventive objects-gc
(let [result (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 1 (:processed result))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 1 (count rows)))
(t/is (every? #(some? (:data %)) rows)))
;; Mark the file ellegible again for GC
(th/db-update! :file
{:has-media-trimmed false}
{:id (:id file)})
;; Run FileGC again, with tiered storage activated
(with-redefs [app.config/flags (conj app.config/flags :tiered-file-data-storage)]
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; The FileGC task will schedule an inner taskq
(th/run-pending-tasks!))
;; Clean objects after file-gc
(let [result (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 1 (:processed result))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 1 (count rows)))
(t/is (every? #(nil? (:data %)) rows))
(t/is (every? #(uuid? (:data-ref-id %)) rows))
(t/is (every? #(= "objects-storage" (:data-backend %)) rows)))
(let [file (th/db-get :file {:id (:id file)})
storage (sto/resolve th/*system*)]
(t/is (= "objects-storage" (:data-backend file)))
(t/is (nil? (:data file)))
(t/is (uuid? (:data-ref-id file)))
(let [sobj (sto/get-object storage (:data-ref-id file))]
(t/is (= "file-data" (:bucket (meta sobj))))
(t/is (= (:id file) (:file-id (meta sobj))))))
;; Add shape to page that should load from cold storage again into the hot storage (db)
(update-file!
:file-id (:id file)
:profile-id (:id profile)
:revn 0
:changes
[{:type :add-page
:name "test"
:id page-id}])
;; Check the number of fragments
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 2 (count rows))))
;; Check the number of fragments
(let [[row1 row2 :as rows]
(th/db-query :file-data-fragment
{:file-id (:id file)
:deleted-at nil}
{:order-by [:created-at]})]
;; (pp/pprint rows)
(t/is (= 2 (count rows)))
(t/is (nil? (:data row1)))
(t/is (= "objects-storage" (:data-backend row1)))
(t/is (bytes? (:data row2)))
(t/is (nil? (:data-backend row2))))
;; The file-gc should mark for remove unused fragments
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
;; The objects-gc should remove unused fragments
(let [res (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 2 (:processed res))))
;; Check the number of fragments before adding the page
(let [rows (th/db-query :file-data-fragment {:file-id (:id file)})]
(t/is (= 2 (count rows)))
(t/is (every? #(bytes? (:data %)) rows))
(t/is (every? #(nil? (:data-ref-id %)) rows))
(t/is (every? #(nil? (:data-backend %)) rows)))))

View file

@ -114,8 +114,7 @@
;; Run the File GC task that should remove unused file object
;; thumbnails
(let [result (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed result))))
(th/run-task! :file-gc {:min-age 0 :file-id (:id file)})
(let [result (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 3 (:processed result))))
@ -134,7 +133,7 @@
(t/is (some? (sto/get-object storage (:media-id row2))))
;; run the task again
(let [res (th/run-task! "storage-gc-touched" {:min-age 0})]
(let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 1 (:delete res)))
(t/is (= 0 (:freeze res))))
@ -217,8 +216,7 @@
;; Run the File GC task that should remove unused file object
;; thumbnails
(let [result (th/run-task! :file-gc {:min-age 0})]
(t/is (= 1 (:processed result))))
(t/is (true? (th/run-task! :file-gc {:min-age 0 :file-id (:id file)})))
(let [result (th/run-task! :objects-gc {:min-age 0})]
(t/is (= 2 (:processed result))))

View file

@ -145,7 +145,7 @@
(t/is (nil? (:result out))))
(let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 6 (:freeze res)))
(t/is (= 0 (:freeze res)))
(t/is (= 0 (:delete res))))
(let [res (th/run-task! :objects-gc {:min-age 0})]
@ -207,7 +207,7 @@
(t/is (nil? (:result out))))
(let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 3 (:freeze res)))
(t/is (= 0 (:freeze res)))
(t/is (= 0 (:delete res))))
(let [res (th/run-task! :objects-gc {:min-age 0})]
@ -268,7 +268,7 @@
(t/is (nil? (:result out))))
(let [res (th/run-task! :storage-gc-touched {:min-age 0})]
(t/is (= 3 (:freeze res)))
(t/is (= 0 (:freeze res)))
(t/is (= 0 (:delete res))))
(let [res (th/run-task! :objects-gc {:min-age 0})]

View file

@ -53,14 +53,15 @@
(.replace js/location redirect-uri)
(log/error :hint "unexpected response from OIDC method"
:resp (pr-str rsp))))
(fn [{:keys [type code] :as error}]
(cond
(and (= type :restriction)
(= code :provider-not-configured))
(st/emit! (ntf/error (tr "errors.auth-provider-not-configured")))
(fn [cause]
(let [{:keys [type code] :as error} (ex-data cause)]
(cond
(and (= type :restriction)
(= code :provider-not-configured))
(st/emit! (ntf/error (tr "errors.auth-provider-not-configured")))
:else
(st/emit! (ntf/error (tr "errors.generic"))))))))
:else
(st/emit! (ntf/error (tr "errors.generic")))))))))
(def ^:private schema:login-form
[:map {:title "LoginForm"}