diff --git a/backend/resources/app/templates/debug.tmpl b/backend/resources/app/templates/debug.tmpl index d408d5d9d..8ba83a241 100644 --- a/backend/resources/app/templates/debug.tmpl +++ b/backend/resources/app/templates/debug.tmpl @@ -7,7 +7,7 @@ Debug Main Page {% block content %}
diff --git a/backend/scripts/repl b/backend/scripts/repl index 7170dbee4..eec5ba5aa 100755 --- a/backend/scripts/repl +++ b/backend/scripts/repl @@ -68,6 +68,7 @@ export AWS_SECRET_ACCESS_KEY=penpot-devenv export PENPOT_OBJECTS_STORAGE_BACKEND=s3 export PENPOT_OBJECTS_STORAGE_S3_ENDPOINT=http://minio:9000 export PENPOT_OBJECTS_STORAGE_S3_BUCKET=penpot +export PENPOT_OBJECTS_STORAGE_FS_DIRECTORY="assets" export OPTIONS=" -A:jmx-remote -A:dev \ diff --git a/backend/src/app/http/debug.clj b/backend/src/app/http/debug.clj index c62202572..c9174813f 100644 --- a/backend/src/app/http/debug.clj +++ b/backend/src/app/http/debug.clj @@ -47,7 +47,7 @@ {::rres/status 200 ::rres/headers {"content-type" "text/html"} ::rres/body (-> (io/resource "app/templates/debug.tmpl") - (tmpl/render {}))}) + (tmpl/render {:version (:full cf/version)}))}) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; FILE CHANGES diff --git a/backend/src/app/loggers/database.clj b/backend/src/app/loggers/database.clj index e2892d13f..bf9e9e3f9 100644 --- a/backend/src/app/loggers/database.clj +++ b/backend/src/app/loggers/database.clj @@ -63,7 +63,7 @@ (ex/format-throwable cause :data? false :explain? false :header? false :summary? false))} (when-let [params (or (:request/params context) (:params context))] - {:params (pp/pprint-str params :length 30 :level 12)}) + {:params (pp/pprint-str params :length 30 :level 13)}) (when-let [value (:value context)] {:value (pp/pprint-str value :length 30 :level 12)}) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index 314732d9f..f66193646 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -475,7 +475,8 @@ ::sto.s3/bucket (or (cf/get :storage-assets-s3-bucket) (cf/get :objects-storage-s3-bucket)) ::sto.s3/io-threads (or (cf/get :storage-assets-s3-io-threads) - (cf/get :objects-storage-s3-io-threads))} + (cf/get :objects-storage-s3-io-threads)) + ::wrk/executor (ig/ref ::wrk/executor)} :app.storage.fs/backend {::sto.fs/directory (or (cf/get :storage-assets-fs-directory) diff --git a/backend/src/app/storage.clj b/backend/src/app/storage.clj index 861730e33..426ff2ca2 100644 --- a/backend/src/app/storage.clj +++ b/backend/src/app/storage.clj @@ -10,6 +10,7 @@ (:require [app.common.data :as d] [app.common.data.macros :as dm] + [app.common.logging :as l] [app.common.spec :as us] [app.common.uuid :as uuid] [app.config :as cf] @@ -19,6 +20,7 @@ [app.storage.s3 :as ss3] [app.util.time :as dt] [clojure.spec.alpha :as s] + [cuerdas.core :as str] [datoteka.fs :as fs] [integrant.core :as ig]) (:import @@ -30,7 +32,7 @@ (case name :assets-fs :fs :assets-s3 :s3 - :fs))) + nil))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Storage Module State @@ -52,11 +54,19 @@ (defmethod ig/init-key ::storage [_ {:keys [::backends ::db/pool] :as cfg}] - (-> (d/without-nils cfg) - (assoc ::backends (d/without-nils backends)) - (assoc ::backend (or (get-legacy-backend) - (cf/get :objects-storage-backend :fs))) - (assoc ::db/connectable pool))) + (let [backend (or (get-legacy-backend) + (cf/get :objects-storage-backend) + :fs) + backends (d/without-nils backends)] + + (l/dbg :hint "initialize" + :default (d/name backend) + :available (str/join "," (map d/name (keys backends)))) + + (-> (d/without-nils cfg) + (assoc ::backends backends) + (assoc ::backend backend) + (assoc ::db/connectable pool)))) (s/def ::backend keyword?) (s/def ::storage diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index 1bbb38b16..2adde671f 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -17,6 +17,7 @@ [app.storage.impl :as impl] [app.storage.tmp :as tmp] [app.util.time :as dt] + [app.worker :as-alias wrk] [clojure.java.io :as io] [clojure.spec.alpha :as s] [datoteka.fs :as fs] @@ -27,17 +28,15 @@ java.io.FilterInputStream java.io.InputStream java.net.URI - java.nio.ByteBuffer java.nio.file.Path java.time.Duration java.util.Collection java.util.Optional - java.util.concurrent.Semaphore org.reactivestreams.Subscriber - org.reactivestreams.Subscription software.amazon.awssdk.core.ResponseBytes software.amazon.awssdk.core.async.AsyncRequestBody software.amazon.awssdk.core.async.AsyncResponseTransformer + software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody software.amazon.awssdk.core.client.config.ClientAsyncConfiguration software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient @@ -59,6 +58,20 @@ software.amazon.awssdk.services.s3.presigner.model.GetObjectPresignRequest software.amazon.awssdk.services.s3.presigner.model.PresignedGetObjectRequest)) +(def ^:private max-retries + "A maximum number of retries on internal operations" + 3) + +(def ^:private max-concurrency + "Maximum concurrent request to S3 service" + 128) + +(def ^:private max-pending-connection-acquires + 20000) + +(def default-timeout + (dt/duration {:seconds 30})) + (declare put-object) (declare get-object-bytes) (declare get-object-data) @@ -80,7 +93,7 @@ (s/def ::io-threads ::us/integer) (defmethod ig/pre-init-spec ::backend [_] - (s/keys :opt [::region ::bucket ::prefix ::endpoint ::io-threads])) + (s/keys :opt [::region ::bucket ::prefix ::endpoint ::io-threads ::wrk/executor])) (defmethod ig/prep-key ::backend [_ {:keys [::prefix ::region] :as cfg}] @@ -128,18 +141,29 @@ [backend object] (us/assert! ::backend backend) - (let [result (p/await (get-object-data backend object))] - (if (ex/exception? result) - (cond - (ex/instance? NoSuchKeyException result) - (ex/raise :type :not-found - :code :object-not-found - :hint "s3 object not found" - :cause result) - :else - (throw result)) + (loop [result (get-object-data backend object) + retryn 0] - result))) + (let [result (p/await result)] + (if (ex/exception? result) + (cond + (ex/instance? NoSuchKeyException result) + (ex/raise :type :not-found + :code :object-not-found + :hint "s3 object not found" + :object-id (:id object) + :object-path (impl/id->path (:id object)) + :cause result) + + (and (ex/instance? java.nio.file.FileAlreadyExistsException result) + (< retryn max-retries)) + (recur (get-object-data backend object) + (inc retryn)) + + :else + (throw result)) + + result)))) (defmethod impl/get-object-bytes :s3 [backend object] @@ -163,18 +187,14 @@ ;; --- HELPERS -(def default-timeout - (dt/duration {:seconds 30})) - (defn- lookup-region ^Region [region] (Region/of (name region))) (defn- build-s3-client - [{:keys [::region ::endpoint ::io-threads]}] - (let [executor (px/resolve-executor :virtual) - aconfig (-> (ClientAsyncConfiguration/builder) + [{:keys [::region ::endpoint ::io-threads ::wrk/executor]}] + (let [aconfig (-> (ClientAsyncConfiguration/builder) (.advancedOption SdkAdvancedAsyncClientOption/FUTURE_COMPLETION_EXECUTOR executor) (.build)) @@ -190,6 +210,8 @@ (.connectionTimeout default-timeout) (.readTimeout default-timeout) (.writeTimeout default-timeout) + (.maxConcurrency (int max-concurrency)) + (.maxPendingConnectionAcquires (int max-pending-connection-acquires)) (.build)) client (let [builder (S3AsyncClient/builder) @@ -223,69 +245,38 @@ (.serviceConfiguration ^S3Configuration config) (.build)))) -(defn- upload-thread - [id subscriber sem content] - (px/thread - {:name "penpot/s3/uploader" - :virtual true - :daemon true} - (l/trace :hint "start upload thread" - :object-id (str id) - :size (impl/get-size content) - ::l/sync? true) - (let [stream (io/input-stream content) - bsize (* 1024 64) - tpoint (dt/tpoint)] - (try - (loop [] - (.acquire ^Semaphore sem 1) - (let [buffer (byte-array bsize) - readed (.read ^InputStream stream buffer)] - (when (pos? readed) - (let [data (ByteBuffer/wrap ^bytes buffer 0 readed)] - (.onNext ^Subscriber subscriber ^ByteBuffer data) - (when (= readed bsize) - (recur)))))) - (.onComplete ^Subscriber subscriber) - (catch InterruptedException _ - (l/trace :hint "interrupted upload thread" - :object-:id (str id) - ::l/sync? true) - nil) - (catch Throwable cause - (.onError ^Subscriber subscriber cause)) - (finally - (l/trace :hint "end upload thread" - :object-id (str id) - :elapsed (dt/format-duration (tpoint)) - ::l/sync? true) - (.close ^InputStream stream)))))) +(defn- write-input-stream + [delegate input] + (try + (.writeInputStream ^BlockingInputStreamAsyncRequestBody delegate + ^InputStream input) + (catch Throwable cause + (l/error :hint "encountered error while writing input stream to service" + :cause cause)) + (finally + (.close ^InputStream input)))) (defn- make-request-body - [id content] - (reify - AsyncRequestBody - (contentLength [_] - (Optional/of (long (impl/get-size content)))) - - (^void subscribe [_ ^Subscriber subscriber] - (let [sem (Semaphore. 0) - thr (upload-thread id subscriber sem content)] - (.onSubscribe subscriber - (reify Subscription - (cancel [_] - (px/interrupt! thr) - (.release sem 1)) - (request [_ n] - (.release sem (int n))))))))) + [executor content] + (let [size (impl/get-size content)] + (reify + AsyncRequestBody + (contentLength [_] + (Optional/of (long size))) + (^void subscribe [_ ^Subscriber subscriber] + (let [delegate (AsyncRequestBody/forBlockingInputStream (long size)) + input (io/input-stream content)] + (px/run! executor (partial write-input-stream delegate input)) + (.subscribe ^BlockingInputStreamAsyncRequestBody delegate + ^Subscriber subscriber)))))) (defn- put-object - [{:keys [::client ::bucket ::prefix]} {:keys [id] :as object} content] + [{:keys [::client ::bucket ::prefix ::wrk/executor]} {:keys [id] :as object} content] (let [path (dm/str prefix (impl/id->path id)) mdata (meta object) mtype (:content-type mdata "application/octet-stream") - rbody (make-request-body id content) + rbody (make-request-body executor content) request (.. (PutObjectRequest/builder) (bucket bucket) (contentType mtype) diff --git a/backend/src/app/storage/tmp.clj b/backend/src/app/storage/tmp.clj index 92cda29eb..376c6ae8b 100644 --- a/backend/src/app/storage/tmp.clj +++ b/backend/src/app/storage/tmp.clj @@ -11,13 +11,16 @@ permanently delete these files (look at systemd-tempfiles)." (:require [app.common.logging :as l] + [app.common.uuid :as uuid] [app.util.time :as dt] [app.worker :as wrk] [clojure.spec.alpha :as s] [datoteka.fs :as fs] [integrant.core :as ig] [promesa.exec :as px] - [promesa.exec.csp :as sp])) + [promesa.exec.csp :as sp]) + (:import + java.nio.file.Files)) (def default-tmp-dir "/tmp/penpot") @@ -76,11 +79,9 @@ [& {:keys [suffix prefix min-age] :or {prefix "penpot." suffix ".tmp"}}] - (let [path (fs/create-tempfile - :perms "rw-r--r--" - :dir default-tmp-dir - :suffix suffix - :prefix prefix)] + (let [attrs (fs/make-permissions "rw-r--r--") + path (fs/join default-tmp-dir (str prefix (uuid/next) suffix)) + path (Files/createFile path attrs)] (fs/delete-on-exit! path) (sp/offer! queue [path (some-> min-age dt/duration)]) path)) diff --git a/backend/src/app/worker/executor.clj b/backend/src/app/worker/executor.clj index c1d10122c..b712c6769 100644 --- a/backend/src/app/worker/executor.clj +++ b/backend/src/app/worker/executor.clj @@ -17,12 +17,11 @@ [integrant.core :as ig] [promesa.exec :as px]) (:import - java.util.concurrent.Executor java.util.concurrent.ThreadPoolExecutor)) (set! *warn-on-reflection* true) -(s/def ::wrk/executor #(instance? Executor %)) +(s/def ::wrk/executor #(instance? ThreadPoolExecutor %)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; EXECUTOR @@ -36,30 +35,22 @@ (let [factory (px/thread-factory :prefix "penpot/default/") executor (px/cached-executor :factory factory :keepalive 60000)] (l/inf :hint "executor started") - (reify - java.lang.AutoCloseable - (close [_] - (l/inf :hint "stoping executor") - (px/shutdown! executor)) - - clojure.lang.IDeref - (deref [_] - {:active (.getPoolSize ^ThreadPoolExecutor executor) - :running (.getActiveCount ^ThreadPoolExecutor executor) - :completed (.getCompletedTaskCount ^ThreadPoolExecutor executor)}) - - Executor - (execute [_ runnable] - (.execute ^Executor executor ^Runnable runnable))))) + executor)) (defmethod ig/halt-key! ::wrk/executor [_ instance] - (.close ^java.lang.AutoCloseable instance)) + (px/shutdown! instance)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; MONITOR ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; +(defn- get-stats + [^ThreadPoolExecutor executor] + {:active (.getPoolSize ^ThreadPoolExecutor executor) + :running (.getActiveCount ^ThreadPoolExecutor executor) + :completed (.getCompletedTaskCount ^ThreadPoolExecutor executor)}) + (s/def ::name ::us/keyword) (defmethod ig/pre-init-spec ::wrk/monitor [_] @@ -74,7 +65,7 @@ [_ {:keys [::wrk/executor ::mtx/metrics ::interval ::wrk/name]}] (letfn [(monitor! [executor prev-completed] (let [labels (into-array String [(d/name name)]) - stats (deref executor) + stats (get-stats executor) completed (:completed stats) completed-inc (- completed prev-completed) diff --git a/common/src/app/common/files/changes.cljc b/common/src/app/common/files/changes.cljc index 33a273121..3c097b5d0 100644 --- a/common/src/app/common/files/changes.cljc +++ b/common/src/app/common/files/changes.cljc @@ -418,7 +418,8 @@ (cts/shape? shape-new)) (ex/raise :type :assertion :code :data-validation - :hint "invalid shape found after applying changes")))))] + :hint "invalid shape found after applying changes" + ::sm/explain (cts/explain-shape shape-new))))))] (->> (into #{} (map :page-id) items) (mapcat (fn [page-id] diff --git a/common/src/app/common/schema.cljc b/common/src/app/common/schema.cljc index c0c933266..25e802f8b 100644 --- a/common/src/app/common/schema.cljc +++ b/common/src/app/common/schema.cljc @@ -686,8 +686,8 @@ pred) pred (if (some? max) (fn [v] - (and (>= max v) - (pred v))) + (and (pred v) + (>= max v))) pred)] {:pred pred @@ -724,8 +724,8 @@ pred) pred (if (some? max) (fn [v] - (and (>= max v) - (pred v))) + (and (pred v) + (>= max v))) pred)] {:pred pred @@ -754,8 +754,8 @@ pred) pred (if (some? max) (fn [v] - (and (>= max v) - (pred v))) + (and (pred v) + (>= max v))) pred) gen (sg/one-of diff --git a/common/src/app/common/types/shape.cljc b/common/src/app/common/types/shape.cljc index 5dfa9a016..1fb69eb29 100644 --- a/common/src/app/common/types/shape.cljc +++ b/common/src/app/common/types/shape.cljc @@ -361,6 +361,9 @@ (def valid-shape? (sm/lazy-validator schema:shape)) +(def explain-shape + (sm/lazy-explainer schema:shape)) + (defn has-images? [{:keys [fills strokes]}] (or (some :fill-image fills)