mirror of
https://github.com/penpot/penpot.git
synced 2025-03-10 14:51:37 -05:00
🐛 Fix incorrect handling of EOF on s3 upload thread
This commit is contained in:
parent
b62cc9c8e9
commit
302ff92b31
2 changed files with 29 additions and 62 deletions
|
@ -475,7 +475,8 @@
|
|||
::sto.s3/bucket (or (cf/get :storage-assets-s3-bucket)
|
||||
(cf/get :objects-storage-s3-bucket))
|
||||
::sto.s3/io-threads (or (cf/get :storage-assets-s3-io-threads)
|
||||
(cf/get :objects-storage-s3-io-threads))}
|
||||
(cf/get :objects-storage-s3-io-threads))
|
||||
::wrk/executor (ig/ref ::wrk/executor)}
|
||||
|
||||
:app.storage.fs/backend
|
||||
{::sto.fs/directory (or (cf/get :storage-assets-fs-directory)
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
[app.storage.impl :as impl]
|
||||
[app.storage.tmp :as tmp]
|
||||
[app.util.time :as dt]
|
||||
[app.worker :as-alias wrk]
|
||||
[clojure.java.io :as io]
|
||||
[clojure.spec.alpha :as s]
|
||||
[datoteka.fs :as fs]
|
||||
|
@ -27,17 +28,15 @@
|
|||
java.io.FilterInputStream
|
||||
java.io.InputStream
|
||||
java.net.URI
|
||||
java.nio.ByteBuffer
|
||||
java.nio.file.Path
|
||||
java.time.Duration
|
||||
java.util.Collection
|
||||
java.util.Optional
|
||||
java.util.concurrent.Semaphore
|
||||
org.reactivestreams.Subscriber
|
||||
org.reactivestreams.Subscription
|
||||
software.amazon.awssdk.core.ResponseBytes
|
||||
software.amazon.awssdk.core.async.AsyncRequestBody
|
||||
software.amazon.awssdk.core.async.AsyncResponseTransformer
|
||||
software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody
|
||||
software.amazon.awssdk.core.client.config.ClientAsyncConfiguration
|
||||
software.amazon.awssdk.core.client.config.SdkAdvancedAsyncClientOption
|
||||
software.amazon.awssdk.http.nio.netty.NettyNioAsyncHttpClient
|
||||
|
@ -84,7 +83,7 @@
|
|||
(s/def ::io-threads ::us/integer)
|
||||
|
||||
(defmethod ig/pre-init-spec ::backend [_]
|
||||
(s/keys :opt [::region ::bucket ::prefix ::endpoint ::io-threads]))
|
||||
(s/keys :opt [::region ::bucket ::prefix ::endpoint ::io-threads ::wrk/executor]))
|
||||
|
||||
(defmethod ig/prep-key ::backend
|
||||
[_ {:keys [::prefix ::region] :as cfg}]
|
||||
|
@ -238,71 +237,38 @@
|
|||
(.serviceConfiguration ^S3Configuration config)
|
||||
(.build))))
|
||||
|
||||
(defn- upload-thread
|
||||
[id subscriber sem content]
|
||||
(px/thread
|
||||
{:name "penpot/s3/uploader"
|
||||
:virtual true
|
||||
:daemon true}
|
||||
(l/debug :hint "start upload thread"
|
||||
:object-id (str id)
|
||||
:size (impl/get-size content)
|
||||
::l/sync? true)
|
||||
|
||||
;; FIXME: improve buffer reusing
|
||||
(let [stream (io/input-stream content)
|
||||
bsize (* 1024 64)
|
||||
tpoint (dt/tpoint)]
|
||||
(try
|
||||
(loop []
|
||||
(.acquire ^Semaphore sem 1)
|
||||
(let [buffer (byte-array bsize)
|
||||
readed (.read ^InputStream stream buffer)]
|
||||
(when (pos? readed)
|
||||
(let [data (ByteBuffer/wrap ^bytes buffer 0 readed)]
|
||||
(.onNext ^Subscriber subscriber ^ByteBuffer data)
|
||||
(when (= readed bsize)
|
||||
(recur))))))
|
||||
(.onComplete ^Subscriber subscriber)
|
||||
(catch InterruptedException _
|
||||
(l/trace :hint "interrupted upload thread"
|
||||
:object-:id (str id)
|
||||
::l/sync? true)
|
||||
nil)
|
||||
(catch Throwable cause
|
||||
(.onError ^Subscriber subscriber cause))
|
||||
(finally
|
||||
(l/trace :hint "end upload thread"
|
||||
:object-id (str id)
|
||||
:elapsed (dt/format-duration (tpoint))
|
||||
::l/sync? true)
|
||||
(.close ^InputStream stream))))))
|
||||
(defn- write-input-stream
|
||||
[delegate input]
|
||||
(try
|
||||
(.writeInputStream ^BlockingInputStreamAsyncRequestBody delegate
|
||||
^InputStream input)
|
||||
(catch Throwable cause
|
||||
(l/error :hint "encountered error while writing input stream to service"
|
||||
:cause cause))
|
||||
(finally
|
||||
(.close ^InputStream input))))
|
||||
|
||||
(defn- make-request-body
|
||||
[id content]
|
||||
(reify
|
||||
AsyncRequestBody
|
||||
(contentLength [_]
|
||||
(Optional/of (long (impl/get-size content))))
|
||||
|
||||
(^void subscribe [_ ^Subscriber subscriber]
|
||||
(let [sem (Semaphore. 0)
|
||||
thr (upload-thread id subscriber sem content)]
|
||||
(.onSubscribe subscriber
|
||||
(reify Subscription
|
||||
(cancel [_]
|
||||
(px/interrupt! thr)
|
||||
(.release sem 1))
|
||||
(request [_ n]
|
||||
(.release sem (int n)))))))))
|
||||
[executor content]
|
||||
(let [size (impl/get-size content)]
|
||||
(reify
|
||||
AsyncRequestBody
|
||||
(contentLength [_]
|
||||
(Optional/of (long size)))
|
||||
|
||||
(^void subscribe [_ ^Subscriber subscriber]
|
||||
(let [delegate (AsyncRequestBody/forBlockingInputStream (long size))
|
||||
input (io/input-stream content)]
|
||||
(px/run! executor (partial write-input-stream delegate input))
|
||||
(.subscribe ^BlockingInputStreamAsyncRequestBody delegate
|
||||
^Subscriber subscriber))))))
|
||||
|
||||
(defn- put-object
|
||||
[{:keys [::client ::bucket ::prefix]} {:keys [id] :as object} content]
|
||||
[{:keys [::client ::bucket ::prefix ::wrk/executor]} {:keys [id] :as object} content]
|
||||
(let [path (dm/str prefix (impl/id->path id))
|
||||
mdata (meta object)
|
||||
mtype (:content-type mdata "application/octet-stream")
|
||||
rbody (make-request-body id content)
|
||||
rbody (make-request-body executor content)
|
||||
request (.. (PutObjectRequest/builder)
|
||||
(bucket bucket)
|
||||
(contentType mtype)
|
||||
|
|
Loading…
Add table
Reference in a new issue