mirror of
https://github.com/penpot/penpot.git
synced 2025-03-16 01:31:22 -05:00
🎉 Add team-id channel subscriptions.
This commit is contained in:
parent
60b241e867
commit
065fc157bf
4 changed files with 188 additions and 194 deletions
|
@ -27,14 +27,24 @@
|
|||
(s/def ::websocket-params
|
||||
(s/keys :req-un [::file-id ::session-id]))
|
||||
|
||||
(def sql:retrieve-file
|
||||
"select f.id as id,
|
||||
p.team_id as team_id
|
||||
from file as f
|
||||
join project as p on (p.id = f.project_id)
|
||||
where f.id = ?")
|
||||
|
||||
(defn retrieve-file
|
||||
[conn id]
|
||||
(db/exec-one! conn [sql:retrieve-file id]))
|
||||
|
||||
(defn websocket
|
||||
[{:keys [profile-id] :as req}]
|
||||
(let [params (us/conform ::websocket-params (:params req))
|
||||
file (db/get-by-id db/pool :file (:file-id params))
|
||||
file (retrieve-file db/pool (:file-id params))
|
||||
params (assoc params
|
||||
:profile-id profile-id
|
||||
:file file)]
|
||||
|
||||
:team-id (:team-id file))]
|
||||
(cond
|
||||
(not profile-id)
|
||||
{:error {:code 403 :message "Authentication required"}}
|
||||
|
|
|
@ -35,10 +35,8 @@
|
|||
;; --- API FORWARD
|
||||
|
||||
(defn subscribe
|
||||
([topic]
|
||||
(redis/subscribe client topic))
|
||||
([topic xf]
|
||||
(redis/subscribe client topic xf)))
|
||||
[opts]
|
||||
(redis/subscribe client opts))
|
||||
|
||||
(defn run!
|
||||
[cmd params]
|
||||
|
|
|
@ -15,173 +15,19 @@
|
|||
[app.db :as db]
|
||||
[app.metrics :as mtx]
|
||||
[app.redis :as redis]
|
||||
[app.util.async :as aa]
|
||||
[app.util.time :as dt]
|
||||
[app.util.transit :as t]
|
||||
[clojure.core.async :as a :refer [>! <!]]
|
||||
[clojure.core.async :as a]
|
||||
[clojure.tools.logging :as log]
|
||||
[promesa.core :as p]
|
||||
[ring.adapter.jetty9 :as jetty]))
|
||||
|
||||
(defmacro go-try
|
||||
[& body]
|
||||
`(a/go
|
||||
(try
|
||||
~@body
|
||||
(catch Throwable e# e#))))
|
||||
|
||||
(defmacro <?
|
||||
[ch]
|
||||
`(let [r# (a/<! ~ch)]
|
||||
(if (instance? Throwable r#)
|
||||
(throw r#)
|
||||
r#)))
|
||||
|
||||
(defmacro thread-try
|
||||
[& body]
|
||||
`(a/thread
|
||||
(try
|
||||
~@body
|
||||
(catch Throwable e#
|
||||
e#))))
|
||||
|
||||
(defn- publish
|
||||
[channel message]
|
||||
(go-try
|
||||
(let [message (t/encode-str message)]
|
||||
(<? (redis/run :publish {:channel (str channel)
|
||||
:message message})))))
|
||||
|
||||
(def ^:private
|
||||
sql:retrieve-presence
|
||||
"select * from presence
|
||||
where file_id=?
|
||||
and (clock_timestamp() - updated_at) < '5 min'::interval")
|
||||
|
||||
(defn- retrieve-presence
|
||||
[file-id]
|
||||
(thread-try
|
||||
(let [rows (db/exec! db/pool [sql:retrieve-presence file-id])]
|
||||
(mapv (juxt :session-id :profile-id) rows))))
|
||||
|
||||
(def ^:private
|
||||
sql:update-presence
|
||||
"insert into presence (file_id, session_id, profile_id, updated_at)
|
||||
values (?, ?, ?, clock_timestamp())
|
||||
on conflict (file_id, session_id, profile_id)
|
||||
do update set updated_at=clock_timestamp()")
|
||||
|
||||
(defn- update-presence
|
||||
[file-id session-id profile-id]
|
||||
(thread-try
|
||||
(let [now (dt/now)
|
||||
sql [sql:update-presence file-id session-id profile-id]]
|
||||
(db/exec-one! db/pool sql))))
|
||||
|
||||
(defn- delete-presence
|
||||
[file-id session-id profile-id]
|
||||
(thread-try
|
||||
(db/delete! db/pool :presence {:file-id file-id
|
||||
:profile-id profile-id
|
||||
:session-id session-id})))
|
||||
|
||||
;; --- WebSocket Messages Handling
|
||||
|
||||
(defmulti handle-message
|
||||
(fn [ws message] (:type message)))
|
||||
|
||||
;; TODO: check permissions for join a file-id channel (probably using
|
||||
;; single use token for avoid explicit database query).
|
||||
|
||||
(defmethod handle-message :connect
|
||||
[{:keys [file-id profile-id session-id output] :as ws} message]
|
||||
(log/debugf "profile '%s' is connected to file '%s'" profile-id file-id)
|
||||
(go-try
|
||||
(<? (update-presence file-id session-id profile-id))
|
||||
(let [members (<? (retrieve-presence file-id))]
|
||||
(<? (publish file-id {:type :presence :sessions members})))))
|
||||
|
||||
(defmethod handle-message :disconnect
|
||||
[{:keys [profile-id file-id session-id] :as ws} message]
|
||||
(log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id)
|
||||
(go-try
|
||||
(<? (delete-presence file-id session-id profile-id))
|
||||
(let [members (<? (retrieve-presence file-id))]
|
||||
(<? (publish file-id {:type :presence :sessions members})))))
|
||||
|
||||
(defmethod handle-message :keepalive
|
||||
[{:keys [profile-id file-id session-id] :as ws} message]
|
||||
(update-presence file-id session-id profile-id))
|
||||
|
||||
(defmethod handle-message :pointer-update
|
||||
[{:keys [profile-id file-id session-id] :as ws} message]
|
||||
(let [message (assoc message
|
||||
:profile-id profile-id
|
||||
:session-id session-id)]
|
||||
(publish file-id message)))
|
||||
|
||||
(defmethod handle-message :default
|
||||
[ws message]
|
||||
(a/go
|
||||
(log/warnf "received unexpected message: " message)))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; WebSocket Handler
|
||||
;; WebSocket Http Handler
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
(defn- forward-message
|
||||
[{:keys [out session-id profile-id] :as ws} message]
|
||||
(go-try
|
||||
(when-not (= (:session-id message) session-id)
|
||||
(>! out message))))
|
||||
|
||||
(defn start-loop!
|
||||
[{:keys [in out sub] :as ws}]
|
||||
(go-try
|
||||
(loop []
|
||||
(let [timeout (a/timeout 30000)
|
||||
[val port] (a/alts! [in sub timeout])]
|
||||
;; (prn "alts" val "from" (cond (= port in) "input"
|
||||
;; (= port sub) "redis"
|
||||
;; :else "timeout"))
|
||||
|
||||
(cond
|
||||
;; Process message coming from connected client
|
||||
(and (= port in) (not (nil? val)))
|
||||
(do
|
||||
(<? (handle-message ws val))
|
||||
(recur))
|
||||
|
||||
;; Forward message to the websocket
|
||||
(and (= port sub) (not (nil? val)))
|
||||
(do
|
||||
(<? (forward-message ws val))
|
||||
(recur))
|
||||
|
||||
;; Timeout channel signaling
|
||||
(= port timeout)
|
||||
(do
|
||||
(>! out {:type :ping})
|
||||
(recur))
|
||||
|
||||
:else
|
||||
nil)))))
|
||||
|
||||
(defn disconnect!
|
||||
[conn]
|
||||
(let [session (.getSession conn)]
|
||||
(when session
|
||||
(.disconnect session))))
|
||||
|
||||
(defn- on-subscribed
|
||||
[{:keys [conn] :as ws}]
|
||||
(a/go
|
||||
(try
|
||||
(<? (handle-message ws {:type :connect}))
|
||||
(<? (start-loop! ws))
|
||||
(<? (handle-message ws {:type :disconnect}))
|
||||
(catch Throwable err
|
||||
(log/errorf err "Unexpected exception on websocket handler.")
|
||||
(disconnect! conn)))))
|
||||
(declare on-connect)
|
||||
|
||||
(defrecord WebSocket [conn in out sub])
|
||||
|
||||
|
@ -194,17 +40,17 @@
|
|||
:help "A total number of messages handled by the notifications service."}))
|
||||
|
||||
(defn websocket
|
||||
[{:keys [file-id profile-id] :as params}]
|
||||
[{:keys [file-id team-id profile-id] :as params}]
|
||||
(let [in (a/chan 32)
|
||||
out (a/chan 32)]
|
||||
{:on-connect
|
||||
(fn [conn]
|
||||
(metrics-active-connections :inc)
|
||||
(let [xf (map t/decode-str)
|
||||
sub (redis/subscribe (str file-id) xf)
|
||||
(let [sub (redis/subscribe {:xform (map t/decode-str)
|
||||
:topics [file-id team-id]})
|
||||
ws (WebSocket. conn in out sub nil params)]
|
||||
|
||||
;; RCV LOOP
|
||||
;; message forwarding loop
|
||||
(a/go-loop []
|
||||
(let [val (a/<! out)]
|
||||
(when-not (nil? val)
|
||||
|
@ -212,7 +58,7 @@
|
|||
(recur))))
|
||||
|
||||
(a/go
|
||||
(a/<! (on-subscribed ws))
|
||||
(a/<! (on-connect ws))
|
||||
(a/close! sub))))
|
||||
|
||||
:on-error
|
||||
|
@ -235,4 +81,135 @@
|
|||
:on-bytes
|
||||
(constantly nil)}))
|
||||
|
||||
(declare handle-message)
|
||||
(declare start-loop!)
|
||||
|
||||
(defn- on-connect
|
||||
[{:keys [conn] :as ws}]
|
||||
(a/go
|
||||
(try
|
||||
(aa/<? (handle-message ws {:type :connect}))
|
||||
(aa/<? (start-loop! ws))
|
||||
(aa/<? (handle-message ws {:type :disconnect}))
|
||||
(catch Throwable err
|
||||
(log/errorf err "Unexpected exception on websocket handler.")
|
||||
(let [session (.getSession conn)]
|
||||
(when session
|
||||
(.disconnect session)))))))
|
||||
|
||||
(defn- start-loop!
|
||||
[{:keys [in out sub session-id] :as ws}]
|
||||
(aa/go-try
|
||||
(loop []
|
||||
(let [timeout (a/timeout 30000)
|
||||
[val port] (a/alts! [in sub timeout])]
|
||||
;; (prn "alts" val "from" (cond (= port in) "input"
|
||||
;; (= port sub) "redis"
|
||||
;; :else "timeout"))
|
||||
|
||||
(cond
|
||||
;; Process message coming from connected client
|
||||
(and (= port in) (not (nil? val)))
|
||||
(do
|
||||
(aa/<? (handle-message ws val))
|
||||
(recur))
|
||||
|
||||
;; Forward message to the websocket
|
||||
(and (= port sub) (not (nil? val)))
|
||||
(do
|
||||
(when-not (= (:session-id val) session-id)
|
||||
(a/>! out val))
|
||||
(recur))
|
||||
|
||||
;; Timeout channel signaling
|
||||
(= port timeout)
|
||||
(do
|
||||
(a/>! out {:type :ping})
|
||||
(recur))
|
||||
|
||||
:else
|
||||
nil)))))
|
||||
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
;; Incoming Messages Handling
|
||||
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
|
||||
|
||||
;; --- Impl
|
||||
|
||||
(defn- publish
|
||||
[channel message]
|
||||
(aa/go-try
|
||||
(let [message (t/encode-str message)]
|
||||
(aa/<? (redis/run :publish {:channel (str channel)
|
||||
:message message})))))
|
||||
|
||||
(def ^:private
|
||||
sql:retrieve-presence
|
||||
"select * from presence
|
||||
where file_id=?
|
||||
and (clock_timestamp() - updated_at) < '5 min'::interval")
|
||||
|
||||
(defn- retrieve-presence
|
||||
[file-id]
|
||||
(aa/thread-try
|
||||
(let [rows (db/exec! db/pool [sql:retrieve-presence file-id])]
|
||||
(mapv (juxt :session-id :profile-id) rows))))
|
||||
|
||||
(def ^:private
|
||||
sql:update-presence
|
||||
"insert into presence (file_id, session_id, profile_id, updated_at)
|
||||
values (?, ?, ?, clock_timestamp())
|
||||
on conflict (file_id, session_id, profile_id)
|
||||
do update set updated_at=clock_timestamp()")
|
||||
|
||||
(defn- update-presence
|
||||
[file-id session-id profile-id]
|
||||
(aa/thread-try
|
||||
(let [now (dt/now)
|
||||
sql [sql:update-presence file-id session-id profile-id]]
|
||||
(db/exec-one! db/pool sql))))
|
||||
|
||||
(defn- delete-presence
|
||||
[file-id session-id profile-id]
|
||||
(aa/thread-try
|
||||
(db/delete! db/pool :presence {:file-id file-id
|
||||
:profile-id profile-id
|
||||
:session-id session-id})))
|
||||
|
||||
(defmulti handle-message
|
||||
(fn [ws message] (:type message)))
|
||||
|
||||
;; TODO: check permissions for join a file-id channel (probably using
|
||||
;; single use token for avoid explicit database query).
|
||||
|
||||
(defmethod handle-message :connect
|
||||
[{:keys [file-id profile-id session-id output] :as ws} message]
|
||||
(log/debugf "profile '%s' is connected to file '%s'" profile-id file-id)
|
||||
(aa/go-try
|
||||
(aa/<? (update-presence file-id session-id profile-id))
|
||||
(let [members (aa/<? (retrieve-presence file-id))]
|
||||
(aa/<? (publish file-id {:type :presence :sessions members})))))
|
||||
|
||||
(defmethod handle-message :disconnect
|
||||
[{:keys [profile-id file-id session-id] :as ws} message]
|
||||
(log/debugf "profile '%s' is disconnected from '%s'" profile-id file-id)
|
||||
(aa/go-try
|
||||
(aa/<? (delete-presence file-id session-id profile-id))
|
||||
(let [members (aa/<? (retrieve-presence file-id))]
|
||||
(aa/<? (publish file-id {:type :presence :sessions members})))))
|
||||
|
||||
(defmethod handle-message :keepalive
|
||||
[{:keys [profile-id file-id session-id] :as ws} message]
|
||||
(update-presence file-id session-id profile-id))
|
||||
|
||||
(defmethod handle-message :pointer-update
|
||||
[{:keys [profile-id file-id session-id] :as ws} message]
|
||||
(let [message (assoc message
|
||||
:profile-id profile-id
|
||||
:session-id session-id)]
|
||||
(publish file-id message)))
|
||||
|
||||
(defmethod handle-message :default
|
||||
[ws message]
|
||||
(a/go
|
||||
(log/warnf "received unexpected message: " message)))
|
||||
|
|
|
@ -22,32 +22,38 @@
|
|||
io.lettuce.core.pubsub.api.sync.RedisPubSubCommands
|
||||
))
|
||||
|
||||
(defrecord Client [client uri]
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(.shutdown ^RedisClient client)))
|
||||
(defrecord Client [^RedisClient inner
|
||||
^RedisURI uri]
|
||||
clojure.lang.IDeref
|
||||
(deref [_] inner)
|
||||
|
||||
(defrecord Connection [^RedisAsyncCommands cmd]
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(let [conn (.getStatefulConnection cmd)]
|
||||
(.close ^StatefulRedisConnection conn))))
|
||||
(.shutdown inner)))
|
||||
|
||||
(defrecord Connection [^StatefulRedisConnection inner
|
||||
^RedisAsyncCommands cmd]
|
||||
clojure.lang.IDeref
|
||||
(deref [_] inner)
|
||||
|
||||
java.lang.AutoCloseable
|
||||
(close [_]
|
||||
(.close ^StatefulRedisConnection inner)))
|
||||
|
||||
(defn client
|
||||
[uri]
|
||||
(->Client (RedisClient/create) (RedisURI/create uri)))
|
||||
(->Client (RedisClient/create)
|
||||
(RedisURI/create uri)))
|
||||
|
||||
(defn connect
|
||||
[client]
|
||||
(let [^RedisURI uri (:uri client)
|
||||
^RedisClient client (:client client)
|
||||
^StatefulRedisConnection conn (.connect client StringCodec/UTF8 uri)]
|
||||
(->Connection (.async conn))))
|
||||
[{:keys [uri] :as client}]
|
||||
(let [conn (.connect ^RedisClient @client StringCodec/UTF8 ^RedisURI uri)]
|
||||
(->Connection conn (.async ^StatefulRedisConnection conn))))
|
||||
|
||||
(defn- impl-subscribe
|
||||
[^String topic xf ^StatefulRedisPubSubConnection conn]
|
||||
[topics xform ^StatefulRedisPubSubConnection conn]
|
||||
(let [cmd (.sync conn)
|
||||
output (a/chan 1 (comp (filter string?) xf))
|
||||
output (a/chan 1 (comp (filter string?) xform))
|
||||
buffer (a/chan (a/sliding-buffer 64))
|
||||
sub (reify RedisPubSubListener
|
||||
(message [it pattern channel message])
|
||||
|
@ -60,8 +66,8 @@
|
|||
(punsubscribed [it pattern count])
|
||||
(subscribed [it channel count])
|
||||
(unsubscribed [it channel count]))]
|
||||
(.addListener conn sub)
|
||||
|
||||
;; Start message event-loop (with keepalive mechanism)
|
||||
(a/go-loop []
|
||||
(let [[val port] (a/alts! [buffer (a/timeout 5000)])
|
||||
message (if (= port buffer) val ::keepalive)]
|
||||
|
@ -73,17 +79,20 @@
|
|||
(when (.isOpen conn)
|
||||
(.close conn))))))
|
||||
|
||||
(.subscribe ^RedisPubSubCommands cmd (into-array String [topic]))
|
||||
;; Synchronously subscribe to topics
|
||||
(.addListener conn sub)
|
||||
(.subscribe ^RedisPubSubCommands cmd topics)
|
||||
|
||||
;; Return the output channel
|
||||
output))
|
||||
|
||||
(defn subscribe
|
||||
([client topic]
|
||||
(subscribe client topic (map identity)))
|
||||
([client topic xf]
|
||||
(let [^RedisURI uri (:uri client)
|
||||
^RedisClient client (:client client)]
|
||||
(->> (.connectPubSub client StringCodec/UTF8 uri)
|
||||
(impl-subscribe topic xf)))))
|
||||
[{:keys [uri] :as client} {:keys [topic topics xform]}]
|
||||
(let [topics (if (vector? topics)
|
||||
(into-array String (map str topics))
|
||||
(into-array String [(str topics)]))]
|
||||
(->> (.connectPubSub ^RedisClient @client StringCodec/UTF8 ^RedisURI uri)
|
||||
(impl-subscribe topics xform))))
|
||||
|
||||
(defn- resolve-to-bool
|
||||
[v]
|
||||
|
|
Loading…
Add table
Reference in a new issue