From 1a7007140583f4da28111873b1dabc5968f5bfca Mon Sep 17 00:00:00 2001 From: "alonso.torres" Date: Thu, 27 May 2021 14:31:10 +0200 Subject: [PATCH] :sparkles: Adds support to rx streams on workers framework --- frontend/src/app/main/worker.cljs | 4 ++ frontend/src/app/util/worker.cljs | 35 ++++++++++---- frontend/src/app/worker.cljs | 61 ++++++++++++++----------- frontend/src/app/worker/thumbnails.cljs | 27 +++++------ 4 files changed, 76 insertions(+), 51 deletions(-) diff --git a/frontend/src/app/main/worker.cljs b/frontend/src/app/main/worker.cljs index a0985d97f..de564536d 100644 --- a/frontend/src/app/main/worker.cljs +++ b/frontend/src/app/main/worker.cljs @@ -26,3 +26,7 @@ (defn ask-buffered! [message] (uw/ask-buffered! instance message)) + +(defn ask-many! + [message] + (uw/ask-many! instance message)) diff --git a/frontend/src/app/util/worker.cljs b/frontend/src/app/util/worker.cljs index 4c07a7f08..8e9943e4f 100644 --- a/frontend/src/app/util/worker.cljs +++ b/frontend/src/app/util/worker.cljs @@ -14,14 +14,25 @@ (declare handle-response) (defrecord Worker [instance stream]) -(defn- send-message! [worker {sender-id :sender-id :as message}] - (let [data (t/encode message) - instance (:instance worker)] - (.postMessage instance data) - (->> (:stream worker) - (rx/filter #(= (:reply-to %) sender-id)) - (rx/take 1) - (rx/map handle-response)))) +(defn- send-message! + ([worker message] + (send-message! worker message nil)) + + ([worker {sender-id :sender-id :as message} {:keys [many?] :or {many? false}}] + (let [take-messages + (fn [ob] + (if many? + (rx/take-while #(not (:completed %)) ob) + (rx/take 1 ob))) + + data (t/encode message) + instance (:instance worker)] + + (.postMessage instance data) + (->> (:stream worker) + (rx/filter #(= (:reply-to %) sender-id)) + (take-messages) + (rx/map handle-response))))) (defn ask! [worker message] @@ -30,6 +41,14 @@ {:sender-id (uuid/next) :payload message})) +(defn ask-many! + [worker message] + (send-message! + worker + {:sender-id (uuid/next) + :payload message} + {:many? true})) + (defn ask-buffered! [worker message] (send-message! diff --git a/frontend/src/app/worker.cljs b/frontend/src/app/worker.cljs index 0c2193532..38b68a57d 100644 --- a/frontend/src/app/worker.cljs +++ b/frontend/src/app/worker.cljs @@ -43,35 +43,42 @@ "Process the message and returns to the client" [{:keys [sender-id payload] :as message}] (us/assert ::message message) - (try - (let [result (impl/handler payload)] - (cond - (p/promise? result) - (p/handle result - (fn [msg] - (.postMessage js/self (t/encode - {:reply-to sender-id - :payload msg}))) - (fn [err] - (.postMessage js/self (t/encode - {:reply-to sender-id - :error {:data (ex-data err) - :message (ex-message err)}})))) + (letfn [(post [msg] + (let [msg (-> msg (assoc :reply-to sender-id) (t/encode))] + (.postMessage js/self msg))) - (or (rx/observable? result) - (rx/subject? result)) - (throw (ex-info "not implemented" {})) + (reply [result] + (post {:payload result})) - :else - (.postMessage js/self (t/encode - {:reply-to sender-id - :payload result})))) - (catch :default e - (.error js/console "error" e) - (let [message {:reply-to sender-id - :error {:data (ex-data e) - :message (ex-message e)}}] - (.postMessage js/self (t/encode message)))))) + (reply-error [err] + (.error js/console "error" err) + (post {:error {:data (ex-data err) + :message (ex-message err)}})) + + (reply-completed + ([] (reply-completed nil)) + ([msg] (post {:payload msg + :completed true})))] + + (try + (let [result (impl/handler payload) + promise? (p/promise? result) + stream? (or (rx/observable? result) (rx/subject? result))] + + (cond + promise? + (-> result + (p/then reply-completed) + (p/catch reply-error)) + + stream? + (rx/subscribe result reply reply-error reply-completed) + + :else + (reply result))) + + (catch :default err + (reply-error err))))) (defn- drop-message "Sends to the client a notifiction that its messages have been dropped" diff --git a/frontend/src/app/worker/thumbnails.cljs b/frontend/src/app/worker/thumbnails.cljs index ddf65c6b1..bf010d56a 100644 --- a/frontend/src/app/worker/thumbnails.cljs +++ b/frontend/src/app/worker/thumbnails.cljs @@ -31,17 +31,12 @@ (defn- request-page [file-id page-id] (let [uri "/api/rpc/query/page"] - (p/create - (fn [resolve reject] - (->> (http/send! {:uri uri - :query {:file-id file-id :id page-id :strip-thumbnails true} - :method :get}) - (rx/map http/conditional-decode-transit) - (rx/mapcat handle-response) - (rx/subs (fn [body] - (resolve body)) - (fn [error] - (reject error)))))))) + (->> (http/send! + {:uri uri + :query {:file-id file-id :id page-id :strip-thumbnails true} + :method :get}) + (rx/map http/conditional-decode-transit) + (rx/mapcat handle-response)))) (defonce cache (atom {})) @@ -57,8 +52,8 @@ (defmethod impl/handler :thumbnails/generate [{:keys [file-id page-id] :as message}] - (p/then - (request-page file-id page-id) - (fn [data] - {:svg (render-page data #{file-id page-id}) - :fonts @fonts/loaded}))) + (->> (request-page file-id page-id) + (rx/map + (fn [data] + {:svg (render-page data #{file-id page-id}) + :fonts @fonts/loaded}))))