(ns uxbox.impl.routing (:require [clojure.core.async :as a] [cats.monad.exception :as exc] [cats.core :as m] [promissum.core :as p] [catacumba.core :as ct] [catacumba.serializers :as sz] [catacumba.impl.websocket :as ws] [catacumba.impl.handlers :as hs])) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Helpers ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn encode [data] (sz/bytes->str (sz/encode data :transit+json))) (defn decode [data] (sz/decode (sz/str->bytes data) :transit+json)) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Protocol Definition ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defprotocol IHandlerResponse (-handle-response [_ context frameid options] "Handle the response.")) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Implementation ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (declare response) (defn- generic-error-handler [context error] (if (instance? clojure.lang.ExceptionInfo error) (response :error {:message (.getMessage error) :data (ex-data error)}) (response :error {:message (str error)}))) (defn- handle-error [error frameid context options] (let [on-error (::on-error options generic-error-handler) response (on-error context error)] (-handle-response response context frameid options))) (defmulti handle-frame (fn [frame handler context options] (:cmd frame))) (defmethod handle-frame :default [frame handler context options] (let [frameid (:id frame) response (exc/try-on (handler context frame))] (if (exc/success? response) (-handle-response @response context frameid options) (let [error (m/extract response)] (handle-error error frameid context options))))) (defmethod handle-frame :pong [frame _ context options] (let [state (:state context)] (defn- send-decode-error [{:keys [out]}] (let [frame {:cmd :error :id nil :data "Error on deserializing frame."}] (a/go (a/>! out (encode frame))))) (defn- initialize [{:keys [in out ctrl] :as context} options] (a/go (let [received (a/! out (encode {:cmd :hello})) (do (a/! out (encode frame)) (let [[v ch] (a/alts! [in (a/timeout 1000)])] (if (= ch in) (do (a/ response (p/then on-resolve) (p/catch on-reject))))) (defrecord Response [data cmd] IHandlerResponse (-handle-response [this context frameid options] (let [frame (into {:id frameid} this) output (:out context)] (a/put! output (encode frame))))) ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; ;; Public Api ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (defn response ([data] (response :response data)) ([type data] (map->Response {:data data :cmd type}))) (defn on-close [context handler] {:pre [(fn? handler) (::on-close-handlers context)]} (let [container (::on-close-handlers context)] (swap! container conj handler))) (defn router ([handler] (router handler {})) ([handler options] (fn [context] (let [context (assoc context ::on-close-handlers (atom []) :state (atom {}))] (ws/websocket context #(dispatcher % handler options))))))