0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-08 07:50:43 -05:00

🎉 Add file offloading to external storage mechanism.

This commit is contained in:
Andrey Antukh 2021-06-14 11:50:26 +02:00
parent 0c49ed1fec
commit 0c97a44a2a
18 changed files with 334 additions and 125 deletions

View file

@ -8,6 +8,8 @@
- Allow to ungroup assets [Taiga #1719](https://tree.taiga.io/project/penpot/us/1719)
- Allow to rename assets groups [Taiga #1721](https://tree.taiga.io/project/penpot/us/1721)
- Memorize collapse state of assets in panel [Taiga #1718](https://tree.taiga.io/project/penpot/us/1718)
- Add the ability to offload file data to a cheaper storage when file
becomes inactive.
### :bug: Bugs fixed
### :arrow_up: Deps updates

View file

@ -58,11 +58,8 @@
:srepl-host "127.0.0.1"
:srepl-port 6062
:storage-backend :fs
:storage-fs-directory "assets"
:storage-s3-region :eu-central-1
:storage-s3-bucket "penpot-devenv-assets-pre"
:assets-storage-backend :fs
:storage-assets-fs-directory "assets"
:feedback-destination "info@example.com"
:feedback-enabled false
@ -175,10 +172,14 @@
(s/def ::smtp-username (s/nilable ::us/string))
(s/def ::srepl-host ::us/string)
(s/def ::srepl-port ::us/integer)
(s/def ::storage-backend ::us/keyword)
(s/def ::storage-fs-directory ::us/string)
(s/def ::storage-s3-bucket ::us/string)
(s/def ::storage-s3-region ::us/keyword)
(s/def ::assets-storage-backend ::us/keyword)
(s/def ::fdata-storage-backend ::us/keyword)
(s/def ::storage-assets-fs-directory ::us/string)
(s/def ::storage-assets-s3-bucket ::us/string)
(s/def ::storage-assets-s3-region ::us/keyword)
(s/def ::storage-fdata-s3-bucket ::us/string)
(s/def ::storage-fdata-s3-region ::us/keyword)
(s/def ::storage-fdata-s3-prefix ::us/string)
(s/def ::telemetry-enabled ::us/boolean)
(s/def ::telemetry-uri ::us/string)
(s/def ::telemetry-with-taiga ::us/boolean)
@ -257,12 +258,20 @@
::smtp-ssl
::smtp-tls
::smtp-username
::srepl-host
::srepl-port
::storage-backend
::storage-fs-directory
::storage-s3-bucket
::storage-s3-region
::assets-storage-backend
::storage-assets-fs-directory
::storage-assets-s3-bucket
::storage-assets-s3-region
::fdata-storage-backend
::storage-fdata-s3-bucket
::storage-fdata-s3-region
::storage-fdata-s3-prefix
::telemetry-enabled
::telemetry-uri
::telemetry-referer

View file

@ -49,7 +49,7 @@
{:status 200
:headers {"content-type" (:content-type mdata)
"cache-control" (str "max-age=" (inst-ms cache-max-age))}
:body (sto/get-object-data storage obj)}
:body (sto/get-object-bytes storage obj)}
:s3
(let [url (sto/get-object-url storage obj {:max-age signature-max-age})]

View file

@ -166,7 +166,7 @@
:tasks (ig/ref :app.worker/registry)
:pool (ig/ref :app.db/pool)
:schedule
[{:cron #app/cron "0 0 0 * * ? *" ;; daily
[{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :file-media-gc}
{:cron #app/cron "0 0 * * * ?" ;; hourly
@ -190,6 +190,10 @@
{:cron #app/cron "0 0 0 * * ?" ;; daily
:task :tasks-gc}
(when (cf/get :fdata-storage-backed)
{:cron #app/cron "0 0 * * * ?" ;; hourly
:task :file-offload})
(when (cf/get :audit-archive-enabled)
{:cron #app/cron "0 0 * * * ?" ;; every 1h
:task :audit-archive})
@ -217,6 +221,7 @@
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler)
:session-gc (ig/ref :app.http.session/gc-task)
:file-offload (ig/ref :app.tasks.file-offload/handler)
:audit-archive (ig/ref :app.loggers.audit/archive-task)
:audit-archive-gc (ig/ref :app.loggers.audit/archive-gc-task)}}
@ -256,6 +261,12 @@
{:pool (ig/ref :app.db/pool)
:max-age (dt/duration {:hours 72})}
:app.tasks.file-offload/handler
{:pool (ig/ref :app.db/pool)
:max-age (dt/duration {:seconds 5})
:storage (ig/ref :app.storage/storage)
:backend (cf/get :fdata-storage-backed :fdata-s3)}
:app.tasks.telemetry/handler
{:pool (ig/ref :app.db/pool)
:version (:full cf/version)
@ -306,23 +317,32 @@
:app.storage/storage
{:pool (ig/ref :app.db/pool)
:executor (ig/ref :app.worker/executor)
:backend (cf/get :storage-backend :fs)
:backends {:s3 (ig/ref [::main :app.storage.s3/backend])
:db (ig/ref [::main :app.storage.db/backend])
:fs (ig/ref [::main :app.storage.fs/backend])
:tmp (ig/ref [::tmp :app.storage.fs/backend])}}
:backend (cf/get :assets-storage-backend :assets-fs)
:backends {:assets-s3 (ig/ref [::assets :app.storage.s3/backend])
:assets-db (ig/ref [::assets :app.storage.db/backend])
:assets-fs (ig/ref [::assets :app.storage.fs/backend])
:s3 (ig/ref [::assets :app.storage.s3/backend])
:db (ig/ref [::assets :app.storage.db/backend])
:fs (ig/ref [::assets :app.storage.fs/backend])
:tmp (ig/ref [::tmp :app.storage.fs/backend])
:fdata-s3 (ig/ref [::fdata :app.storage.s3/backend])}}
[::main :app.storage.s3/backend]
{:region (cf/get :storage-s3-region)
:bucket (cf/get :storage-s3-bucket)}
[::fdata :app.storage.s3/backend]
{:region (cf/get :storage-fdata-s3-region)
:bucket (cf/get :storage-fdata-s3-bucket)
:prefix (cf/get :storage-fdata-s3-prefix)}
[::main :app.storage.fs/backend]
{:directory (cf/get :storage-fs-directory)}
[::assets :app.storage.s3/backend]
{:region (cf/get :storage-assets-s3-region)
:bucket (cf/get :storage-assets-s3-bucket)}
[::assets :app.storage.fs/backend]
{:directory (cf/get :storage-assets-fs-directory)}
[::tmp :app.storage.fs/backend]
{:directory "/tmp/penpot"}
[::main :app.storage.db/backend]
[::assets :app.storage.db/backend]
{:pool (ig/ref :app.db/pool)}})
(def system nil)

View file

@ -210,9 +210,15 @@
([a b]
(mobj :inc)
(origf a b))
([a b & more]
([a b c]
(mobj :inc)
(apply origf a b more)))
(origf a b c))
([a b c d]
(mobj :inc)
(origf a b c d))
([a b c d & more]
(mobj :inc)
(apply origf a b c d more)))
(assoc mdata ::original origf))))
([rootf mobj labels]
(let [mdata (meta rootf)

View file

@ -190,6 +190,9 @@
{:name "0060-mod-file-change-table"
:fn (mg/resource "app/migrations/sql/0060-mod-file-change-table.sql")}
{:name "0061-mod-file-table"
:fn (mg/resource "app/migrations/sql/0061-mod-file-table.sql")}
])

View file

@ -0,0 +1,10 @@
CREATE INDEX IF NOT EXISTS file__modified_at__with__data__idx
ON file (modified_at, id)
WHERE data IS NOT NULL;
ALTER TABLE file
ADD COLUMN data_backend text NULL,
ALTER COLUMN data_backend SET STORAGE EXTERNAL;
DROP TRIGGER file_on_update_tgr ON file;
DROP FUNCTION handle_file_update ();

View file

@ -22,6 +22,8 @@
[clojure.spec.alpha :as s]))
(declare create-file)
;; --- Helpers & Specs
(s/def ::id ::us/uuid)
@ -32,8 +34,6 @@
;; --- Mutation: Create File
(declare create-file)
(s/def ::is-shared ::us/boolean)
(s/def ::create-file
(s/keys :req-un [::profile-id ::name ::project-id]
@ -67,10 +67,11 @@
:is-shared is-shared
:data (blob/encode data)
:deleted-at deleted-at})]
(->> (assoc params :file-id id :role :owner)
(create-file-role conn))
(assoc file :data data)))
(assoc file :data data)))
;; --- Mutation: Rename File
@ -202,7 +203,6 @@
{:file-id file-id
:library-file-id library-id}))
;; --- Mutation: Ignore updates in linked files
(declare ignore-sync)
@ -303,7 +303,8 @@
(mapcat :changes changes-with-metadata)
changes)
file (-> file
ts (dt/now)
file (-> (files/retrieve-data cfg file)
(update :revn inc)
(update :data (fn [data]
(-> data
@ -317,6 +318,7 @@
{:id (uuid/next)
:session-id session-id
:profile-id profile-id
:created-at ts
:file-id (:id file)
:revn (:revn file)
:data (when (take-snapshot? file)
@ -327,11 +329,16 @@
(db/update! conn :file
{:revn (:revn file)
:data (:data file)
:data-backend nil
:modified-at ts
:has-media-trimmed false}
{:id (:id file)})
(let [params (-> params (assoc :file file
:changes changes))]
(db/update! conn :project
{:modified-at ts}
{:id (:project-id file)})
(let [params (assoc params :file file :changes changes)]
;; Send asynchronous notifications
(send-notifications cfg params)

View file

@ -9,10 +9,12 @@
[app.common.pages.migrations :as pmg]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.rpc.permissions :as perms]
[app.rpc.queries.projects :as projects]
[app.rpc.queries.teams :as teams]
[app.storage.impl :as simpl]
[app.util.blob :as blob]
[app.util.services :as sv]
[clojure.spec.alpha :as s]))
@ -171,11 +173,23 @@
;; --- Query: File (By ID)
(defn- retrieve-data*
[{:keys [storage] :as cfg} file]
(when-let [backend (simpl/resolve-backend storage (cf/get :fdata-storage-backend))]
(simpl/get-object-bytes backend file)))
(defn retrieve-data
[cfg file]
(if (bytes? (:data file))
file
(assoc file :data (retrieve-data* cfg file))))
(defn retrieve-file
[conn id]
(-> (db/get-by-id conn :file id)
(decode-row)
(pmg/migrate-file)))
[{:keys [conn] :as cfg} id]
(->> (db/get-by-id conn :file id)
(retrieve-data cfg)
(decode-row)
(pmg/migrate-file)))
(s/def ::file
(s/keys :req-un [::profile-id ::id]))
@ -183,8 +197,9 @@
(sv/defmethod ::file
[{:keys [pool] :as cfg} {:keys [profile-id id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id id)
(retrieve-file conn id)))
(let [cfg (assoc cfg :conn conn)]
(check-edition-permissions! conn profile-id id)
(retrieve-file cfg id))))
(s/def ::page
(s/keys :req-un [::profile-id ::file-id]))
@ -218,10 +233,10 @@
(sv/defmethod ::page
[{:keys [pool] :as cfg} {:keys [profile-id file-id id strip-thumbnails]}]
[{:keys [pool] :as cfg} {:keys [profile-id file-id]}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id file-id)
(let [file (retrieve-file conn file-id)
(let [cfg (assoc cfg :conn conn)
file (retrieve-file cfg file-id)
page-id (get-in file [:data :pages 0])]
(cond-> (get-in file [:data :pages-index page-id])
strip-thumbnails
@ -277,23 +292,36 @@
;; --- Query: File Libraries used by a File
(def ^:private sql:file-libraries
"select fl.*,
flr.synced_at as synced_at
from file as fl
inner join file_library_rel as flr on (flr.library_file_id = fl.id)
where flr.file_id = ?
and fl.deleted_at is null")
"WITH RECURSIVE libs AS (
SELECT fl.*, flr.synced_at
FROM file AS fl
JOIN file_library_rel AS flr ON (flr.library_file_id = fl.id)
WHERE flr.file_id = ?::uuid
UNION
SELECT fl.*, flr.synced_at
FROM file AS fl
JOIN file_library_rel AS flr ON (flr.library_file_id = fl.id)
JOIN libs AS l ON (flr.file_id = l.id)
)
SELECT l.id,
l.data,
l.project_id,
l.created_at,
l.modified_at,
l.deleted_at,
l.name,
l.revn,
l.synced_at
FROM libs AS l
WHERE l.deleted_at IS NULL OR l.deleted_at > now();")
(defn retrieve-file-libraries
[conn is-indirect file-id]
(let [libraries (->> (db/exec! conn [sql:file-libraries file-id])
(map #(assoc % :is-indirect is-indirect))
(into #{} decode-row-xf))]
(reduce #(into %1 (retrieve-file-libraries conn true %2))
libraries
(map :id libraries))))
[{:keys [conn] :as cfg} is-indirect file-id]
(let [xform (comp
(map #(assoc % :is-indirect is-indirect))
(map #(retrieve-data cfg %))
(map decode-row))]
(into #{} xform (db/exec! conn [sql:file-libraries file-id]))))
(s/def ::file-libraries
(s/keys :req-un [::profile-id ::file-id]))
@ -301,8 +329,9 @@
(sv/defmethod ::file-libraries
[{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id file-id)
(retrieve-file-libraries conn false file-id)))
(let [cfg (assoc cfg :conn conn)]
(check-edition-permissions! conn profile-id file-id)
(retrieve-file-libraries cfg false file-id))))
;; --- QUERY: team-recent-files
@ -334,7 +363,6 @@
(teams/check-read-permissions! conn profile-id team-id)
(db/exec! conn [sql:team-recent-files team-id])))
;; --- Helpers
(defn decode-row

View file

@ -42,12 +42,13 @@
(sv/defmethod ::viewer-bundle {:auth false}
[{:keys [pool] :as cfg} {:keys [profile-id file-id page-id token] :as params}]
(db/with-atomic [conn pool]
(let [file (files/retrieve-file conn file-id)
(let [cfg (assoc cfg :conn conn)
file (files/retrieve-file cfg file-id)
project (retrieve-project conn (:project-id file))
page (get-in file [:data :pages-index page-id])
file (merge (dissoc file :data)
(select-keys (:data file) [:colors :media :typographies]))
libs (files/retrieve-file-libraries conn false file-id)
libs (files/retrieve-file-libraries cfg false file-id)
users (teams/retrieve-users conn (:team-id project))
fonts (db/query conn :team-font-variant

View file

@ -5,7 +5,7 @@
;; Copyright (c) UXBOX Labs SL
(ns app.storage
"File Storage abstraction layer."
"Objects storage abstraction layer."
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
@ -20,13 +20,9 @@
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.core :as fs]
[integrant.core :as ig]
[promesa.exec :as px])
(:import
java.io.InputStream))
[promesa.exec :as px]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Storage Module State
@ -39,7 +35,11 @@
(s/def ::db ::sdb/backend)
(s/def ::backends
(s/keys :opt-un [::s3 ::fs ::db]))
(s/map-of ::us/keyword
(s/nilable
(s/or :s3 ::ss3/backend
:fs ::sfs/backend
:db ::sdb/backend))))
(defmethod ig/pre-init-spec ::storage [_]
(s/keys :req-un [::backend ::wrk/executor ::db/pool ::backends]))
@ -50,8 +50,9 @@
(assoc :backends (d/without-nils backends))))
(defmethod ig/init-key ::storage
[_ cfg]
cfg)
[_ {:keys [backends] :as cfg}]
(-> (d/without-nils cfg)
(assoc :backends (d/without-nils backends))))
(s/def ::storage
(s/keys :req-un [::backends ::wrk/executor ::db/pool ::backend]))
@ -151,8 +152,6 @@
;; API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare resolve-backend)
(defn object->relative-path
[{:keys [id] :as obj}]
(impl/id->path id))
@ -185,7 +184,7 @@
(px/run! executor #(register-recheck storage backend (:id object)))
;; Store the data finally on the underlying storage subsystem.
(-> (resolve-backend storage backend)
(-> (impl/resolve-backend storage backend)
(impl/put-object object content))
object))
@ -201,28 +200,37 @@
;; if the source and destination backends are the same, we
;; proceed to use the fast path with specific copy
;; implementation on backend.
(-> (resolve-backend storage (:backend storage))
(-> (impl/resolve-backend storage (:backend storage))
(impl/copy-object object object*))
;; if the source and destination backends are different, we just
;; need to obtain the streams and proceed full copy of the data
(with-open [^InputStream input
(-> (resolve-backend storage (:backend object))
(impl/get-object-data object))]
(-> (resolve-backend storage (:backend storage))
(impl/put-object object* (impl/content input (:size object))))))
(with-open [is (-> (impl/resolve-backend storage (:backend object))
(impl/get-object-data object))]
(-> (impl/resolve-backend storage (:backend storage))
(impl/put-object object* (impl/content is (:size object))))))
object*))
(defn get-object-data
"Return an input stream instance of the object content."
[{:keys [pool conn] :as storage} object]
(us/assert ::storage storage)
(when (or (nil? (:expired-at object))
(dt/is-after? (:expired-at object) (dt/now)))
(-> (assoc storage :conn (or conn pool))
(resolve-backend (:backend object))
(impl/resolve-backend (:backend object))
(impl/get-object-data object))))
(defn get-object-bytes
"Returns a byte array of object content."
[{:keys [pool conn] :as storage} object]
(us/assert ::storage storage)
(when (or (nil? (:expired-at object))
(dt/is-after? (:expired-at object) (dt/now)))
(-> (assoc storage :conn (or conn pool))
(impl/resolve-backend (:backend object))
(impl/get-object-bytes object))))
(defn get-object-url
([storage object]
(get-object-url storage object nil))
@ -231,14 +239,14 @@
(when (or (nil? (:expired-at object))
(dt/is-after? (:expired-at object) (dt/now)))
(-> (assoc storage :conn (or conn pool))
(resolve-backend (:backend object))
(impl/resolve-backend (:backend object))
(impl/get-object-url object options)))))
(defn get-object-path
"Get the Path to the object. Only works with `:fs` type of
storages."
[storage object]
(let [backend (resolve-backend storage (:backend object))]
(let [backend (impl/resolve-backend storage (:backend object))]
(when (not= :fs (:type backend))
(ex/raise :type :internal
:code :operation-not-allowed
@ -254,16 +262,7 @@
(-> (assoc storage :conn (or conn pool))
(delete-database-object (if (uuid? id-or-obj) id-or-obj (:id id-or-obj)))))
;; --- impl
(defn resolve-backend
[{:keys [conn pool] :as storage} backend-id]
(let [backend (get-in storage [:backends backend-id])]
(when-not backend
(ex/raise :type :internal
:code :backend-not-configured
:hint (str/fmt "backend '%s' not configured" backend-id)))
(assoc backend :conn (or conn pool))))
(d/export impl/resolve-backend)
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Garbage Collection: Permanently delete objects
@ -295,7 +294,7 @@
(some-> (seq rows) (group-by-backend))))
(delete-in-bulk [conn [backend ids]]
(let [backend (resolve-backend storage backend)
(let [backend (impl/resolve-backend storage backend)
backend (assoc backend :conn conn)]
(impl/del-objects-in-bulk backend ids)))]
@ -445,7 +444,7 @@
(some-> (seq rows) (group-results))))
(delete-group [conn [backend ids]]
(let [backend (resolve-backend storage backend)
(let [backend (impl/resolve-backend storage backend)
backend (assoc backend :conn conn)]
(impl/del-objects-in-bulk backend ids)))

View file

@ -46,6 +46,11 @@
(let [result (db/exec-one! conn ["select data from storage_data where id=?" id])]
(ByteArrayInputStream. (:data result))))
(defmethod impl/get-object-bytes :db
[{:keys [conn] :as backend} {:keys [id] :as object}]
(let [result (db/exec-one! conn ["select data from storage_data where id=?" id])]
(:data result)))
(defmethod impl/get-object-url :db
[_ _]
(throw (UnsupportedOperationException. "not supported")))

View file

@ -79,6 +79,10 @@
:path (str full)))
(io/input-stream full)))
(defmethod impl/get-object-bytes :fs
[backend object]
(fs/slurp-bytes (impl/get-object-data backend object)))
(defmethod impl/get-object-url :fs
[{:keys [uri] :as backend} {:keys [id] :as object} _]
(update uri :path

View file

@ -8,10 +8,10 @@
"Storage backends abstraction layer."
(:require
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[buddy.core.codecs :as bc]
[clojure.java.io :as io])
[clojure.java.io :as io]
[cuerdas.core :as str])
(:import
java.nio.ByteBuffer
java.util.UUID
@ -45,6 +45,14 @@
:code :invalid-storage-backend
:context cfg))
(defmulti get-object-bytes (fn [cfg _] (:type cfg)))
(defmethod get-object-bytes :default
[cfg _]
(ex/raise :type :internal
:code :invalid-storage-backend
:context cfg))
(defmulti get-object-url (fn [cfg _ _] (:type cfg)))
(defmethod get-object-url :default
@ -109,7 +117,10 @@
(make-output-stream [_ opts]
(throw (UnsupportedOperationException. "not implemented")))
clojure.lang.Counted
(count [_] size))))
(count [_] size)
java.lang.AutoCloseable
(close [_]))))
(defn string->content
[^String v]
@ -129,7 +140,10 @@
clojure.lang.Counted
(count [_]
(alength data)))))
(alength data))
java.lang.AutoCloseable
(close [_]))))
(defn- input-stream->content
[^InputStream is size]
@ -137,7 +151,7 @@
IContentObject
io/IOFactory
(make-reader [_ opts]
(io/make-reader is opts))
(io/make-reader is opts))
(make-writer [_ opts]
(throw (UnsupportedOperationException. "not implemented")))
(make-input-stream [_ opts]
@ -146,7 +160,11 @@
(throw (UnsupportedOperationException. "not implemented")))
clojure.lang.Counted
(count [_] size)))
(count [_] size)
java.lang.AutoCloseable
(close [_]
(.close is))))
(defn content
([data] (content data nil))
@ -179,10 +197,20 @@
(defn slurp-bytes
[content]
(us/assert content? content)
(with-open [input (io/input-stream content)
output (java.io.ByteArrayOutputStream. (count content))]
(io/copy input output)
(.toByteArray output)))
(defn resolve-backend
[{:keys [conn pool] :as storage} backend-id]
(when backend-id
(let [backend (get-in storage [:backends backend-id])]
(when-not backend
(ex/raise :type :internal
:code :backend-not-configured
:hint (str/fmt "backend '%s' not configured" backend-id)))
(assoc backend
:conn (or conn pool)
:id backend-id))))

View file

@ -5,7 +5,7 @@
;; Copyright (c) UXBOX Labs SL
(ns app.storage.s3
"Storage backends abstraction layer."
"S3 Storage backend implementation."
(:require
[app.common.data :as d]
[app.common.exceptions :as ex]
@ -18,8 +18,11 @@
[integrant.core :as ig])
(:import
java.time.Duration
java.io.InputStream
java.util.Collection
software.amazon.awssdk.core.sync.RequestBody
software.amazon.awssdk.core.ResponseBytes
;; software.amazon.awssdk.core.ResponseInputStream
software.amazon.awssdk.regions.Region
software.amazon.awssdk.services.s3.S3Client
software.amazon.awssdk.services.s3.model.Delete
@ -29,13 +32,17 @@
software.amazon.awssdk.services.s3.model.GetObjectRequest
software.amazon.awssdk.services.s3.model.ObjectIdentifier
software.amazon.awssdk.services.s3.model.PutObjectRequest
;; software.amazon.awssdk.services.s3.model.GetObjectResponse
software.amazon.awssdk.services.s3.presigner.S3Presigner
software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest
software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest))
software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest
))
(declare put-object)
(declare copy-object)
(declare get-object)
(declare get-object-bytes)
(declare get-object-data)
(declare get-object-url)
(declare del-object-in-bulk)
(declare build-s3-client)
@ -87,7 +94,11 @@
(defmethod impl/get-object-data :s3
[backend object]
(get-object backend object))
(get-object-data backend object))
(defmethod impl/get-object-bytes :s3
[backend object]
(get-object-bytes backend object))
(defmethod impl/get-object-url :s3
[backend object options]
@ -104,19 +115,19 @@
(case region
:eu-central-1 Region/EU_CENTRAL_1))
(defn- build-s3-client
(defn build-s3-client
[{:keys [region]}]
(.. (S3Client/builder)
(region (lookup-region region))
(build)))
(defn- build-s3-presigner
(defn build-s3-presigner
[{:keys [region]}]
(.. (S3Presigner/builder)
(region (lookup-region region))
(build)))
(defn- put-object
(defn put-object
[{:keys [client bucket prefix]} {:keys [id] :as object} content]
(let [path (str prefix (impl/id->path id))
mdata (meta object)
@ -125,14 +136,15 @@
(bucket bucket)
(contentType mtype)
(key path)
(build))
content (RequestBody/fromInputStream (io/input-stream content)
(count content))]
(.putObject ^S3Client client
^PutObjectRequest request
^RequestBody content)))
(build))]
(defn- copy-object
(with-open [^InputStream is (io/input-stream content)]
(let [content (RequestBody/fromInputStream is (count content))]
(.putObject ^S3Client client
^PutObjectRequest request
^RequestBody content)))))
(defn copy-object
[{:keys [client bucket prefix]} src-object dst-object]
(let [source-path (str prefix (impl/id->path (:id src-object)))
source-mdata (meta src-object)
@ -146,22 +158,33 @@
(contentType source-mtype)
(build))]
(.copyObject ^S3Client client
^CopyObjectRequest request)))
(.copyObject ^S3Client client ^CopyObjectRequest request)))
(defn- get-object
(defn get-object-data
[{:keys [client bucket prefix]} {:keys [id]}]
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
obj (.getObject ^S3Client client ^GetObjectRequest gor)]
obj (.getObject ^S3Client client ^GetObjectRequest gor)
;; rsp (.response ^ResponseInputStream obj)
;; len (.contentLength ^GetObjectResponse rsp)
]
(io/input-stream obj)))
(defn get-object-bytes
[{:keys [client bucket prefix]} {:keys [id]}]
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
obj (.getObjectAsBytes ^S3Client client ^GetObjectRequest gor)]
(.asByteArray ^ResponseBytes obj)))
(def default-max-age
(dt/duration {:minutes 10}))
(defn- get-object-url
(defn get-object-url
[{:keys [presigner bucket prefix]} {:keys [id]} {:keys [max-age] :or {max-age default-max-age}}]
(us/assert dt/duration? max-age)
(let [gor (.. (GetObjectRequest/builder)
@ -175,7 +198,7 @@
pgor (.presignGetObject ^S3Presigner presigner ^GetObjectPresignRequest gopr)]
(u/uri (str (.url ^PresignedGetObjectRequest pgor)))))
(defn- del-object-in-bulk
(defn del-object-in-bulk
[{:keys [bucket client prefix]} ids]
(let [oids (map (fn [id]
(.. (ObjectIdentifier/builder)

View file

@ -0,0 +1,64 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.tasks.file-offload
"A maintenance task that offloads file data to an external storage (S3)."
(:require
[app.common.spec :as us]
[app.db :as db]
[app.storage :as sto]
[app.storage.impl :as simpl]
[app.util.logging :as l]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
[integrant.core :as ig]))
(def sql:offload-candidates-chunk
"select f.id, f.data from file as f
where f.data is not null
and f.modified_at < now() - ?::interval
order by f.modified_at
limit 10")
(defn- retrieve-candidates
[{:keys [conn max-age]}]
(db/exec! conn [sql:offload-candidates-chunk max-age]))
(defn- offload-candidate
[{:keys [storage conn backend] :as cfg} {:keys [id data] :as file}]
(l/debug :action "offload file data" :id id)
(let [backend (simpl/resolve-backend storage backend)]
(->> (simpl/content data)
(simpl/put-object backend file))
(db/update! conn :file
{:data nil
:data-backend (name (:id backend))}
{:id id})))
;; ---- STATE INIT
(s/def ::max-age ::dt/duration)
(s/def ::backend ::us/keyword)
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::db/pool ::max-age ::sto/storage ::backend]))
(defmethod ig/init-key ::handler
[_ {:keys [pool max-age] :as cfg}]
(fn [_]
(db/with-atomic [conn pool]
(let [max-age (db/interval max-age)
cfg (-> cfg
(assoc :conn conn)
(assoc :max-age max-age))]
(loop [n 0]
(let [candidates (retrieve-candidates cfg)]
(if (seq candidates)
(do
(run! (partial offload-candidate cfg) candidates)
(recur (+ n (count candidates))))
(l/debug :hint "offload summary" :count n))))))))

View file

@ -22,7 +22,6 @@
th/database-reset
th/clean-storage))
;; TODO: add specific tests for DB backend.
(t/deftest put-and-retrieve-object
(let [storage (:app.storage/storage th/*system*)

View file

@ -747,6 +747,7 @@
(st/emit! dm/hide))
do-dismiss #(do (st/emit! ignore-sync)
(st/emit! dm/hide))]
(rx/of (dm/info-dialog
(tr "workspace.updates.there-are-updates")
:inline-actions