;; This Source Code Form is subject to the terms of the Mozilla Public ;; License, v. 2.0. If a copy of the MPL was not distributed with this ;; file, You can obtain one at http://mozilla.org/MPL/2.0/. ;; ;; Copyright (c) UXBOX Labs SL (ns app.worker (:require [cljs.spec.alpha :as s] [promesa.core :as p] [beicon.core :as rx] [cuerdas.core :as str] [app.common.exceptions :as ex] [app.common.spec :as us] [app.common.uuid :as uuid] [app.worker.impl :as impl] [app.worker.selection] [app.worker.thumbnails] [app.worker.snaps] [app.util.object :as obj] [app.util.transit :as t] [app.util.worker :as w])) ;; --- Messages Handling (s/def ::cmd keyword?) (s/def ::payload (s/keys :req-un [::cmd])) (s/def ::sender-id uuid?) (s/def ::buffer? boolean?) (s/def ::message (s/keys :req-opt [::buffer?] :req-un [::payload ::sender-id])) (def buffer (rx/subject)) (defn- handle-message "Process the message and returns to the client" [{:keys [sender-id payload] :as message}] (us/assert ::message message) (try (let [result (impl/handler payload)] (cond (p/promise? result) (p/handle result (fn [msg] (.postMessage js/self (t/encode {:reply-to sender-id :payload msg}))) (fn [err] (.postMessage js/self (t/encode {:reply-to sender-id :error {:data (ex-data err) :message (ex-message err)}})))) (or (rx/observable? result) (rx/subject? result)) (throw (ex-info "not implemented" {})) :else (.postMessage js/self (t/encode {:reply-to sender-id :payload result})))) (catch :default e (.error js/console "error" e) (let [message {:reply-to sender-id :error {:data (ex-data e) :message (ex-message e)}}] (.postMessage js/self (t/encode message)))))) (defn- drop-message "Sends to the client a notifiction that its messages have been dropped" [{:keys [sender-id payload] :as message}] (us/assert ::message message) (.postMessage js/self (t/encode {:reply-to sender-id :dropped true}))) (defn subscribe-buffer-messages "Creates a subscription to process the buffer messages" [] (let [empty [{} [] ::clear]] (->> buffer ;; We want async processing to not block the main loop (rx/observe-on :async) ;; This scan will store the last message per type in `messages` ;; when a previous message is dropped is stored in `dropped` ;; we also store the last message processed in order to detect ;; posible infinite loops (rx/scan (fn [[messages dropped last] message] (let [cmd (get-in message [:payload :cmd]) ;; The previous message is dropped dropped (cond-> dropped (contains? messages cmd) (conj (get messages cmd))) ;; This is the new "head" for its type messages (assoc messages cmd message)] ;; When a "clear" message is detected we empty the buffer (if (= message ::clear) empty [messages dropped message]))) empty) ;; 1ms debounce, after 1ms without messages will process the buffer (rx/debounce 1) (rx/subs (fn [[messages dropped last]] ;; Send back the dropped messages replies (doseq [msg dropped] (drop-message msg)) ;; Process the message (doseq [msg (vals messages)] (handle-message msg)) ;; After process the buffer we send a clear (when-not (= last ::clear) (rx/push! buffer ::clear))))))) (defonce process-message-sub (subscribe-buffer-messages)) (defn- on-message [event] (when (nil? (.-source event)) (let [message (.-data event) message (t/decode message)] (if (:buffer? message) (rx/push! buffer message) (handle-message message))))) (.addEventListener js/self "message" on-message) (defn ^:dev/before-load stop [] (rx/-dispose process-message-sub) (.removeEventListener js/self "message" on-message)) (defn ^:dev/after-load start [] [] (set! process-message-sub (subscribe-buffer-messages)) (.addEventListener js/self "message" on-message))