0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-01-24 07:29:08 -05:00

Adds support to rx streams on workers framework

This commit is contained in:
alonso.torres 2021-05-27 14:31:10 +02:00
parent b648fb7446
commit 1a70071405
4 changed files with 76 additions and 51 deletions

View file

@ -26,3 +26,7 @@
(defn ask-buffered! (defn ask-buffered!
[message] [message]
(uw/ask-buffered! instance message)) (uw/ask-buffered! instance message))
(defn ask-many!
[message]
(uw/ask-many! instance message))

View file

@ -14,14 +14,25 @@
(declare handle-response) (declare handle-response)
(defrecord Worker [instance stream]) (defrecord Worker [instance stream])
(defn- send-message! [worker {sender-id :sender-id :as message}] (defn- send-message!
(let [data (t/encode message) ([worker message]
instance (:instance worker)] (send-message! worker message nil))
(.postMessage instance data)
(->> (:stream worker) ([worker {sender-id :sender-id :as message} {:keys [many?] :or {many? false}}]
(rx/filter #(= (:reply-to %) sender-id)) (let [take-messages
(rx/take 1) (fn [ob]
(rx/map handle-response)))) (if many?
(rx/take-while #(not (:completed %)) ob)
(rx/take 1 ob)))
data (t/encode message)
instance (:instance worker)]
(.postMessage instance data)
(->> (:stream worker)
(rx/filter #(= (:reply-to %) sender-id))
(take-messages)
(rx/map handle-response)))))
(defn ask! (defn ask!
[worker message] [worker message]
@ -30,6 +41,14 @@
{:sender-id (uuid/next) {:sender-id (uuid/next)
:payload message})) :payload message}))
(defn ask-many!
[worker message]
(send-message!
worker
{:sender-id (uuid/next)
:payload message}
{:many? true}))
(defn ask-buffered! (defn ask-buffered!
[worker message] [worker message]
(send-message! (send-message!

View file

@ -43,35 +43,42 @@
"Process the message and returns to the client" "Process the message and returns to the client"
[{:keys [sender-id payload] :as message}] [{:keys [sender-id payload] :as message}]
(us/assert ::message message) (us/assert ::message message)
(try (letfn [(post [msg]
(let [result (impl/handler payload)] (let [msg (-> msg (assoc :reply-to sender-id) (t/encode))]
(cond (.postMessage js/self msg)))
(p/promise? result)
(p/handle result
(fn [msg]
(.postMessage js/self (t/encode
{:reply-to sender-id
:payload msg})))
(fn [err]
(.postMessage js/self (t/encode
{:reply-to sender-id
:error {:data (ex-data err)
:message (ex-message err)}}))))
(or (rx/observable? result) (reply [result]
(rx/subject? result)) (post {:payload result}))
(throw (ex-info "not implemented" {}))
:else (reply-error [err]
(.postMessage js/self (t/encode (.error js/console "error" err)
{:reply-to sender-id (post {:error {:data (ex-data err)
:payload result})))) :message (ex-message err)}}))
(catch :default e
(.error js/console "error" e) (reply-completed
(let [message {:reply-to sender-id ([] (reply-completed nil))
:error {:data (ex-data e) ([msg] (post {:payload msg
:message (ex-message e)}}] :completed true})))]
(.postMessage js/self (t/encode message))))))
(try
(let [result (impl/handler payload)
promise? (p/promise? result)
stream? (or (rx/observable? result) (rx/subject? result))]
(cond
promise?
(-> result
(p/then reply-completed)
(p/catch reply-error))
stream?
(rx/subscribe result reply reply-error reply-completed)
:else
(reply result)))
(catch :default err
(reply-error err)))))
(defn- drop-message (defn- drop-message
"Sends to the client a notifiction that its messages have been dropped" "Sends to the client a notifiction that its messages have been dropped"

View file

@ -31,17 +31,12 @@
(defn- request-page (defn- request-page
[file-id page-id] [file-id page-id]
(let [uri "/api/rpc/query/page"] (let [uri "/api/rpc/query/page"]
(p/create (->> (http/send!
(fn [resolve reject] {:uri uri
(->> (http/send! {:uri uri :query {:file-id file-id :id page-id :strip-thumbnails true}
:query {:file-id file-id :id page-id :strip-thumbnails true} :method :get})
:method :get}) (rx/map http/conditional-decode-transit)
(rx/map http/conditional-decode-transit) (rx/mapcat handle-response))))
(rx/mapcat handle-response)
(rx/subs (fn [body]
(resolve body))
(fn [error]
(reject error))))))))
(defonce cache (atom {})) (defonce cache (atom {}))
@ -57,8 +52,8 @@
(defmethod impl/handler :thumbnails/generate (defmethod impl/handler :thumbnails/generate
[{:keys [file-id page-id] :as message}] [{:keys [file-id page-id] :as message}]
(p/then (->> (request-page file-id page-id)
(request-page file-id page-id) (rx/map
(fn [data] (fn [data]
{:svg (render-page data #{file-id page-id}) {:svg (render-page data #{file-id page-id})
:fonts @fonts/loaded}))) :fonts @fonts/loaded}))))