diff --git a/backend/src/app/config.clj b/backend/src/app/config.clj index 71ee82fbe..dbd234dde 100644 --- a/backend/src/app/config.clj +++ b/backend/src/app/config.clj @@ -112,6 +112,7 @@ (s/def ::database-password (s/nilable ::us/string)) (s/def ::database-uri ::us/string) (s/def ::database-username (s/nilable ::us/string)) +(s/def ::database-readonly ::us/boolean) (s/def ::default-blob-version ::us/integer) (s/def ::error-report-webhook ::us/string) (s/def ::user-feedback-destination ::us/string) @@ -201,6 +202,7 @@ ::database-password ::database-uri ::database-username + ::database-readonly ::default-blob-version ::error-report-webhook ::file-change-snapshot-every diff --git a/backend/src/app/db.clj b/backend/src/app/db.clj index a4976b47b..d6eb7fa72 100644 --- a/backend/src/app/db.clj +++ b/backend/src/app/db.clj @@ -62,12 +62,13 @@ :opt-un [::migrations ::mtx/metrics ::read-only])) (defmethod ig/init-key ::pool - [_ {:keys [migrations metrics name] :as cfg}] + [_ {:keys [migrations metrics name read-only] :as cfg}] (l/info :action "initialize connection pool" :name (d/name name) :uri (:uri cfg)) (some-> metrics :registry instrument-jdbc!) (let [pool (create-pool cfg)] - (some->> (seq migrations) (apply-migrations! pool)) + (when-not read-only + (some->> (seq migrations) (apply-migrations! pool))) pool)) (defmethod ig/halt-key! ::pool @@ -136,10 +137,14 @@ (s/def ::pool pool?) -(defn pool-closed? +(defn closed? [pool] (.isClosed ^HikariDataSource pool)) +(defn read-only? + [pool] + (.isReadOnly ^HikariDataSource pool)) + (defn create-pool [cfg] (let [dsc (create-datasource-config cfg)] diff --git a/backend/src/app/http/session.clj b/backend/src/app/http/session.clj index 90e3d217d..08abfe6e0 100644 --- a/backend/src/app/http/session.clj +++ b/backend/src/app/http/session.clj @@ -11,74 +11,114 @@ [app.common.logging :as l] [app.config :as cfg] [app.db :as db] + [app.db.sql :as sql] [app.metrics :as mtx] [app.util.async :as aa] [app.util.time :as dt] [app.worker :as wrk] [clojure.core.async :as a] [clojure.spec.alpha :as s] - [integrant.core :as ig])) + [integrant.core :as ig] + [ring.middleware.session.store :as rss])) ;; A default cookie name for storing the session. We don't allow ;; configure it. (def cookie-name "auth-token") +(deftype DatabaseStore [pool tokens] + rss/SessionStore + (read-session [_ token] + (db/exec-one! pool (sql/select :http-session {:id token}))) + + (write-session [_ _ data] + (let [profile-id (:profile-id data) + user-agent (:user-agent data) + token (tokens :generate {:iss "authentication" + :iat (dt/now) + :uid profile-id}) + params {:user-agent user-agent + :profile-id profile-id + :id token}] + (db/insert! pool :http-session params) + token)) + + (delete-session [_ token] + (db/delete! pool :http-session {:id token}) + nil)) + +(deftype MemoryStore [cache tokens] + rss/SessionStore + (read-session [_ token] + (get @cache token)) + + (write-session [_ _ data] + (let [profile-id (:profile-id data) + user-agent (:user-agent data) + token (tokens :generate {:iss "authentication" + :iat (dt/now) + :uid profile-id}) + params {:user-agent user-agent + :profile-id profile-id + :id token}] + + (swap! cache assoc token params) + token)) + + (delete-session [_ token] + (swap! cache dissoc token) + nil)) + ;; --- IMPL (defn- create-session - [{:keys [conn tokens] :as cfg} {:keys [profile-id headers] :as request}] - (let [token (tokens :generate {:iss "authentication" - :iat (dt/now) - :uid profile-id}) - params {:user-agent (get headers "user-agent") - :profile-id profile-id - :id token}] - (db/insert! conn :http-session params))) + [store request profile-id] + (let [params {:user-agent (get-in request [:headers "user-agent"]) + :profile-id profile-id}] + (rss/write-session store nil params))) (defn- delete-session - [{:keys [conn] :as cfg} {:keys [cookies] :as request}] + [store {:keys [cookies] :as request}] (when-let [token (get-in cookies [cookie-name :value])] - (db/delete! conn :http-session {:id token})) - nil) + (rss/delete-session store token))) (defn- retrieve-session - [{:keys [conn] :as cfg} id] - (when id - (db/exec-one! conn ["select id, profile_id from http_session where id = ?" id]))) + [store token] + (when token + (rss/read-session store token))) (defn- retrieve-from-request - [cfg {:keys [cookies] :as request}] + [store {:keys [cookies] :as request}] (->> (get-in cookies [cookie-name :value]) - (retrieve-session cfg))) + (retrieve-session store))) (defn- add-cookies - [response {:keys [id] :as session}] + [response token] (let [cors? (contains? cfg/flags :cors) secure? (contains? cfg/flags :secure-session-cookies)] (assoc response :cookies {cookie-name {:path "/" :http-only true - :value id + :value token :same-site (if cors? :none :lax) :secure secure?}}))) - (defn- clear-cookies [response] (assoc response :cookies {cookie-name {:value "" :max-age -1}})) (defn- middleware - [cfg handler] + [events-ch store handler] (fn [request] - (if-let [{:keys [id profile-id] :as session} (retrieve-from-request cfg request)] + (if-let [{:keys [id profile-id] :as session} (retrieve-from-request store request)] (do - (a/>!! (::events-ch cfg) id) + (a/>!! events-ch id) (l/set-context! {:profile-id profile-id}) (handler (assoc request :profile-id profile-id :session-id id))) (handler request)))) ;; --- STATE INIT: SESSION +(s/def ::tokens fn?) (defmethod ig/pre-init-spec ::session [_] - (s/keys :req-un [::db/pool])) + (s/keys :req-un [::db/pool ::tokens])) (defmethod ig/prep-key ::session [_ cfg] @@ -86,20 +126,24 @@ (d/without-nils cfg))) (defmethod ig/init-key ::session - [_ {:keys [pool] :as cfg}] - (let [events (a/chan (a/dropping-buffer (:buffer-size cfg))) - cfg (-> cfg - (assoc :conn pool) - (assoc ::events-ch events))] + [_ {:keys [pool tokens] :as cfg}] + (let [events-ch (a/chan (a/dropping-buffer (:buffer-size cfg))) + store (if (db/read-only? pool) + (->MemoryStore (atom {}) tokens) + (->DatabaseStore pool tokens))] + + (when (db/read-only? pool) + (l/warn :hint "sessions module initialized with in-memory store")) + (-> cfg - (assoc :middleware #(middleware cfg %)) + (assoc ::events-ch events-ch) + (assoc :middleware #(middleware events-ch store %)) (assoc :create (fn [profile-id] (fn [request response] - (let [request (assoc request :profile-id profile-id) - session (create-session cfg request)] - (add-cookies response session))))) + (let [token (create-session store request profile-id)] + (add-cookies response token))))) (assoc :delete (fn [request response] - (delete-session cfg request) + (delete-session store request) (-> response (assoc :status 204) (assoc :body "") diff --git a/backend/src/app/loggers/audit.clj b/backend/src/app/loggers/audit.clj index d8c5d33f7..b212b24c4 100644 --- a/backend/src/app/loggers/audit.clj +++ b/backend/src/app/loggers/audit.clj @@ -89,19 +89,24 @@ (s/def ::events (s/every ::event)) (defmethod ig/init-key ::http-handler - [_ {:keys [executor] :as cfg}] - (fn [{:keys [params profile-id] :as request}] - (when (contains? cf/flags :audit-log) - (let [events (->> (:events params) - (remove #(not= profile-id (:profile-id %))) - (us/conform ::events)) - ip-addr (parse-client-ip request) - cfg (-> cfg - (assoc :source "frontend") - (assoc :events events) - (assoc :ip-addr ip-addr))] - (px/run! executor #(persist-http-events cfg)))) - {:status 204 :body ""})) + [_ {:keys [executor pool] :as cfg}] + (if (db/read-only? pool) + (do + (l/warn :hint "audit log http handler disabled, db is read-only") + (constantly {:status 204 :body ""})) + (fn [{:keys [params profile-id] :as request}] + (when (contains? cf/flags :audit-log) + (let [events (->> (:events params) + (remove #(not= profile-id (:profile-id %))) + (us/conform ::events)) + ip-addr (parse-client-ip request) + cfg (-> cfg + (assoc :source "frontend") + (assoc :events events) + (assoc :ip-addr ip-addr))] + + (px/run! executor #(persist-http-events cfg)))) + {:status 204 :body ""}))) (defn- persist-http-events [{:keys [pool events ip-addr source] :as cfg}] @@ -148,13 +153,25 @@ (map clean-props))) (defmethod ig/init-key ::collector - [_ cfg] - (when (contains? cf/flags :audit-log) - (l/info :msg "initializing audit log collector") + [_ {:keys [pool] :as cfg}] + (cond + (not (contains? cf/flags :audit-log)) + (do + (l/info :hint "audit log collection disabled") + (constantly nil)) + + (db/read-only? pool) + (do + (l/warn :hint "audit log collection disabled, db is read-only") + (constantly nil)) + + :else (let [input (a/chan 512 event-xform) buffer (aa/batch input {:max-batch-size 100 :max-batch-age (* 10 1000) ; 10s :init []})] + + (l/info :hint "audit log collector initialized") (a/go-loop [] (when-let [[_type events] (a/ (bn/random-bytes 64) + (bc/bytes->b64u) + (bc/bytes->str))) + +(defn- retrieve-all + [conn] + (->> (db/query conn :server-prop {:preload true}) + (filter #(not= "secret-key" (:id %))) + (map (fn [row] + [(keyword (:id row)) + (db/decode-transit-pgobject (:content row))])) + (into {}))) + +(defn- handle-instance-id + [instance-id conn read-only?] + (or instance-id + (let [instance-id (uuid/random)] + (when-not read-only? + (try + (db/insert! conn :server-prop + {:id "instance-id" + :preload true + :content (db/tjson instance-id)}) + (catch Throwable cause + (l/warn :hint "unable to persist instance-id" + :instance-id instance-id + :cause cause)))) + instance-id))) (defmethod ig/pre-init-spec ::props [_] (s/keys :req-un [::db/pool])) (defmethod ig/init-key ::props - [_ {:keys [pool] :as cfg}] + [_ {:keys [pool key] :as cfg}] (db/with-atomic [conn pool] - (let [cfg (assoc cfg :conn conn)] - (initialize-secret-key! cfg) - (initialize-instance-id! cfg) - (retrieve-all cfg)))) + (db/xact-lock! conn 0) + (when-not key + (l/warn :hint (str "using autogenerated secret-key, it will change on each restart and will invalidate " + "all sessions on each restart, it is hightly recommeded setting up the " + "PENPOT_SECRET_KEY environment variable"))) -(def sql:upsert-secret-key - "insert into server_prop (id, preload, content) - values ('secret-key', true, ?::jsonb) - on conflict (id) do update set content = ?::jsonb") - -(def sql:insert-secret-key - "insert into server_prop (id, preload, content) - values ('secret-key', true, ?::jsonb) - on conflict (id) do nothing") - -(defn- initialize-secret-key! - [{:keys [conn key] :as cfg}] - (if key - (let [key (db/tjson key)] - (db/exec-one! conn [sql:upsert-secret-key key key])) - (let [key (-> (bn/random-bytes 64) - (bc/bytes->b64u) - (bc/bytes->str)) - key (db/tjson key)] - (db/exec-one! conn [sql:insert-secret-key key])))) - -(defn- initialize-instance-id! - [{:keys [conn] :as cfg}] - (let [iid (uuid/random)] - - (db/insert! conn :server-prop - {:id "instance-id" - :preload true - :content (db/tjson iid)} - {:on-conflict-do-nothing true}))) - -(defn- retrieve-all - [{:keys [conn] :as cfg}] - (reduce (fn [acc row] - (assoc acc (keyword (:id row)) (db/decode-transit-pgobject (:content row)))) - {} - (db/query conn :server-prop {:preload true}))) + (let [stored (-> (retrieve-all conn) + (assoc :secret-key (or key (generate-random-key))))] + (update stored :instance-id handle-instance-id conn (db/read-only? pool))))) diff --git a/backend/src/app/worker.clj b/backend/src/app/worker.clj index 7d532f5e0..72a1ba16e 100644 --- a/backend/src/app/worker.clj +++ b/backend/src/app/worker.clj @@ -59,6 +59,7 @@ ;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;; (declare event-loop-fn) +(declare event-loop) (declare instrument-tasks) (s/def ::queue keyword?) @@ -85,13 +86,10 @@ :queue :default} (d/without-nils cfg))) -(defmethod ig/init-key ::worker - [_ {:keys [pool poll-interval name queue] :as cfg}] - (l/info :action "start worker" - :name (d/name name) - :queue (d/name queue)) - (let [close-ch (a/chan 1) - poll-ms (inst-ms poll-interval)] +(defn- event-loop + "Main, worker eventloop" + [{:keys [pool poll-interval close-ch] :as cfg}] + (let [poll-ms (inst-ms poll-interval)] (a/go-loop [] (let [[val port] (a/alts! [close-ch (event-loop-fn cfg)] :priority true)] (cond @@ -100,7 +98,7 @@ (or (= port close-ch) (nil? val)) (l/debug :hint "stop condition found") - (db/pool-closed? pool) + (db/closed? pool) (do (l/debug :hint "eventloop aborted because pool is closed") (a/close! close-ch)) @@ -132,14 +130,27 @@ (= ::empty val) (do (a/> schedule - (filter some?) - ;; If id is not defined, use the task as id. - (map (fn [{:keys [id task] :as item}] - (if (some? id) - (assoc item :id (d/name id)) - (assoc item :id (d/name task))))) - (map (fn [{:keys [task] :as item}] - (let [f (get tasks task)] - (when-not f - (ex/raise :type :internal - :code :task-not-found - :hint (str/fmt "task %s not configured" task))) - (-> item - (dissoc :task) - (assoc :fn f)))))) - cfg (assoc cfg - :scheduler scheduler - :schedule schedule)] + [_ {:keys [schedule tasks pool] :as cfg}] + (let [scheduler (Executors/newScheduledThreadPool (int 1))] + (if (db/read-only? pool) + (l/warn :hint "scheduler not started, db is read-only") + (let [schedule (->> schedule + (filter some?) + ;; If id is not defined, use the task as id. + (map (fn [{:keys [id task] :as item}] + (if (some? id) + (assoc item :id (d/name id)) + (assoc item :id (d/name task))))) + (map (fn [{:keys [task] :as item}] + (let [f (get tasks task)] + (when-not f + (ex/raise :type :internal + :code :task-not-found + :hint (str/fmt "task %s not configured" task))) + (-> item + (dissoc :task) + (assoc :fn f)))))) + cfg (assoc cfg + :scheduler scheduler + :schedule schedule)] + (l/info :hint "scheduler started" + :registred-tasks (count schedule)) - (synchronize-schedule cfg) - (run! (partial schedule-task cfg) - (filter some? schedule)) + (synchronize-schedule cfg) + (run! (partial schedule-task cfg) + (filter some? schedule)))) (reify java.lang.AutoCloseable