0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-22 14:39:45 -05:00

♻️ Minor refactor on storages

Fix many issues on FS & S3 backend; removes the unused and broken
DB backend. Normalize operations on bytes and byte streams on a
separated namespace: app.util.bytes
This commit is contained in:
Andrey Antukh 2022-06-22 11:34:36 +02:00
parent 3a55f07f45
commit ebcb385593
9 changed files with 302 additions and 179 deletions

View file

@ -28,7 +28,8 @@
metosin/reitit-core {:mvn/version "0.5.18"}
org.postgresql/postgresql {:mvn/version "42.4.0"}
com.zaxxer/HikariCP {:mvn/version "5.0.1"}
funcool/datoteka {:mvn/version "2.0.0"}
funcool/datoteka {:mvn/version "3.0.64"}
buddy/buddy-hashers {:mvn/version "1.8.158"}
buddy/buddy-sign {:mvn/version "3.4.333"}

View file

@ -71,6 +71,10 @@
:app.tokens/tokens
{:keys (ig/ref :app.setup/keys)}
:app.storage.tmp/cleaner
{:executor (ig/ref [::worker :app.worker/executor])
:scheduler (ig/ref :app.worker/scheduler)}
:app.storage/gc-deleted-task
{:pool (ig/ref :app.db/pool)
:storage (ig/ref :app.storage/storage)
@ -336,23 +340,12 @@
:backends
{:assets-s3 (ig/ref [::assets :app.storage.s3/backend])
:assets-db (ig/ref [::assets :app.storage.db/backend])
:assets-fs (ig/ref [::assets :app.storage.fs/backend])
:tmp (ig/ref [::tmp :app.storage.fs/backend])
:fdata-s3 (ig/ref [::fdata :app.storage.s3/backend])
;; keep this for backward compatibility
:s3 (ig/ref [::assets :app.storage.s3/backend])
:fs (ig/ref [::assets :app.storage.fs/backend])}}
[::fdata :app.storage.s3/backend]
{:region (cf/get :storage-fdata-s3-region)
:bucket (cf/get :storage-fdata-s3-bucket)
:endpoint (cf/get :storage-fdata-s3-endpoint)
:prefix (cf/get :storage-fdata-s3-prefix)
:executor (ig/ref [::default :app.worker/executor])}
[::assets :app.storage.s3/backend]
{:region (cf/get :storage-assets-s3-region)
:endpoint (cf/get :storage-assets-s3-endpoint)
@ -361,12 +354,7 @@
[::assets :app.storage.fs/backend]
{:directory (cf/get :storage-assets-fs-directory)}
[::tmp :app.storage.fs/backend]
{:directory "/tmp/penpot"}
[::assets :app.storage.db/backend]
{:pool (ig/ref :app.db/pool)}})
})
(def system nil)

View file

