From 0c97a44a2aff5c36b1a8e041a5bdb7cb21b36730 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Mon, 14 Jun 2021 11:50:26 +0200 Subject: [PATCH] :tada: Add file offloading to external storage mechanism. --- CHANGES.md | 2 + backend/src/app/config.clj | 35 +++++--- backend/src/app/http/assets.clj | 2 +- backend/src/app/main.clj | 44 +++++++--- backend/src/app/metrics.clj | 10 ++- backend/src/app/migrations.clj | 3 + .../migrations/sql/0061-mod-file-table.sql | 10 +++ backend/src/app/rpc/mutations/files.clj | 21 +++-- backend/src/app/rpc/queries/files.clj | 80 +++++++++++++------ backend/src/app/rpc/queries/viewer.clj | 5 +- backend/src/app/storage.clj | 67 ++++++++-------- backend/src/app/storage/db.clj | 5 ++ backend/src/app/storage/fs.clj | 4 + backend/src/app/storage/impl.clj | 42 ++++++++-- backend/src/app/storage/s3.clj | 63 ++++++++++----- backend/src/app/tasks/file_offload.clj | 64 +++++++++++++++ backend/test/app/storage_test.clj | 1 - .../app/main/data/workspace/libraries.cljs | 1 + 18 files changed, 334 insertions(+), 125 deletions(-) create mode 100644 backend/src/app/migrations/sql/0061-mod-file-table.sql create mode 100644 backend/src/app/tasks/file_offload.clj diff --git a/CHANGES.md b/CHANGES.md index acebd9e0d..7459428f5 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -8,6 +8,8 @@ - Allow to ungroup assets [Taiga #1719](https://tree.taiga.io/project/penpot/us/1719) - Allow to rename assets groups [Taiga #1721](https://tree.taiga.io/project/penpot/us/1721) - Memorize collapse state of assets in panel [Taiga #1718](https://tree.taiga.io/project/penpot/us/1718) +- Add the ability to offload file data to a cheaper storage when file + becomes inactive. ### :bug: Bugs fixed ### :arrow_up: Deps updates diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 30ca83d38..f7c587d48 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -58,11 +58,8 @@ :srepl-host "127.0.0.1" :srepl-port 6062 - :storage-backend :fs - - :storage-fs-directory "assets" - :storage-s3-region :eu-central-1 - :storage-s3-bucket "penpot-devenv-assets-pre" + :assets-storage-backend :fs + :storage-assets-fs-directory "assets" :feedback-destination "info@example.com" :feedback-enabled false @@ -175,10 +172,14 @@ (s/def ::smtp-username (s/nilable ::us/string)) (s/def ::srepl-host ::us/string) (s/def ::srepl-port ::us/integer) -(s/def ::storage-backend ::us/keyword) -(s/def ::storage-fs-directory ::us/string) -(s/def ::storage-s3-bucket ::us/string) -(s/def ::storage-s3-region ::us/keyword) +(s/def ::assets-storage-backend ::us/keyword) +(s/def ::fdata-storage-backend ::us/keyword) +(s/def ::storage-assets-fs-directory ::us/string) +(s/def ::storage-assets-s3-bucket ::us/string) +(s/def ::storage-assets-s3-region ::us/keyword) +(s/def ::storage-fdata-s3-bucket ::us/string) +(s/def ::storage-fdata-s3-region ::us/keyword) +(s/def ::storage-fdata-s3-prefix ::us/string) (s/def ::telemetry-enabled ::us/boolean) (s/def ::telemetry-uri ::us/string) (s/def ::telemetry-with-taiga ::us/boolean) @@ -257,12 +258,20 @@ ::smtp-ssl ::smtp-tls ::smtp-username + ::srepl-host ::srepl-port - ::storage-backend - ::storage-fs-directory - ::storage-s3-bucket - ::storage-s3-region + + ::assets-storage-backend + ::storage-assets-fs-directory + ::storage-assets-s3-bucket + ::storage-assets-s3-region + + ::fdata-storage-backend + ::storage-fdata-s3-bucket + ::storage-fdata-s3-region + ::storage-fdata-s3-prefix + ::telemetry-enabled ::telemetry-uri ::telemetry-referer diff --git a/backend/src/app/http/assets.clj b/backend/src/app/http/assets.clj index 678c4bddd..18c14462e 100644 --- a/backend/src/app/http/assets.clj +++ b/backend/src/app/http/assets.clj @@ -49,7 +49,7 @@ {:status 200 :headers {"content-type" (:content-type mdata) "cache-control" (str "max-age=" (inst-ms cache-max-age))} - :body (sto/get-object-data storage obj)} + :body (sto/get-object-bytes storage obj)} :s3 (let [url (sto/get-object-url storage obj {:max-age signature-max-age})] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 218f318e9..a7593ee4d 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -166,7 +166,7 @@ :tasks (ig/ref :app.worker/registry) :pool (ig/ref :app.db/pool) :schedule - [{:cron #app/cron "0 0 0 * * ? *" ;; daily + [{:cron #app/cron "0 0 0 * * ?" ;; daily :task :file-media-gc} {:cron #app/cron "0 0 * * * ?" ;; hourly @@ -190,6 +190,10 @@ {:cron #app/cron "0 0 0 * * ?" ;; daily :task :tasks-gc} + (when (cf/get :fdata-storage-backed) + {:cron #app/cron "0 0 * * * ?" ;; hourly + :task :file-offload}) + (when (cf/get :audit-archive-enabled) {:cron #app/cron "0 0 * * * ?" ;; every 1h :task :audit-archive}) @@ -217,6 +221,7 @@ :tasks-gc (ig/ref :app.tasks.tasks-gc/handler) :telemetry (ig/ref :app.tasks.telemetry/handler) :session-gc (ig/ref :app.http.session/gc-task) + :file-offload (ig/ref :app.tasks.file-offload/handler) :audit-archive (ig/ref :app.loggers.audit/archive-task) :audit-archive-gc (ig/ref :app.loggers.audit/archive-gc-task)}} @@ -256,6 +261,12 @@ {:pool (ig/ref :app.db/pool) :max-age (dt/duration {:hours 72})} + :app.tasks.file-offload/handler + {:pool (ig/ref :app.db/pool) + :max-age (dt/duration {:seconds 5}) + :storage (ig/ref :app.storage/storage) + :backend (cf/get :fdata-storage-backed :fdata-s3)} + :app.tasks.telemetry/handler {:pool (ig/ref :app.db/pool) :version (:full cf/version) @@ -306,23 +317,32 @@ :app.storage/storage {:pool (ig/ref :app.db/pool) :executor (ig/ref :app.worker/executor) - :backend (cf/get :storage-backend :fs) - :backends {:s3 (ig/ref [::main :app.storage.s3/backend]) - :db (ig/ref [::main :app.storage.db/backend]) - :fs (ig/ref [::main :app.storage.fs/backend]) - :tmp (ig/ref [::tmp :app.storage.fs/backend])}} + :backend (cf/get :assets-storage-backend :assets-fs) + :backends {:assets-s3 (ig/ref [::assets :app.storage.s3/backend]) + :assets-db (ig/ref [::assets :app.storage.db/backend]) + :assets-fs (ig/ref [::assets :app.storage.fs/backend]) + :s3 (ig/ref [::assets :app.storage.s3/backend]) + :db (ig/ref [::assets :app.storage.db/backend]) + :fs (ig/ref [::assets :app.storage.fs/backend]) + :tmp (ig/ref [::tmp :app.storage.fs/backend]) + :fdata-s3 (ig/ref [::fdata :app.storage.s3/backend])}} - [::main :app.storage.s3/backend] - {:region (cf/get :storage-s3-region) - :bucket (cf/get :storage-s3-bucket)} + [::fdata :app.storage.s3/backend] + {:region (cf/get :storage-fdata-s3-region) + :bucket (cf/get :storage-fdata-s3-bucket) + :prefix (cf/get :storage-fdata-s3-prefix)} - [::main :app.storage.fs/backend] - {:directory (cf/get :storage-fs-directory)} + [::assets :app.storage.s3/backend] + {:region (cf/get :storage-assets-s3-region) + :bucket (cf/get :storage-assets-s3-bucket)} + + [::assets :app.storage.fs/backend] + {:directory (cf/get :storage-assets-fs-directory)} [::tmp :app.storage.fs/backend] {:directory "/tmp/penpot"} - [::main :app.storage.db/backend] + [::assets :app.storage.db/backend] {:pool (ig/ref :app.db/pool)}}) (def system nil) diff --git a/backend/src/app/metrics.clj b/backend/src/app/metrics.clj index 52567eb82..74fb318ac 100644 --- a/backend/src/app/metrics.clj +++ b/backend/src/app/metrics.clj @@ -210,9 +210,15 @@ ([a b] (mobj :inc) (origf a b)) - ([a b & more] + ([a b c] (mobj :inc) - (apply origf a b more))) + (origf a b c)) + ([a b c d] + (mobj :inc) + (origf a b c d)) + ([a b c d & more] + (mobj :inc) + (apply origf a b c d more))) (assoc mdata ::original origf)))) ([rootf mobj labels] (let [mdata (meta rootf) diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 070f6cae8..c0f62cf3f 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -190,6 +190,9 @@ {:name "0060-mod-file-change-table" :fn (mg/resource "app/migrations/sql/0060-mod-file-change-table.sql")} + + {:name "0061-mod-file-table" + :fn (mg/resource "app/migrations/sql/0061-mod-file-table.sql")} ]) diff --git a/backend/src/app/migrations/sql/0061-mod-file-table.sql b/backend/src/app/migrations/sql/0061-mod-file-table.sql new file mode 100644 index 000000000..8be8c3bc1 --- /dev/null +++ b/backend/src/app/migrations/sql/0061-mod-file-table.sql @@ -0,0 +1,10 @@ +CREATE INDEX IF NOT EXISTS file__modified_at__with__data__idx + ON file (modified_at, id) + WHERE data IS NOT NULL; + +ALTER TABLE file + ADD COLUMN data_backend text NULL, +ALTER COLUMN data_backend SET STORAGE EXTERNAL; + +DROP TRIGGER file_on_update_tgr ON file; +DROP FUNCTION handle_file_update (); diff --git a/backend/src/app/rpc/mutations/files.clj b/backend/src/app/rpc/mutations/files.clj index 822c39d3d..a08da44f8 100644 --- a/backend/src/app/rpc/mutations/files.clj +++ b/backend/src/app/rpc/mutations/files.clj @@ -22,6 +22,8 @@ [clojure.spec.alpha :as s])) +(declare create-file) + ;; --- Helpers & Specs (s/def ::id ::us/uuid) @@ -32,8 +34,6 @@ ;; --- Mutation: Create File -(declare create-file) - (s/def ::is-shared ::us/boolean) (s/def ::create-file (s/keys :req-un [::profile-id ::name ::project-id] @@ -67,10 +67,11 @@ :is-shared is-shared :data (blob/encode data) :deleted-at deleted-at})] + (->> (assoc params :file-id id :role :owner) (create-file-role conn)) - (assoc file :data data))) + (assoc file :data data))) ;; --- Mutation: Rename File @@ -202,7 +203,6 @@ {:file-id file-id :library-file-id library-id})) - ;; --- Mutation: Ignore updates in linked files (declare ignore-sync) @@ -303,7 +303,8 @@ (mapcat :changes changes-with-metadata) changes) - file (-> file + ts (dt/now) + file (-> (files/retrieve-data cfg file) (update :revn inc) (update :data (fn [data] (-> data @@ -317,6 +318,7 @@ {:id (uuid/next) :session-id session-id :profile-id profile-id + :created-at ts :file-id (:id file) :revn (:revn file) :data (when (take-snapshot? file) @@ -327,11 +329,16 @@ (db/update! conn :file {:revn (:revn file) :data (:data file) + :data-backend nil + :modified-at ts :has-media-trimmed false} {:id (:id file)}) - (let [params (-> params (assoc :file file - :changes changes))] + (db/update! conn :project + {:modified-at ts} + {:id (:project-id file)}) + + (let [params (assoc params :file file :changes changes)] ;; Send asynchronous notifications (send-notifications cfg params) diff --git a/backend/src/app/rpc/queries/files.clj b/backend/src/app/rpc/queries/files.clj index dd36c6413..730efe8d5 100644 --- a/backend/src/app/rpc/queries/files.clj +++ b/backend/src/app/rpc/queries/files.clj @@ -9,10 +9,12 @@ [app.common.pages.migrations :as pmg] [app.common.spec :as us] [app.common.uuid :as uuid] + [app.config :as cf] [app.db :as db] [app.rpc.permissions :as perms] [app.rpc.queries.projects :as projects] [app.rpc.queries.teams :as teams] + [app.storage.impl :as simpl] [app.util.blob :as blob] [app.util.services :as sv] [clojure.spec.alpha :as s])) @@ -171,11 +173,23 @@ ;; --- Query: File (By ID) +(defn- retrieve-data* + [{:keys [storage] :as cfg} file] + (when-let [backend (simpl/resolve-backend storage (cf/get :fdata-storage-backend))] + (simpl/get-object-bytes backend file))) + +(defn retrieve-data + [cfg file] + (if (bytes? (:data file)) + file + (assoc file :data (retrieve-data* cfg file)))) + (defn retrieve-file - [conn id] - (-> (db/get-by-id conn :file id) - (decode-row) - (pmg/migrate-file))) + [{:keys [conn] :as cfg} id] + (->> (db/get-by-id conn :file id) + (retrieve-data cfg) + (decode-row) + (pmg/migrate-file))) (s/def ::file (s/keys :req-un [::profile-id ::id])) @@ -183,8 +197,9 @@ (sv/defmethod ::file [{:keys [pool] :as cfg} {:keys [profile-id id] :as params}] (db/with-atomic [conn pool] - (check-edition-permissions! conn profile-id id) - (retrieve-file conn id))) + (let [cfg (assoc cfg :conn conn)] + (check-edition-permissions! conn profile-id id) + (retrieve-file cfg id)))) (s/def ::page (s/keys :req-un [::profile-id ::file-id])) @@ -218,10 +233,10 @@ (sv/defmethod ::page [{:keys [pool] :as cfg} {:keys [profile-id file-id id strip-thumbnails]}] - [{:keys [pool] :as cfg} {:keys [profile-id file-id]}] (db/with-atomic [conn pool] (check-edition-permissions! conn profile-id file-id) - (let [file (retrieve-file conn file-id) + (let [cfg (assoc cfg :conn conn) + file (retrieve-file cfg file-id) page-id (get-in file [:data :pages 0])] (cond-> (get-in file [:data :pages-index page-id]) strip-thumbnails @@ -277,23 +292,36 @@ ;; --- Query: File Libraries used by a File (def ^:private sql:file-libraries - "select fl.*, - - flr.synced_at as synced_at - from file as fl - inner join file_library_rel as flr on (flr.library_file_id = fl.id) - where flr.file_id = ? - and fl.deleted_at is null") + "WITH RECURSIVE libs AS ( + SELECT fl.*, flr.synced_at + FROM file AS fl + JOIN file_library_rel AS flr ON (flr.library_file_id = fl.id) + WHERE flr.file_id = ?::uuid + UNION + SELECT fl.*, flr.synced_at + FROM file AS fl + JOIN file_library_rel AS flr ON (flr.library_file_id = fl.id) + JOIN libs AS l ON (flr.file_id = l.id) + ) + SELECT l.id, + l.data, + l.project_id, + l.created_at, + l.modified_at, + l.deleted_at, + l.name, + l.revn, + l.synced_at + FROM libs AS l + WHERE l.deleted_at IS NULL OR l.deleted_at > now();") (defn retrieve-file-libraries - [conn is-indirect file-id] - (let [libraries (->> (db/exec! conn [sql:file-libraries file-id]) - (map #(assoc % :is-indirect is-indirect)) - (into #{} decode-row-xf))] - (reduce #(into %1 (retrieve-file-libraries conn true %2)) - libraries - (map :id libraries)))) - + [{:keys [conn] :as cfg} is-indirect file-id] + (let [xform (comp + (map #(assoc % :is-indirect is-indirect)) + (map #(retrieve-data cfg %)) + (map decode-row))] + (into #{} xform (db/exec! conn [sql:file-libraries file-id])))) (s/def ::file-libraries (s/keys :req-un [::profile-id ::file-id])) @@ -301,8 +329,9 @@ (sv/defmethod ::file-libraries [{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}] (db/with-atomic [conn pool] - (check-edition-permissions! conn profile-id file-id) - (retrieve-file-libraries conn false file-id))) + (let [cfg (assoc cfg :conn conn)] + (check-edition-permissions! conn profile-id file-id) + (retrieve-file-libraries cfg false file-id)))) ;; --- QUERY: team-recent-files @@ -334,7 +363,6 @@ (teams/check-read-permissions! conn profile-id team-id) (db/exec! conn [sql:team-recent-files team-id]))) - ;; --- Helpers (defn decode-row diff --git a/backend/src/app/rpc/queries/viewer.clj b/backend/src/app/rpc/queries/viewer.clj index f346e53ef..dfe95314c 100644 --- a/backend/src/app/rpc/queries/viewer.clj +++ b/backend/src/app/rpc/queries/viewer.clj @@ -42,12 +42,13 @@ (sv/defmethod ::viewer-bundle {:auth false} [{:keys [pool] :as cfg} {:keys [profile-id file-id page-id token] :as params}] (db/with-atomic [conn pool] - (let [file (files/retrieve-file conn file-id) + (let [cfg (assoc cfg :conn conn) + file (files/retrieve-file cfg file-id) project (retrieve-project conn (:project-id file)) page (get-in file [:data :pages-index page-id]) file (merge (dissoc file :data) (select-keys (:data file) [:colors :media :typographies])) - libs (files/retrieve-file-libraries conn false file-id) + libs (files/retrieve-file-libraries cfg false file-id) users (teams/retrieve-users conn (:team-id project)) fonts (db/query conn :team-font-variant diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index 0b1c1ffe1..f8a923a2d 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -5,7 +5,7 @@ ;; Copyright (c) UXBOX Labs SL (ns app.storage - "File Storage abstraction layer." + "Objects storage abstraction layer." (:require [app.common.data :as d] [app.common.exceptions :as ex] @@ -20,13 +20,9 @@ [app.util.time :as dt] [app.worker :as wrk] [clojure.spec.alpha :as s] - [cuerdas.core :as str] [datoteka.core :as fs] [integrant.core :as ig] - [promesa.exec :as px]) - (:import - java.io.InputStream)) - + [promesa.exec :as px])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Storage Module State @@ -39,7 +35,11 @@ (s/def ::db ::sdb/backend) (s/def ::backends - (s/keys :opt-un [::s3 ::fs ::db])) + (s/map-of ::us/keyword + (s/nilable + (s/or :s3 ::ss3/backend + :fs ::sfs/backend + :db ::sdb/backend)))) (defmethod ig/pre-init-spec ::storage [_] (s/keys :req-un [::backend ::wrk/executor ::db/pool ::backends])) @@ -50,8 +50,9 @@ (assoc :backends (d/without-nils backends)))) (defmethod ig/init-key ::storage - [_ cfg] - cfg) + [_ {:keys [backends] :as cfg}] + (-> (d/without-nils cfg) + (assoc :backends (d/without-nils backends)))) (s/def ::storage (s/keys :req-un [::backends ::wrk/executor ::db/pool ::backend])) @@ -151,8 +152,6 @@ ;; API ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; -(declare resolve-backend) - (defn object->relative-path [{:keys [id] :as obj}] (impl/id->path id)) @@ -185,7 +184,7 @@ (px/run! executor #(register-recheck storage backend (:id object))) ;; Store the data finally on the underlying storage subsystem. - (-> (resolve-backend storage backend) + (-> (impl/resolve-backend storage backend) (impl/put-object object content)) object)) @@ -201,28 +200,37 @@ ;; if the source and destination backends are the same, we ;; proceed to use the fast path with specific copy ;; implementation on backend. - (-> (resolve-backend storage (:backend storage)) + (-> (impl/resolve-backend storage (:backend storage)) (impl/copy-object object object*)) ;; if the source and destination backends are different, we just ;; need to obtain the streams and proceed full copy of the data - (with-open [^InputStream input - (-> (resolve-backend storage (:backend object)) - (impl/get-object-data object))] - (-> (resolve-backend storage (:backend storage)) - (impl/put-object object* (impl/content input (:size object)))))) - + (with-open [is (-> (impl/resolve-backend storage (:backend object)) + (impl/get-object-data object))] + (-> (impl/resolve-backend storage (:backend storage)) + (impl/put-object object* (impl/content is (:size object)))))) object*)) (defn get-object-data + "Return an input stream instance of the object content." [{:keys [pool conn] :as storage} object] (us/assert ::storage storage) (when (or (nil? (:expired-at object)) (dt/is-after? (:expired-at object) (dt/now))) (-> (assoc storage :conn (or conn pool)) - (resolve-backend (:backend object)) + (impl/resolve-backend (:backend object)) (impl/get-object-data object)))) +(defn get-object-bytes + "Returns a byte array of object content." + [{:keys [pool conn] :as storage} object] + (us/assert ::storage storage) + (when (or (nil? (:expired-at object)) + (dt/is-after? (:expired-at object) (dt/now))) + (-> (assoc storage :conn (or conn pool)) + (impl/resolve-backend (:backend object)) + (impl/get-object-bytes object)))) + (defn get-object-url ([storage object] (get-object-url storage object nil)) @@ -231,14 +239,14 @@ (when (or (nil? (:expired-at object)) (dt/is-after? (:expired-at object) (dt/now))) (-> (assoc storage :conn (or conn pool)) - (resolve-backend (:backend object)) + (impl/resolve-backend (:backend object)) (impl/get-object-url object options))))) (defn get-object-path "Get the Path to the object. Only works with `:fs` type of storages." [storage object] - (let [backend (resolve-backend storage (:backend object))] + (let [backend (impl/resolve-backend storage (:backend object))] (when (not= :fs (:type backend)) (ex/raise :type :internal :code :operation-not-allowed @@ -254,16 +262,7 @@ (-> (assoc storage :conn (or conn pool)) (delete-database-object (if (uuid? id-or-obj) id-or-obj (:id id-or-obj))))) -;; --- impl - -(defn resolve-backend - [{:keys [conn pool] :as storage} backend-id] - (let [backend (get-in storage [:backends backend-id])] - (when-not backend - (ex/raise :type :internal - :code :backend-not-configured - :hint (str/fmt "backend '%s' not configured" backend-id))) - (assoc backend :conn (or conn pool)))) +(d/export impl/resolve-backend) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Garbage Collection: Permanently delete objects @@ -295,7 +294,7 @@ (some-> (seq rows) (group-by-backend)))) (delete-in-bulk [conn [backend ids]] - (let [backend (resolve-backend storage backend) + (let [backend (impl/resolve-backend storage backend) backend (assoc backend :conn conn)] (impl/del-objects-in-bulk backend ids)))] @@ -445,7 +444,7 @@ (some-> (seq rows) (group-results)))) (delete-group [conn [backend ids]] - (let [backend (resolve-backend storage backend) + (let [backend (impl/resolve-backend storage backend) backend (assoc backend :conn conn)] (impl/del-objects-in-bulk backend ids))) diff --git a/backend/src/app/storage/db.clj b/backend/src/app/storage/db.clj index a7ed7adbc..e5814c850 100644 --- a/backend/src/app/storage/db.clj +++ b/backend/src/app/storage/db.clj @@ -46,6 +46,11 @@ (let [result (db/exec-one! conn ["select data from storage_data where id=?" id])] (ByteArrayInputStream. (:data result)))) +(defmethod impl/get-object-bytes :db + [{:keys [conn] :as backend} {:keys [id] :as object}] + (let [result (db/exec-one! conn ["select data from storage_data where id=?" id])] + (:data result))) + (defmethod impl/get-object-url :db [_ _] (throw (UnsupportedOperationException. "not supported"))) diff --git a/backend/src/app/storage/fs.clj b/backend/src/app/storage/fs.clj index 4f9d05969..e467bd168 100644 --- a/backend/src/app/storage/fs.clj +++ b/backend/src/app/storage/fs.clj @@ -79,6 +79,10 @@ :path (str full))) (io/input-stream full))) +(defmethod impl/get-object-bytes :fs + [backend object] + (fs/slurp-bytes (impl/get-object-data backend object))) + (defmethod impl/get-object-url :fs [{:keys [uri] :as backend} {:keys [id] :as object} _] (update uri :path diff --git a/backend/src/app/storage/impl.clj b/backend/src/app/storage/impl.clj index a28184df5..e39aaa7f9 100644 --- a/backend/src/app/storage/impl.clj +++ b/backend/src/app/storage/impl.clj @@ -8,10 +8,10 @@ "Storage backends abstraction layer." (:require [app.common.exceptions :as ex] - [app.common.spec :as us] [app.common.uuid :as uuid] [buddy.core.codecs :as bc] - [clojure.java.io :as io]) + [clojure.java.io :as io] + [cuerdas.core :as str]) (:import java.nio.ByteBuffer java.util.UUID @@ -45,6 +45,14 @@ :code :invalid-storage-backend :context cfg)) +(defmulti get-object-bytes (fn [cfg _] (:type cfg))) + +(defmethod get-object-bytes :default + [cfg _] + (ex/raise :type :internal + :code :invalid-storage-backend + :context cfg)) + (defmulti get-object-url (fn [cfg _ _] (:type cfg))) (defmethod get-object-url :default @@ -109,7 +117,10 @@ (make-output-stream [_ opts] (throw (UnsupportedOperationException. "not implemented"))) clojure.lang.Counted - (count [_] size)))) + (count [_] size) + + java.lang.AutoCloseable + (close [_])))) (defn string->content [^String v] @@ -129,7 +140,10 @@ clojure.lang.Counted (count [_] - (alength data))))) + (alength data)) + + java.lang.AutoCloseable + (close [_])))) (defn- input-stream->content [^InputStream is size] @@ -137,7 +151,7 @@ IContentObject io/IOFactory (make-reader [_ opts] - (io/make-reader is opts)) + (io/make-reader is opts)) (make-writer [_ opts] (throw (UnsupportedOperationException. "not implemented"))) (make-input-stream [_ opts] @@ -146,7 +160,11 @@ (throw (UnsupportedOperationException. "not implemented"))) clojure.lang.Counted - (count [_] size))) + (count [_] size) + + java.lang.AutoCloseable + (close [_] + (.close is)))) (defn content ([data] (content data nil)) @@ -179,10 +197,20 @@ (defn slurp-bytes [content] - (us/assert content? content) (with-open [input (io/input-stream content) output (java.io.ByteArrayOutputStream. (count content))] (io/copy input output) (.toByteArray output))) +(defn resolve-backend + [{:keys [conn pool] :as storage} backend-id] + (when backend-id + (let [backend (get-in storage [:backends backend-id])] + (when-not backend + (ex/raise :type :internal + :code :backend-not-configured + :hint (str/fmt "backend '%s' not configured" backend-id))) + (assoc backend + :conn (or conn pool) + :id backend-id)))) diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index da7928bdf..335b284a7 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -5,7 +5,7 @@ ;; Copyright (c) UXBOX Labs SL (ns app.storage.s3 - "Storage backends abstraction layer." + "S3 Storage backend implementation." (:require [app.common.data :as d] [app.common.exceptions :as ex] @@ -18,8 +18,11 @@ [integrant.core :as ig]) (:import java.time.Duration + java.io.InputStream java.util.Collection software.amazon.awssdk.core.sync.RequestBody + software.amazon.awssdk.core.ResponseBytes + ;; software.amazon.awssdk.core.ResponseInputStream software.amazon.awssdk.regions.Region software.amazon.awssdk.services.s3.S3Client software.amazon.awssdk.services.s3.model.Delete @@ -29,13 +32,17 @@ software.amazon.awssdk.services.s3.model.GetObjectRequest software.amazon.awssdk.services.s3.model.ObjectIdentifier software.amazon.awssdk.services.s3.model.PutObjectRequest + ;; software.amazon.awssdk.services.s3.model.GetObjectResponse software.amazon.awssdk.services.s3.presigner.S3Presigner software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest - software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest)) + software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest + + )) (declare put-object) (declare copy-object) -(declare get-object) +(declare get-object-bytes) +(declare get-object-data) (declare get-object-url) (declare del-object-in-bulk) (declare build-s3-client) @@ -87,7 +94,11 @@ (defmethod impl/get-object-data :s3 [backend object] - (get-object backend object)) + (get-object-data backend object)) + +(defmethod impl/get-object-bytes :s3 + [backend object] + (get-object-bytes backend object)) (defmethod impl/get-object-url :s3 [backend object options] @@ -104,19 +115,19 @@ (case region :eu-central-1 Region/EU_CENTRAL_1)) -(defn- build-s3-client +(defn build-s3-client [{:keys [region]}] (.. (S3Client/builder) (region (lookup-region region)) (build))) -(defn- build-s3-presigner +(defn build-s3-presigner [{:keys [region]}] (.. (S3Presigner/builder) (region (lookup-region region)) (build))) -(defn- put-object +(defn put-object [{:keys [client bucket prefix]} {:keys [id] :as object} content] (let [path (str prefix (impl/id->path id)) mdata (meta object) @@ -125,14 +136,15 @@ (bucket bucket) (contentType mtype) (key path) - (build)) - content (RequestBody/fromInputStream (io/input-stream content) - (count content))] - (.putObject ^S3Client client - ^PutObjectRequest request - ^RequestBody content))) + (build))] -(defn- copy-object + (with-open [^InputStream is (io/input-stream content)] + (let [content (RequestBody/fromInputStream is (count content))] + (.putObject ^S3Client client + ^PutObjectRequest request + ^RequestBody content))))) + +(defn copy-object [{:keys [client bucket prefix]} src-object dst-object] (let [source-path (str prefix (impl/id->path (:id src-object))) source-mdata (meta src-object) @@ -146,22 +158,33 @@ (contentType source-mtype) (build))] - (.copyObject ^S3Client client - ^CopyObjectRequest request))) + (.copyObject ^S3Client client ^CopyObjectRequest request))) -(defn- get-object +(defn get-object-data [{:keys [client bucket prefix]} {:keys [id]}] (let [gor (.. (GetObjectRequest/builder) (bucket bucket) (key (str prefix (impl/id->path id))) (build)) - obj (.getObject ^S3Client client ^GetObjectRequest gor)] + obj (.getObject ^S3Client client ^GetObjectRequest gor) + ;; rsp (.response ^ResponseInputStream obj) + ;; len (.contentLength ^GetObjectResponse rsp) + ] (io/input-stream obj))) +(defn get-object-bytes + [{:keys [client bucket prefix]} {:keys [id]}] + (let [gor (.. (GetObjectRequest/builder) + (bucket bucket) + (key (str prefix (impl/id->path id))) + (build)) + obj (.getObjectAsBytes ^S3Client client ^GetObjectRequest gor)] + (.asByteArray ^ResponseBytes obj))) + (def default-max-age (dt/duration {:minutes 10})) -(defn- get-object-url +(defn get-object-url [{:keys [presigner bucket prefix]} {:keys [id]} {:keys [max-age] :or {max-age default-max-age}}] (us/assert dt/duration? max-age) (let [gor (.. (GetObjectRequest/builder) @@ -175,7 +198,7 @@ pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)] (u/uri (str (.url ^PresignedGetObjectRequest pgor))))) -(defn- del-object-in-bulk +(defn del-object-in-bulk [{:keys [bucket client prefix]} ids] (let [oids (map (fn [id] (.. (ObjectIdentifier/builder) diff --git a/backend/src/app/tasks/file_offload.clj b/backend/src/app/tasks/file_offload.clj new file mode 100644 index 000000000..5c5abd7bf --- /dev/null +++ b/backend/src/app/tasks/file_offload.clj @@ -0,0 +1,64 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; Copyright (c) UXBOX Labs SL + +(ns app.tasks.file-offload + "A maintenance task that offloads file data to an external storage (S3)." + (:require + [app.common.spec :as us] + [app.db :as db] + [app.storage :as sto] + [app.storage.impl :as simpl] + [app.util.logging :as l] + [app.util.time :as dt] + [clojure.spec.alpha :as s] + [integrant.core :as ig])) + +(def sql:offload-candidates-chunk + "select f.id, f.data from file as f + where f.data is not null + and f.modified_at < now() - ?::interval + order by f.modified_at + limit 10") + +(defn- retrieve-candidates + [{:keys [conn max-age]}] + (db/exec! conn [sql:offload-candidates-chunk max-age])) + +(defn- offload-candidate + [{:keys [storage conn backend] :as cfg} {:keys [id data] :as file}] + (l/debug :action "offload file data" :id id) + (let [backend (simpl/resolve-backend storage backend)] + (->> (simpl/content data) + (simpl/put-object backend file)) + (db/update! conn :file + {:data nil + :data-backend (name (:id backend))} + {:id id}))) + +;; ---- STATE INIT + +(s/def ::max-age ::dt/duration) +(s/def ::backend ::us/keyword) + + +(defmethod ig/pre-init-spec ::handler [_] + (s/keys :req-un [::db/pool ::max-age ::sto/storage ::backend])) + +(defmethod ig/init-key ::handler + [_ {:keys [pool max-age] :as cfg}] + (fn [_] + (db/with-atomic [conn pool] + (let [max-age (db/interval max-age) + cfg (-> cfg + (assoc :conn conn) + (assoc :max-age max-age))] + (loop [n 0] + (let [candidates (retrieve-candidates cfg)] + (if (seq candidates) + (do + (run! (partial offload-candidate cfg) candidates) + (recur (+ n (count candidates)))) + (l/debug :hint "offload summary" :count n)))))))) diff --git a/backend/test/app/storage_test.clj b/backend/test/app/storage_test.clj index d35dae02f..05e471854 100644 --- a/backend/test/app/storage_test.clj +++ b/backend/test/app/storage_test.clj @@ -22,7 +22,6 @@ th/database-reset th/clean-storage)) -;; TODO: add specific tests for DB backend. (t/deftest put-and-retrieve-object (let [storage (:app.storage/storage th/*system*) diff --git a/frontend/src/app/main/data/workspace/libraries.cljs b/frontend/src/app/main/data/workspace/libraries.cljs index 588da49bf..f4f177274 100644 --- a/frontend/src/app/main/data/workspace/libraries.cljs +++ b/frontend/src/app/main/data/workspace/libraries.cljs @@ -747,6 +747,7 @@ (st/emit! dm/hide)) do-dismiss #(do (st/emit! ignore-sync) (st/emit! dm/hide))] + (rx/of (dm/info-dialog (tr "workspace.updates.there-are-updates") :inline-actions