diff --git a/backend/resources/migrations/0004.tasks.sql b/backend/resources/migrations/0004.tasks.sql index 058c836b0..caee1d868 100644 --- a/backend/resources/migrations/0004.tasks.sql +++ b/backend/resources/migrations/0004.tasks.sql @@ -24,8 +24,6 @@ CREATE TRIGGER task__modified_at__tgr BEFORE UPDATE ON task FOR EACH ROW EXECUTE PROCEDURE update_modified_at(); - - CREATE TABLE scheduled_task ( id text PRIMARY KEY, diff --git a/backend/resources/migrations/0006.presence.sql b/backend/resources/migrations/0006.presence.sql new file mode 100644 index 000000000..bb1c04256 --- /dev/null +++ b/backend/resources/migrations/0006.presence.sql @@ -0,0 +1,9 @@ +CREATE TABLE presence ( + file_id uuid NOT NULL REFERENCES file(id) ON DELETE CASCADE, + profile_id uuid NOT NULL REFERENCES profile(id) ON DELETE CASCADE, + session_id uuid NOT NULL, + + updated_at timestamptz NOT NULL DEFAULT clock_timestamp(), + + PRIMARY KEY (file_id, session_id, profile_id) +); diff --git a/backend/src/uxbox/migrations.clj b/backend/src/uxbox/migrations.clj index d5ad70049..b0b3cf65c 100644 --- a/backend/src/uxbox/migrations.clj +++ b/backend/src/uxbox/migrations.clj @@ -29,7 +29,10 @@ :fn (mg/resource "migrations/0004.tasks.sql")} {:desc "Initial libraries tables" :name "0005-libraries" - :fn (mg/resource "migrations/0005.libraries.sql")}]}) + :fn (mg/resource "migrations/0005.libraries.sql")} + {:desc "Initial presence tables" + :name "0006-presence" + :fn (mg/resource "migrations/0006.presence.sql")}]}) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Entry point diff --git a/backend/src/uxbox/services/notifications.clj b/backend/src/uxbox/services/notifications.clj index 9d5b295eb..27dcd80a9 100644 --- a/backend/src/uxbox/services/notifications.clj +++ b/backend/src/uxbox/services/notifications.clj @@ -14,7 +14,8 @@ [uxbox.common.exceptions :as ex] [uxbox.common.uuid :as uuid] [uxbox.redis :as redis] - [ring.util.codec :as codec] + [uxbox.db :as db] + [uxbox.util.time :as dt] [uxbox.util.transit :as t])) (defmacro go-try @@ -31,47 +32,55 @@ (throw r#) r#))) -(defn- decode-message - [message] - (->> (t/str->bytes message) - (t/decode))) - -(defn- encode-message - [message] - (->> (t/encode message) - (t/bytes->str))) +(defmacro thread-try + [& body] + `(a/thread + (try + ~@body + (catch Throwable e# + e#)))) ;; --- Redis Interactions (defn- publish [channel message] (go-try - (let [message (encode-message message)] + (let [message (t/encode-str message)] (!! in message))) diff --git a/backend/src/uxbox/util/migrations.clj b/backend/src/uxbox/util/migrations.clj index 29c58d0cf..2f2fdf652 100644 --- a/backend/src/uxbox/util/migrations.clj +++ b/backend/src/uxbox/util/migrations.clj @@ -72,7 +72,7 @@ (defn migrate! "Main entry point for apply a migration." ([conn migrations] - (migrate! conn migrations nil)) + (impl-migrate conn migrations nil)) ([conn migrations options] (impl-migrate conn migrations options))) diff --git a/backend/src/uxbox/util/time.clj b/backend/src/uxbox/util/time.clj index 1bcc13779..994177427 100644 --- a/backend/src/uxbox/util/time.clj +++ b/backend/src/uxbox/util/time.clj @@ -54,9 +54,16 @@ (integer? ms-or-obj) (Duration/ofMillis ms-or-obj) + (string? ms-or-obj) + (Duration/parse ms-or-obj) + :else (obj->duration ms-or-obj))) +(defn duration-between + [t1 t2] + (Duration/between t1 t2)) + (defn parse-duration [s] (assert (string? s)) diff --git a/backend/src/uxbox/util/transit.clj b/backend/src/uxbox/util/transit.clj index 0ce8a012c..be4699456 100644 --- a/backend/src/uxbox/util/transit.clj +++ b/backend/src/uxbox/util/transit.clj @@ -52,6 +52,9 @@ ;; --- High-Level Api +(declare str->bytes) +(declare bytes->str) + (defn decode ([data] (decode data nil)) @@ -68,6 +71,16 @@ (write! w data) (.toByteArray out))))) +(defn decode-str + [message] + (->> (str->bytes message) + (decode))) + +(defn encode-str + [message] + (->> (encode message) + (bytes->str))) + ;; --- Helpers (defn str->bytes @@ -83,4 +96,3 @@ (bytes->str data "UTF-8")) ([^bytes data, ^String encoding] (String. data encoding))) - diff --git a/frontend/src/uxbox/main/data/workspace/notifications.cljs b/frontend/src/uxbox/main/data/workspace/notifications.cljs index bdb1e2d30..c30aea24f 100644 --- a/frontend/src/uxbox/main/data/workspace/notifications.cljs +++ b/frontend/src/uxbox/main/data/workspace/notifications.cljs @@ -28,6 +28,7 @@ (declare handle-pointer-update) (declare handle-page-change) (declare handle-pointer-send) +(declare send-keepalive) (s/def ::type keyword?) (s/def ::message @@ -46,8 +47,11 @@ ptk/WatchEvent (watch [_ state stream] (let [wsession (get-in state [:ws file-id]) - stoper (rx/filter #(= ::finalize %) stream)] + stoper (rx/filter #(= ::finalize %) stream) + interval (* 1000 60)] (->> (rx/merge + (->> (rx/timer interval interval) + (rx/map #(send-keepalive file-id))) (->> (ws/-stream wsession) (rx/filter #(= :message (:type %))) (rx/map (comp t/decode :payload)) @@ -66,6 +70,15 @@ (rx/take-until stoper)))))) +(defn send-keepalive + [file-id] + (ptk/reify ::send-keepalive + ptk/EffectEvent + (effect [_ state stream] + (prn "send-keepalive" file-id) + (when-let [ws (get-in state [:ws file-id])] + (ws/-send ws (t/encode {:type :keepalive})))))) + ;; --- Finalize Websocket (defn finalize