@ -14,7 +14,6 @@
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.db :as db]
[app.storage.db :as sdb]
[app.storage.fs :as sfs]
[app.storage.impl :as impl]
[app.storage.s3 :as ss3]
@ -32,14 +31,12 @@
(s/def ::s3 ::ss3/backend)
(s/def ::fs ::sfs/backend)
(s/def ::db ::sdb/backend)
(s/def ::backends
(s/map-of ::us/keyword
(s/nilable
(s/or :s3 ::ss3/backend
:fs ::sfs/backend
:db ::sdb/backend))))
:fs ::sfs/backend))))
(defmethod ig/pre-init-spec ::storage [_]
(s/keys :req-un [::db/pool ::wrk/executor ::backends]))
@ -109,7 +106,7 @@
result (or result
(-> (db/insert! conn :storage-object
{:id id
:size (count content)
:size (impl/get-size content)
:backend (name backend)
:metadata (db/tjson mdata)
:deleted-at expired-at
@ -263,7 +260,8 @@
;; A task responsible to permanently delete already marked as deleted
;; storage files. The storage objects are practically never marked to
;; be deleted directly by the api call. The touched-gc is responsible
;; of collecting the usage of the object and mark it as deleted.
;; of collecting the usage of the object and mark it as deleted. Only
;; the TMP files are are created with expiration date in future.
(declare sql:retrieve-deleted-objects-chunk)

View file

@ -1,67 +0,0 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.storage.db
(:require
[app.common.spec :as us]
[app.db :as db]
[app.storage.impl :as impl]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.exec :as px])
(:import
java.io.ByteArrayInputStream))
;; --- BACKEND INIT
(defmethod ig/pre-init-spec ::backend [_]
(s/keys :opt-un [::db/pool]))
(defmethod ig/init-key ::backend
[_ cfg]
(assoc cfg :type :db))
(s/def ::type ::us/keyword)
(s/def ::backend
(s/keys :req-un [::type ::db/pool]))
;; --- API IMPL
(defmethod impl/put-object :db
[{:keys [conn executor] :as storage} {:keys [id] :as object} content]
(px/with-dispatch executor
(let [data (impl/slurp-bytes content)]
(db/insert! conn :storage-data {:id id :data data})
object)))
(defmethod impl/get-object-data :db
[{:keys [conn executor] :as backend} {:keys [id] :as object}]
(px/with-dispatch executor
(let [result (db/exec-one! conn ["select data from storage_data where id=?" id])]
(ByteArrayInputStream. (:data result)))))
(defmethod impl/get-object-bytes :db
[{:keys [conn executor] :as backend} {:keys [id] :as object}]
(px/with-dispatch executor
(let [result (db/exec-one! conn ["select data from storage_data where id=?" id])]
(:data result))))
(defmethod impl/get-object-url :db
[_ _]
(throw (UnsupportedOperationException. "not supported")))
(defmethod impl/del-object :db
[_ _]
;; NOOP: because deleting the row already deletes the file data from
;; the database.
nil)
(defmethod impl/del-objects-in-bulk :db
[_ _]
;; NOOP: because deleting the row already deletes the file data from
;; the database.
nil)

View file

@ -10,11 +10,13 @@
[app.common.spec :as us]
[app.common.uri :as u]
[app.storage.impl :as impl]
[app.util.bytes :as bs]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.core :as fs]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px])
(:import
java.io.InputStream
@ -72,9 +74,10 @@
(io/input-stream full))))
(defmethod impl/get-object-bytes :fs
[{:keys [executor] :as backend} object]
(px/with-dispatch executor
(fs/slurp-bytes (impl/get-object-data backend object))))
[backend object]
(p/let [input (impl/get-object-data backend object)]
(ex/with-always (bs/close! input)
(bs/read-as-bytes input))))
(defmethod impl/get-object-url :fs
[{:keys [uri executor] :as backend} {:keys [id] :as object} _]

View file

@ -9,18 +9,15 @@
(:require
[app.common.data.macros :as dm]
[app.common.exceptions :as ex]
[app.common.uuid :as uuid]
[app.util.bytes :as bs]
[buddy.core.codecs :as bc]
[buddy.core.hash :as bh]
[clojure.java.io :as io])
(:import
java.nio.ByteBuffer
java.util.UUID
java.io.ByteArrayInputStream
java.io.InputStream
java.nio.file.Files
org.apache.commons.io.input.BoundedInputStream
))
java.nio.file.Path
java.util.UUID))
;; --- API Definition
@ -95,23 +92,23 @@
(defn coerce-id
[id]
(cond
(string? id) (uuid/uuid id)
(string? id) (parse-uuid id)
(uuid? id) id
:else (ex/raise :type :internal
:code :invalid-id-type
:hint "id should be string or uuid")))
(defprotocol IContentObject
(size [_] "get object size"))
(get-size [_] "get object size"))
(defprotocol IContentHash
(get-hash [_] "get precalculated hash"))
(defn- make-content
[^InputStream is ^long size]
(defn- path->content
[^Path path ^long size]
(reify
IContentObject
(size [_] size)
(get-size [_] size)
io/IOFactory
(make-reader [this opts]
@ -119,47 +116,53 @@
(make-writer [_ _]
(throw (UnsupportedOperationException. "not implemented")))
(make-input-stream [_ _]
(doto (BoundedInputStream. is size)
(.setPropagateClose false)))
(-> (io/input-stream path)
(bs/bounded-input-stream size)))
(make-output-stream [_ _]
(throw (UnsupportedOperationException. "not implemented")))))
(defn- bytes->content
[^bytes data ^long size]
(reify
IContentObject
(get-size [_] size)
io/IOFactory
(make-reader [this opts]
(io/make-reader this opts))
(make-writer [_ _]
(throw (UnsupportedOperationException. "not implemented")))
clojure.lang.Counted
(count [_] size)
java.lang.AutoCloseable
(close [_]
(.close is))))
(make-input-stream [_ _]
(-> (bs/bytes-input-stream data)
(bs/bounded-input-stream size)))
(make-output-stream [_ _]
(throw (UnsupportedOperationException. "not implemented")))))
(defn content
([data] (content data nil))
([data size]
(cond
(instance? java.nio.file.Path data)
(make-content (io/input-stream data)
(Files/size data))
(path->content data (or size (Files/size data)))
(instance? java.io.File data)
(content (.toPath ^java.io.File data) nil)
(content (.toPath ^java.io.File data) size)
(instance? String data)
(let [data (.getBytes data "UTF-8")
bais (ByteArrayInputStream. ^bytes data)]
(make-content bais (alength data)))
(let [data (.getBytes data "UTF-8")]
(bytes->content data (alength data)))
(bytes? data)
(let [size (alength ^bytes data)
bais (ByteArrayInputStream. ^bytes data)]
(make-content bais size))
(bytes->content data (or size (alength ^bytes data)))
(instance? InputStream data)
(do
(when-not size
(throw (UnsupportedOperationException. "size should be provided on InputStream")))
(make-content data size))
;; (instance? InputStream data)
;; (do
;; (when-not size
;; (throw (UnsupportedOperationException. "size should be provided on InputStream")))
;; (make-content data size))
:else
(throw (UnsupportedOperationException. "type not supported")))))
(throw (IllegalArgumentException. "invalid argument type")))))
(defn wrap-with-hash
[content ^String hash]
@ -171,7 +174,7 @@
(reify
IContentObject
(size [_] (size content))
(get-size [_] (get-size content))
IContentHash
(get-hash [_] hash)
@ -184,43 +187,17 @@
(make-input-stream [_ opts]
(io/make-input-stream content opts))
(make-output-stream [_ opts]
(io/make-output-stream content opts))
clojure.lang.Counted
(count [_] (count content))
java.lang.AutoCloseable
(close [_]
(.close ^java.lang.AutoCloseable content))))
(io/make-output-stream content opts))))
(defn content?
[v]
(satisfies? IContentObject v))
(defn slurp-bytes
[content]
(with-open [input (io/input-stream content)
output (java.io.ByteArrayOutputStream. (count content))]
(io/copy input output)
(.toByteArray output)))
(defn calculate-hash
[path-or-stream]
(let [result (cond
(instance? InputStream path-or-stream)
(let [result (-> (bh/blake2b-256 path-or-stream)
(bc/bytes->hex))]
(.reset path-or-stream)
result)
(string? path-or-stream)
(-> (bh/blake2b-256 path-or-stream)
(bc/bytes->hex))
:else
(with-open [is (io/input-stream path-or-stream)]
(-> (bh/blake2b-256 is)
(bc/bytes->hex))))]
[resource]
(let [result (with-open [input (io/input-stream resource)]
(-> (bh/blake2b-256 input)
(bc/bytes->hex)))]
(str "blake2b:" result)))
(defn resolve-backend

View file

@ -12,14 +12,17 @@
[app.common.spec :as us]
[app.common.uri :as u]
[app.storage.impl :as impl]
[app.storage.tmp :as tmp]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[datoteka.core :as fs]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px])
(:import
java.io.FilterInputStream
java.io.InputStream
java.nio.ByteBuffer
java.time.Duration
@ -30,6 +33,7 @@
org.reactivestreams.Subscription
software.amazon.awssdk.core.ResponseBytes
software.amazon.awssdk.core.async.AsyncRequestBody
software.amazon.awssdk.core.async.AsyncResponseTransformer
software.amazon.awssdk.core.client.config.ClientAsyncConfiguration
software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption
software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
@ -107,7 +111,16 @@
(defmethod impl/get-object-data :s3
[backend object]
(get-object-data backend object))
(letfn [(no-such-key? [cause]
(instance? software.amazon.awssdk.services.s3.model.NoSuchKeyException cause))
(handle-not-found [cause]
(ex/raise :type :not-found
:code :object-not-found
:hint "s3 object not found"
:cause cause))]
(-> (get-object-data backend object)
(p/catch no-such-key? handle-not-found))))
(defmethod impl/get-object-bytes :s3
[backend object]
@ -204,7 +217,7 @@
(reify
AsyncRequestBody
(contentLength [_]
(Optional/of (long (count content))))
(Optional/of (long (impl/get-size content))))
(^void subscribe [_ ^Subscriber s]
(let [thread (Thread. #(writer-fn s))]
@ -216,7 +229,6 @@
(cancel [_]
(.interrupt thread)
(.release sem 1))
(request [_ n]
(.release sem (int n))))))))))
@ -238,16 +250,31 @@
^AsyncRequestBody content))))
(defn get-object-data
[{:keys [client bucket prefix]} {:keys [id]}]
(p/let [gor (.. (GetObjectRequest/builder)
[{:keys [client bucket prefix]} {:keys [id size]}]
(let [gor (.. (GetObjectRequest/builder)
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
obj (.getObject ^S3AsyncClient client ^GetObjectRequest gor)
;; rsp (.response ^ResponseInputStream obj)
;; len (.contentLength ^GetObjectResponse rsp)
]
(io/input-stream obj)))
(build))]
;; If the file size is greater than 2MiB then stream the content
;; to the filesystem and then read with buffered inputstream; if
;; not, read the contento into memory using bytearrays.
(if (> size (* 1024 1024 2))
(p/let [path (tmp/tempfile :prefix "penpot.storage.s3.")
rxf (AsyncResponseTransformer/toFile path)
_ (.getObject ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)]
(proxy [FilterInputStream] [(io/input-stream path)]
(close []
(fs/delete path)
(proxy-super close))))
(p/let [rxf (AsyncResponseTransformer/toBytes)
obj (.getObject ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)]
(.asInputStream ^ResponseBytes obj)))))
(defn get-object-bytes
[{:keys [client bucket prefix]} {:keys [id]}]
@ -255,7 +282,10 @@
(bucket bucket)
(key (str prefix (impl/id->path id)))
(build))
obj (.getObjectAsBytes ^S3AsyncClient client ^GetObjectRequest gor)]
rxf (AsyncResponseTransformer/toBytes)
obj (.getObjectAsBytes ^S3AsyncClient client
^GetObjectRequest gor
^AsyncResponseTransformer rxf)]
(.asByteArray ^ResponseBytes obj)))
(def default-max-age

View file

@ -0,0 +1,83 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.storage.tmp
"Temporal files service all created files will be tried to clean after
1 hour afrer creation. This is a best effort, if this process fails,
the operating system cleaning task should be responsible of
permanently delete these files (look at systemd-tempfiles)."
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.core.async :as a]
[clojure.spec.alpha :as s]
[datoteka.core :as fs]
[integrant.core :as ig]
[promesa.exec :as px]))
(declare remove-temp-file)
(defonce queue (a/chan 128))
(s/def ::min-age ::dt/duration)
(defmethod ig/pre-init-spec ::cleaner [_]
(s/keys :req-un [::min-age ::wrk/scheduler ::wrk/executor]))
(defmethod ig/prep-key ::cleaner
[_ cfg]
(merge {:min-age (dt/duration {:minutes 30})}
(d/without-nils cfg)))
(defmethod ig/init-key ::cleaner
[_ {:keys [scheduler executor min-age] :as cfg}]
(l/info :hint "starting tempfile cleaner service")
(let [cch (a/chan)]
(a/go-loop []
(let [[path port] (a/alts! [queue cch])]
(when (not= port cch)
(l/trace :hint "schedule tempfile deletion" :path path
:expires-at (dt/plus (dt/now) min-age))
(px/schedule! scheduler
(inst-ms min-age)
(partial remove-temp-file executor path))
(recur))))
cch))
(defmethod ig/halt-key! ::cleaner
[_ close-ch]
(l/info :hint "stoping tempfile cleaner service")
(some-> close-ch a/close!))
(defn- remove-temp-file
"Permanently delete tempfile"
[executor path]
(px/with-dispatch executor
(l/trace :hint "permanently delete tempfile" :path path)
(when (fs/exists? path)
(fs/delete path))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; API
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn tempfile
"Returns a tmpfile candidate (without creating it)"
[& {:keys [suffix prefix]
:or {prefix "penpot."
suffix ".tmp"}}]
(let [candidate (fs/tempfile :suffix suffix :prefix prefix)]
(a/offer! queue candidate)
candidate))
(defn create-tempfile
[& {:keys [suffix prefix]
:or {prefix "penpot."
suffix ".tmp"}}]
(let [path (fs/create-tempfile :suffix suffix :prefix prefix)]
(a/offer! queue path)
path))

View file

@ -0,0 +1,110 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) UXBOX Labs SL
(ns app.util.bytes
"Bytes & Byte Streams helpers"
(:require
[clojure.java.io :as io]
[datoteka.core :as fs]
[yetti.adapter :as yt])
(:import
com.github.luben.zstd.ZstdInputStream
com.github.luben.zstd.ZstdOutputStream
java.io.ByteArrayInputStream
java.io.ByteArrayOutputStream
java.io.DataInputStream
java.io.DataOutputStream
java.io.OutputStream
java.io.InputStream
java.lang.AutoCloseable
org.apache.commons.io.IOUtils
org.apache.commons.io.input.BoundedInputStream))
(set! *warn-on-reflection* true)
(def ^:const default-buffer-size
(:xnio/buffer-size yt/defaults))
(defn copy!
[src dst & {:keys [offset size buffer-size]
:or {offset 0 buffer-size default-buffer-size}}]
(let [^bytes buff (byte-array buffer-size)]
(if size
(IOUtils/copyLarge ^InputStream src ^OutputStream dst (long offset) (long size) buff)
(IOUtils/copyLarge ^InputStream src ^OutputStream dst buff))))
(defn write-to-file!
[src dst & {:keys [size]}]
(with-open [^OutputStream output (io/output-stream dst)]
(cond
(bytes? src)
(if size
(with-open [^InputStream input (ByteArrayInputStream. ^bytes src)]
(with-open [^InputStream input (BoundedInputStream. input (or size (alength ^bytes src)))]
(copy! input output :size size)))
(do
(IOUtils/writeChunked ^bytes src output)
(.flush ^OutputStream output)
(alength ^bytes src)))
(instance? InputStream src)
(copy! src output :size size)
:else
(throw (IllegalArgumentException. "invalid arguments")))))
(defn read-as-bytes
"Read input stream as byte array."
[input & {:keys [size]}]
(cond
(instance? InputStream input)
(with-open [output (ByteArrayOutputStream. (or size (.available ^InputStream input)))]
(copy! input output :size size)
(.toByteArray output))
(fs/path? input)
(with-open [input (io/input-stream input)
output (ByteArrayOutputStream. (or size (.available input)))]
(copy! input output :size size)
(.toByteArray output))
:else
(throw (IllegalArgumentException. "invalid arguments"))))
(defn bytes-input-stream
"Creates an instance of ByteArrayInputStream."
[^bytes data]
(ByteArrayInputStream. data))
(defn bounded-input-stream
[input size & {:keys [close?] :or {close? true}}]
(doto (BoundedInputStream. ^InputStream input ^long size)
(.setPropagateClose close?)))
(defn zstd-input-stream
^InputStream
[input]
(ZstdInputStream. ^InputStream input))
(defn zstd-output-stream
^OutputStream
[output & {:keys [level] :or {level 0}}]
(ZstdOutputStream. ^OutputStream output (int level)))
(defn data-input-stream
^DataInputStream
[input]
(DataInputStream. ^InputStream input))
(defn data-output-stream
^DataOutputStream
[output]
(DataOutputStream. ^OutputStream output))
(defn close!
[^AutoCloseable stream]
(.close stream))