From 760eb926bfdb6feeb654e3ecdb00803aa87901db Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 30 Dec 2020 14:38:00 +0100 Subject: [PATCH] :tada: Add plugable storages abstraction layer (with support for fs, s3 and db). --- backend/deps.edn | 2 + backend/dev/user.clj | 1 - backend/scripts/repl | 1 - backend/src/app/config.clj | 21 +- backend/src/app/db.clj | 46 ++++ backend/src/app/http.clj | 6 +- backend/src/app/main.clj | 31 ++- backend/src/app/migrations.clj | 3 + .../sql/0035-add-storage-tables.sql | 22 ++ backend/src/app/storage.clj | 243 ++++++++++++++++++ backend/src/app/storage/db.clj | 62 +++++ backend/src/app/storage/fs.clj | 84 ++++++ backend/src/app/storage/impl.clj | 181 +++++++++++++ backend/src/app/storage/s3.clj | 174 +++++++++++++ backend/src/app/tasks/file_media_gc.clj | 5 - docker/devenv/files/nginx.conf | 28 ++ 16 files changed, 893 insertions(+), 17 deletions(-) create mode 100644 backend/src/app/migrations/sql/0035-add-storage-tables.sql create mode 100644 backend/src/app/storage.clj create mode 100644 backend/src/app/storage/db.clj create mode 100644 backend/src/app/storage/fs.clj create mode 100644 backend/src/app/storage/impl.clj create mode 100644 backend/src/app/storage/s3.clj diff --git a/backend/deps.edn b/backend/deps.edn index 11c5ff7b7..026901fb6 100644 --- a/backend/deps.edn +++ b/backend/deps.edn @@ -59,6 +59,8 @@ puppetlabs/clj-ldap {:mvn/version"0.3.0"} integrant/integrant {:mvn/version "0.8.0"} + software.amazon.awssdk/s3 {:mvn/version "2.15.54"} + ;; exception printing io.aviso/pretty {:mvn/version "0.1.37"} environ/environ {:mvn/version "1.2.0"}} diff --git a/backend/dev/user.clj b/backend/dev/user.clj index 6d9d6239b..17a5d360c 100644 --- a/backend/dev/user.clj +++ b/backend/dev/user.clj @@ -31,7 +31,6 @@ (defonce system nil) - ;; --- Benchmarking Tools (defmacro run-quick-bench diff --git a/backend/scripts/repl b/backend/scripts/repl index cd687a65d..544c90d73 100755 --- a/backend/scripts/repl +++ b/backend/scripts/repl @@ -2,5 +2,4 @@ set -ex # clojure -Ojmx-remote -A:dev -e "(set! *warn-on-reflection* true)" -m rebel-readline.main -# clojure -Ojmx-remote -A:dev -J-XX:+UnlockExperimentalVMOptions -J-XX:+UseZGC -J-Xms128m -J-Xmx128m -m rebel-readline.main clojure -A:jmx-remote:dev -J-Xms256m -J-Xmx256m -M -m rebel-readline.main diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 8a955b085..a57ac070e 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -26,10 +26,12 @@ :secret-key "default" :enabled-asserts true - :media-directory "resources/public/media" :public-uri "http://localhost:3449/" :redis-uri "redis://localhost/0" - :media-uri "http://localhost:3449/media/" + + :storage-fs-directory "resources/public/media" + :storage-fs-uri "http://localhost:3449/media/" + :storage-s3-region :eu-central-1 :image-process-max-threads 2 @@ -76,6 +78,12 @@ (s/def ::database-password (s/nilable ::us/string)) (s/def ::database-uri ::us/string) (s/def ::redis-uri ::us/string) + +(s/def ::storage-fs-directory ::us/string) +(s/def ::storage-fs-uri ::us/string) +(s/def ::storage-s3-region ::us/keyword) +(s/def ::storage-s3-bucket ::us/string) + (s/def ::media-uri ::us/string) (s/def ::media-directory ::us/string) (s/def ::secret-key ::us/string) @@ -143,8 +151,10 @@ ::database-username ::database-password ::database-uri - ::media-directory - ::media-uri + ::storage-fs-directory + ::storage-fs-uri + ::storage-s3-bucket + ::storage-s3-region ::error-report-webhook ::secret-key ::smtp-default-from @@ -204,8 +214,7 @@ (assoc (read-config env) :redis-uri "redis://redis/1" :database-uri "postgresql://postgres/penpot_test" - :media-directory "/tmp/app/media" - :assets-directory "/tmp/app/static" + :storage-fs-directory "/tmp/app/storage" :migrations-verbose false)) (def version (v/parse "%version%")) diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index d62769988..09309f4a0 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -17,6 +17,7 @@ [app.util.migrations :as mg] [app.util.time :as dt] [app.util.transit :as t] + [clojure.java.io :as io] [clojure.spec.alpha :as s] [clojure.string :as str] [integrant.core :as ig] @@ -31,7 +32,10 @@ com.zaxxer.hikari.metrics.prometheus.PrometheusMetricsTrackerFactory java.sql.Connection java.sql.Savepoint + org.postgresql.PGConnection org.postgresql.geometric.PGpoint + org.postgresql.largeobject.LargeObject + org.postgresql.largeobject.LargeObjectManager org.postgresql.jdbc.PgArray org.postgresql.util.PGInterval org.postgresql.util.PGobject)) @@ -116,6 +120,48 @@ (jdbc-dt/read-as-instant) (HikariDataSource. dsc))) +(defn unwrap + [conn klass] + (.unwrap ^Connection conn klass)) + +(defn lobj-manager + [conn] + (let [conn (unwrap conn org.postgresql.PGConnection)] + (.getLargeObjectAPI ^PGConnection conn))) + +(defn lobj-create + [manager] + (.createLO ^LargeObjectManager manager LargeObjectManager/READWRITE)) + +(defn lobj-open + ([manager oid] + (lobj-open manager oid {})) + ([manager oid {:keys [mode] :or {mode :rw}}] + (let [mode (case mode + (:r :read) LargeObjectManager/READ + (:w :write) LargeObjectManager/WRITE + (:rw :read+write) LargeObjectManager/READWRITE)] + (.open ^LargeObjectManager manager (long oid) mode)))) + +(defn lobj-unlink + [manager oid] + (.unlink ^LargeObjectManager manager (long oid))) + +(extend-type LargeObject + io/IOFactory + (make-reader [lobj opts] + (let [^InputStream is (.getInputStream ^LargeObject lobj)] + (io/make-reader is opts))) + (make-writer [lobj opts] + (let [^OutputStream os (.getOutputStream ^LargeObject lobj)] + (io/make-writer os opts))) + (make-input-stream [lobj opts] + (let [^InputStream is (.getInputStream ^LargeObject lobj)] + (io/make-input-stream is opts))) + (make-output-stream [lobj opts] + (let [^OutputStream os (.getOutputStream ^LargeObject lobj)] + (io/make-output-stream os opts)))) + (defmacro with-atomic [& args] `(jdbc/with-transaction ~@args)) diff --git a/backend/src/app/http.clj b/backend/src/app/http.clj index 2a3e7164e..6c3b659eb 100644 --- a/backend/src/app/http.clj +++ b/backend/src/app/http.clj @@ -74,9 +74,10 @@ (s/def ::google-auth map?) (s/def ::gitlab-auth map?) (s/def ::ldap-auth fn?) +(s/def ::storage map?) (defmethod ig/pre-init-spec ::router [_] - (s/keys :req-un [::rpc ::session ::metrics ::google-auth ::gitlab-auth])) + (s/keys :req-un [::rpc ::session ::metrics ::google-auth ::gitlab-auth ::storage])) (defmethod ig/init-key ::router [_ cfg] @@ -87,9 +88,10 @@ (rr/create-default-handler)))) (defn- create-router - [{:keys [session rpc google-auth gitlab-auth metrics ldap-auth] :as cfg}] + [{:keys [session rpc google-auth gitlab-auth metrics ldap-auth storage] :as cfg}] (rr/router [["/metrics" {:get (:handler metrics)}] + ["/storage/:id" {:get (:handler storage)}] ["/api" {:middleware [[middleware/format-response-body] [middleware/parse-request-body] [middleware/errors errors/handle] diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index e0555a2e4..a22513ab9 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -58,6 +58,29 @@ {:media-directory (:media-directory config) :media-uri (:media-uri config)} + :app.storage/storage + {:pool (ig/ref :app.db/pool) + :backend (:storage-default-backend cfg/config :s3) + :backends {:s3 (ig/ref :app.storage.s3/backend) + :fs (ig/ref :app.storage.fs/backend) + :db (ig/ref :app.storage.db/backend)}} + + :app.storage/gc-task + {:pool (ig/ref :app.db/pool) + :storage (ig/ref :app.storage/storage)} + + :app.storage.fs/backend + {:directory (:storage-fs-directory cfg/config) + :uri (:storage-fs-uri cfg/config)} + + :app.storage.db/backend + {:pool (ig/ref :app.db/pool)} + + :app.storage.s3/backend + {:region (:storage-s3-region cfg/config) + :bucket (:storage-s3-bucket cfg/config)} + + :app.http.session/session {:pool (ig/ref :app.db/pool) :cookie-name "auth-token"} @@ -75,7 +98,8 @@ :metrics (ig/ref :app.metrics/metrics) :google-auth (ig/ref :app.http.auth/google) :gitlab-auth (ig/ref :app.http.auth/gitlab) - :ldap-auth (ig/ref :app.http.auth/ldap)} + :ldap-auth (ig/ref :app.http.auth/ldap) + :storage (ig/ref :app.storage/storage)} :app.rpc/rpc {:pool (ig/ref :app.db/pool) @@ -85,7 +109,6 @@ :storage (ig/ref :app.media-storage/storage) :redis (ig/ref :app.redis/redis)} - :app.notifications/handler {:redis (ig/ref :app.redis/redis) :pool (ig/ref :app.db/pool) @@ -143,6 +166,10 @@ :cron #app/cron "0 0 0 */1 * ?" ;; daily :fn (ig/ref :app.tasks.file-xlog-gc/handler)} + {:id "storage-gc" + :cron #app/cron "0 0 0 */1 * ?" ;; daily + :fn (ig/ref :app.storage/gc-task)} + {:id "tasks-gc" :cron #app/cron "0 0 0 */1 * ?" ;; daily :fn (ig/ref :app.tasks.tasks-gc/handler)} diff --git a/backend/src/app/migrations.clj b/backend/src/app/migrations.clj index 116770b5c..4b671cf4b 100644 --- a/backend/src/app/migrations.clj +++ b/backend/src/app/migrations.clj @@ -116,6 +116,9 @@ {:name "0034-mod-profile-table-add-props-field" :fn (mg/resource "app/migrations/sql/0034-mod-profile-table-add-props-field.sql")} + + {:name "0035-add-storage-tables" + :fn (mg/resource "app/migrations/sql/0035-add-storage-tables.sql")} ]) diff --git a/backend/src/app/migrations/sql/0035-add-storage-tables.sql b/backend/src/app/migrations/sql/0035-add-storage-tables.sql new file mode 100644 index 000000000..72e626e01 --- /dev/null +++ b/backend/src/app/migrations/sql/0035-add-storage-tables.sql @@ -0,0 +1,22 @@ +CREATE TABLE storage_object ( + id uuid PRIMARY KEY DEFAULT uuid_generate_v4(), + + created_at timestamptz NOT NULL DEFAULT now(), + deleted_at timestamptz NULL DEFAULT NULL, + + size bigint NOT NULL DEFAULT 0, + backend text NOT NULL, + + metadata jsonb NULL DEFAULT NULL +); + +CREATE TABLE storage_data ( + id uuid PRIMARY KEY REFERENCES storage_object (id) ON DELETE CASCADE, + data bytea NOT NULL +); + +CREATE INDEX storage_data__id__idx ON storage_data(id); +CREATE INDEX storage_object__id__deleted_at__idx + ON storage_object(id, deleted_at) + WHERE deleted_at IS NOT null; + diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj new file mode 100644 index 000000000..123a7be06 --- /dev/null +++ b/backend/src/app/storage.clj @@ -0,0 +1,243 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020 UXBOX Labs SL + +(ns app.storage + "File Storage abstraction layer." + (:require + [app.common.data :as d] + [app.common.exceptions :as ex] + [app.common.spec :as us] + [app.common.uuid :as uuid] + [app.config :as cfg] + [app.db :as db] + [app.storage.fs :as sfs] + [app.storage.impl :as impl] + [app.storage.s3 :as ss3] + [app.storage.db :as sdb] + [app.util.time :as dt] + [lambdaisland.uri :as u] + [clojure.spec.alpha :as s] + [cuerdas.core :as str] + [integrant.core :as ig])) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Storage Module State +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(declare handler) + +(s/def ::backend ::us/keyword) +(s/def ::backends + (s/map-of ::us/keyword + (s/or :s3 ::ss3/backend + :fs ::sfs/backend + :db ::sdb/backend))) + +(defmethod ig/pre-init-spec ::storage [_] + (s/keys :req-un [::backend ::db/pool ::backends])) + +(defmethod ig/prep-key ::storage + [_ {:keys [backends] :as cfg}] + (assoc cfg :backends (d/without-nils backends))) + +(defmethod ig/init-key ::storage + [_ {:keys [backends] :as cfg}] + (assoc cfg :handler (partial handler cfg))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Database Objects +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(defrecord StorageObject [id size created-at backend]) + +(def ^:private + sql:insert-storage-object + "insert into storage_object (id, size, backend, metadata) + values (?, ?, ?, ?::jsonb) + returning *") + +(defn- create-database-object + [conn backend {:keys [content] :as object}] + (let [id (uuid/next) + mdata (dissoc object :content) + result (db/exec-one! conn [sql:insert-storage-object id + (count content) + (name backend) + (db/tjson mdata)])] + (StorageObject. (:id result) + (:size result) + (:created-at result) + backend + mdata + nil))) + +(def ^:private sql:retrieve-storage-object + "select * from storage_object where id = ? and deleted_at is null") + +(defn- retrieve-database-object + [conn id] + (when-let [res (db/exec-one! conn [sql:retrieve-storage-object id])] + (let [mdata (some-> (:metadata res) (db/decode-transit-pgobject))] + (StorageObject. (:id res) + (:size res) + (:created-at res) + (keyword (:backend res)) + mdata + nil)))) + +(def sql:delete-storage-object + "update storage_object set deleted_at=now() where id=? and deleted_at is null") + +(defn- delete-database-object + [conn id] + (let [result (db/exec-one! conn [sql:delete-storage-object id])] + (pos? (:next.jdbc/update-count result)))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; API +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(declare resolve-backend) + +(defn content-object + ([data] (impl/content-object data nil)) + ([data size] (impl/content-object data size))) + +(defn get-object + [{:keys [conn pool]} id] + (let [id (impl/coerce-id id)] + (retrieve-database-object (or conn pool) id))) + +(defn put-object + [{:keys [pool conn backend] :as storage} {:keys [content] :as object}] + (us/assert impl/content-object? content) + (let [conn (or conn pool) + object (create-database-object conn backend object)] + (-> (resolve-backend storage backend) + (assoc :conn conn) + (impl/put-object object content)) + object)) + +(defn get-object-data + [{:keys [pool conn] :as storage} object] + (-> (resolve-backend storage (:backend object)) + (assoc :conn (or conn pool)) + (impl/get-object object))) + +(defn get-object-url + ([storage object] + (get-object-url storage object nil)) + ([storage object options] + ;; As this operation does not need the database connection, the + ;; assoc of the conn to backend is ommited. + (-> (resolve-backend storage (:backend object)) + (impl/get-object-url object options)))) + +(defn del-object + [{:keys [conn pool]} id] + (let [conn (or conn pool)] + (delete-database-object conn id))) + +;; --- impl + +(defn- resolve-backend + [storage backend] + (let [backend* (get-in storage [:backends backend])] + (when-not backend* + (ex/raise :type :internal + :code :backend-not-configured + :hint (str/fmt "backend '%s' not configured" backend))) + backend*)) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; HTTP Handler +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +(def cache-max-age + (dt/duration {:hours 24})) + +(def signature-max-age + (dt/duration {:hours 24 :minutes 15})) + +(defn- handler + [storage request] + (let [id (get-in request [:path-params :id]) + obj (get-object storage id)] + (if obj + (let [mdata (meta obj) + backend (resolve-backend storage (:backend obj))] + (case (:type backend) + :db + {:status 200 + :headers {"content-type" (:content-type mdata) + "cache-control" (str "max-age=" (inst-ms cache-max-age))} + :body (get-object-data storage obj)} + + :s3 + (let [url (get-object-url storage obj {:max-age signature-max-age})] + {:status 307 + :headers {"location" (str url) + "x-host" (:host url) + "cache-control" (str "max-age=" (inst-ms cache-max-age))} + :body ""}) + + :fs + (let [url (get-object-url storage obj)] + {:status 200 + :headers {"x-accel-redirect" (:path url) + "content-type" (:content-type mdata) + "cache-control" (str "max-age=" (inst-ms cache-max-age))} + :body ""}))) + {:status 404 + :body ""}))) + +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +;; Garbage Collection Task +;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; + +;; A task responsible to permanently delete already marked as deleted +;; storage files. + +(declare sql:retrieve-deleted-objects) + +(defmethod ig/pre-init-spec ::gc-task [_] + (s/keys :req-un [::storage ::db/pool])) + +(defmethod ig/init-key ::gc-task + [_ {:keys [pool storage] :as cfg}] + (letfn [(retrieve-deleted-objects [conn] + (when-let [result (seq (db/exec! conn [sql:retrieve-deleted-objects]))] + (as-> (group-by (comp keyword :backend) result) $ + (reduce-kv #(assoc %1 %2 (map :id %3)) $ $)))) + + (delete-in-bulk [conn backend ids] + (let [backend (resolve-backend storage backend) + backend (assoc backend :conn conn)] + (impl/del-objects-in-bulk backend ids)))] + + (fn [task] + (db/with-atomic [conn pool] + (loop [groups (retrieve-deleted-objects conn)] + (when groups + (doseq [[sid objects] groups] + (delete-in-bulk conn sid objects)) + (recur (retrieve-deleted-objects conn)))))))) + +(def sql:retrieve-deleted-objects + "with items_part as ( + select s.id from storage_object as s + where s.deleted_at is not null + order by s.deleted_at + limit 500 + ) + delete from storage_object + where id in (select id from items_part) + returning *;") + + diff --git a/backend/src/app/storage/db.clj b/backend/src/app/storage/db.clj new file mode 100644 index 000000000..62d764b2d --- /dev/null +++ b/backend/src/app/storage/db.clj @@ -0,0 +1,62 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020-2021 UXBOX Labs SL + +(ns app.storage.db + (:require + [app.common.exceptions :as ex] + [app.common.spec :as us] + [app.db :as db] + [app.storage.impl :as impl] + [clojure.java.io :as io] + [clojure.spec.alpha :as s] + [datoteka.core :as fs] + [lambdaisland.uri :as u] + [integrant.core :as ig]) + (:import + org.postgresql.largeobject.LargeObject + java.io.ByteArrayInputStream + java.io.ByteArrayOutputStream + java.io.InputStream + java.io.OutputStream)) + +;; --- BACKEND INIT + +(defmethod ig/pre-init-spec ::backend [_] + (s/keys :opt-un [::db/pool])) + +(defmethod ig/init-key ::backend + [_ cfg] + (assoc cfg :type :db)) + +(s/def ::type #{:db}) +(s/def ::backend + (s/keys :req-un [::type ::db/pool])) + +;; --- API IMPL + +(defmethod impl/put-object :db + [{:keys [conn] :as storage} {:keys [id] :as object} content] + (let [data (impl/slurp-bytes content)] + (db/insert! conn :storage-data {:id id :data data}) + object)) + +(defmethod impl/get-object :db + [{:keys [conn] :as backend} {:keys [id] :as object}] + (let [result (db/exec-one! conn ["select data from storage_data where id=?" id])] + (ByteArrayInputStream. (:data result)))) + +(defmethod impl/get-object-url :db + [backend {:keys [id] :as object}] + (throw (UnsupportedOperationException. "not supported"))) + +(defmethod impl/del-objects-in-bulk :db + [backend ids] + ;; NOOP: because delting the row already deletes the file data from + ;; the database. + nil) diff --git a/backend/src/app/storage/fs.clj b/backend/src/app/storage/fs.clj new file mode 100644 index 000000000..ba22b492f --- /dev/null +++ b/backend/src/app/storage/fs.clj @@ -0,0 +1,84 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020-2021 UXBOX Labs SL + +(ns app.storage.fs + (:require + [app.common.exceptions :as ex] + [app.common.spec :as us] + [app.db :as db] + [app.storage.impl :as impl] + [clojure.java.io :as io] + [clojure.spec.alpha :as s] + [datoteka.core :as fs] + [lambdaisland.uri :as u] + [integrant.core :as ig]) + (:import + java.io.InputStream + java.io.OutputStream + java.nio.file.Path + java.nio.file.Files)) + +;; --- BACKEND INIT + +(s/def ::directory ::us/string) +(s/def ::uri ::us/string) + +(defmethod ig/pre-init-spec ::backend [_] + (s/keys :opt-un [::directory ::uri])) + +(defmethod ig/init-key ::backend + [_ cfg] + ;; Return a valid backend data structure only if all optional + ;; parameters are provided. + (when (and (string? (:directory cfg)) + (string? (:uri cfg))) + (assoc cfg :type :fs))) + +(s/def ::type #{:fs}) +(s/def ::backend + (s/keys :req-un [::directory ::uri ::type])) + +;; --- API IMPL + +(defmethod impl/put-object :fs + [backend {:keys [id] :as object} content] + (let [^Path base (fs/path (:directory backend)) + ^Path path (fs/path (impl/id->path id)) + ^Path full (.resolve base path)] + (when-not (fs/exists? (.getParent full)) + (fs/create-dir (.getParent full))) + (with-open [^InputStream src (io/input-stream content) + ^OutputStream dst (io/output-stream full)] + (io/copy src dst)))) + +(defmethod impl/get-object :fs + [backend {:keys [id] :as object}] + (let [^Path base (fs/path (:directory backend)) + ^Path path (fs/path (impl/id->path id)) + ^Path full (.resolve base path)] + (when-not (fs/exists? full) + (ex/raise :type :internal + :code :filesystem-object-does-not-exists + :path (str full))) + (io/input-stream full))) + +(defmethod impl/get-object-url :fs + [backend {:keys [id] :as object} _] + (let [uri (u/uri (:uri backend))] + (update uri :path + (fn [existing] + (str existing (impl/id->path id)))))) + +(defmethod impl/del-objects-in-bulk :fs + [backend ids] + (let [base (fs/path (:directory backend))] + (doseq [id ids] + (let [path (fs/path (impl/id->path id)) + path (.resolve ^Path base ^Path path)] + (Files/deleteIfExists ^Path path))))) diff --git a/backend/src/app/storage/impl.clj b/backend/src/app/storage/impl.clj new file mode 100644 index 000000000..49c3a877a --- /dev/null +++ b/backend/src/app/storage/impl.clj @@ -0,0 +1,181 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020-2021 UXBOX Labs SL + +(ns app.storage.impl + "Storage backends abstraction layer." + (:require + [app.common.exceptions :as ex] + [app.common.spec :as us] + [app.common.uuid :as uuid] + [clojure.java.io :as io] + [buddy.core.codecs :as bc]) + (:import + java.nio.ByteBuffer + java.util.UUID + java.io.ByteArrayInputStream + java.io.InputStream + java.nio.file.Path + java.nio.file.Files)) + +;; --- API Definition + +(defmulti put-object (fn [cfg _ _] (:type cfg))) + +(defmethod put-object :default + [cfg _ _] + (ex/raise :type :internal + :code :invalid-storage-backend + :context cfg)) + +(defmulti get-object (fn [cfg _] (:type cfg))) + +(defmethod get-object :default + [cfg _] + (ex/raise :type :internal + :code :invalid-storage-backend + :context cfg)) + +(defmulti get-object-url (fn [cfg _ _] (:type cfg))) + +(defmethod get-object-url :default + [cfg _ _] + (ex/raise :type :internal + :code :invalid-storage-backend + :context cfg)) + + +(defmulti del-objects-in-bulk (fn [cfg _] (:type cfg))) + +(defmethod del-objects-in-bulk :default + [cfg _] + (ex/raise :type :internal + :code :invalid-storage-backend + :context cfg)) + + +;; --- HELPERS + +(defn uuid->hex + [^UUID v] + (let [buffer (ByteBuffer/allocate 16)] + (.putLong buffer (.getMostSignificantBits v)) + (.putLong buffer (.getLeastSignificantBits v)) + (bc/bytes->hex (.array buffer)))) + +(defn id->path + [id] + (let [tokens (->> (uuid->hex id) + (re-seq #"[\w\d]{2}")) + prefix (take 2 tokens) + suffix (drop 2 tokens)] + (str (apply str (interpose "/" prefix)) + "/" + (apply str suffix)))) + +(defn coerce-id + [id] + (cond + (string? id) (uuid/uuid id) + (uuid? id) id + :else (ex/raise :type :internal + :code :invalid-id-type + :hint "id should be string or uuid"))) + + +(defprotocol IContentObject) + +(defn- path->content-object + [path] + (let [size (Files/size path)] + (reify + IContentObject + io/IOFactory + (make-reader [_ opts] + (io/make-reader path opts)) + (make-writer [_ opts] + (throw (UnsupportedOperationException. "not implemented"))) + (make-input-stream [_ opts] + (io/make-input-stream path opts)) + (make-output-stream [_ opts] + (throw (UnsupportedOperationException. "not implemented"))) + clojure.lang.Counted + (count [_] size)))) + +(defn string->content-object + [^String v] + (let [data (.getBytes v "UTF-8") + bais (ByteArrayInputStream. ^bytes data)] + (reify + IContentObject + io/IOFactory + (make-reader [_ opts] + (io/make-reader bais opts)) + (make-writer [_ opts] + (throw (UnsupportedOperationException. "not implemented"))) + (make-input-stream [_ opts] + (io/make-input-stream bais opts)) + (make-output-stream [_ opts] + (throw (UnsupportedOperationException. "not implemented"))) + + clojure.lang.Counted + (count [_] + (alength data))))) + +(defn- input-stream->content-object + [^InputStream is size] + (reify + IContentObject + io/IOFactory + (make-reader [_ opts] + (io/make-reader is opts)) + (make-writer [_ opts] + (throw (UnsupportedOperationException. "not implemented"))) + (make-input-stream [_ opts] + (io/make-input-stream is opts)) + (make-output-stream [_ opts] + (throw (UnsupportedOperationException. "not implemented"))) + + clojure.lang.Counted + (count [_] size))) + +(defn content-object + ([data] (content-object data nil)) + ([data size] + (cond + (instance? java.nio.file.Path data) + (path->content-object data) + + (instance? java.io.File data) + (path->content-object (.toPath ^java.io.File data)) + + (instance? String data) + (string->content-object data) + + (instance? InputStream data) + (do + (when-not size + (throw (UnsupportedOperationException. "size should be provided on InputStream"))) + (input-stream->content-object data size)) + + :else + (throw (UnsupportedOperationException. "type not supported"))))) + +(defn content-object? + [v] + (satisfies? IContentObject v)) + +(defn slurp-bytes + [content] + (us/assert content-object? content) + (with-open [input (io/input-stream content) + output (java.io.ByteArrayOutputStream. (count content))] + (io/copy input output) + (.toByteArray output))) + + diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj new file mode 100644 index 000000000..307007827 --- /dev/null +++ b/backend/src/app/storage/s3.clj @@ -0,0 +1,174 @@ +;; This Source Code Form is subject to the terms of the Mozilla Public +;; License, v. 2.0. If a copy of the MPL was not distributed with this +;; file, You can obtain one at http://mozilla.org/MPL/2.0/. +;; +;; This Source Code Form is "Incompatible With Secondary Licenses", as +;; defined by the Mozilla Public License, v. 2.0. +;; +;; Copyright (c) 2020 UXBOX Labs SL + +(ns app.storage.s3 + "Storage backends abstraction layer." + (:require + [app.common.exceptions :as ex] + [app.common.spec :as us] + [app.db :as db] + [app.storage.impl :as impl] + [app.util.time :as dt] + [clojure.java.io :as io] + [clojure.spec.alpha :as s] + [lambdaisland.uri :as u] + [integrant.core :as ig]) + (:import + java.io.InputStream + java.io.OutputStream + java.nio.file.Path + software.amazon.awssdk.regions.Region + software.amazon.awssdk.services.s3.S3Client + software.amazon.awssdk.services.s3.S3ClientBuilder + software.amazon.awssdk.core.sync.RequestBody + software.amazon.awssdk.services.s3.model.PutObjectRequest + software.amazon.awssdk.services.s3.model.GetObjectRequest + software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest + software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest + software.amazon.awssdk.services.s3.presigner.S3Presigner + software.amazon.awssdk.services.s3.model.DeleteObjectsRequest + software.amazon.awssdk.services.s3.model.Delete + software.amazon.awssdk.services.s3.model.ObjectIdentifier + software.amazon.awssdk.services.s3.model.DeleteObjectsResponse)) + + +(declare put-object) +(declare get-object) +(declare get-object-url) +(declare del-object-in-bulk) +(declare build-s3-client) +(declare build-s3-presigner) + +;; --- BACKEND INIT + +(s/def ::region #{:eu-central-1}) +(s/def ::bucket ::us/string) + +(defmethod ig/pre-init-spec ::backend [_] + (s/keys :opt-un [::region ::bucket])) + +(defmethod ig/init-key ::backend + [_ cfg] + ;; Return a valid backend data structure only if all optional + ;; parameters are provided. + (when (and (contains? cfg :region) + (string? (:bucket cfg))) + (let [client (build-s3-client cfg) + presigner (build-s3-presigner cfg)] + (assoc cfg + :client client + :presigner presigner + :type :s3)))) + +(s/def ::type #{:s3}) +(s/def ::client #(instance? S3Client %)) +(s/def ::presigner #(instance? S3Presigner %)) +(s/def ::backend + (s/keys :req-un [::region ::bucket ::client ::type ::presigner])) + +;; --- API IMPL + +(defmethod impl/put-object :s3 + [backend object content] + (put-object backend object content)) + +(defmethod impl/get-object :s3 + [backend object] + (get-object backend object)) + +(defmethod impl/get-object-url :s3 + [backend object options] + (get-object-url backend object options)) + +(defmethod impl/del-objects-in-bulk :s3 + [backend ids] + (del-object-in-bulk backend ids)) + +;; --- HELPERS + +(defn- lookup-region + [region] + (case region + :eu-central-1 Region/EU_CENTRAL_1)) + +(defn- build-s3-client + [{:keys [region bucket]}] + (.. (S3Client/builder) + (region (lookup-region region)) + (build))) + +(defn- build-s3-presigner + [{:keys [region]}] + (.. (S3Presigner/builder) + (region (lookup-region region)) + (build))) + +(defn- put-object + [{:keys [client bucket]} {:keys [id] :as object} content] + (let [path (impl/id->path id) + mdata (meta object) + mtype (:content-type mdata "application/octet-stream") + request (.. (PutObjectRequest/builder) + (bucket bucket) + (contentType mtype) + (key path) + (build)) + content (RequestBody/fromInputStream (io/input-stream content) + (count content))] + (.putObject ^S3Client client + ^PutObjectRequest request + ^RequestBody content))) + +(defn- get-object + [{:keys [client bucket]} {:keys [id]}] + (let [gor (.. (GetObjectRequest/builder) + (bucket bucket) + (key (impl/id->path id)) + (build)) + obj (.getObject ^S3Client client gor)] + (io/input-stream obj))) + +(def default-max-age + (dt/duration {:minutes 10})) + +(defn- get-object-url + [{:keys [presigner bucket]} {:keys [id]} {:keys [max-age] :or {max-age default-max-age}}] + (us/assert dt/duration? max-age) + (let [gor (.. (GetObjectRequest/builder) + (bucket bucket) + (key (impl/id->path id)) + (build)) + gopr (.. (GetObjectPresignRequest/builder) + (signatureDuration max-age) + (getObjectRequest gor) + (build)) + pgor (.presignGetObject ^S3Presigner presigner gopr)] + (u/uri (str (.url ^PresignedGetObjectRequest pgor))))) + +(defn- del-object-in-bulk + [{:keys [bucket client]} ids] + (let [oids (map (fn [id] + (.. (ObjectIdentifier/builder) + (key (impl/id->path id)) + (build))) + ids) + delc (.. (Delete/builder) + (objects oids) + (build)) + dor (.. (DeleteObjectsRequest/builder) + (bucket bucket) + (delete ^Delete delc) + (build)) + dres (.deleteObjects ^S3Client client + ^DeleteObjectsRequest dor)] + (when (.hasErrors ^DeleteObjectsResponse dres) + (let [errors (seq (.errors ^DeleteObjectsResponse dres))] + (ex/raise :type :s3-error + :code :error-on-bulk-delete + :context errors))))) diff --git a/backend/src/app/tasks/file_media_gc.clj b/backend/src/app/tasks/file_media_gc.clj index e522036a7..d7996cb47 100644 --- a/backend/src/app/tasks/file_media_gc.clj +++ b/backend/src/app/tasks/file_media_gc.clj @@ -44,11 +44,6 @@ (run! (partial process-file conn) files) (recur)))))) -;; (mtx/instrument-with-summary! -;; {:var #'handler -;; :id "tasks__file_media_gc" -;; :help "Timing of task: file_media_gc"}) - (defn- decode-row [{:keys [data] :as row}] (cond-> row diff --git a/docker/devenv/files/nginx.conf b/docker/devenv/files/nginx.conf index cc33ec476..d4daa20d5 100644 --- a/docker/devenv/files/nginx.conf +++ b/docker/devenv/files/nginx.conf @@ -57,6 +57,9 @@ http { proxy_set_header X-Forwarded-Proto $scheme; proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + resolver 8.8.8.8; + + etag off; location / { @@ -68,6 +71,31 @@ http { proxy_pass http://127.0.0.1:6060/api; } + location /storage { + proxy_pass http://127.0.0.1:6060/storage; + recursive_error_pages on; + proxy_intercept_errors on; + error_page 301 302 307 = @handle_redirect; + } + + location @handle_redirect { + set $redirect_uri "$upstream_http_location"; + set $redirect_host "$upstream_http_x_host"; + set $redirect_cache_control "$upstream_http_cache_control"; + + proxy_buffering off; + + add_header x-internal-redirect "$redirect_uri"; + add_header cache-control "$redirect_cache_control"; + + proxy_set_header Host "$redirect_host"; + proxy_hide_header x-amz-id-2; + proxy_hide_header x-amz-request-id; + proxy_hide_header x-amz-meta-server-side-encryption; + proxy_hide_header x-amz-server-side-encryption; + proxy_pass $redirect_uri; + } + location /export { proxy_pass http://127.0.0.1:6061; }