From c1463ebd127b0c829442c5ef6be183577fde9b18 Mon Sep 17 00:00:00 2001 From: Andrey Antukh Date: Wed, 12 Jun 2024 15:56:27 +0200 Subject: [PATCH] :bug: Fix many race conditions on thumbnail generation process --- frontend/src/app/config.cljs | 4 + .../app/main/data/workspace/libraries.cljs | 2 +- .../app/main/data/workspace/thumbnails.cljs | 136 ++++++--------- .../app/main/ui/workspace/shapes/frame.cljs | 5 +- frontend/src/app/util/queue.cljs | 156 ++++++++++-------- 5 files changed, 147 insertions(+), 156 deletions(-) diff --git a/frontend/src/app/config.cljs b/frontend/src/app/config.cljs index 51b244551..098dc1248 100644 --- a/frontend/src/app/config.cljs +++ b/frontend/src/app/config.cljs @@ -158,6 +158,10 @@ (avatars/generate {:name name}) (dm/str (u/join public-uri "assets/by-id/" photo-id)))) +(defn resolve-media + [id] + (dm/str (u/join public-uri "assets/by-id/" (str id)))) + (defn resolve-file-media ([media] (resolve-file-media media false)) diff --git a/frontend/src/app/main/data/workspace/libraries.cljs b/frontend/src/app/main/data/workspace/libraries.cljs index f49569557..c9be059e2 100644 --- a/frontend/src/app/main/data/workspace/libraries.cljs +++ b/frontend/src/app/main/data/workspace/libraries.cljs @@ -814,7 +814,7 @@ component (ctkl/get-component data component-id) page-id (:main-instance-page component) root-id (:main-instance-id component)] - (dwt/request-thumbnail file-id page-id root-id tag "update-component-thumbnail-sync"))) + (dwt/update-thumbnail file-id page-id root-id tag "update-component-thumbnail-sync"))) (defn update-component-sync ([shape-id file-id] (update-component-sync shape-id file-id nil)) diff --git a/frontend/src/app/main/data/workspace/thumbnails.cljs b/frontend/src/app/main/data/workspace/thumbnails.cljs index d131ffd9c..c4cde6e2d 100644 --- a/frontend/src/app/main/data/workspace/thumbnails.cljs +++ b/frontend/src/app/main/data/workspace/thumbnails.cljs @@ -10,6 +10,7 @@ [app.common.files.helpers :as cfh] [app.common.logging :as l] [app.common.thumbnails :as thc] + [app.config :as cf] [app.main.data.changes :as dch] [app.main.data.persistence :as-alias dps] [app.main.data.workspace.notifications :as-alias wnt] @@ -18,7 +19,6 @@ [app.main.refs :as refs] [app.main.render :as render] [app.main.repo :as rp] - [app.main.store :as st] [app.util.http :as http] [app.util.queue :as q] [app.util.time :as tp] @@ -30,55 +30,36 @@ (l/set-level! :warn) -(declare update-thumbnail) +(defn- find-request + [params item] + (and (= (unchecked-get params "file-id") + (unchecked-get item "file-id")) + (= (unchecked-get params "page-id") + (unchecked-get item "page-id")) + (= (unchecked-get params "shape-id") + (unchecked-get item "shape-id")) + (= (unchecked-get params "tag") + (unchecked-get item "tag")))) -(defn resolve-request - "Resolves the request to generate a thumbnail for the given ids." - [item] - (let [file-id (unchecked-get item "file-id") - page-id (unchecked-get item "page-id") - shape-id (unchecked-get item "shape-id") - tag (unchecked-get item "tag")] - (st/emit! (update-thumbnail file-id page-id shape-id tag)))) +(defn- create-request + "Creates a request to generate a thumbnail for the given ids." + [file-id page-id shape-id tag] + #js {:file-id file-id + :page-id page-id + :shape-id shape-id + :tag tag}) ;; Defines the thumbnail queue (defonce queue - (q/create resolve-request (/ 1000 30))) - -(defn create-request - "Creates a request to generate a thumbnail for the given ids." - [file-id page-id shape-id tag] - #js {:file-id file-id :page-id page-id :shape-id shape-id :tag tag}) - -(defn find-request - "Returns true if the given item matches the given ids." - [file-id page-id shape-id tag item] - (and (= file-id (unchecked-get item "file-id")) - (= page-id (unchecked-get item "page-id")) - (= shape-id (unchecked-get item "shape-id")) - (= tag (unchecked-get item "tag")))) - -(defn request-thumbnail - "Enqueues a request to generate a thumbnail for the given ids." - ([file-id page-id shape-id tag] - (request-thumbnail file-id page-id shape-id tag "unknown")) - ([file-id page-id shape-id tag requester] - (ptk/reify ::request-thumbnail - ptk/EffectEvent - (effect [_ _ _] - (l/dbg :hint "request thumbnail" :requester requester :file-id file-id :page-id page-id :shape-id shape-id :tag tag) - (q/enqueue-unique queue - (create-request file-id page-id shape-id tag) - (partial find-request file-id page-id shape-id tag)))))) + (q/create find-request (/ 1000 30))) ;; This function first renders the HTML calling `render/render-frame` that ;; returns HTML as a string, then we send that data to the iframe rasterizer ;; that returns the image as a Blob. Finally we create a URI for that blob. -(defn get-thumbnail +(defn- render-thumbnail "Returns the thumbnail for the given ids" - [state file-id page-id frame-id tag & {:keys [object-id]}] - - (let [object-id (or object-id (thc/fmt-object-id file-id page-id frame-id tag)) + [state file-id page-id frame-id tag] + (let [object-id (thc/fmt-object-id file-id page-id frame-id tag) tp (tp/tpoint-ms) objects (wsh/lookup-objects state file-id page-id) shape (get objects frame-id)] @@ -87,10 +68,15 @@ (rx/take 1) (rx/filter some?) (rx/mapcat thr/render) - (rx/map (fn [blob] (wapi/create-uri blob))) (rx/tap #(l/dbg :hint "thumbnail rendered" :elapsed (dm/str (tp) "ms")))))) +(defn- request-thumbnail + "Enqueues a request to generate a thumbnail for the given ids." + [state file-id page-id shape-id tag] + (let [request (create-request file-id page-id shape-id tag)] + (q/enqueue-unique queue request (partial render-thumbnail state file-id page-id shape-id tag)))) + (defn clear-thumbnail ([file-id page-id frame-id tag] (clear-thumbnail file-id (thc/fmt-object-id file-id page-id frame-id tag))) @@ -154,8 +140,7 @@ (defn update-thumbnail "Updates the thumbnail information for the given `id`" - - [file-id page-id frame-id tag] + [file-id page-id frame-id tag requester] (let [object-id (thc/fmt-object-id file-id page-id frame-id tag)] (ptk/reify ::update-thumbnail cljs.core/IDeref @@ -163,23 +148,25 @@ ptk/WatchEvent (watch [_ state stream] - (l/dbg :hint "update thumbnail" :object-id object-id :tag tag) + (l/dbg :hint "update thumbnail" :requester requester :object-id object-id :tag tag) ;; Send the update to the back-end - (->> (get-thumbnail state file-id page-id frame-id tag) - (rx/mapcat (fn [uri] - (rx/merge - (rx/of (assoc-thumbnail object-id uri)) - (->> (http/send! {:uri uri :response-type :blob :method :get}) - (rx/map :body) - (rx/mapcat (fn [blob] - ;; Send the data to backend - (let [params {:file-id file-id - :object-id object-id - :media blob - :tag (or tag "frame")}] - (rp/cmd! :create-file-object-thumbnail params)))) - (rx/catch rx/empty) - (rx/ignore))))) + (->> (request-thumbnail state file-id page-id frame-id tag) + (rx/mapcat (fn [blob] + ;; Send the data to backend + (let [params {:file-id file-id + :object-id object-id + :media blob + :tag (or tag "frame")}] + (rp/cmd! :create-file-object-thumbnail params)))) + + (rx/mapcat (fn [{:keys [object-id media-id]}] + (let [uri (cf/resolve-media media-id)] + ;; We perform this request just for + ;; populate the browser CACHE and avoid + ;; unnecesary image flickering + (->> (http/send! {:uri uri :method :get}) + (rx/map #(assoc-thumbnail object-id uri)))))) + (rx/catch (fn [cause] (.error js/console cause) (rx/empty))) @@ -260,31 +247,12 @@ (rx/observe-on :async) (rx/with-latest-from workspace-data-s) (rx/merge-map (partial extract-root-frame-changes page-id)) - (rx/tap #(l/trc :hint "inconming change" :origin "local" :frame-id (dm/str %))) + (rx/tap #(l/trc :hint "inconming change" :origin "all" :frame-id (dm/str %))) (rx/share)) - local-commits-s - (->> stream - (rx/filter dch/commit?) - (rx/map deref) - (rx/filter #(= :local (:source %))) - (rx/observe-on :async) - (rx/with-latest-from workspace-data-s) - (rx/merge-map (partial extract-root-frame-changes page-id)) - (rx/tap #(l/trc :hint "inconming change" :origin "local" :frame-id (dm/str %))) - (rx/share)) - - ;; BUFFER NOTIFIER: only on local changes, remote changes - ;; we expect to receive thumbnail uri once it is - ;; generated va notifications subsystem notifier-s (->> stream (rx/filter (ptk/type? ::dps/commit-persisted)) - (rx/map deref) - (rx/observe-on :async) - (rx/with-latest-from workspace-data-s) - (rx/merge-map (partial extract-root-frame-changes page-id)) - (rx/tap #(l/trc :hint "inconming change" :origin "local" :frame-id (dm/str %))) (rx/debounce 5000) (rx/tap #(l/trc :hint "buffer initialized")))] @@ -296,11 +264,11 @@ (rx/map (fn [frame-id] (clear-thumbnail file-id page-id frame-id "frame")))) - ;; Generate thumbnails in batchs, once user becomes - ;; inactive for some instant only for local changes - (->> local-commits-s + ;; Generate thumbnails in batches, once user becomes + ;; inactive for some instant. + (->> all-commits-s (rx/buffer-until notifier-s) (rx/mapcat #(into #{} %)) - (rx/map #(request-thumbnail file-id page-id % "frame" "watch-state-changes")))) + (rx/map #(update-thumbnail file-id page-id % "frame" "watch-state-changes")))) (rx/take-until stopper-s)))))) diff --git a/frontend/src/app/main/ui/workspace/shapes/frame.cljs b/frontend/src/app/main/ui/workspace/shapes/frame.cljs index f4bd1f9d3..be793c755 100644 --- a/frontend/src/app/main/ui/workspace/shapes/frame.cljs +++ b/frontend/src/app/main/ui/workspace/shapes/frame.cljs @@ -109,7 +109,7 @@ (fn [{:keys [width height]}] (when (or (not (mth/close? width fixed-width 5)) (not (mth/close? height fixed-height 5))) - (st/emit! (dwt/request-thumbnail file-id page-id frame-id "frame" "check-thumbnail-size")))))))) + (st/emit! (dwt/update-thumbnail file-id page-id frame-id "frame" "check-thumbnail-size")))))))) (defn root-frame-wrapper-factory [shape-wrapper] @@ -176,7 +176,8 @@ (mf/with-effect [] (when-not (some? thumbnail-uri) (tm/schedule-on-idle - #(st/emit! (dwt/request-thumbnail file-id page-id frame-id "frame" "root-frame")))) + #(st/emit! (dwt/update-thumbnail file-id page-id frame-id "frame" "root-frame")))) + #(when-let [task (mf/ref-val task-ref)] (d/close! task))) diff --git a/frontend/src/app/util/queue.cljs b/frontend/src/app/util/queue.cljs index 1c68763f1..564fd3cda 100644 --- a/frontend/src/app/util/queue.cljs +++ b/frontend/src/app/util/queue.cljs @@ -5,33 +5,26 @@ ;; Copyright (c) KALEIDOS INC (ns app.util.queue - (:require [app.common.logging :as l] - [app.common.math :as mth] - [app.util.time :as t])) + "Low-Level queuing mechanism, mainly used for process thumbnails" + (:require + [app.common.logging :as l] + [app.common.math :as mth] + [app.util.time :as t] + [beicon.v2.core :as rx])) (l/set-level! :info) (declare process) -(declare dequeue) - -(defrecord Queue [f items timeout time threshold max-iterations]) +(declare request-process) (defn create - [f threshold] - (Queue. f - #js [] - nil - 0 - threshold - ##Inf)) - -(defn- measure-fn - [f & args] - (let [tp (t/tpoint-ms) - _ (apply f args) - duration (tp)] - (l/dbg :hint "queue::measure-fn" :duration duration) - duration)) + [find-fn threshold] + #js {:find-fn find-fn + :items #js [] + :timeout nil + :time 0 + :threshold threshold + :max-iterations ##Inf}) (defn- next-process-time [queue] @@ -40,76 +33,101 @@ max-time 5000 min-time 1000 calc-time (mth/min (mth/max (* (- time threshold) 10) min-time) max-time)] - (l/dbg :hint "queue::next-process-time" :time time :threshold threshold :calc-time calc-time :max-time max-time :min-time min-time) + (l/dbg :hint "queue::next-process-time" + :time time + :threshold threshold + :calc-time calc-time + :max-time max-time + :min-time min-time) calc-time)) (defn- has-requested-process? [queue] - (not (nil? (unchecked-get queue "timeout")))) - -(defn- request-process - [queue time] - (l/dbg :hint "queue::request-process" :time time) - (unchecked-set queue "timeout" (js/setTimeout (fn [] (process queue)) time))) + (some? (unchecked-get queue "timeout"))) ;; NOTE: Right now there are no cases where we need to cancel a process ;; but if we do, we can use this function -#_(defn- cancel-process - [queue] - (l/dbg :hint "queue::cancel-process") - (let [timeout (unchecked-get queue "timeout")] - (when (some? timeout) - (js/clearTimeout timeout)) - (unchecked-set queue "timeout" nil))) +;; (defn- cancel-process +;; [queue] +;; (l/dbg :hint "queue::cancel-process") +;; (let [timeout (unchecked-get queue "timeout")] +;; (when (some? timeout) +;; (js/clearTimeout timeout)) +;; (unchecked-set queue "timeout" nil))) (defn- process - [queue] - (unchecked-set queue "timeout" nil) - (unchecked-set queue "time" 0) + [queue iterations] (let [threshold (unchecked-get queue "threshold") max-iterations (unchecked-get queue "max-iterations") - f (unchecked-get queue "f")] - (loop [item (dequeue queue) - iterations 0] - (l/dbg :hint "queue::process" :item item) - (when (some? item) - (let [duration (measure-fn f item) - time (unchecked-get queue "time") - time (unchecked-set queue "time" (+ time duration))] - (if (or (> time threshold) (>= iterations max-iterations)) - (request-process queue (next-process-time queue)) - (recur (dequeue queue) (inc iterations)))))))) + items (unchecked-get queue "items") + item (.shift ^js items)] -(defn- dequeue - [queue] - (let [items (unchecked-get queue "items")] - (.shift items))) + (when (some? item) + (let [tp (t/tpoint-ms) + f (unchecked-get item "f") + res (unchecked-get item "result")] + (rx/subscribe (f) + (fn [o] + (rx/push! res o)) + (fn [e] + (rx/error! res e)) + (fn [] + (rx/end! res) + (let [duration (tp) + time (unchecked-get queue "time") + time (+ time duration)] + (unchecked-set queue "time" time) + (if (or (> time threshold) (>= iterations max-iterations)) + (request-process queue 0 (next-process-time queue)) + (request-process queue (inc iterations) 0))))))))) -(defn enqueue-first +(defn- request-process + [queue iterations time] + (l/dbg :hint "queue::request-process" :time time) + (unchecked-set queue "timeout" + (js/setTimeout + (fn [] + (unchecked-set queue "timeout" nil) + (process queue iterations)) + time))) + +(defn- enqueue-first [queue item] - (assert (instance? Queue queue)) (let [items (unchecked-get queue "items")] - (.unshift items item) + (.unshift ^js items item) (when-not (has-requested-process? queue) - (request-process queue (next-process-time queue))))) + (request-process queue 0 (next-process-time queue))))) -(defn enqueue-last +(defn- enqueue-last [queue item] - (assert (instance? Queue queue)) (let [items (unchecked-get queue "items")] - (.push items item) + (.push ^js items item) (when-not (has-requested-process? queue) - (request-process queue (next-process-time queue))))) + (request-process queue 0 (next-process-time queue))))) (defn enqueue-unique - [queue item f] - (assert (instance? Queue queue)) - (let [items (unchecked-get queue "items")] + [queue request f] + (let [items (unchecked-get queue "items") + find-fn (unchecked-get queue "find-fn") + result (rx/subject)] + + (unchecked-set request "result" result) + (unchecked-set request "f" f) + ;; If tag is "frame", then they are added to the front of the queue ;; so that they are processed first, anything else is added to the ;; end of the queue. - (if (= (unchecked-get item "tag") "frame") - (when-not (.find ^js items f) - (enqueue-first queue item)) - (when-not (.findLast ^js items f) - (enqueue-last queue item))))) + (if (= (unchecked-get request "tag") "frame") + (let [item (.find ^js items find-fn)] + (if item + (let [other-result (unchecked-get item "result")] + (rx/subscribe other-result result)) + (enqueue-first queue request))) + + (let [item (.findLast ^js items find-fn)] + (if item + (let [other-result (unchecked-get item "result")] + (rx/subscribe other-result result)) + (enqueue-last queue request)))) + + (rx/to-observable result)))