From 9329c2ebd93655ce1f0d51d45dcfa4265a69f39e Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Fri, 25 Oct 2024 16:13:06 +0200 Subject: [PATCH] :bug: Fix incorrect handling of EOF on s3 upload thread --- backend/src/app/main.clj | 3 +- backend/src/app/storage/s3.clj | 88 +++++++++++----------------------- 2 files changed, 29 insertions(+), 62 deletions(-) diff --git a/backend/src/app/main.clj b/backend/src/app/main.clj index bda16bb3b..1e6ae5644 100644 --- a/backend/src/app/main.clj +++ b/backend/src/app/main.clj @@ -471,7 +471,8 @@ ::sto.s3/bucket (or (cf/get :storage-assets-s3-bucket) (cf/get :objects-storage-s3-bucket)) ::sto.s3/io-threads (or (cf/get :storage-assets-s3-io-threads) - (cf/get :objects-storage-s3-io-threads))} + (cf/get :objects-storage-s3-io-threads)) + ::wrk/executor (ig/ref ::wrk/executor)} :app.storage.fs/backend {::sto.fs/directory (or (cf/get :storage-assets-fs-directory) diff --git a/backend/src/app/storage/s3.clj b/backend/src/app/storage/s3.clj index e7e176463..93a18aa11 100644 --- a/backend/src/app/storage/s3.clj +++ b/backend/src/app/storage/s3.clj @@ -17,6 +17,7 @@ [app.storage.impl :as impl] [app.storage.tmp :as tmp] [app.util.time :as dt] + [app.worker :as-alias wrk] [clojure.java.io :as io] [clojure.spec.alpha :as s] [datoteka.fs :as fs] @@ -27,17 +28,15 @@ java.io.FilterInputStream java.io.InputStream java.net.URI - java.nio.ByteBuffer java.nio.file.Path java.time.Duration java.util.Collection java.util.Optional - java.util.concurrent.Semaphore org.reactivestreams.Subscriber - org.reactivestreams.Subscription software.amazon.awssdk.core.ResponseBytes software.amazon.awssdk.core.async.AsyncRequestBody software.amazon.awssdk.core.async.AsyncResponseTransformer + software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody software.amazon.awssdk.core.client.config.ClientAsyncConfiguration software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient @@ -84,7 +83,7 @@ (s/def ::io-threads ::us/integer) (defmethod ig/pre-init-spec ::backend [_] - (s/keys :opt [::region ::bucket ::prefix ::endpoint ::io-threads])) + (s/keys :opt [::region ::bucket ::prefix ::endpoint ::io-threads ::wrk/executor])) (defmethod ig/prep-key ::backend [_ {:keys [::prefix ::region] :as cfg}] @@ -238,71 +237,38 @@ (.serviceConfiguration ^S3Configuration config) (.build)))) -(defn- upload-thread - [id subscriber sem content] - (px/thread - {:name "penpot/s3/uploader" - :virtual true - :daemon true} - (l/debug :hint "start upload thread" - :object-id (str id) - :size (impl/get-size content) - ::l/sync? true) - - ;; FIXME: improve buffer reusing - (let [stream (io/input-stream content) - bsize (* 1024 64) - tpoint (dt/tpoint)] - (try - (loop [] - (.acquire ^Semaphore sem 1) - (let [buffer (byte-array bsize) - readed (.read ^InputStream stream buffer)] - (when (pos? readed) - (let [data (ByteBuffer/wrap ^bytes buffer 0 readed)] - (.onNext ^Subscriber subscriber ^ByteBuffer data) - (when (= readed bsize) - (recur)))))) - (.onComplete ^Subscriber subscriber) - (catch InterruptedException _ - (l/trace :hint "interrupted upload thread" - :object-:id (str id) - ::l/sync? true) - nil) - (catch Throwable cause - (.onError ^Subscriber subscriber cause)) - (finally - (l/trace :hint "end upload thread" - :object-id (str id) - :elapsed (dt/format-duration (tpoint)) - ::l/sync? true) - (.close ^InputStream stream)))))) +(defn- write-input-stream + [delegate input] + (try + (.writeInputStream ^BlockingInputStreamAsyncRequestBody delegate + ^InputStream input) + (catch Throwable cause + (l/error :hint "encountered error while writing input stream to service" + :cause cause)) + (finally + (.close ^InputStream input)))) (defn- make-request-body - [id content] - (reify - AsyncRequestBody - (contentLength [_] - (Optional/of (long (impl/get-size content)))) - - (^void subscribe [_ ^Subscriber subscriber] - (let [sem (Semaphore. 0) - thr (upload-thread id subscriber sem content)] - (.onSubscribe subscriber - (reify Subscription - (cancel [_] - (px/interrupt! thr) - (.release sem 1)) - (request [_ n] - (.release sem (int n))))))))) + [executor content] + (let [size (impl/get-size content)] + (reify + AsyncRequestBody + (contentLength [_] + (Optional/of (long size))) + (^void subscribe [_ ^Subscriber subscriber] + (let [delegate (AsyncRequestBody/forBlockingInputStream (long size)) + input (io/input-stream content)] + (px/run! executor (partial write-input-stream delegate input)) + (.subscribe ^BlockingInputStreamAsyncRequestBody delegate + ^Subscriber subscriber)))))) (defn- put-object - [{:keys [::client ::bucket ::prefix]} {:keys [id] :as object} content] + [{:keys [::client ::bucket ::prefix ::wrk/executor]} {:keys [id] :as object} content] (let [path (dm/str prefix (impl/id->path id)) mdata (meta object) mtype (:content-type mdata "application/octet-stream") - rbody (make-request-body id content) + rbody (make-request-body executor content) request (.. (PutObjectRequest/builder) (bucket bucket) (contentType mtype)