0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-03-25 06:01:46 -05:00

♻️ Refactor profile and session handling

- makes the profile access more efficient (replace in-app joins to a
  simple select query on profile table
- add partial support for access-tokens (still missing some RPC methods)
- move router definitions to specific modules and simplify the main http
  module definitions to simple includes
- simplifiy authentication code related to access-tokens and sessions
- normalize db parameters with proper namespaced props
- more work on convert all modules initialization to use proper specs
  with fully-qualified keyword config props
This commit is contained in:
Andrey Antukh 2023-01-02 22:56:24 +01:00
parent a7ec9d7d1f
commit db689d151e
58 changed files with 1285 additions and 963 deletions

View file

@ -3,8 +3,8 @@
{:default
[[:default :window "200000/h"]]
#{:query/teams}
#{:command/get-teams}
[[:burst :bucket "5/1/5s"]]
#{:query/profile}
[[:burst :bucket "100/60/1m"]]}
#{:command/get-profile}
[[:burst :bucket "60/60/1m"]]}

View file

@ -349,7 +349,7 @@
::fullname
::props]))
(defn retrieve-info
(defn get-info
[{:keys [provider] :as cfg} {:keys [params] :as request}]
(letfn [(validate-oidc [info]
;; If the provider is OIDC, we can proceed to check
@ -396,14 +396,12 @@
(p/then' validate-oidc)
(p/then' (partial post-process state))))))
(defn- retrieve-profile
(defn- get-profile
[{:keys [::db/pool ::wrk/executor] :as cfg} info]
(px/with-dispatch executor
(with-open [conn (db/open pool)]
(some->> (:email info)
(profile/retrieve-profile-data-by-email conn)
(profile/populate-additional-data conn)
(profile/decode-profile-row)))))
(profile/get-profile-by-email conn)))))
(defn- redirect-response
[uri]
@ -417,9 +415,9 @@
(redirect-response uri)))
(defn- generate-redirect
[{:keys [::session/session] :as cfg} request info profile]
[cfg request info profile]
(if profile
(let [sxf (session/create-fn session (:id profile))
(let [sxf (session/create-fn cfg (:id profile))
token (or (:invitation-token info)
(tokens/generate (::main/props cfg)
{:iss :auth
@ -436,7 +434,7 @@
(when-let [collector (::audit/collector cfg)]
(audit/submit! collector {:type "command"
:name "login"
:name "login-with-password"
:profile-id (:id profile)
:ip-addr (audit/parse-client-ip request)
:props (audit/profile->props profile)}))
@ -471,8 +469,8 @@
(defn- callback-handler
[cfg request]
(letfn [(process-request []
(p/let [info (retrieve-info cfg request)
profile (retrieve-profile cfg info)]
(p/let [info (get-info cfg request)
profile (get-profile cfg info)]
(generate-redirect cfg request info profile)))
(handle-error [cause]
@ -524,23 +522,24 @@
(s/def ::providers (s/map-of ::us/keyword (s/nilable ::provider)))
(s/def ::routes vector?)
(defmethod ig/pre-init-spec ::routes
[_]
(s/keys :req [::http/client
(s/keys :req [::session/manager
::http/client
::wrk/executor
::main/props
::db/pool
::providers
::session/session]))
::providers]))
(defmethod ig/init-key ::routes
[_ {:keys [::wrk/executor ::session/session] :as cfg}]
[_ {:keys [::wrk/executor] :as cfg}]
(let [cfg (update cfg :provider d/without-nils)]
["" {:middleware [[(:middleware session)]
["" {:middleware [[session/authz cfg]
[hmw/with-dispatch executor]
[hmw/with-config cfg]
[provider-lookup]
]}
[provider-lookup]]}
["/auth/oauth"
["/:provider"
{:handler auth-handler
@ -548,4 +547,3 @@
["/:provider/callback"
{:handler callback-handler
:allowed-methods #{:get}}]]]))

View file

@ -10,9 +10,9 @@
[app.common.logging :as l]
[app.db :as db]
[app.main :as main]
[app.rpc.commands.auth :as cmd.auth]
[app.rpc.commands.auth :as auth]
[app.rpc.mutations.profile :as profile]
[app.rpc.queries.profile :refer [retrieve-profile-data-by-email]]
[app.rpc.queries.profile :refer [get-profile-by-email]]
[clojure.string :as str]
[clojure.tools.cli :refer [parse-opts]]
[integrant.core :as ig])
@ -55,16 +55,17 @@
:type :password}))]
(try
(db/with-atomic [conn (:app.db/pool system)]
(->> (cmd.auth/create-profile conn
{:fullname fullname
:email email
:password password
:is-active true
:is-demo false})
(cmd.auth/create-profile-relations conn)))
(->> (auth/create-profile! conn
{:fullname fullname
:email email
:password password
:is-active true
:is-demo false})
(auth/create-profile-rels! conn)))
(when (pos? (:verbosity options))
(println "User created successfully."))
(System/exit 0)
(catch Exception _e
@ -79,7 +80,7 @@
(db/with-atomic [conn (:app.db/pool system)]
(let [email (or (:email options)
(read-from-console {:label "Email:"}))
profile (retrieve-profile-data-by-email conn email)]
profile (get-profile-by-email conn email)]
(when-not profile
(when (pos? (:verbosity options))
(println "Profile does not exists."))

View file

@ -128,6 +128,7 @@
(s/def ::database-max-pool-size ::us/integer)
(s/def ::quotes-teams-per-profile ::us/integer)
(s/def ::quotes-access-tokens-per-profile ::us/integer)
(s/def ::quotes-projects-per-team ::us/integer)
(s/def ::quotes-invitations-per-team ::us/integer)
(s/def ::quotes-profiles-per-team ::us/integer)
@ -281,6 +282,7 @@
::public-uri
::quotes-teams-per-profile
::quotes-access-tokens-per-profile
::quotes-projects-per-team
::quotes-invitations-per-team
::quotes-profiles-per-team

View file

@ -233,44 +233,46 @@
[pool]
(jdbc/get-connection pool))
(def ^:private default-opts
{:builder-fn sql/as-kebab-maps})
(defn exec!
([ds sv]
(exec! ds sv {}))
(jdbc/execute! ds sv default-opts))
([ds sv opts]
(jdbc/execute! ds sv (assoc opts :builder-fn sql/as-kebab-maps))))
(jdbc/execute! ds sv (merge default-opts opts))))
(defn exec-one!
([ds sv] (exec-one! ds sv {}))
([ds sv]
(jdbc/execute-one! ds sv default-opts))
([ds sv opts]
(jdbc/execute-one! ds sv (assoc opts :builder-fn sql/as-kebab-maps))))
(jdbc/execute-one! ds sv
(-> (merge default-opts opts)
(assoc :return-keys (::return-keys? opts false))))))
(defn insert!
([ds table params] (insert! ds table params nil))
([ds table params opts]
(exec-one! ds
(sql/insert table params opts)
(merge {:return-keys true} opts))))
[ds table params & {:as opts}]
(exec-one! ds
(sql/insert table params opts)
(merge {::return-keys? true} opts)))
(defn insert-multi!
([ds table cols rows] (insert-multi! ds table cols rows nil))
([ds table cols rows opts]
(exec! ds
(sql/insert-multi table cols rows opts)
(merge {:return-keys true} opts))))
[ds table cols rows & {:as opts}]
(exec! ds
(sql/insert-multi table cols rows opts)
(merge {::return-keys? true} opts)))
(defn update!
([ds table params where] (update! ds table params where nil))
([ds table params where opts]
(exec-one! ds
(sql/update table params where opts)
(merge {:return-keys true} opts))))
[ds table params where & {:as opts}]
(exec-one! ds
(sql/update table params where opts)
(merge {::return-keys? true} opts)))
(defn delete!
([ds table params] (delete! ds table params nil))
([ds table params opts]
(exec-one! ds
(sql/delete table params opts)
(assoc opts :return-keys true))))
[ds table params & {:as opts}]
(exec-one! ds
(sql/delete table params opts)
(merge {::return-keys? true} opts)))
(defn is-row-deleted?
[{:keys [deleted-at]}]
@ -279,56 +281,34 @@
(inst-ms (dt/now)))))
(defn get*
"Internal function for retrieve a single row from database that
matches a simple filters."
([ds table params]
(get* ds table params nil))
([ds table params {:keys [check-deleted?] :or {check-deleted? true} :as opts}]
(let [rows (exec! ds (sql/select table params opts))
rows (cond->> rows
check-deleted?
(remove is-row-deleted?))]
(first rows))))
"Retrieve a single row from database that matches a simple filters. Do
not raises exceptions."
[ds table params & {:as opts}]
(let [rows (exec! ds (sql/select table params opts))
rows (cond->> rows
(::remove-deleted? opts true)
(remove is-row-deleted?))]
(first rows)))
(defn get
([ds table params]
(get ds table params nil))
([ds table params {:keys [check-deleted?] :or {check-deleted? true} :as opts}]
(let [row (get* ds table params opts)]
(when (and (not row) check-deleted?)
(ex/raise :type :not-found
:code :object-not-found
:table table
:hint "database object not found"))
row)))
(defn get-by-params
"DEPRECATED"
([ds table params]
(get-by-params ds table params nil))
([ds table params {:keys [check-not-found] :or {check-not-found true} :as opts}]
(let [row (get* ds table params (assoc opts :check-deleted? check-not-found))]
(when (and (not row) check-not-found)
(ex/raise :type :not-found
:code :object-not-found
:table table
:hint "database object not found"))
row)))
"Retrieve a single row from database that matches a simple
filters. Raises :not-found exception if no object is found."
[ds table params & {:as opts}]
(let [row (get* ds table params opts)]
(when (and (not row) (::check-deleted? opts true))
(ex/raise :type :not-found
:code :object-not-found
:table table
:hint "database object not found"))
row))
(defn get-by-id
([ds table id]
(get ds table {:id id} nil))
([ds table id opts]
(let [opts (cond-> opts
(contains? opts :check-not-found)
(assoc :check-deleted? (:check-not-found opts)))]
(get ds table {:id id} opts))))
[ds table id & {:as opts}]
(get ds table {:id id} opts))
(defn query
([ds table params]
(query ds table params nil))
([ds table params opts]
(exec! ds (sql/select table params opts))))
[ds table params & {:as opts}]
(exec! ds (sql/select table params opts)))
(defn pgobject?
([v]

View file

@ -7,6 +7,7 @@
(ns app.db.sql
(:refer-clojure :exclude [update])
(:require
[app.db :as-alias db]
[clojure.string :as str]
[next.jdbc.optional :as jdbc-opt]
[next.jdbc.sql.builder :as sql]))
@ -43,8 +44,10 @@
([table where-params opts]
(let [opts (merge default-opts opts)
opts (cond-> opts
(:for-update opts) (assoc :suffix "FOR UPDATE")
(:for-key-share opts) (assoc :suffix "FOR KEY SHARE"))]
(::db/for-update? opts) (assoc :suffix "FOR UPDATE")
(::db/for-share? opts) (assoc :suffix "FOR KEY SHARE")
(:for-update opts) (assoc :suffix "FOR UPDATE")
(:for-key-share opts) (assoc :suffix "FOR KEY SHARE"))]
(sql/for-query table where-params opts))))
(defn update

View file

@ -6,13 +6,22 @@
(ns app.http
(:require
[app.auth.oidc :as-alias oidc]
[app.common.data :as d]
[app.common.logging :as l]
[app.common.transit :as t]
[app.db :as-alias db]
[app.http.access-token :as actoken]
[app.http.assets :as-alias assets]
[app.http.awsns :as-alias awsns]
[app.http.debug :as-alias debug]
[app.http.errors :as errors]
[app.http.middleware :as mw]
[app.http.session :as session]
[app.http.websocket :as-alias ws]
[app.metrics :as mtx]
[app.rpc :as-alias rpc]
[app.rpc.doc :as-alias rpc.doc]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
@ -64,7 +73,6 @@
:http/max-body-size (:max-body-size cfg)
:http/max-multipart-body-size (:max-multipart-body-size cfg)
:xnio/io-threads (:io-threads cfg)
:xnio/worker-threads (:worker-threads cfg)
:xnio/dispatch (:executor cfg)
:ring/async true}
@ -113,64 +121,41 @@
;; HTTP ROUTER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::assets map?)
(s/def ::awsns-handler fn?)
(s/def ::debug-routes (s/nilable vector?))
(s/def ::doc-routes (s/nilable vector?))
(s/def ::feedback fn?)
(s/def ::oauth map?)
(s/def ::oidc-routes (s/nilable vector?))
(s/def ::rpc-routes (s/nilable vector?))
(s/def ::session ::session/session)
(s/def ::storage map?)
(s/def ::ws fn?)
(defmethod ig/pre-init-spec ::router [_]
(s/keys :req-un [::mtx/metrics
::ws
::storage
::assets
::session
::feedback
::awsns-handler
::debug-routes
::oidc-routes
::rpc-routes
::doc-routes]))
(s/keys :req [::session/manager
::actoken/manager
::ws/routes
::rpc/routes
::rpc.doc/routes
::oidc/routes
::assets/routes
::debug/routes
::db/pool
::mtx/routes
::awsns/routes]))
(defmethod ig/init-key ::router
[_ {:keys [ws session metrics assets feedback] :as cfg}]
[_ cfg]
(rr/router
[["" {:middleware [[mw/server-timing]
[mw/format-response]
[mw/params]
[mw/parse-request]
[session/middleware-1 session]
[session/soft-auth cfg]
[actoken/soft-auth cfg]
[mw/errors errors/handle]
[mw/restrict-methods]]}
["/metrics" {:handler (::mtx/handler metrics)
:allowed-methods #{:get}}]
["/assets" {:middleware [[session/middleware-2 session]]}
["/by-id/:id" {:handler (:objects-handler assets)}]
["/by-file-media-id/:id" {:handler (:file-objects-handler assets)}]
["/by-file-media-id/:id/thumbnail" {:handler (:file-thumbnails-handler assets)}]]
(:debug-routes cfg)
(::mtx/routes cfg)
(::assets/routes cfg)
(::debug/routes cfg)
["/webhooks"
["/sns" {:handler (:awsns-handler cfg)
:allowed-methods #{:post}}]]
(::awsns/routes cfg)]
["/ws/notifications" {:middleware [[session/middleware-2 session]]
:handler ws
:allowed-methods #{:get}}]
(::ws/routes cfg)
["/api" {:middleware [[mw/cors]
[session/middleware-2 session]]}
["/feedback" {:handler feedback
:allowed-methods #{:post}}]
(:doc-routes cfg)
(:oidc-routes cfg)
(:rpc-routes cfg)]]]))
["/api" {:middleware [[mw/cors]]}
(::oidc/routes cfg)
(::rpc.doc/routes cfg)
(::rpc/routes cfg)]]]))

View file

@ -0,0 +1,96 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.http.access-token
(:require
[app.common.logging :as l]
[app.common.spec :as us]
[app.config :as cf]
[app.db :as db]
[app.main :as-alias main]
[app.tokens :as tokens]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]
[yetti.request :as yrq]))
(s/def ::manager
(s/keys :req [::db/pool ::wrk/executor ::main/props]))
(defmethod ig/pre-init-spec ::manager [_] ::manager)
(defmethod ig/init-key ::manager [_ cfg] cfg)
(defmethod ig/halt-key! ::manager [_ _])
(def header-re #"^Token\s+(.*)")
(defn- get-token
[request]
(some->> (yrq/get-header request "authorization")
(re-matches header-re)
(second)))
(defn- decode-token
[props token]
(when token
(tokens/verify props {:token token :iss "access-token"})))
(defn- get-token-perms
[pool token-id]
(when-not (db/read-only? pool)
(when-let [token (db/get* pool :access-token {:id token-id} {:columns [:perms]})]
(some-> (:perms token)
(db/decode-pgarray #{})))))
(defn- wrap-soft-auth
[handler {:keys [::manager]}]
(us/assert! ::manager manager)
(let [{:keys [::wrk/executor ::main/props]} manager]
(fn [request respond raise]
(let [token (get-token request)]
(->> (px/submit! executor (partial decode-token props token))
(p/fnly (fn [claims cause]
(when cause
(l/trace :hint "exception on decoding malformed token" :cause cause))
(let [request (cond-> request
(map? claims)
(assoc ::id (:tid claims)))]
(handler request respond raise)))))))))
(defn- wrap-authz
[handler {:keys [::manager]}]
(us/assert! ::manager manager)
(let [{:keys [::wrk/executor ::db/pool]} manager]
(fn [request respond raise]
(if-let [token-id (::id request)]
(->> (px/submit! executor (partial get-token-perms pool token-id))
(p/fnly (fn [perms cause]
(cond
(some? cause)
(raise cause)
(nil? perms)
(handler request respond raise)
:else
(let [request (assoc request ::perms perms)]
(handler request respond raise))))))
(handler request respond raise)))))
(def soft-auth
{:name ::soft-auth
:compile (fn [& _]
(when (contains? cf/flags :access-tokens)
wrap-soft-auth))})
(def authz
{:name ::authz
:compile (fn [& _]
(when (contains? cf/flags :access-tokens)
wrap-authz))})

View file

@ -115,7 +115,10 @@
(s/def ::cache-max-age ::dt/duration)
(s/def ::signature-max-age ::dt/duration)
(defmethod ig/pre-init-spec ::handlers [_]
(s/def ::routes vector?)
;; FIXME: namespace qualified params
(defmethod ig/pre-init-spec ::routes [_]
(s/keys :req-un [::storage
::wrk/executor
::mtx/metrics
@ -123,9 +126,9 @@
::cache-max-age
::signature-max-age]))
(defmethod ig/init-key ::handlers
(defmethod ig/init-key ::routes
[_ cfg]
{:objects-handler (partial objects-handler cfg)
:file-objects-handler (partial file-objects-handler cfg)
:file-thumbnails-handler (partial file-thumbnails-handler cfg)})
["/assets"
["/by-id/:id" {:handler (partial objects-handler cfg)}]
["/by-file-media-id/:id" {:handler (partial file-objects-handler cfg)}]
["/by-file-media-id/:id/thumbnail" {:handler (partial file-thumbnails-handler cfg)}]])

View file

@ -28,18 +28,20 @@
(declare parse-notification)
(declare process-report)
(defmethod ig/pre-init-spec ::handler [_]
(defmethod ig/pre-init-spec ::routes [_]
(s/keys :req [::http/client
::main/props
::db/pool
::wrk/executor]))
(defmethod ig/init-key ::handler
(defmethod ig/init-key ::routes
[_ {:keys [::wrk/executor] :as cfg}]
(fn [request respond _]
(let [data (-> request yrq/body slurp)]
(px/run! executor #(handle-request cfg data)))
(respond (yrs/response 200))))
(letfn [(handler [request respond _]
(let [data (-> request yrq/body slurp)]
(px/run! executor #(handle-request cfg data)))
(respond (yrs/response 200)))]
["/sns" {:handler handler
:allowed-methods #{:post}}]))
(defn handle-request
[cfg data]
@ -105,8 +107,7 @@
[cfg headers]
(let [tdata (get headers "x-penpot-data")]
(when-not (str/empty? tdata)
(let [sprops (::main/props cfg)
result (tokens/verify sprops {:token tdata :iss :profile-identity})]
(let [result (tokens/verify (::main/props cfg) {:token tdata :iss :profile-identity})]
(:profile-id result)))))
(defn- parse-notification

View file

@ -39,9 +39,9 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn authorized?
[pool {:keys [profile-id]}]
[pool {:keys [::session/profile-id]}]
(or (= "devenv" (cf/get :host))
(let [profile (ex/ignoring (profile/retrieve-profile-data pool profile-id))
(let [profile (ex/ignoring (profile/get-profile pool profile-id))
admins (or (cf/get :admins) #{})]
(contains? admins (:email profile)))))
@ -61,7 +61,7 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn index-handler
[{:keys [pool]} request]
[{:keys [::db/pool]} request]
(when-not (authorized? pool request)
(ex/raise :type :authentication
:code :only-admins-allowed))
@ -81,7 +81,7 @@
"select revn, changes, data from file_change where file_id=? and revn = ?")
(defn- retrieve-file-data
[{:keys [pool]} {:keys [params profile-id] :as request}]
[{:keys [::db/pool]} {:keys [params ::session/profile-id] :as request}]
(when-not (authorized? pool request)
(ex/raise :type :authentication
:code :only-admins-allowed))
@ -107,8 +107,9 @@
(prepare-download-response data filename)
(contains? params :clone)
(let [project-id (some-> (profile/retrieve-additional-data pool profile-id) :default-project-id)
data (some-> data blob/decode)]
(let [profile (profile/get-profile pool profile-id)
project-id (:default-project-id profile)
data (blob/decode data)]
(create-file pool {:id (uuid/next)
:name (str "Cloned file: " filename)
:project-id project-id
@ -117,7 +118,7 @@
(yrs/response 201 "OK CREATED"))
:else
(prepare-response (some-> data blob/decode))))))
(prepare-response (blob/decode data))))))
(defn- is-file-exists?
[pool id]
@ -125,8 +126,9 @@
(-> (db/exec-one! pool [sql id]) :exists)))
(defn- upload-file-data
[{:keys [pool]} {:keys [profile-id params] :as request}]
(let [project-id (some-> (profile/retrieve-additional-data pool profile-id) :default-project-id)
[{:keys [::db/pool]} {:keys [::session/profile-id params] :as request}]
(let [profile (profile/get-profile pool profile-id)
project-id (:default-project-id profile)
data (some-> params :file :path io/read-as-bytes blob/decode)]
(if (and data project-id)
@ -162,7 +164,7 @@
:code :method-not-found)))
(defn file-changes-handler
[{:keys [pool]} {:keys [params] :as request}]
[{:keys [::db/pool]} {:keys [params] :as request}]
(when-not (authorized? pool request)
(ex/raise :type :authentication
:code :only-admins-allowed))
@ -202,7 +204,7 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn error-handler
[{:keys [pool]} request]
[{:keys [::db/pool]} request]
(letfn [(parse-id [request]
(let [id (get-in request [:path-params :id])
id (parse-uuid id)]
@ -251,7 +253,7 @@
LIMIT 100")
(defn error-list-handler
[{:keys [pool]} request]
[{:keys [::db/pool]} request]
(when-not (authorized? pool request)
(ex/raise :type :authentication
:code :only-admins-allowed))
@ -268,7 +270,7 @@
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(defn export-handler
[{:keys [pool] :as cfg} {:keys [params profile-id] :as request}]
[{:keys [::db/pool] :as cfg} {:keys [params ::session/profile-id] :as request}]
(let [file-ids (->> (:file-ids params)
(remove empty?)
@ -287,7 +289,8 @@
(assoc ::binf/include-libraries? libs?)
(binf/export-to-tmpfile!))]
(if clone?
(let [project-id (some-> (profile/retrieve-additional-data pool profile-id) :default-project-id)]
(let [profile (profile/get-profile pool profile-id)
project-id (:default-project-id profile)]
(binf/import!
(assoc cfg
::binf/input path
@ -309,15 +312,16 @@
(defn import-handler
[{:keys [pool] :as cfg} {:keys [params profile-id] :as request}]
[{:keys [::db/pool] :as cfg} {:keys [params ::session/profile-id] :as request}]
(when-not (contains? params :file)
(ex/raise :type :validation
:code :missing-upload-file
:hint "missing upload file"))
(let [project-id (some-> (profile/retrieve-additional-data pool profile-id) :default-project-id)
(let [profile (profile/get-profile pool profile-id)
project-id (:default-project-id profile)
overwrite? (contains? params :overwrite)
migrate? (contains? params :migrate)
migrate? (contains? params :migrate)
ignore-index-errors? (contains? params :ignore-index-errors)]
(when-not project-id
@ -381,16 +385,17 @@
(raise (ex/error :type :authentication
:code :only-admins-allowed))))))})
(defmethod ig/pre-init-spec ::routes [_]
(s/keys :req-un [::db/pool ::wrk/executor ::session/session]))
(s/keys :req [::db/pool
::wrk/executor
::session/manager]))
(defmethod ig/init-key ::routes
[_ {:keys [session pool executor] :as cfg}]
[_ {:keys [::db/pool ::wrk/executor] :as cfg}]
[["/readyz" {:middleware [[mw/with-dispatch executor]
[mw/with-config cfg]]
:handler health-handler}]
["/dbg" {:middleware [[session/middleware-2 session]
["/dbg" {:middleware [[session/authz cfg]
[with-authorization pool]
[mw/with-dispatch executor]
[mw/with-config cfg]]}

View file

@ -11,6 +11,8 @@
[app.common.exceptions :as ex]
[app.common.logging :as l]
[app.http :as-alias http]
[app.http.access-token :as-alias actoken]
[app.http.session :as-alias session]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[yetti.request :as yrq]
@ -26,7 +28,9 @@
(defn get-context
[request]
(let [claims (:session-token-claims request)]
(let [claims (-> {}
(into (::session/token-claims request))
(into (::actoken/token-claims request)))]
(merge
*context*
{:path (:path request)
@ -49,6 +53,10 @@
[err _]
(yrs/response 401 (ex-data err)))
(defmethod handle-exception :authorization
[err _]
(yrs/response 403 (ex-data err)))
(defmethod handle-exception :restriction
[err _]
(yrs/response 400 (ex-data err)))

View file

@ -13,6 +13,7 @@
[app.config :as cf]
[app.db :as db]
[app.emails :as eml]
[app.http.session :as-alias session]
[app.rpc.queries.profile :as profile]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
@ -42,7 +43,7 @@
:hint "feedback module is disabled"))))))
(defn- handler
[{:keys [pool] :as cfg} {:keys [profile-id] :as request}]
[{:keys [pool] :as cfg} {:keys [::session/profile-id] :as request}]
(let [ftoken (cf/get :feedback-token ::no-token)
token (yrq/get-header request "x-feedback-token")
params (d/merge (:params request)

View file

@ -9,14 +9,17 @@
(:require
[app.common.data :as d]
[app.common.logging :as l]
[app.common.spec :as us]
[app.config :as cf]
[app.db :as db]
[app.db.sql :as sql]
[app.http.session.tasks :as-alias tasks]
[app.main :as-alias main]
[app.tokens :as tokens]
[app.util.time :as dt]
[app.worker :as wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[integrant.core :as ig]
[promesa.core :as p]
[promesa.exec :as px]
@ -45,55 +48,55 @@
(defprotocol ISessionManager
(read [_ key])
(decode [_ key])
(write! [_ key data])
(update! [_ data])
(delete! [_ key]))
(s/def ::session #(satisfies? ISessionManager %))
(s/def ::manager #(satisfies? ISessionManager %))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; STORAGE IMPL
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::session-params
(s/keys :req-un [::user-agent
::profile-id
::created-at]))
(defn- prepare-session-params
[props data]
(let [profile-id (:profile-id data)
user-agent (:user-agent data)
created-at (or (:created-at data) (dt/now))
token (tokens/generate props {:iss "authentication"
:iat created-at
:uid profile-id})]
{:user-agent user-agent
:profile-id profile-id
:created-at created-at
:updated-at created-at
:id token}))
[key params]
(us/assert! ::us/not-empty-string key)
(us/assert! ::session-params params)
{:user-agent (:user-agent params)
:profile-id (:profile-id params)
:created-at (:created-at params)
:updated-at (:created-at params)
:id key})
(defn- database-manager
[{:keys [::db/pool ::wrk/executor ::main/props]}]
^{::wrk/executor executor
::db/pool pool
::main/props props}
(reify ISessionManager
(read [_ token]
(px/with-dispatch executor
(db/exec-one! pool (sql/select :http-session {:id token}))))
(decode [_ token]
(write! [_ key params]
(px/with-dispatch executor
(tokens/verify props {:token token :iss "authentication"})))
(write! [_ _ data]
(px/with-dispatch executor
(let [params (prepare-session-params props data)]
(let [params (prepare-session-params key params)]
(db/insert! pool :http-session params)
params)))
(update! [_ data]
(update! [_ params]
(let [updated-at (dt/now)]
(px/with-dispatch executor
(db/update! pool :http-session
{:updated-at updated-at}
{:id (:id data)})
(assoc data :updated-at updated-at))))
{:id (:id params)})
(assoc params :updated-at updated-at))))
(delete! [_ token]
(px/with-dispatch executor
@ -101,27 +104,26 @@
nil))))
(defn inmemory-manager
[{:keys [::wrk/executor ::main/props]}]
[{:keys [::db/pool ::wrk/executor ::main/props]}]
(let [cache (atom {})]
^{::main/props props
::wrk/executor executor
::db/pool pool}
(reify ISessionManager
(read [_ token]
(p/do (get @cache token)))
(decode [_ token]
(px/with-dispatch executor
(tokens/verify props {:token token :iss "authentication"})))
(write! [_ _ data]
(write! [_ key params]
(p/do
(let [{:keys [token] :as params} (prepare-session-params props data)]
(swap! cache assoc token params)
(let [params (prepare-session-params key params)]
(swap! cache assoc key params)
params)))
(update! [_ data]
(update! [_ params]
(p/do
(let [updated-at (dt/now)]
(swap! cache update (:id data) assoc :updated-at updated-at)
(assoc data :updated-at updated-at))))
(swap! cache update (:id params) assoc :updated-at updated-at)
(assoc params :updated-at updated-at))))
(delete! [_ token]
(p/do
@ -144,25 +146,34 @@
;; MANAGER IMPL
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare assign-auth-token-cookie)
(declare assign-authenticated-cookie)
(declare clear-auth-token-cookie)
(declare clear-authenticated-cookie)
(declare ^:private assign-auth-token-cookie)
(declare ^:private assign-authenticated-cookie)
(declare ^:private clear-auth-token-cookie)
(declare ^:private clear-authenticated-cookie)
(declare ^:private gen-token)
(defn create-fn
[manager profile-id]
(fn [request response]
(let [uagent (yrq/get-header request "user-agent")
params {:profile-id profile-id
:user-agent uagent}]
(-> (write! manager nil params)
(p/then (fn [session]
(l/trace :hint "create" :profile-id profile-id)
(-> response
(assign-auth-token-cookie session)
(assign-authenticated-cookie session))))))))
[{:keys [::manager]} profile-id]
(us/assert! ::manager manager)
(us/assert! ::us/uuid profile-id)
(let [props (-> manager meta ::main/props)]
(fn [request response]
(let [uagent (yrq/get-header request "user-agent")
params {:profile-id profile-id
:user-agent uagent
:created-at (dt/now)}
token (gen-token props params)]
(->> (write! manager token params)
(p/fmap (fn [session]
(l/trace :hint "create" :profile-id profile-id)
(-> response
(assign-auth-token-cookie session)
(assign-authenticated-cookie session)))))))))
(defn delete-fn
[manager]
[{:keys [::manager]}]
(us/assert! ::manager manager)
(letfn [(delete [{:keys [profile-id] :as request}]
(let [cname (cf/get :auth-token-cookie-name default-auth-token-cookie-name)
cookie (yrq/get-cookie request cname)]
@ -177,68 +188,92 @@
(clear-auth-token-cookie)
(clear-authenticated-cookie))))))
(def middleware-1
(letfn [(decode-cookie [manager cookie]
(if-let [value (:value cookie)]
(decode manager value)
(p/resolved nil)))
(defn- gen-token
[props {:keys [profile-id created-at]}]
(tokens/generate props {:iss "authentication"
:iat created-at
:uid profile-id}))
(defn- decode-token
[props token]
(when token
(tokens/verify props {:token token :iss "authentication"})))
(wrap-handler [manager handler request respond raise]
(let [cookie (some->> (cf/get :auth-token-cookie-name default-auth-token-cookie-name)
(yrq/get-cookie request))]
(->> (decode-cookie manager cookie)
(p/fnly (fn [claims _]
(cond-> request
(some? claims) (assoc :session-token-claims claims)
:always (handler respond raise)))))))]
{:name :session-1
:compile (fn [& _]
(fn [handler manager]
(partial wrap-handler manager handler)))}))
(defn- get-token
[request]
(let [cname (cf/get :auth-token-cookie-name default-auth-token-cookie-name)
cookie (some-> (yrq/get-cookie request cname) :value)]
(when-not (str/empty? cookie)
cookie)))
(def middleware-2
(letfn [(wrap-handler [manager handler request respond raise]
(-> (retrieve-session manager request)
(p/finally (fn [session cause]
(cond
(some? cause)
(raise cause)
(defn- get-session
[manager token]
(some->> token (read manager)))
(nil? session)
(handler request respond raise)
(defn- renew-session?
[{:keys [updated-at] :as session}]
(and (dt/instant? updated-at)
(let [elapsed (dt/diff updated-at (dt/now))]
(neg? (compare default-renewal-max-age elapsed)))))
:else
(let [request (-> request
(assoc :profile-id (:profile-id session))
(assoc :session-id (:id session)))
respond (cond-> respond
(renew-session? session)
(wrap-respond manager session))]
(handler request respond raise)))))))
(defn- wrap-reneval
[respond manager session]
(fn [response]
(p/let [session (update! manager session)]
(-> response
(assign-auth-token-cookie session)
(assign-authenticated-cookie session)
(respond)))))
(retrieve-session [manager request]
(let [cname (cf/get :auth-token-cookie-name default-auth-token-cookie-name)
cookie (yrq/get-cookie request cname)]
(some->> (:value cookie) (read manager))))
(defn- wrap-soft-auth
[handler {:keys [::manager]}]
(us/assert! ::manager manager)
(renew-session? [{:keys [updated-at] :as session}]
(and (dt/instant? updated-at)
(let [elapsed (dt/diff updated-at (dt/now))]
(neg? (compare default-renewal-max-age elapsed)))))
(let [{:keys [::wrk/executor ::main/props]} (meta manager)]
(fn [request respond raise]
(let [token (get-token request)]
(->> (px/submit! executor (partial decode-token props token))
(p/fnly (fn [claims cause]
(when cause
(l/trace :hint "exception on decoding malformed token" :cause cause))
;; Wrap respond with session renewal code
(wrap-respond [respond manager session]
(fn [response]
(p/let [session (update! manager session)]
(-> response
(assign-auth-token-cookie session)
(assign-authenticated-cookie session)
(respond)))))]
(let [request (cond-> request
(map? claims)
(-> (assoc ::token-claims claims)
(assoc ::token token)))]
(handler request respond raise)))))))))
{:name :session-2
:compile (fn [& _]
(fn [handler manager]
(partial wrap-handler manager handler)))}))
(defn- wrap-authz
[handler {:keys [::manager]}]
(us/assert! ::manager manager)
(fn [request respond raise]
(if-let [token (::token request)]
(->> (get-session manager token)
(p/fnly (fn [session cause]
(cond
(some? cause)
(raise cause)
(nil? session)
(handler request respond raise)
:else
(let [request (-> request
(assoc ::profile-id (:profile-id session))
(assoc ::id (:id session)))
respond (cond-> respond
(renew-session? session)
(wrap-reneval manager session))]
(handler request respond raise))))))
(handler request respond raise))))
(def soft-auth
{:name ::soft-auth
:compile (constantly wrap-soft-auth)})
(def authz
{:name ::authz
:compile (constantly wrap-authz)})
;; --- IMPL
@ -300,21 +335,26 @@
;; TASK: SESSION GC
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(declare sql:delete-expired)
(s/def ::tasks/max-age ::dt/duration)
(s/def ::max-age ::dt/duration)
(defmethod ig/pre-init-spec ::tasks/gc [_]
(s/keys :req [::db/pool]
:opt [::tasks/max-age]))
(defmethod ig/pre-init-spec ::gc-task [_]
(s/keys :req-un [::db/pool]
:opt-un [::max-age]))
(defmethod ig/prep-key ::gc-task
(defmethod ig/prep-key ::tasks/gc
[_ cfg]
(merge {:max-age default-cookie-max-age}
(d/without-nils cfg)))
(let [max-age (cf/get :auth-token-cookie-max-age default-cookie-max-age)]
(merge {::tasks/max-age max-age} (d/without-nils cfg))))
(defmethod ig/init-key ::gc-task
[_ {:keys [pool max-age] :as cfg}]
(def ^:private
sql:delete-expired
"delete from http_session
where updated_at < now() - ?::interval
or (updated_at is null and
created_at < now() - ?::interval)")
(defmethod ig/init-key ::tasks/gc
[_ {:keys [::db/pool ::tasks/max-age] :as cfg}]
(l/debug :hint "initializing session gc task" :max-age max-age)
(fn [_]
(db/with-atomic [conn pool]
@ -326,9 +366,3 @@
:deleted result)
result))))
(def ^:private
sql:delete-expired
"delete from http_session
where updated_at < now() - ?::interval
or (updated_at is null and
created_at < now() - ?::interval)")

View file

@ -12,6 +12,7 @@
[app.common.pprint :as pp]
[app.common.spec :as us]
[app.db :as db]
[app.http.session :as session]
[app.metrics :as mtx]
[app.msgbus :as mbus]
[app.util.time :as dt]
@ -34,7 +35,7 @@
(def state (atom {}))
(defn- on-connect
[{:keys [metrics]} wsp]
[{:keys [::mtx/metrics]} wsp]
(let [created-at (dt/now)]
(swap! state assoc (::ws/id @wsp) wsp)
(mtx/run! metrics
@ -48,7 +49,7 @@
:val (/ (inst-ms (dt/diff created-at (dt/now))) 1000.0)))))
(defn- on-rcv-message
[{:keys [metrics]} _ message]
[{:keys [::mtx/metrics]} _ message]
(mtx/run! metrics
:id :websocket-messages-total
:labels recv-labels
@ -56,7 +57,7 @@
message)
(defn- on-snd-message
[{:keys [metrics]} _ message]
[{:keys [::mtx/metrics]} _ message]
(mtx/run! metrics
:id :websocket-messages-total
:labels send-labels
@ -95,7 +96,6 @@
:user-agent (::ws/user-agent @wsp)
:ip-addr (::ws/remote-addr @wsp)
:last-activity-at (::ws/last-activity-at @wsp)
:http-session-id (::ws/http-session-id @wsp)
:subscribed-file (-> wsp deref ::file-subscription :file-id)
:subscribed-team (-> wsp deref ::team-subscription :team-id)}))
@ -120,7 +120,7 @@
(defmethod handle-message :connect
[cfg wsp _]
(let [msgbus (:msgbus cfg)
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
@ -139,7 +139,7 @@
(defmethod handle-message :disconnect
[cfg wsp _]
(let [msgbus (:msgbus cfg)
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
@ -173,7 +173,7 @@
(defmethod handle-message :subscribe-team
[cfg wsp {:keys [team-id] :as params}]
(let [msgbus (:msgbus cfg)
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
session-id (::session-id @wsp)
output-ch (::ws/output-ch @wsp)
@ -205,7 +205,7 @@
(defmethod handle-message :subscribe-file
[cfg wsp {:keys [file-id] :as params}]
(let [msgbus (:msgbus cfg)
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
@ -258,7 +258,7 @@
(defmethod handle-message :unsubscribe-file
[cfg wsp {:keys [file-id] :as params}]
(let [msgbus (:msgbus cfg)
(let [msgbus (::mbus/msgbus cfg)
conn-id (::ws/id @wsp)
session-id (::session-id @wsp)
profile-id (::profile-id @wsp)
@ -288,7 +288,7 @@
(defmethod handle-message :pointer-update
[cfg wsp {:keys [file-id] :as message}]
(let [msgbus (:msgbus cfg)
(let [msgbus (::mbus/msgbus cfg)
profile-id (::profile-id @wsp)
session-id (::session-id @wsp)
subs (::file-subscription @wsp)
@ -313,39 +313,47 @@
;; HTTP HANDLER
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(s/def ::msgbus ::mbus/msgbus)
(s/def ::session-id ::us/uuid)
(s/def ::handler-params
(s/keys :req-un [::session-id]))
(defmethod ig/pre-init-spec ::handler [_]
(s/keys :req-un [::msgbus ::db/pool ::mtx/metrics]))
(defn- http-handler
[cfg {:keys [params ::session/profile-id] :as request} respond raise]
(let [{:keys [session-id]} (us/conform ::handler-params params)]
(cond
(not profile-id)
(raise (ex/error :type :authentication
:hint "Authentication required."))
(defmethod ig/init-key ::handler
(not (yws/upgrade-request? request))
(raise (ex/error :type :validation
:code :websocket-request-expected
:hint "this endpoint only accepts websocket connections"))
:else
(do
(l/trace :hint "websocket request" :profile-id profile-id :session-id session-id)
(->> (ws/handler
::ws/on-rcv-message (partial on-rcv-message cfg)
::ws/on-snd-message (partial on-snd-message cfg)
::ws/on-connect (partial on-connect cfg)
::ws/handler (partial handle-message cfg)
::profile-id profile-id
::session-id session-id)
(yws/upgrade request)
(respond))))))
(defmethod ig/pre-init-spec ::routes [_]
(s/keys :req [::mbus/msgbus
::mtx/metrics
::db/pool
::session/manager]))
(s/def ::routes vector?)
(defmethod ig/init-key ::routes
[_ cfg]
(fn [{:keys [profile-id params] :as req} respond raise]
(let [{:keys [session-id]} (us/conform ::handler-params params)]
(cond
(not profile-id)
(raise (ex/error :type :authentication
:hint "Authentication required."))
(not (yws/upgrade-request? req))
(raise (ex/error :type :validation
:code :websocket-request-expected
:hint "this endpoint only accepts websocket connections"))
:else
(do
(l/trace :hint "websocket request" :profile-id profile-id :session-id session-id)
(->> (ws/handler
::ws/on-rcv-message (partial on-rcv-message cfg)
::ws/on-snd-message (partial on-snd-message cfg)
::ws/on-connect (partial on-connect cfg)
::ws/handler (partial handle-message cfg)
::profile-id profile-id
::session-id session-id)
(yws/upgrade req)
(respond)))))))
["/ws/notifications" {:middleware [[session/authz cfg]]
:handler (partial http-handler cfg)
:allowed-methods #{:get}}])

View file

@ -159,7 +159,7 @@
;; this case we just retry the operation.
(rtry/with-retry {::rtry/when rtry/conflict-exception?
::rtry/max-retries 6
::rtry/label "persist-audit-log-event"}
::rtry/label "persist-audit-log"}
(let [now (dt/now)]
(db/insert! pool :audit-log
(-> params

View file

@ -111,7 +111,7 @@
" where id=?")
err
(:id whook)]
res (db/exec-one! pool sql {:return-keys true})]
res (db/exec-one! pool sql {::db/return-keys? true})]
(when (>= (:error-count res) max-errors)
(db/update! pool :webhook {:is-active false} {:id (:id whook)})))

View file

@ -12,15 +12,24 @@
[app.common.logging :as l]
[app.config :as cf]
[app.db :as-alias db]
[app.http.access-token :as-alias actoken]
[app.http.assets :as-alias http.assets]
[app.http.awsns :as http.awsns]
[app.http.client :as-alias http.client]
[app.http.session :as-alias http.session]
[app.http.debug :as-alias http.debug]
[app.http.session :as-alias session]
[app.http.session.tasks :as-alias session.tasks]
[app.http.websocket :as http.ws]
[app.loggers.audit :as-alias audit]
[app.loggers.audit.tasks :as-alias audit.tasks]
[app.loggers.webhooks :as-alias webhooks]
[app.loggers.zmq :as-alias lzmq]
[app.metrics :as-alias mtx]
[app.metrics.definition :as-alias mdef]
[app.msgbus :as-alias mbus]
[app.redis :as-alias rds]
[app.rpc :as-alias rpc]
[app.rpc.doc :as-alias rpc.doc]
[app.storage :as-alias sto]
[app.util.time :as dt]
[app.worker :as-alias wrk]
@ -180,6 +189,9 @@
::mtx/metrics
{:default default-metrics}
::mtx/routes
{::mtx/metrics (ig/ref ::mtx/metrics)}
:app.migrations/all
{:main (ig/ref :app.migrations/migrations)}
@ -187,7 +199,7 @@
{::rds/uri (cf/get :redis-uri)
::mtx/metrics (ig/ref ::mtx/metrics)}
:app.msgbus/msgbus
::mbus/msgbus
{:backend (cf/get :msgbus-backend :redis)
:executor (ig/ref ::wrk/executor)
:redis (ig/ref ::rds/redis)}
@ -207,16 +219,20 @@
::http.client/client
{::wrk/executor (ig/ref ::wrk/executor)}
:app.http.session/manager
::session/manager
{::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::props (ig/ref :app.setup/props)}
:app.http.session/gc-task
{:pool (ig/ref ::db/pool)
:max-age (cf/get :auth-token-cookie-max-age)}
::actoken/manager
{::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::props (ig/ref :app.setup/props)}
:app.http.awsns/handler
::session.tasks/gc
{::db/pool (ig/ref ::db/pool)}
::http.awsns/routes
{::props (ig/ref :app.setup/props)
::db/pool (ig/ref ::db/pool)
::http.client/client (ig/ref ::http.client/client)
@ -259,50 +275,44 @@
{::http.client/client (ig/ref ::http.client/client)}
::oidc/routes
{::http.client/client (ig/ref ::http.client/client)
::db/pool (ig/ref ::db/pool)
::props (ig/ref :app.setup/props)
::wrk/executor (ig/ref ::wrk/executor)
::oidc/providers {:google (ig/ref ::oidc.providers/google)
:github (ig/ref ::oidc.providers/github)
:gitlab (ig/ref ::oidc.providers/gitlab)
:oidc (ig/ref ::oidc.providers/generic)}
::audit/collector (ig/ref ::audit/collector)
::http.session/session (ig/ref :app.http.session/manager)}
{::http.client/client (ig/ref ::http.client/client)
::db/pool (ig/ref ::db/pool)
::props (ig/ref :app.setup/props)
::wrk/executor (ig/ref ::wrk/executor)
::oidc/providers {:google (ig/ref ::oidc.providers/google)
:github (ig/ref ::oidc.providers/github)
:gitlab (ig/ref ::oidc.providers/gitlab)
:oidc (ig/ref ::oidc.providers/generic)}
::audit/collector (ig/ref ::audit/collector)
::session/manager (ig/ref ::session/manager)}
;; TODO: revisit the dependencies of this service, looks they are too much unused of them
:app.http/router
{:assets (ig/ref :app.http.assets/handlers)
:feedback (ig/ref :app.http.feedback/handler)
:session (ig/ref :app.http.session/manager)
:awsns-handler (ig/ref :app.http.awsns/handler)
:debug-routes (ig/ref :app.http.debug/routes)
:oidc-routes (ig/ref ::oidc/routes)
:ws (ig/ref :app.http.websocket/handler)
:metrics (ig/ref ::mtx/metrics)
:public-uri (cf/get :public-uri)
:storage (ig/ref ::sto/storage)
:rpc-routes (ig/ref :app.rpc/routes)
:doc-routes (ig/ref :app.rpc.doc/routes)
:executor (ig/ref ::wrk/executor)}
{::session/manager (ig/ref ::session/manager)
::actoken/manager (ig/ref ::actoken/manager)
::wrk/executor (ig/ref ::wrk/executor)
::db/pool (ig/ref ::db/pool)
::rpc/routes (ig/ref ::rpc/routes)
::rpc.doc/routes (ig/ref ::rpc.doc/routes)
::props (ig/ref :app.setup/props)
::mtx/routes (ig/ref ::mtx/routes)
::oidc/routes (ig/ref ::oidc/routes)
::http.debug/routes (ig/ref ::http.debug/routes)
::http.assets/routes (ig/ref ::http.assets/routes)
::http.ws/routes (ig/ref ::http.ws/routes)
::http.awsns/routes (ig/ref ::http.awsns/routes)}
:app.http.debug/routes
{:pool (ig/ref ::db/pool)
:executor (ig/ref ::wrk/executor)
:storage (ig/ref ::sto/storage)
:session (ig/ref :app.http.session/manager)
{::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::session/manager (ig/ref ::session/manager)}
::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::sto/storage (ig/ref ::sto/storage)}
:app.http.websocket/routes
{::db/pool (ig/ref ::db/pool)
::mtx/metrics (ig/ref ::mtx/metrics)
::mbus/msgbus (ig/ref :app.msgbus/msgbus)
::session/manager (ig/ref ::session/manager)}
:app.http.websocket/handler
{:pool (ig/ref ::db/pool)
:metrics (ig/ref ::mtx/metrics)
:msgbus (ig/ref :app.msgbus/msgbus)}
:app.http.assets/handlers
:app.http.assets/routes
{:metrics (ig/ref ::mtx/metrics)
:assets-path (cf/get :assets-path)
:storage (ig/ref ::sto/storage)
@ -310,37 +320,32 @@
:cache-max-age (dt/duration {:hours 24})
:signature-max-age (dt/duration {:hours 24 :minutes 5})}
:app.http.feedback/handler
{:pool (ig/ref ::db/pool)
:executor (ig/ref ::wrk/executor)}
:app.rpc/climit
{:metrics (ig/ref ::mtx/metrics)
:executor (ig/ref ::wrk/executor)}
{::mtx/metrics (ig/ref ::mtx/metrics)
::wrk/executor (ig/ref ::wrk/executor)}
:app.rpc/rlimit
{:executor (ig/ref ::wrk/executor)
:scheduled-executor (ig/ref ::wrk/scheduled-executor)}
{::wrk/executor (ig/ref ::wrk/executor)
::wrk/scheduled-executor (ig/ref ::wrk/scheduled-executor)}
:app.rpc/methods
{::audit/collector (ig/ref ::audit/collector)
::http.client/client (ig/ref ::http.client/client)
::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::props (ig/ref :app.setup/props)
::session/manager (ig/ref ::session/manager)
::ldap/provider (ig/ref ::ldap/provider)
::sto/storage (ig/ref ::sto/storage)
::mtx/metrics (ig/ref ::mtx/metrics)
::mbus/msgbus (ig/ref ::mbus/msgbus)
::rds/redis (ig/ref ::rds/redis)
::rpc/climit (ig/ref ::rpc/climit)
::rpc/rlimit (ig/ref ::rpc/rlimit)
::props (ig/ref :app.setup/props)
:pool (ig/ref ::db/pool)
:session (ig/ref :app.http.session/manager)
:sprops (ig/ref :app.setup/props)
:metrics (ig/ref ::mtx/metrics)
:storage (ig/ref ::sto/storage)
:msgbus (ig/ref :app.msgbus/msgbus)
:public-uri (cf/get :public-uri)
:redis (ig/ref ::rds/redis)
:http-client (ig/ref ::http.client/client)
:climit (ig/ref :app.rpc/climit)
:rlimit (ig/ref :app.rpc/rlimit)
:executor (ig/ref ::wrk/executor)
:templates (ig/ref :app.setup/builtin-templates)
}
@ -348,7 +353,12 @@
{:methods (ig/ref :app.rpc/methods)}
:app.rpc/routes
{:methods (ig/ref :app.rpc/methods)}
{::rpc/methods (ig/ref :app.rpc/methods)
::db/pool (ig/ref ::db/pool)
::wrk/executor (ig/ref ::wrk/executor)
::session/manager (ig/ref ::session/manager)
::actoken/manager (ig/ref ::actoken/manager)
::props (ig/ref :app.setup/props)}
::wrk/registry
{:metrics (ig/ref ::mtx/metrics)
@ -361,7 +371,7 @@
:storage-gc-touched (ig/ref ::sto/gc-touched-task)
:tasks-gc (ig/ref :app.tasks.tasks-gc/handler)
:telemetry (ig/ref :app.tasks.telemetry/handler)
:session-gc (ig/ref :app.http.session/gc-task)
:session-gc (ig/ref ::session.tasks/gc)
:audit-log-archive (ig/ref ::audit.tasks/archive)
:audit-log-gc (ig/ref ::audit.tasks/gc)

View file

@ -87,6 +87,7 @@
::definitions definitions
::registry registry}))
(defn- handler
[registry _ respond _]
(let [samples (.metricFamilySamples ^CollectorRegistry registry)
@ -95,6 +96,18 @@
(respond {:headers {"content-type" TextFormat/CONTENT_TYPE_004}
:body (.toString writer)})))
(s/def ::routes vector?)
(defmethod ig/pre-init-spec ::routes [_]
(s/keys :req [::metrics]))
(defmethod ig/init-key ::routes
[_ {:keys [::metrics]}]
(let [registry (::registry metrics)]
["/metrics" {:handler (partial handler registry)
:allowed-methods #{:get}}]))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; Implementation
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View file

@ -302,6 +302,12 @@
{:name "0098-add-quotes-table"
:fn (mg/resource "app/migrations/sql/0098-add-quotes-table.sql")}
{:name "0099-add-access-token-table"
:fn (mg/resource "app/migrations/sql/0099-add-access-token-table.sql")}
{:name "0100-mod-profile-indexes"
:fn (mg/resource "app/migrations/sql/0100-mod-profile-indexes.sql")}
])

View file

@ -0,0 +1,19 @@
DROP TABLE IF EXISTS access_token;
CREATE TABLE access_token (
id uuid NOT NULL DEFAULT uuid_generate_v4() PRIMARY KEY,
profile_id uuid NOT NULL REFERENCES profile(id) ON DELETE CASCADE DEFERRABLE,
created_at timestamptz NOT NULL DEFAULT now(),
updated_at timestamptz NOT NULL DEFAULT now(),
name text NOT NULL,
token text NOT NULL,
perms text[] NULL
);
ALTER TABLE access_token
ALTER COLUMN name SET STORAGE external,
ALTER COLUMN token SET STORAGE external,
ALTER COLUMN perms SET STORAGE external;
CREATE INDEX access_token__profile_id__idx ON access_token(profile_id);

View file

@ -0,0 +1,34 @@
DROP INDEX profile__email__idx;
CREATE INDEX profile__email__idx ON profile(email);
ALTER TABLE profile
ADD COLUMN default_project_id uuid NULL REFERENCES project(id) ON DELETE SET NULL DEFERRABLE,
ADD COLUMN default_team_id uuid NULL REFERENCES team(id) ON DELETE SET NULL DEFERRABLE;
CREATE INDEX profile__default_project__idx ON profile(default_project_id);
CREATE INDEX profile__default_team__idx ON profile(default_team_id);
with profiles as (
select p.id,
tpr.team_id as default_team_id,
ppr.project_id as default_project_id
from profile as p
join team_profile_rel as tpr
on (tpr.profile_id = p.id and
tpr.is_owner is true)
join project_profile_rel as ppr
on (ppr.profile_id = p.id and
ppr.is_owner is true)
join project as pj
on (pj.id = ppr.project_id)
join team as tm
on (tm.id = tpr.team_id)
where pj.is_default is true
and tm.is_default is true
and pj.team_id = tm.id
)
update profile
set default_team_id = p.default_team_id,
default_project_id = p.default_project_id
from profiles as p
where profile.id = p.id;

View file

@ -193,6 +193,7 @@
(defn get-or-connect
[{:keys [::cache] :as state} key options]
(us/assert! ::redis state)
(-> state
(assoc ::connection
(or (get @cache key)
@ -205,7 +206,6 @@
(defn add-listener!
[{:keys [::connection] :as conn} listener]
(us/assert! ::connection-holder conn)
(us/assert! ::pubsub-connection connection)
(us/assert! ::pubsub-listener listener)
(.addListener ^StatefulRedisPubSubConnection @connection
@ -213,10 +213,9 @@
conn)
(defn publish!
[{:keys [::connection] :as conn} topic message]
[{:keys [::connection]} topic message]
(us/assert! ::us/string topic)
(us/assert! ::us/bytes message)
(us/assert! ::connection-holder conn)
(us/assert! ::default-connection connection)
(let [pcomm (.async ^StatefulRedisConnection @connection)]
@ -224,8 +223,7 @@
(defn subscribe!
"Blocking operation, intended to be used on a thread/agent thread."
[{:keys [::connection] :as conn} & topics]
(us/assert! ::connection-holder conn)
[{:keys [::connection]} & topics]
(us/assert! ::pubsub-connection connection)
(try
(let [topics (into-array String (map str topics))
@ -236,8 +234,7 @@
(defn unsubscribe!
"Blocking operation, intended to be used on a thread/agent thread."
[{:keys [::connection] :as conn} & topics]
(us/assert! ::connection-holder conn)
[{:keys [::connection]} & topics]
(us/assert! ::pubsub-connection connection)
(try
(let [topics (into-array String (map str topics))
@ -247,8 +244,8 @@
(throw (InterruptedException. (ex-message cause))))))
(defn rpush!
[{:keys [::connection] :as conn} key payload]
(us/assert! ::connection-holder conn)
[{:keys [::connection]} key payload]
(us/assert! ::default-connection connection)
(us/assert! (or (and (vector? payload)
(every? bytes? payload))
(bytes? payload)))
@ -270,8 +267,8 @@
(throw (InterruptedException. (ex-message cause))))))
(defn blpop!
[{:keys [::connection] :as conn} timeout & keys]
(us/assert! ::connection-holder conn)
[{:keys [::connection]} timeout & keys]
(us/assert! ::default-connection connection)
(try
(let [keys (into-array Object (map str keys))
cmd (.sync ^StatefulRedisConnection @connection)
@ -286,8 +283,7 @@
(throw (InterruptedException. (ex-message cause))))))
(defn open?
[{:keys [::connection] :as conn}]
(us/assert! ::connection-holder conn)
[{:keys [::connection]}]
(us/assert! ::pubsub-connection connection)
(.isOpen ^StatefulConnection @connection))
@ -335,7 +331,7 @@
(defn eval!
[{:keys [::mtx/metrics ::connection] :as state} script]
(us/assert! ::redis state)
(us/assert! ::connection-holder state)
(us/assert! ::default-connection connection)
(us/assert! ::rscript/script script)
(let [cmd (.async ^StatefulRedisConnection @connection)

View file

@ -12,12 +12,15 @@
[app.common.logging :as l]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.http :as-alias http]
[app.http.access-token :as-alias actoken]
[app.http.client :as-alias http.client]
[app.http.session :as-alias http.session]
[app.http.session :as-alias session]
[app.loggers.audit :as audit]
[app.loggers.webhooks :as-alias webhooks]
[app.main :as-alias main]
[app.metrics :as mtx]
[app.msgbus :as-alias mbus]
[app.rpc.climit :as climit]
@ -71,71 +74,77 @@
(defn- rpc-query-handler
"Ring handler that dispatches query requests and convert between
internal async flow into ring async flow."
[methods {:keys [profile-id session-id path-params params] :as request} respond raise]
(let [type (keyword (:type path-params))
data (-> params
(assoc ::request-at (dt/now))
(assoc ::http/request request))
data (if profile-id
(-> data
(assoc :profile-id profile-id)
(assoc ::profile-id profile-id)
(assoc ::session-id session-id))
(dissoc data :profile-id ::profile-id))
method (get methods type default-handler)]
[methods {:keys [params path-params] :as request} respond raise]
(let [type (keyword (:type path-params))
profile-id (or (::session/profile-id request)
(::actoken/profile-id request))
(-> (method data)
(p/then (partial handle-response request))
(p/then respond)
(p/catch (fn [cause]
(let [context {:profile-id profile-id}]
(raise (ex/wrap-with-context cause context))))))))
data (-> params
(assoc ::request-at (dt/now))
(assoc ::http/request request))
data (if profile-id
(-> data
(assoc :profile-id profile-id)
(assoc ::profile-id profile-id))
(dissoc data :profile-id ::profile-id))
method (get methods type default-handler)]
(->> (method data)
(p/mcat (partial handle-response request))
(p/fnly (fn [response cause]
(if cause
(raise (ex/wrap-with-context cause {:profile-id profile-id}))
(respond response)))))))
(defn- rpc-mutation-handler
"Ring handler that dispatches mutation requests and convert between
internal async flow into ring async flow."
[methods {:keys [profile-id session-id path-params params] :as request} respond raise]
(let [type (keyword (:type path-params))
data (-> params
(assoc ::request-at (dt/now))
(assoc ::http/request request))
data (if profile-id
(-> data
(assoc :profile-id profile-id)
(assoc ::profile-id profile-id)
(assoc ::session-id session-id))
(dissoc data :profile-id ::profile-id))
method (get methods type default-handler)]
(-> (method data)
(p/then (partial handle-response request))
(p/then respond)
(p/catch (fn [cause]
(let [context {:profile-id profile-id}]
(raise (ex/wrap-with-context cause context))))))))
[methods {:keys [params path-params] :as request} respond raise]
(let [type (keyword (:type path-params))
profile-id (or (::session/profile-id request)
(::actoken/profile-id request))
data (-> params
(assoc ::request-at (dt/now))
(assoc ::http/request request))
data (if profile-id
(-> data
(assoc :profile-id profile-id)
(assoc ::profile-id profile-id))
(dissoc data :profile-id))
method (get methods type default-handler)]
(->> (method data)
(p/mcat (partial handle-response request))
(p/fnly (fn [response cause]
(if cause
(raise (ex/wrap-with-context cause {:profile-id profile-id}))
(respond response)))))))
(defn- rpc-command-handler
"Ring handler that dispatches cmd requests and convert between
internal async flow into ring async flow."
[methods {:keys [profile-id session-id path-params params] :as request} respond raise]
(let [cmd (keyword (:type path-params))
etag (yrq/get-header request "if-none-match")
[methods {:keys [params path-params] :as request} respond raise]
(let [type (keyword (:type path-params))
etag (yrq/get-header request "if-none-match")
profile-id (or (::session/profile-id request)
(::actoken/profile-id request))
data (-> params
(assoc ::request-at (dt/now))
(assoc ::http/request request)
(assoc ::cond/key etag)
(cond-> (uuid? profile-id)
(-> (assoc ::profile-id profile-id)
(assoc ::session-id session-id))))
data (-> params
(assoc ::request-at (dt/now))
(assoc ::http/request request)
(assoc ::cond/key etag)
(cond-> (uuid? profile-id)
(assoc ::profile-id profile-id)))
method (get methods type default-handler)]
method (get methods cmd default-handler)]
(binding [cond/*enabled* true]
(-> (method data)
(p/then (partial handle-response request))
(p/then respond)
(p/catch (fn [cause]
(let [context {:profile-id profile-id}]
(raise (ex/wrap-with-context cause context)))))))))
(->> (method data)
(p/mcat (partial handle-response request))
(p/fnly (fn [response cause]
(if cause
(raise (ex/wrap-with-context cause {:profile-id profile-id}))
(respond response))))))))
(defn- wrap-metrics
"Wrap service method with metrics measurement."
@ -143,18 +152,46 @@
(let [labels (into-array String [(::sv/name mdata)])]
(fn [cfg params]
(let [tp (dt/tpoint)]
(p/finally
(f cfg params)
(fn [_ _]
(mtx/run! metrics
:id metrics-id
:val (inst-ms (tp))
:labels labels)))))))
(->> (f cfg params)
(p/fnly (fn [_ _]
(mtx/run! metrics
:id metrics-id
:val (inst-ms (tp))
:labels labels))))))))
(defn- wrap-authentication
[_ f {:keys [::auth] :as mdata}]
(fn [cfg params]
(let [profile-id (::profile-id params)]
(if (and auth (not (uuid? profile-id)))
(p/rejected
(ex/error :type :authentication
:code :authentication-required
:hint "authentication required for this endpoint"))
(f cfg params)))))
(defn- wrap-access-token
"Wraps service method with access token validation."
[_ f {:keys [::sv/name] :as mdata}]
(if (contains? cf/flags :access-tokens)
(fn [cfg params]
(let [request (::http/request params)]
(if (contains? request ::actoken/id)
(let [perms (::actoken/perms request #{})]
(if (contains? perms name)
(f cfg params)
(p/rejected
(ex/error :type :authorization
:code :operation-not-allowed
:allowed perms))))
(f cfg params))))
f))
(defn- wrap-dispatch
"Wraps service method into async flow, with the ability to dispatching
it to a preconfigured executor service."
[{:keys [executor] :as cfg} f mdata]
[{:keys [::wrk/executor] :as cfg} f mdata]
(with-meta
(fn [cfg params]
(->> (px/submit! executor (px/wrap-bindings #(f cfg params)))
@ -222,37 +259,34 @@
f))
f))
(defn- wrap-spec-conform
[_ f mdata]
(let [spec (or (::sv/spec mdata) (s/spec any?))]
(fn [cfg params]
(let [params (ex/try! (us/conform spec params))]
(if (ex/exception? params)
(p/rejected params)
(f cfg params))))))
(defn- wrap-all
[cfg f mdata]
(as-> f $
(wrap-dispatch cfg $ mdata)
(wrap-metrics cfg $ mdata)
(cond/wrap cfg $ mdata)
(retry/wrap-retry cfg $ mdata)
(climit/wrap cfg $ mdata)
(rlimit/wrap cfg $ mdata)
(wrap-audit cfg $ mdata)
(wrap-spec-conform cfg $ mdata)
(wrap-authentication cfg $ mdata)
(wrap-access-token cfg $ mdata)))
(defn- wrap
[cfg f mdata]
(let [f (as-> f $
(wrap-dispatch cfg $ mdata)
(cond/wrap cfg $ mdata)
(retry/wrap-retry cfg $ mdata)
(wrap-metrics cfg $ mdata)
(climit/wrap cfg $ mdata)
(rlimit/wrap cfg $ mdata)
(wrap-audit cfg $ mdata))
spec (or (::sv/spec mdata) (s/spec any?))
auth? (::auth mdata true)]
(l/debug :hint "register method" :name (::sv/name mdata))
(with-meta
(fn [params]
;; Raise authentication error when rpc method requires auth but
;; no profile-id is found in the request.
(let [profile-id (if (= "command" (::type cfg))
(::profile-id params)
(:profile-id params))]
(p/do!
(if (and auth? (not (uuid? profile-id)))
(ex/raise :type :authentication
:code :authentication-required
:hint "authentication required for this endpoint")
(let [params (us/conform spec params)]
(f cfg params))))))
mdata)))
(l/debug :hint "register method" :name (::sv/name mdata))
(let [f (wrap-all cfg f mdata)]
(with-meta #(f cfg %) mdata)))
(defn- process-method
[cfg vfn]
@ -263,74 +297,70 @@
(defn- resolve-query-methods
[cfg]
(let [cfg (assoc cfg ::type "query" ::metrics-id :rpc-query-timing)]
(->> (sv/scan-ns 'app.rpc.queries.projects
'app.rpc.queries.files
'app.rpc.queries.teams
'app.rpc.queries.profile
'app.rpc.queries.viewer
'app.rpc.queries.fonts)
(->> (sv/scan-ns
'app.rpc.queries.projects
'app.rpc.queries.files
'app.rpc.queries.teams
'app.rpc.queries.profile
'app.rpc.queries.viewer
'app.rpc.queries.fonts)
(map (partial process-method cfg))
(into {}))))
(defn- resolve-mutation-methods
[cfg]
(let [cfg (assoc cfg ::type "mutation" ::metrics-id :rpc-mutation-timing)]
(->> (sv/scan-ns 'app.rpc.mutations.media
'app.rpc.mutations.profile
'app.rpc.mutations.files
'app.rpc.mutations.projects
'app.rpc.mutations.teams
'app.rpc.mutations.fonts
'app.rpc.mutations.share-link)
(->> (sv/scan-ns
'app.rpc.mutations.media
'app.rpc.mutations.profile
'app.rpc.mutations.files
'app.rpc.mutations.projects
'app.rpc.mutations.teams
'app.rpc.mutations.fonts
'app.rpc.mutations.share-link)
(map (partial process-method cfg))
(into {}))))
(defn- resolve-command-methods
[cfg]
(let [cfg (assoc cfg ::type "command" ::metrics-id :rpc-command-timing)]
(->> (sv/scan-ns 'app.rpc.commands.binfile
'app.rpc.commands.comments
'app.rpc.commands.management
'app.rpc.commands.verify-token
'app.rpc.commands.search
'app.rpc.commands.media
'app.rpc.commands.teams
'app.rpc.commands.auth
'app.rpc.commands.ldap
'app.rpc.commands.demo
'app.rpc.commands.webhooks
'app.rpc.commands.audit
'app.rpc.commands.files
'app.rpc.commands.files.update
'app.rpc.commands.files.create
'app.rpc.commands.files.temp)
(->> (sv/scan-ns
'app.rpc.commands.access-token
'app.rpc.commands.audit
'app.rpc.commands.auth
'app.rpc.commands.binfile
'app.rpc.commands.comments
'app.rpc.commands.demo
'app.rpc.commands.files
'app.rpc.commands.files.create
'app.rpc.commands.files.temp
'app.rpc.commands.files.update
'app.rpc.commands.ldap
'app.rpc.commands.management
'app.rpc.commands.media
'app.rpc.commands.profile
'app.rpc.commands.search
'app.rpc.commands.teams
'app.rpc.commands.verify-token
'app.rpc.commands.webhooks)
(map (partial process-method cfg))
(into {}))))
(s/def ::ldap (s/nilable map?))
(s/def ::msgbus ::mbus/msgbus)
(s/def ::climit (s/nilable ::climit/climit))
(s/def ::rlimit (s/nilable ::rlimit/rlimit))
(s/def ::public-uri ::us/not-empty-string)
(s/def ::sprops map?)
(defmethod ig/pre-init-spec ::methods [_]
(s/keys :req [::audit/collector
::session/manager
::http.client/client
::db/pool
::mbus/msgbus
::ldap/provider
::wrk/executor]
:req-un [::sto/storage
::http.session/session
::sprops
::public-uri
::msgbus
::rlimit
::climit
::wrk/executor
::mtx/metrics
::db/pool]))
::sto/storage
::mtx/metrics
::main/props
::wrk/executor
]
:opt [::climit
::rlimit]
:req-un [::db/pool]))
(defmethod ig/init-key ::methods
[_ cfg]
@ -352,12 +382,20 @@
::queries
::commands]))
(s/def ::routes vector?)
(defmethod ig/pre-init-spec ::routes [_]
(s/keys :req-un [::methods]))
(s/keys :req [::methods
::db/pool
::main/props
::wrk/executor
::session/manager
::actoken/manager]))
(defmethod ig/init-key ::routes
[_ {:keys [methods] :as cfg}]
[["/rpc"
[_ {:keys [::methods] :as cfg}]
[["/rpc" {:middleware [[session/authz cfg]
[actoken/authz cfg]]}
["/command/:type" {:handler (partial rpc-command-handler (:commands methods))}]
["/query/:type" {:handler (partial rpc-query-handler (:queries methods))}]
["/mutation/:type" {:handler (partial rpc-mutation-handler (:mutations methods))

View file

@ -46,7 +46,7 @@
(p/rejected
(ex/error :type :internal
:code :concurrency-limit-reached
:queue (-> limiter meta :bkey name)
:queue (-> limiter meta ::bkey name)
:cause cause))
(some? cause)
@ -56,7 +56,7 @@
(p/resolved result))))))
(defn- create-limiter
[{:keys [executor metrics concurrency queue-size bkey skey]}]
[{:keys [::wrk/executor ::mtx/metrics ::bkey ::skey concurrency queue-size]}]
(let [labels (into-array String [(name bkey)])
on-queue (fn [instance]
(l/trace :hint "enqueued"
@ -100,10 +100,10 @@
:on-run on-run}]
(-> (pxb/create options)
(vary-meta assoc :bkey bkey :skey skey))))
(vary-meta assoc ::bkey bkey ::skey skey))))
(defn- create-cache
[{:keys [executor] :as params} config]
[{:keys [::wrk/executor] :as params} config]
(let [listener (reify RemovalListener
(onRemoval [_ key _val cause]
(l/trace :hint "cache: remove" :key key :reason (str cause))))
@ -113,8 +113,8 @@
(let [[bkey skey] key]
(when-let [config (get config bkey)]
(-> (merge params config)
(assoc :bkey bkey)
(assoc :skey skey)
(assoc ::bkey bkey)
(assoc ::skey skey)
(create-limiter))))))]
(.. (Caffeine/newBuilder)
@ -134,14 +134,16 @@
(defmethod ig/prep-key ::rpc/climit
[_ cfg]
(merge {:path (cf/get :rpc-climit-config)}
(merge {::path (cf/get :rpc-climit-config)}
(d/without-nils cfg)))
(s/def ::path ::fs/path)
(defmethod ig/pre-init-spec ::rpc/climit [_]
(s/keys :req-un [::wrk/executor ::mtx/metrics ::fs/path]))
(s/keys :req [::wrk/executor ::mtx/metrics ::path]))
(defmethod ig/init-key ::rpc/climit
[_ {:keys [path] :as params}]
[_ {:keys [::path] :as params}]
(when (contains? cf/flags :rpc-climit)
(if-let [config (some->> path slurp edn/read-string)]
(do
@ -163,7 +165,8 @@
(l/warn :hint "unable to load configuration" :config (str path)))))
(s/def ::climit #(satisfies? IConcurrencyManager %))
(s/def ::rpc/climit
(s/nilable #(satisfies? IConcurrencyManager %)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; PUBLIC API
@ -176,7 +179,7 @@
(p/wrap (do ~@body))))
(defn wrap
[{:keys [climit]} f {:keys [::queue ::key-fn] :as mdata}]
[{:keys [::rpc/climit]} f {:keys [::queue ::key-fn] :as mdata}]
(if (and (some? climit)
(some? queue))
(if-let [config (get @climit queue)]
@ -192,7 +195,6 @@
(let [key [queue (key-fn params)]
lim (get climit key)]
(invoke! lim (partial f cfg params))))
(let [lim (get climit queue)]
(fn [cfg params]
(invoke! lim (partial f cfg params))))))

View file

@ -0,0 +1,64 @@
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;;
;; Copyright (c) KALEIDOS INC
(ns app.rpc.commands.access-token
(:require
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.db :as db]
[app.main :as-alias main]
[app.rpc :as-alias rpc]
[app.rpc.doc :as-alias doc]
[app.rpc.quotes :as quotes]
[app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]))
(defn- create-access-token
[{:keys [::conn ::main/props]} profile-id name perms]
(let [created-at (dt/now)
token-id (uuid/next)
token (tokens/generate props {:iss "access-token"
:tid token-id
:iat created-at})]
(db/insert! conn :access-token
{:id token-id
:name name
:token token
:profile-id profile-id
:created-at created-at
:updated-at created-at
:perms (db/create-array conn "text" perms)})))
(defn repl-create-access-token
[{:keys [::db/pool] :as system} profile-id name perms]
(db/with-atomic [conn pool]
(let [props (:app.setup/props system)]
(create-access-token {::conn conn ::main/props props}
profile-id
name
perms))))
(s/def ::name ::us/not-empty-string)
(s/def ::perms ::us/set-of-strings)
(s/def ::create-access-token
(s/keys :req [::rpc/profile-id]
:req-un [::name ::perms]))
(sv/defmethod ::create-access-token
{::doc/added "1.18"}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id name perms]}]
(db/with-atomic [conn pool]
(let [cfg (assoc cfg ::conn conn)]
(quotes/check-quote! conn
{::quotes/id ::quotes/access-tokens-per-profile
::quotes/profile-id profile-id})
(create-access-token cfg profile-id name perms))))

View file

@ -42,7 +42,7 @@
:profile-id :ip-addr :props :context])
(defn- handle-events
[{:keys [::db/pool]} {:keys [::rpc/profile-id events ::http/request] :as params}]
[{:keys [::db/pool]} {:keys [::rpc/profile-id events ::http/request]}]
(let [ip-addr (audit/parse-client-ip request)
xform (comp
(map #(assoc % :profile-id profile-id))

View file

@ -69,7 +69,7 @@
;; ---- COMMAND: login with password
(defn login-with-password
[{:keys [::db/pool session] :as cfg} {:keys [email password] :as params}]
[{:keys [::db/pool] :as cfg} {:keys [email password] :as params}]
(when-not (or (contains? cf/flags :login)
(contains? cf/flags :login-with-password))
@ -105,11 +105,10 @@
profile)]
(db/with-atomic [conn pool]
(let [profile (->> (profile/retrieve-profile-data-by-email conn email)
(let [profile (->> (profile/get-profile-by-email conn email)
(validate-profile)
(profile/strip-private-attrs)
(profile/populate-additional-data conn)
(profile/decode-profile-row))
(profile/decode-row)
(profile/strip-private-attrs))
invitation (when-let [token (:invitation-token params)]
(tokens/verify (::main/props cfg) {:token token :iss :team-invitation}))
@ -122,14 +121,13 @@
(assoc profile :is-admin (let [admins (cf/get :admins)]
(contains? admins (:email profile)))))]
(-> response
(rph/with-transform (session/create-fn session (:id profile)))
(rph/with-transform (session/create-fn cfg (:id profile)))
(rph/with-meta {::audit/props (audit/profile->props profile)
::audit/profile-id (:id profile)}))))))
(s/def ::scope ::us/string)
(s/def ::login-with-password
(s/keys :req-un [::email ::password]
:opt-un [::invitation-token ::scope]))
:opt-un [::invitation-token]))
(sv/defmethod ::login-with-password
"Performs authentication using penpot password."
@ -148,8 +146,8 @@
"Clears the authentication cookie and logout the current session."
{::rpc/auth false
::doc/added "1.15"}
[{:keys [session] :as cfg} _]
(rph/with-transform {} (session/delete-fn session)))
[cfg _]
(rph/with-transform {} (session/delete-fn cfg)))
;; ---- COMMAND: Recover Profile
@ -226,7 +224,7 @@
(validate-register-attempt! cfg params)
(let [profile (when-let [profile (profile/retrieve-profile-data-by-email pool (:email params))]
(let [profile (when-let [profile (profile/get-profile-by-email pool (:email params))]
(cond
(:is-blocked profile)
(ex/raise :type :restriction
@ -267,10 +265,11 @@
;; ---- COMMAND: Register Profile
(defn create-profile
(defn create-profile!
"Create the profile entry on the database with limited set of input
attrs (all the other attrs are filled with default values)."
[conn params]
[conn {:keys [email] :as params}]
(us/assert! ::us/email email)
(let [id (or (:id params) (uuid/next))
props (-> (audit/extract-utm-params params)
(merge (:props params))
@ -291,7 +290,7 @@
is-demo (:is-demo params false)
is-muted (:is-muted params false)
is-active (:is-active params false)
email (str/lower (:email params))
email (str/lower email)
params {:id id
:fullname (:fullname params)
@ -306,7 +305,7 @@
:is-demo is-demo}]
(try
(-> (db/insert! conn :profile params)
(profile/decode-profile-row))
(profile/decode-row))
(catch org.postgresql.util.PSQLException e
(let [state (.getSQLState e)]
(if (not= state "23505")
@ -315,15 +314,17 @@
:code :email-already-exists
:cause e)))))))
(defn create-profile-relations
[conn profile]
(let [team (teams/create-team conn {:profile-id (:id profile)
(defn create-profile-rels!
[conn {:keys [id] :as profile}]
(let [team (teams/create-team conn {:profile-id id
:name "Default"
:is-default true})]
(-> profile
(profile/strip-private-attrs)
(assoc :default-team-id (:id team))
(assoc :default-project-id (:default-project-id team)))))
(-> (db/update! conn :profile
{:default-team-id (:id team)
:default-project-id (:default-project-id team)}
{:id id})
(profile/decode-row))))
(defn send-email-verification!
[conn props profile]
@ -347,22 +348,18 @@
:extra-data ptoken})))
(defn register-profile
[{:keys [conn session] :as cfg} {:keys [token] :as params}]
[{:keys [conn] :as cfg} {:keys [token] :as params}]
(let [claims (tokens/verify (::main/props cfg) {:token token :iss :prepared-register})
params (merge params claims)
is-active (or (:is-active params)
(not (contains? cf/flags :email-verification))
;; DEPRECATED: v1.15
(contains? cf/flags :insecure-register))
(not (contains? cf/flags :email-verification)))
profile (if-let [profile-id (:profile-id claims)]
(profile/retrieve-profile conn profile-id)
(->> (assoc params :is-active is-active)
(create-profile conn)
(create-profile-relations conn)
(profile/decode-profile-row)))
(profile/get-profile conn profile-id)
(->> (create-profile! conn (assoc params :is-active is-active))
(create-profile-rels! conn)))
invitation (when-let [token (:invitation-token params)]
(tokens/verify (::main/props cfg) {:token token :iss :team-invitation}))]
@ -389,7 +386,7 @@
token (tokens/generate (::main/props cfg) claims)
resp {:invitation-token token}]
(-> resp
(rph/with-transform (session/create-fn session (:id profile)))
(rph/with-transform (session/create-fn cfg (:id profile)))
(rph/with-meta {::audit/replace-props (audit/profile->props profile)
::audit/profile-id (:id profile)})))
@ -398,7 +395,7 @@
;; we need to mark this session as logged.
(not= "penpot" (:auth-backend profile))
(-> (profile/strip-private-attrs profile)
(rph/with-transform (session/create-fn session (:id profile)))
(rph/with-transform (session/create-fn cfg (:id profile)))
(rph/with-meta {::audit/replace-props (audit/profile->props profile)
::audit/profile-id (:id profile)}))
@ -406,7 +403,7 @@
;; to sign in the user directly, without email verification.
(true? is-active)
(-> (profile/strip-private-attrs profile)
(rph/with-transform (session/create-fn session (:id profile)))
(rph/with-transform (session/create-fn cfg (:id profile)))
(rph/with-meta {::audit/replace-props (audit/profile->props profile)
::audit/profile-id (:id profile)}))
@ -448,7 +445,7 @@
:exp (dt/in-future {:days 30})})]
(eml/send! {::eml/conn conn
::eml/factory eml/password-recovery
:public-uri (:public-uri cfg)
:public-uri (cf/get :public-uri)
:to (:email profile)
:token (:token profile)
:name (:fullname profile)
@ -456,7 +453,7 @@
nil))]
(db/with-atomic [conn pool]
(when-let [profile (profile/retrieve-profile-data-by-email conn email)]
(when-let [profile (profile/get-profile-by-email conn email)]
(when-not (eml/allow-send-emails? conn profile)
(ex/raise :type :validation
:code :profile-is-muted

View file

@ -436,9 +436,8 @@
(s/def ::embed-assets? (s/nilable ::us/boolean))
(s/def ::write-export-options
(s/keys :req-un [::db/pool ::sto/storage]
:req [::output ::file-ids]
:opt [::include-libraries? ::embed-assets?]))
(s/keys :req [::db/pool ::sto/storage ::output ::file-ids]
:opt [::include-libraries? ::embed-assets?]))
(defn write-export!
"Do the exportation of a specified file in custom penpot binary
@ -555,9 +554,8 @@
(s/def ::ignore-index-errors? (s/nilable ::us/boolean))
(s/def ::read-import-options
(s/keys :req-un [::db/pool ::sto/storage]
:req [::project-id ::input]
:opt [::overwrite? ::migrate? ::ignore-index-errors?]))
(s/keys :req [::db/pool ::sto/storage ::project-id ::input]
:opt [::overwrite? ::migrate? ::ignore-index-errors?]))
(defn read-import!
"Do the importation of the specified resource in penpot custom binary
@ -580,7 +578,7 @@
(read-import (assoc options ::version version ::timestamp timestamp))))
(defmethod read-import :v1
[{:keys [pool ::input] :as options}]
[{:keys [::db/pool ::input] :as options}]
(with-open [input (zstd-input-stream input)]
(with-open [input (io/data-input-stream input)]
(db/with-atomic [conn pool]
@ -673,7 +671,7 @@
(db/insert! conn :file-library-rel rel)))))
(defmethod read-section :v1/sobjects
[{:keys [storage conn ::input ::overwrite?]}]
[{:keys [::sto/storage conn ::input ::overwrite?]}]
(let [storage (media/configure-assets-storage storage)
ids (read-obj! input)]
@ -871,13 +869,14 @@
(s/def ::embed-assets? ::us/boolean)
(s/def ::export-binfile
(s/keys :req [::rpc/profile-id] :req-un [::file-id ::include-libraries? ::embed-assets?]))
(s/keys :req [::rpc/profile-id]
:req-un [::file-id ::include-libraries? ::embed-assets?]))
(sv/defmethod ::export-binfile
"Export a penpot file in a binary format."
{::doc/added "1.15"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}]
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id include-libraries? embed-assets?] :as params}]
(files/check-read-permissions! pool profile-id file-id)
(let [body (reify yrs/StreamableResponseBody
(-write-body-to-stream [_ _ output-stream]
@ -892,7 +891,8 @@
(s/def ::file ::media/upload)
(s/def ::import-binfile
(s/keys :req [::rpc/profile-id] :req-un [::project-id ::file]))
(s/keys :req [::rpc/profile-id]
:req-un [::project-id ::file]))
(sv/defmethod ::import-binfile
"Import a penpot file in a binary format."

View file

@ -54,8 +54,8 @@
:hint "file not found"))))
(defn- get-comment-thread
[conn thread-id & {:keys [for-update?]}]
(-> (db/get-by-id conn :comment-thread thread-id {:for-update for-update?})
[conn thread-id & {:as opts}]
(-> (db/get-by-id conn :comment-thread thread-id opts)
(decode-row)))
(defn- get-comment
@ -374,7 +374,7 @@
{::doc/added "1.15"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id id share-id] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [file-id] :as thread} (get-comment-thread conn id :for-update? true)]
(let [{:keys [file-id] :as thread} (get-comment-thread conn id ::db/for-update? true)]
(files/check-comment-permissions! conn profile-id file-id share-id)
(upsert-comment-thread-status! conn profile-id id))))
@ -391,7 +391,7 @@
{::doc/added "1.15"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id id is-resolved share-id] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [file-id] :as thread} (get-comment-thread conn id :for-update? true)]
(let [{:keys [file-id] :as thread} (get-comment-thread conn id ::db/for-update? true)]
(files/check-comment-permissions! conn profile-id file-id share-id)
(db/update! conn :comment-thread
{:is-resolved is-resolved}
@ -414,7 +414,7 @@
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id ::rpc/request-at thread-id share-id content] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [file-id page-id] :as thread} (get-comment-thread conn thread-id :for-update? true)
(let [{:keys [file-id page-id] :as thread} (get-comment-thread conn thread-id ::db/for-update? true)
{:keys [team-id project-id page-name] :as file} (get-file conn file-id page-id)]
(files/check-comment-permissions! conn profile-id (:id file) share-id)
@ -467,8 +467,8 @@
{::doc/added "1.15"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id ::rpc/request-at id share-id content] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [thread-id] :as comment} (get-comment conn id :for-update? true)
{:keys [file-id page-id owner-id] :as thread} (get-comment-thread conn thread-id :for-update? true)]
(let [{:keys [thread-id] :as comment} (get-comment conn id ::db/for-update? true)
{:keys [file-id page-id owner-id] :as thread} (get-comment-thread conn thread-id ::db/for-update? true)]
(files/check-comment-permissions! conn profile-id file-id share-id)
@ -500,7 +500,7 @@
{::doc/added "1.15"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id id share-id] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [owner-id file-id] :as thread} (get-comment-thread conn id :for-update? true)]
(let [{:keys [owner-id file-id] :as thread} (get-comment-thread conn id ::db/for-update? true)]
(files/check-comment-permissions! conn profile-id file-id share-id)
(when-not (= owner-id profile-id)
(ex/raise :type :validation
@ -520,7 +520,7 @@
{::doc/added "1.15"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id id share-id] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [owner-id thread-id] :as comment} (get-comment conn id :for-update? true)
(let [{:keys [owner-id thread-id] :as comment} (get-comment conn id ::db/for-update? true)
{:keys [file-id] :as thread} (get-comment-thread conn thread-id)]
(files/check-comment-permissions! conn profile-id file-id share-id)
(when-not (= owner-id profile-id)
@ -540,7 +540,7 @@
{::doc/added "1.15"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id id position frame-id share-id] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [file-id] :as thread} (get-comment-thread conn id :for-update? true)]
(let [{:keys [file-id] :as thread} (get-comment-thread conn id ::db/for-update? true)]
(files/check-comment-permissions! conn profile-id file-id share-id)
(db/update! conn :comment-thread
{:modified-at (::rpc/request-at params)
@ -560,7 +560,7 @@
{::doc/added "1.15"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id id frame-id share-id] :as params}]
(db/with-atomic [conn pool]
(let [{:keys [file-id] :as thread} (get-comment-thread conn id :for-update? true)]
(let [{:keys [file-id] :as thread} (get-comment-thread conn id ::db/for-update? true)]
(files/check-comment-permissions! conn profile-id file-id share-id)
(db/update! conn :comment-thread
{:modified-at (::rpc/request-at params)

View file

@ -8,12 +8,11 @@
"A demo specific mutations."
(:require
[app.common.exceptions :as ex]
[app.common.uuid :as uuid]
[app.config :as cf]
[app.db :as db]
[app.loggers.audit :as audit]
[app.rpc :as-alias rpc]
[app.rpc.commands.auth :as cmd.auth]
[app.rpc.commands.auth :as auth]
[app.rpc.doc :as-alias doc]
[app.util.services :as sv]
[app.util.time :as dt]
@ -31,31 +30,30 @@
::doc/added "1.15"
::doc/changes ["1.15" "This method is migrated from mutations to commands."]}
[{:keys [pool] :as cfg} _]
(let [id (uuid/next)
sem (System/currentTimeMillis)
(when-not (contains? cf/flags :demo-users)
(ex/raise :type :validation
:code :demo-users-not-allowed
:hint "Demo users are disabled by config."))
(let [sem (System/currentTimeMillis)
email (str "demo-" sem ".demo@example.com")
fullname (str "Demo User " sem)
password (-> (bn/random-bytes 16)
(bc/bytes->b64u)
(bc/bytes->str))
params {:id id
:email email
params {:email email
:fullname fullname
:is-active true
:deleted-at (dt/in-future cf/deletion-delay)
:password password
:props {}
}]
(when-not (contains? cf/flags :demo-users)
(ex/raise :type :validation
:code :demo-users-not-allowed
:hint "Demo users are disabled by config."))
:props {}}]
(db/with-atomic [conn pool]
(->> (cmd.auth/create-profile conn params)
(cmd.auth/create-profile-relations conn))
(with-meta {:email email
:password password}
{::audit/profile-id id}))))
(let [profile (->> (auth/create-profile! conn params)
(auth/create-profile-rels! conn))]
(with-meta {:email email
:password password}
{::audit/profile-id (:id profile)})))))

View file

@ -189,7 +189,7 @@
(let [row (db/get conn :file-data-fragment
{:id id :file-id file-id}
{:columns [:content]
:check-deleted? false})]
::db/check-deleted? false})]
(blob/decode (:content row))))
(defn persist-pointers!
@ -811,7 +811,7 @@
(let [ldata (-> library decode-row pmg/migrate-file :data)]
(->> (db/query conn :file-library-rel {:library-file-id id})
(map :file-id)
(keep #(db/get-by-id conn :file % {:check-deleted? false}))
(keep #(db/get-by-id conn :file % ::db/check-deleted? false))
(map decode-row)
(map pmg/migrate-file)
(run! (fn [{:keys [id data revn] :as file}]

View file

@ -45,7 +45,7 @@
;; --- MUTATION COMMAND: update-temp-file
(defn update-temp-file
[conn {:keys [::rpc/profile-id session-id id revn changes] :as params}]
[conn {:keys [profile-id session-id id revn changes] :as params}]
(db/insert! conn :file-change
{:id (uuid/next)
:session-id session-id
@ -57,16 +57,17 @@
:changes (blob/encode changes)}))
(s/def ::update-temp-file
(s/keys :req-un [::files.update/changes
(s/keys :req [::rpc/profile-id]
:req-un [::files.update/changes
::files.update/revn
::files.update/session-id
::files/id]))
(sv/defmethod ::update-temp-file
{::doc/added "1.17"}
[{:keys [pool] :as cfg} params]
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id] :as params}]
(db/with-atomic [conn pool]
(update-temp-file conn params)
(update-temp-file conn (assoc params :profile-id profile-id))
nil))
;; --- MUTATION COMMAND: persist-temp-file

View file

@ -145,7 +145,7 @@
(l/trace :hint "update-file" :time (dt/format-duration elapsed))))))))
(defn update-file
[{:keys [conn metrics] :as cfg} {:keys [profile-id id changes changes-with-metadata] :as params}]
[{:keys [conn ::mtx/metrics] :as cfg} {:keys [profile-id id changes changes-with-metadata] :as params}]
(let [file (get-file conn id)
features (->> (concat (:features file)
(:features params))
@ -275,7 +275,7 @@
(defn- send-notifications!
[{:keys [conn] :as cfg} {:keys [file changes session-id] :as params}]
(let [lchanges (filter library-change? changes)
msgbus (:msgbus cfg)]
msgbus (::mbus/msgbus cfg)]
;; Asynchronously publish message to the msgbus
(mbus/pub! msgbus

View file

@ -14,7 +14,7 @@
[app.loggers.audit :as-alias audit]
[app.main :as-alias main]
[app.rpc :as-alias rpc]
[app.rpc.commands.auth :as cmd.auth]
[app.rpc.commands.auth :as auth]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.rpc.queries.profile :as profile]
@ -39,7 +39,7 @@
is properly configured and enabled with `login-with-ldap` flag."
{::rpc/auth false
::doc/added "1.15"}
[{:keys [::main/props ::ldap/provider session] :as cfg} params]
[{:keys [::main/props ::ldap/provider] :as cfg} params]
(when-not provider
(ex/raise :type :restriction
:code :ldap-not-initialized
@ -67,12 +67,12 @@
:member-email (:email profile))
token (tokens/generate props claims)]
(-> {:invitation-token token}
(rph/with-transform (session/create-fn session (:id profile)))
(rph/with-transform (session/create-fn cfg (:id profile)))
(rph/with-meta {::audit/props (:props profile)
::audit/profile-id (:id profile)})))
(-> profile
(rph/with-transform (session/create-fn session (:id profile)))
(rph/with-transform (session/create-fn cfg (:id profile)))
(rph/with-meta {::audit/props (:props profile)
::audit/profile-id (:id profile)}))))))
@ -80,11 +80,10 @@
[{:keys [pool] :as cfg} info]
(db/with-atomic [conn pool]
(or (some->> (:email info)
(profile/retrieve-profile-data-by-email conn)
(profile/populate-additional-data conn)
(profile/decode-profile-row))
(profile/get-profile-by-email conn)
(profile/decode-row))
(->> (assoc info :is-active true :is-demo false)
(cmd.auth/create-profile conn)
(cmd.auth/create-profile-relations conn)
(auth/create-profile! conn)
(auth/create-profile-rels! conn)
(profile/strip-private-attrs)))))

View file

@ -23,6 +23,7 @@
[app.storage.tmp :as tmp]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[datoteka.io :as io]
@ -66,8 +67,8 @@
(sv/defmethod ::upload-file-media-object
{::doc/added "1.17"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)]
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id content] :as params}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(files/check-edition-permissions! pool profile-id file-id)
(media/validate-media-type! content)
(validate-content-size! content)
@ -110,7 +111,7 @@
;; inverse, soft referential integrity).
(defn create-file-media-object
[{:keys [storage pool climit executor]}
[{:keys [::sto/storage ::db/pool climit ::wrk/executor]}
{:keys [id file-id is-local name content]}]
(letfn [;; Function responsible to retrieve the file information, as
;; it is synchronous operation it should be wrapped into
@ -186,8 +187,8 @@
(sv/defmethod ::create-file-media-object-from-url
{::doc/added "1.17"}
[{:keys [pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)]
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(files/check-edition-permissions! pool profile-id file-id)
(create-file-media-object-from-url cfg params)))

View file

@ -28,6 +28,7 @@
[app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[promesa.core :as p]
@ -114,8 +115,8 @@
(defn retrieve-teams
[conn profile-id]
(let [defaults (profile/retrieve-additional-data conn profile-id)]
(->> (db/exec! conn [sql:teams (:default-team-id defaults) profile-id])
(let [profile (profile/get-profile conn profile-id)]
(->> (db/exec! conn [sql:teams (:default-team-id profile) profile-id])
(mapv process-permissions))))
;; --- Query: Team (by ID)
@ -134,14 +135,15 @@
(defn retrieve-team
[conn profile-id team-id]
(let [defaults (profile/retrieve-additional-data conn profile-id)
sql (str "WITH teams AS (" sql:teams ") SELECT * FROM teams WHERE id=?")
result (db/exec-one! conn [sql (:default-team-id defaults) profile-id team-id])]
(let [profile (profile/get-profile conn profile-id)
sql (str "WITH teams AS (" sql:teams ") SELECT * FROM teams WHERE id=?")
result (db/exec-one! conn [sql (:default-team-id profile) profile-id team-id])]
(when-not result
(ex/raise :type :not-found
:code :team-does-not-exist))
(process-permissions result)))
(process-permissions result)))
;; --- Query: Team Members
@ -583,11 +585,11 @@
[cfg {:keys [::rpc/profile-id file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
(let [cfg (update cfg :storage media/configure-assets-storage)]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(update-team-photo cfg (assoc params :profile-id profile-id))))
(defn update-team-photo
[{:keys [pool storage executor] :as cfg} {:keys [profile-id team-id] :as params}]
[{:keys [::db/pool ::sto/storage ::wrk/executor] :as cfg} {:keys [profile-id team-id] :as params}]
(p/let [team (px/with-dispatch executor
(retrieve-team pool profile-id team-id))
photo (upload-photo cfg params)]
@ -605,7 +607,7 @@
(assoc team :photo-id (:id photo))))
(defn upload-photo
[{:keys [storage executor climit] :as cfg} {:keys [file]}]
[{:keys [::sto/storage ::wrk/executor climit] :as cfg} {:keys [file]}]
(letfn [(get-info [content]
(climit/with-dispatch (:process-image climit)
(media/run {:cmd :info :input content})))
@ -663,7 +665,7 @@
(defn- create-invitation
[{:keys [::conn] :as cfg} {:keys [team profile role email] :as params}]
(let [member (profile/retrieve-profile-data-by-email conn email)
(let [member (profile/get-profile-by-email conn email)
expire (dt/in-future "168h") ;; 7 days
itoken (create-invitation-token cfg {:profile-id (:id profile)
:valid-until expire
@ -838,7 +840,7 @@
{:team-id team-id
:email-to (str/lower email)})
(update :role keyword))
member (profile/retrieve-profile-data-by-email pool (:email invit))
member (profile/get-profile-by-email pool (:email invit))
token (create-invitation-token cfg {:team-id (:team-id invit)
:profile-id profile-id
:valid-until (:valid-until invit)

View file

@ -11,6 +11,7 @@
[app.db :as db]
[app.http.session :as session]
[app.loggers.audit :as audit]
[app.main :as-alias main]
[app.rpc :as-alias rpc]
[app.rpc.commands.teams :as teams]
[app.rpc.doc :as-alias doc]
@ -34,15 +35,15 @@
(sv/defmethod ::verify-token
{::rpc/auth false
::doc/added "1.15"}
[{:keys [pool sprops] :as cfg} {:keys [token] :as params}]
[{:keys [pool] :as cfg} {:keys [token] :as params}]
(db/with-atomic [conn pool]
(let [claims (tokens/verify sprops {:token token})
(let [claims (tokens/verify (::main/props cfg) {:token token})
cfg (assoc cfg :conn conn)]
(process-token cfg params claims))))
(defmethod process-token :change-email
[{:keys [conn] :as cfg} _params {:keys [profile-id email] :as claims}]
(when (profile/retrieve-profile-data-by-email conn email)
(when (profile/get-profile-by-email conn email)
(ex/raise :type :validation
:code :email-already-exists))
@ -56,8 +57,8 @@
::audit/profile-id profile-id}))
(defmethod process-token :verify-email
[{:keys [conn session] :as cfg} _ {:keys [profile-id] :as claims}]
(let [profile (profile/retrieve-profile conn profile-id)
[{:keys [conn] :as cfg} _ {:keys [profile-id] :as claims}]
(let [profile (profile/get-profile conn profile-id)
claims (assoc claims :profile profile)]
(when-not (:is-active profile)
@ -71,14 +72,14 @@
{:id (:id profile)}))
(-> claims
(rph/with-transform (session/create-fn session profile-id))
(rph/with-transform (session/create-fn cfg profile-id))
(rph/with-meta {::audit/name "verify-profile-email"
::audit/props (audit/profile->props profile)
::audit/profile-id (:id profile)}))))
(defmethod process-token :auth
[{:keys [conn] :as cfg} _params {:keys [profile-id] :as claims}]
(let [profile (profile/retrieve-profile conn profile-id)]
(let [profile (profile/get-profile conn profile-id)]
(assoc claims :profile profile)))
;; --- Team Invitation
@ -133,7 +134,7 @@
:opt-un [::spec.team-invitation/member-id]))
(defmethod process-token :team-invitation
[{:keys [conn session] :as cfg}
[{:keys [conn] :as cfg}
{:keys [::rpc/profile-id token]}
{:keys [member-id team-id member-email] :as claims}]
@ -179,7 +180,7 @@
{:columns [:id :email]})]
(let [profile (accept-invitation cfg claims invitation member)]
(-> (assoc claims :state :created)
(rph/with-transform (session/create-fn session (:id profile)))
(rph/with-transform (session/create-fn cfg (:id profile)))
(rph/with-meta {::audit/name "accept-team-invitation"
::audit/props (merge
(audit/profile->props profile)

View file

@ -70,6 +70,8 @@
(respond (yrs/response 404)))))
(s/def ::routes vector?)
(defmethod ig/pre-init-spec ::routes [_]
(s/keys :req-un [::rpc/methods]))

View file

@ -22,6 +22,7 @@
[app.storage :as sto]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[promesa.core :as p]
[promesa.exec :as px]))
@ -48,7 +49,7 @@
{::doc/added "1.3"
::webhooks/event? true}
[{:keys [pool] :as cfg} {:keys [team-id profile-id] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(teams/check-edition-permissions! pool profile-id team-id)
(quotes/check-quote! pool {::quotes/id ::quotes/font-variants-per-team
::quotes/profile-id profile-id
@ -56,7 +57,7 @@
(create-font-variant cfg params)))
(defn create-font-variant
[{:keys [storage pool executor climit] :as cfg} {:keys [data] :as params}]
[{:keys [::sto/storage ::db/pool ::wrk/executor climit] :as cfg} {:keys [data] :as params}]
(letfn [(generate-fonts [data]
(climit/with-dispatch (:process-font climit)
(media/run {:cmd :generate-fonts :input data})))

View file

@ -11,6 +11,7 @@
[app.rpc.commands.files :as files]
[app.rpc.commands.media :as cmd.media]
[app.rpc.doc :as-alias doc]
[app.storage :as-alias sto]
[app.util.services :as sv]
[clojure.spec.alpha :as s]))
@ -22,7 +23,7 @@
{::doc/added "1.2"
::doc/deprecated "1.17"}
[{:keys [pool] :as cfg} {:keys [profile-id file-id content] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(files/check-edition-permissions! pool profile-id file-id)
(media/validate-media-type! content)
(cmd.media/validate-content-size! content)
@ -36,7 +37,7 @@
{::doc/added "1.3"
::doc/deprecated "1.17"}
[{:keys [pool] :as cfg} {:keys [profile-id file-id] :as params}]
(let [cfg (update cfg :storage media/configure-assets-storage)]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(files/check-edition-permissions! pool profile-id file-id)
(#'cmd.media/create-file-media-object-from-url cfg params)))

View file

@ -15,6 +15,7 @@
[app.emails :as eml]
[app.http.session :as session]
[app.loggers.audit :as audit]
[app.main :as-alias main]
[app.media :as media]
[app.rpc :as-alias rpc]
[app.rpc.climit :as-alias climit]
@ -27,6 +28,7 @@
[app.tokens :as tokens]
[app.util.services :as sv]
[app.util.time :as dt]
[app.worker :as-alias wrk]
[clojure.spec.alpha :as s]
[cuerdas.core :as str]
[promesa.core :as p]
@ -51,13 +53,13 @@
(sv/defmethod ::update-profile
{::doc/added "1.0"}
[{:keys [pool] :as cfg} {:keys [profile-id fullname lang theme] :as params}]
[{:keys [::db/pool] :as cfg} {:keys [profile-id fullname lang theme] :as params}]
(db/with-atomic [conn pool]
;; NOTE: we need to retrieve the profile independently if we use
;; it or not for explicit locking and avoid concurrent updates of
;; the same row/object.
(let [profile (-> (db/get-by-id conn :profile profile-id {:for-update true})
(profile/decode-profile-row))
(let [profile (-> (db/get-by-id conn :profile profile-id ::db/for-update? true)
(profile/decode-row))
;; Update the profile map with direct params
profile (-> profile
@ -90,7 +92,7 @@
(sv/defmethod ::update-profile-password
{::climit/queue :auth}
[{:keys [pool] :as cfg} {:keys [password] :as params}]
[{:keys [::db/pool] :as cfg} {:keys [password] :as params}]
(db/with-atomic [conn pool]
(let [profile (validate-password! conn params)
session-id (::rpc/session-id params)]
@ -135,11 +137,11 @@
[cfg {:keys [file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
(let [cfg (update cfg :storage media/configure-assets-storage)]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(update-profile-photo cfg params)))
(defn update-profile-photo
[{:keys [pool storage executor] :as cfg} {:keys [profile-id file] :as params}]
[{:keys [::db/pool ::sto/storage ::wrk/executor] :as cfg} {:keys [profile-id file] :as params}]
(p/let [profile (px/with-dispatch executor
(db/get-by-id pool :profile profile-id))
photo (teams/upload-photo cfg params)]
@ -169,7 +171,7 @@
(s/keys :req-un [::email]))
(sv/defmethod ::request-email-change
[{:keys [pool] :as cfg} {:keys [profile-id email] :as params}]
[{:keys [::db/pool] :as cfg} {:keys [profile-id email] :as params}]
(db/with-atomic [conn pool]
(let [profile (db/get-by-id conn :profile profile-id)
cfg (assoc cfg :conn conn)
@ -190,13 +192,13 @@
{:changed true})
(defn- request-email-change
[{:keys [conn sprops] :as cfg} {:keys [profile email] :as params}]
(let [token (tokens/generate sprops
[{:keys [conn] :as cfg} {:keys [profile email] :as params}]
(let [token (tokens/generate (::main/props cfg)
{:iss :change-email
:exp (dt/in-future "15m")
:profile-id (:id profile)
:email email})
ptoken (tokens/generate sprops
ptoken (tokens/generate (::main/props cfg)
{:iss :profile-identity
:profile-id (:id profile)
:exp (dt/in-future {:days 30})})]
@ -216,7 +218,7 @@
(eml/send! {::eml/conn conn
::eml/factory eml/change-email
:public-uri (:public-uri cfg)
:public-uri (cf/get :public-uri)
:to (:email profile)
:name (:fullname profile)
:pending-email email
@ -225,11 +227,6 @@
nil))
(defn select-profile-for-update
[conn id]
(db/get-by-id conn :profile id {:for-update true}))
;; --- MUTATION: Update Profile Props
(s/def ::props map?)
@ -237,9 +234,9 @@
(s/keys :req-un [::profile-id ::props]))
(sv/defmethod ::update-profile-props
[{:keys [pool] :as cfg} {:keys [profile-id props]}]
[{:keys [::db/pool] :as cfg} {:keys [profile-id props]}]
(db/with-atomic [conn pool]
(let [profile (profile/retrieve-profile-data conn profile-id)
(let [profile (profile/get-profile conn profile-id ::db/for-update? true)
props (reduce-kv (fn [props k v]
;; We don't accept namespaced keys
(if (simple-ident? k)
@ -254,7 +251,7 @@
{:props (db/tjson props)}
{:id profile-id})
(profile/filter-profile-props props))))
(profile/filter-props props))))
;; --- MUTATION: Delete Profile
@ -267,7 +264,7 @@
(s/keys :req-un [::profile-id]))
(sv/defmethod ::delete-profile
[{:keys [pool session] :as cfg} {:keys [profile-id] :as params}]
[{:keys [::db/pool] :as cfg} {:keys [profile-id] :as params}]
(db/with-atomic [conn pool]
(let [teams (get-owned-teams-with-participants conn profile-id)
deleted-at (dt/now)]
@ -290,7 +287,7 @@
{:deleted-at deleted-at}
{:id profile-id})
(rph/with-transform {} (session/delete-fn session)))))
(rph/with-transform {} (session/delete-fn cfg)))))
(def sql:owned-teams
"with owner_teams as (

View file

@ -15,6 +15,7 @@
[app.rpc.commands.teams :as cmd.teams]
[app.rpc.doc :as-alias doc]
[app.rpc.helpers :as rph]
[app.storage :as-alias sto]
[app.util.services :as sv]
[app.util.time :as dt]
[clojure.spec.alpha :as s]
@ -126,7 +127,7 @@
[cfg {:keys [file] :as params}]
;; Validate incoming mime type
(media/validate-media-type! file #{"image/jpeg" "image/png" "image/webp"})
(let [cfg (update cfg :storage media/configure-assets-storage)]
(let [cfg (update cfg ::sto/storage media/configure-assets-storage)]
(cmd.teams/update-team-photo cfg params)))
;; --- Mutation: Invite Member

View file

@ -6,7 +6,6 @@
(ns app.rpc.queries.profile
(:require
[app.common.exceptions :as ex]
[app.common.spec :as us]
[app.common.uuid :as uuid]
[app.db :as db]
@ -17,8 +16,6 @@
;; --- Helpers & Specs
(declare strip-private-attrs)
(s/def ::email ::us/email)
(s/def ::fullname ::us/string)
(s/def ::old-password ::us/string)
@ -30,73 +27,32 @@
;; --- Query: Profile (own)
(declare retrieve-profile)
(declare retrieve-additional-data)
(declare decode-row)
(declare get-profile)
(declare strip-private-attrs)
(declare filter-props)
(s/def ::profile
(s/keys :opt-un [::profile-id]))
(sv/defmethod ::profile
{::rpc/auth false}
[{:keys [pool] :as cfg} {:keys [profile-id] :as params}]
[{:keys [::db/pool] :as cfg} {:keys [profile-id]}]
;; We need to return the anonymous profile object in two cases, when
;; no profile-id is in session, and when db call raises not found. In all other
;; cases we need to reraise the exception.
(or (ex/try*
#(some->> profile-id (retrieve-profile pool))
#(when (not= :not-found (:type (ex-data %))) (throw %)))
{:id uuid/zero
:fullname "Anonymous User"}))
(try
(-> (get-profile pool profile-id)
(strip-private-attrs)
(update :props filter-props))
(catch Throwable _
{:id uuid/zero :fullname "Anonymous User"})))
(def ^:private sql:default-profile-team
"select t.id, name
from team as t
inner join team_profile_rel as tp on (tp.team_id = t.id)
where tp.profile_id = ?
and tp.is_owner is true
and t.is_default is true")
(def ^:private sql:default-profile-project
"select p.id, name
from project as p
inner join project_profile_rel as tp on (tp.project_id = p.id)
where tp.profile_id = ?
and tp.is_owner is true
and p.is_default is true
and p.team_id = ?")
(defn retrieve-additional-data
[conn id]
(let [team (db/exec-one! conn [sql:default-profile-team id])
project (db/exec-one! conn [sql:default-profile-project id (:id team)])]
{:default-team-id (:id team)
:default-project-id (:id project)}))
(defn populate-additional-data
[conn profile]
(merge profile (retrieve-additional-data conn (:id profile))))
(defn filter-profile-props
[props]
(into {} (filter (fn [[k _]] (simple-ident? k))) props))
(defn decode-profile-row
[{:keys [props] :as row}]
(cond-> row
(db/pgobject? props "jsonb")
(assoc :props (db/decode-transit-pgobject props))))
(defn retrieve-profile-data
[conn id]
(-> (db/get-by-id conn :profile id)
(decode-profile-row)))
(defn retrieve-profile
[conn id]
(let [profile (->> (retrieve-profile-data conn id)
(strip-private-attrs)
(populate-additional-data conn))]
(update profile :props filter-profile-props)))
(defn get-profile
"Get profile by id. Throws not-found exception if no profile found."
[conn id & {:as attrs}]
(-> (db/get-by-id conn :profile id attrs)
(decode-row)))
(def ^:private sql:profile-by-email
"select p.* from profile as p
@ -104,14 +60,27 @@
and (p.deleted_at is null or
p.deleted_at > now())")
(defn retrieve-profile-data-by-email
(defn get-profile-by-email
"Returns a profile looked up by email or `nil` if not match found."
[conn email]
(ex/ignoring
(db/exec-one! conn [sql:profile-by-email (str/lower email)])))
(->> (db/exec! conn [sql:profile-by-email (str/lower email)])
(map decode-row)
(first)))
;; --- Attrs Helpers
;; --- HELPERS
(defn strip-private-attrs
"Only selects a publicly visible profile attrs."
[row]
(dissoc row :password :deleted-at))
(defn filter-props
"Removes all namespace qualified props from `props` attr."
[props]
(into {} (filter (fn [[k _]] (simple-ident? k))) props))
(defn decode-row
[{:keys [props] :as row}]
(cond-> row
(db/pgobject? props "jsonb")
(assoc :props (db/decode-transit-pgobject props))))

View file

@ -16,8 +16,7 @@
(defn retrieve-share-link
[conn file-id share-id]
(some-> (db/get-by-params conn :share-link
{:id share-id :file-id file-id}
{:check-not-found false})
(some-> (db/get* conn :share-link
{:id share-id :file-id file-id})
(decode-share-link-row)))

View file

@ -160,6 +160,28 @@
(assoc ::count-sql [sql:get-teams-per-profile profile-id])
(generic-check!)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; QUOTE: ACCESS-TOKENS-PER-PROFILE
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
(def ^:private sql:get-access-tokens-per-profile
"select count(*) as total
from access_token
where profile_id = ?")
(s/def ::access-tokens-per-profile
(s/keys :req [::profile-id ::target]))
(defmethod check-quote ::access-tokens-per-profile
[{:keys [::profile-id ::target] :as quote}]
(us/assert! ::access-tokens-per-profile quote)
(-> quote
(assoc ::default (cf/get :quotes-access-tokens-per-profile Integer/MAX_VALUE))
(assoc ::quote-sql [sql:get-quotes-1 target profile-id])
(assoc ::count-sql [sql:get-access-tokens-per-profile profile-id])
(generic-check!)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; QUOTE: PROJECTS-PER-TEAM
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -279,7 +301,6 @@
(assoc ::count-sql [sql:get-files-per-project project-id])
(generic-check!)))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; QUOTE: COMMENT-THREADS-PER-FILE
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;

View file

@ -52,7 +52,7 @@
[app.config :as cf]
[app.http :as-alias http]
[app.loggers.audit :refer [parse-client-ip]]
[app.redis :as redis]
[app.redis :as rds]
[app.redis.script :as-alias rscript]
[app.rpc :as-alias rpc]
[app.rpc.rlimit.result :as-alias lresult]
@ -71,7 +71,7 @@
(dt/duration 400))
(def ^:private default-options
{:codec redis/string-codec
{:codec rds/string-codec
:timeout default-timeout})
(def ^:private bucket-rate-limit-script
@ -141,23 +141,23 @@
(let [script (-> bucket-rate-limit-script
(assoc ::rscript/keys [(str key "." service "." user-id)])
(assoc ::rscript/vals (conj params (dt/->seconds now))))]
(-> (redis/eval! redis script)
(p/then (fn [result]
(let [allowed? (boolean (nth result 0))
remaining (nth result 1)
reset (* (/ (inst-ms interval) rate)
(- capacity remaining))]
(l/trace :hint "limit processed"
:service service
:limit (name (::name limit))
:strategy (name (::strategy limit))
:opts (::opts limit)
(->> (rds/eval! redis script)
(p/fmap (fn [result]
(let [allowed? (boolean (nth result 0))
remaining (nth result 1)
reset (* (/ (inst-ms interval) rate)
(- capacity remaining))]
(l/trace :hint "limit processed"
:service service
:limit (name (::name limit))
:strategy (name (::strategy limit))
:opts (::opts limit)
:allowed? allowed?
:remaining remaining)
(-> limit
(assoc ::lresult/allowed? allowed?)
(assoc ::lresult/reset (dt/plus now reset))
(assoc ::lresult/remaining remaining))))))))
(-> limit
(assoc ::lresult/allowed? allowed?)
(assoc ::lresult/reset (dt/plus now reset))
(assoc ::lresult/remaining remaining))))))))
(defmethod process-limit :window
[redis user-id now {:keys [::nreq ::unit ::key ::service] :as limit}]
@ -166,94 +166,113 @@
script (-> window-rate-limit-script
(assoc ::rscript/keys [(str key "." service "." user-id "." (dt/format-instant ts))])
(assoc ::rscript/vals [nreq (dt/->seconds ttl)]))]
(-> (redis/eval! redis script)
(p/then (fn [result]
(let [allowed? (boolean (nth result 0))
remaining (nth result 1)]
(l/trace :hint "limit processed"
:service service
:limit (name (::name limit))
:strategy (name (::strategy limit))
:opts (::opts limit)
:allowed? allowed?
(->> (rds/eval! redis script)
(p/fmap (fn [result]
(let [allowed? (boolean (nth result 0))
remaining (nth result 1)]
(l/trace :hint "limit processed"
:service service
:limit (name (::name limit))
:strategy (name (::strategy limit))
:opts (::opts limit)
:allowed? allowed?
:remaining remaining)
(-> limit
(assoc ::lresult/allowed? allowed?)
(assoc ::lresult/remaining remaining)
(assoc ::lresult/reset (dt/plus ts {unit 1})))))))))
(-> limit
(assoc ::lresult/allowed? allowed?)
(assoc ::lresult/remaining remaining)
(assoc ::lresult/reset (dt/plus ts {unit 1})))))))))
(defn- process-limits
(defn- process-limits!
[redis user-id limits now]
(-> (p/all (map (partial process-limit redis user-id now) limits))
(p/then (fn [results]
(let [remaining (->> results
(d/index-by ::name ::lresult/remaining)
(uri/map->query-string))
reset (->> results
(d/index-by ::name (comp dt/->seconds ::lresult/reset))
(uri/map->query-string))
rejected (->> results
(filter (complement ::lresult/allowed?))
(first))]
(->> (p/all (map (partial process-limit redis user-id now) limits))
(p/fmap (fn [results]
(let [remaining (->> results
(d/index-by ::name ::lresult/remaining)
(uri/map->query-string))
reset (->> results
(d/index-by ::name (comp dt/->seconds ::lresult/reset))
(uri/map->query-string))
rejected (->> results
(filter (complement ::lresult/allowed?))
(first))]
(when rejected
(l/warn :hint "rejected rate limit"
:user-id (str user-id)
:limit-service (-> rejected ::service name)
:limit-name (-> rejected ::name name)
:limit-strategy (-> rejected ::strategy name)))
(when rejected
(l/warn :hint "rejected rate limit"
:user-id (str user-id)
:limit-service (-> rejected ::service name)
:limit-name (-> rejected ::name name)
:limit-strategy (-> rejected ::strategy name)))
{:enabled? true
:allowed? (not (some? rejected))
:headers {"x-rate-limit-remaining" remaining
"x-rate-limit-reset" reset}})))))
{:enabled? true
:allowed? (not (some? rejected))
:headers {"x-rate-limit-remaining" remaining
"x-rate-limit-reset" reset}})))))
(defn- handle-response
[f cfg params result]
(if (:enabled? result)
(let [headers (:headers result)]
(when-not (:allowed? result)
(ex/raise :type :rate-limit
:code :request-blocked
:hint "rate limit reached"
::http/headers headers))
(-> (f cfg params)
(p/then (fn [response]
(vary-meta response update ::http/headers merge headers)))))
(if (:allowed? result)
(->> (f cfg params)
(p/fmap (fn [response]
(vary-meta response update ::http/headers merge headers))))
(p/rejected
(ex/error :type :rate-limit
:code :request-blocked
:hint "rate limit reached"
::http/headers headers))))
(f cfg params)))
(defn- get-limits
[state skey sname]
(some->> (or (get-in @state [::limits skey])
(get-in @state [::limits :default]))
(map #(assoc % ::service sname))
(seq)))
(defn- get-uid
[{:keys [::http/request] :as params}]
(or (::rpc/profile-id params)
(some-> request parse-client-ip)
uuid/zero))
(defn wrap
[{:keys [rlimit redis] :as cfg} f mdata]
[{:keys [::rpc/rlimit ::rds/redis] :as cfg} f mdata]
(us/assert! ::rpc/rlimit rlimit)
(us/assert! ::rds/redis redis)
(if rlimit
(let [skey (keyword (::rpc/type cfg) (->> mdata ::sv/spec name))
sname (str (::rpc/type cfg) "." (->> mdata ::sv/spec name))]
(fn [cfg {:keys [::http/request] :as params}]
(let [uid (or (:profile-id params)
(some-> request parse-client-ip)
uuid/zero)
rsp (when (and uid @enabled?)
(when-let [limits (or (get-in @rlimit [::limits skey])
(get-in @rlimit [::limits :default]))]
(let [redis (redis/get-or-connect redis ::rlimit default-options)
limits (map #(assoc % ::service sname) limits)
resp (-> (process-limits redis uid limits (dt/now))
(p/catch (fn [cause]
;; If we have an error on processing the rate-limit we just skip
;; it for do not cause service interruption because of redis
;; downtime or similar situation.
(l/error :hint "error on processing rate-limit" :cause cause)
{:enabled? false})))]
(fn [cfg params]
(if @enabled?
(try
(let [uid (get-uid params)
rsp (when-let [limits (get-limits rlimit skey sname)]
(let [redis (rds/get-or-connect redis ::rpc/rlimit default-options)
rsp (->> (process-limits! redis uid limits (dt/now))
(p/merr (fn [cause]
;; If we have an error on processing the rate-limit we just skip
;; it for do not cause service interruption because of redis
;; downtime or similar situation.
(l/error :hint "error on processing rate-limit" :cause cause)
(p/resolved {:enabled? false}))))]
;; If soft rate are enabled, we process the rate-limit but return unprotected
;; response.
(if (contains? cf/flags :soft-rpc-rlimit)
(p/resolved {:enabled? false})
resp))))
;; If soft rate are enabled, we process the rate-limit but return unprotected
;; response.
(if (contains? cf/flags :soft-rpc-rlimit)
{:enabled? false}
rsp)))]
rsp (or rsp (p/resolved {:enabled? false}))]
(->> (p/promise rsp)
(p/fmap #(or % {:enabled? false}))
(p/mcat #(handle-response f cfg params %))))
(p/then rsp (partial handle-response f cfg params)))))
(catch Throwable cause
(p/rejected cause)))
(f cfg params))))
f))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
@ -289,8 +308,9 @@
(s/keys :req [::nreq
::unit]))))
(s/def ::rlimit
#(instance? clojure.lang.Agent %))
(s/def ::rpc/rlimit
(s/nilable
#(instance? clojure.lang.Agent %)))
(s/def ::config
(s/map-of (s/or :kw keyword? :set set?)
@ -332,7 +352,7 @@
::limits limits}))))
(defn- refresh-config
[{:keys [state path executor scheduled-executor] :as params}]
[{:keys [::state ::path ::wrk/executor ::wrk/scheduled-executor] :as cfg}]
(letfn [(update-config [{:keys [::updated-at] :as state}]
(let [updated-at' (fs/last-modified-time path)]
(merge state
@ -349,7 +369,7 @@
(schedule-next [state]
(px/schedule! scheduled-executor
(inst-ms (::refresh state))
(partial refresh-config params))
(partial refresh-config cfg))
state)]
(send-via executor state update-config)
@ -371,10 +391,11 @@
(and (fs/exists? path) (fs/regular-file? path) path)))
(defmethod ig/pre-init-spec :app.rpc/rlimit [_]
(s/keys :req-un [::wrk/executor ::wrk/scheduled-executor]))
(s/keys :req [::wrk/executor
::wrk/scheduled-executor]))
(defmethod ig/init-key ::rpc/rlimit
[_ {:keys [executor] :as params}]
[_ {:keys [::wrk/executor] :as cfg}]
(when (contains? cf/flags :rpc-rlimit)
(let [state (agent {})]
(set-error-handler! state on-refresh-error)
@ -387,6 +408,6 @@
(send-via executor state (constantly {::refresh (dt/duration "5s")}))
;; Force a refresh
(refresh-config (assoc params :path path :state state)))
(refresh-config (assoc cfg ::path path ::state state)))
state)))

View file

@ -70,7 +70,7 @@
[system & {:keys [update-fn id save? migrate? inc-revn?]
:or {save? false migrate? true inc-revn? true}}]
(db/with-atomic [conn (:app.db/pool system)]
(let [file (-> (db/get-by-id conn :file id {:for-update true})
(let [file (-> (db/get-by-id conn :file id {::db/for-update? true})
(update :features db/decode-pgarray #{}))]
(binding [*conn* conn
pmap/*tracked* (atom {})

View file

@ -71,7 +71,7 @@
(let [sprops (:app.setup/props system)
pool (:app.db/pool system)
profile (profile/retrieve-profile-data-by-email pool email)]
profile (profile/get-profile-by-email pool email)]
(cmd.auth/send-email-verification! pool sprops profile)
:email-sent))
@ -81,10 +81,9 @@
associated with the profile-id."
[system email]
(db/with-atomic [conn (:app.db/pool system)]
(when-let [profile (db/get-by-params conn :profile
{:email (str/lower email)}
{:columns [:id :email]
:check-not-found false})]
(when-let [profile (db/get* conn :profile
{:email (str/lower email)}
{:columns [:id :email]})]
(when-not (:is-blocked profile)
(db/update! conn :profile {:is-active true} {:id (:id profile)})
:activated))))
@ -94,10 +93,9 @@
associated with the profile-id."
[system email]
(db/with-atomic [conn (:app.db/pool system)]
(when-let [profile (db/get-by-params conn :profile
{:email (str/lower email)}
{:columns [:id :email]
:check-not-found false})]
(when-let [profile (db/get* conn :profile
{:email (str/lower email)}
{:columns [:id :email]})]
(when-not (:is-blocked profile)
(db/update! conn :profile {:is-blocked true} {:id (:id profile)})
(db/delete! conn :http-session {:profile-id (:id profile)})

View file

@ -188,7 +188,7 @@
res (db/update! (or conn pool) :storage-object
{:touched-at (dt/now)}
{:id id}
{:return-keys false})]
{::db/return-keys? false})]
(pos? (:next.jdbc/update-count res)))))
(defn get-object-data
@ -247,7 +247,7 @@
res (db/update! (or conn pool) :storage-object
{:deleted-at (dt/now)}
{:id id}
{:return-keys false})]
{::db/return-keys? false})]
(pos? (:next.jdbc/update-count res)))))
(dm/export impl/resolve-backend)

View file

@ -29,6 +29,6 @@
(throw cause#))))]
(if (= ::retry result#)
(do
(l/warn :hint "retrying operation" :label ~label)
(l/warn :hint "retrying operation" :label ~label :retry tnum#)
(recur (inc tnum#)))
result#))))

View file

@ -60,7 +60,7 @@
(assert (fn? on-snd-message) "'on-snd-message' should be a function")
(assert (fn? on-connect) "'on-connect' should be a function")
(fn [{:keys [::yws/channel session-id] :as request}]
(fn [{:keys [::yws/channel] :as request}]
(let [input-ch (a/chan input-buff-size)
output-ch (a/chan output-buff-size)
hbeat-ch (a/chan (a/sliding-buffer 6))
@ -81,7 +81,6 @@
::stop-ch stop-ch
::channel channel
::remote-addr ip-addr
::http-session-id session-id
::user-agent uagent})
(atom))

View file

@ -45,7 +45,7 @@
(defmethod ig/init-key ::executor
[skey {:keys [::parallelism]}]
(let [prefix (if (vector? skey) (-> skey first name keyword) :default)
(let [prefix (if (vector? skey) (-> skey first name keyword) "default")
tname (str "penpot/" prefix "/%s")
factory (px/forkjoin-thread-factory :name tname)]
(px/forkjoin-executor

View file

@ -6,12 +6,12 @@
(ns backend-tests.bounce-handling-test
(:require
[backend-tests.helpers :as th]
[app.db :as db]
[app.emails :as emails]
[app.http.awsns :as awsns]
[app.tokens :as tokens]
[app.util.time :as dt]
[backend-tests.helpers :as th]
[clojure.pprint :refer [pprint]]
[clojure.test :as t]
[mockery.core :refer [with-mocks]]))

View file

@ -16,8 +16,10 @@
[app.config :as cf]
[app.db :as db]
[app.main :as main]
[app.media :as-alias mtx]
[app.media]
[app.migrations]
[app.msgbus :as-alias mbus]
[app.rpc :as-alias rpc]
[app.rpc.commands.auth :as cmd.auth]
[app.rpc.commands.files :as files]
@ -64,52 +66,50 @@
(defn state-init
[next]
(let [templates [{:id "test"
:name "test"
:file-uri "test"
:thumbnail-uri "test"
:path (-> "backend_tests/test_files/template.penpot" io/resource fs/path)}]
system (-> (merge main/system-config main/worker-config)
(assoc-in [:app.redis/redis :app.redis/uri] (:redis-uri config))
(assoc-in [:app.db/pool :uri] (:database-uri config))
(assoc-in [:app.db/pool :username] (:database-username config))
(assoc-in [:app.db/pool :password] (:database-password config))
(assoc-in [:app.rpc/methods :templates] templates)
(dissoc :app.srepl/server
:app.http/server
:app.http/router
:app.http.awsns/handler
:app.http.session/updater
:app.auth.oidc/google-provider
:app.auth.oidc/gitlab-provider
:app.auth.oidc/github-provider
:app.auth.oidc/generic-provider
:app.setup/builtin-templates
:app.auth.oidc/routes
:app.worker/executors-monitor
:app.http.oauth/handler
:app.notifications/handler
:app.loggers.sentry/reporter
:app.loggers.mattermost/reporter
:app.loggers.loki/reporter
:app.loggers.database/reporter
:app.loggers.zmq/receiver
:app.worker/cron
:app.worker/worker))
_ (ig/load-namespaces system)
system (-> (ig/prep system)
(ig/init))]
(try
(binding [*system* system
*pool* (:app.db/pool system)]
(with-redefs [app.config/flags (flags/parse flags/default default-flags (:flags config))
app.config/config config
app.loggers.audit/submit! (constantly nil)
app.auth/derive-password identity
app.auth/verify-password (fn [a b] {:valid (= a b)})]
(next)))
(finally
(ig/halt! system)))))
(with-redefs [app.config/flags (flags/parse flags/default default-flags)
app.config/config config
app.loggers.audit/submit! (constantly nil)
app.auth/derive-password identity
app.auth/verify-password (fn [a b] {:valid (= a b)})]
(let [templates [{:id "test"
:name "test"
:file-uri "test"
:thumbnail-uri "test"
:path (-> "backend_tests/test_files/template.penpot" io/resource fs/path)}]
system (-> (merge main/system-config main/worker-config)
(assoc-in [:app.redis/redis :app.redis/uri] (:redis-uri config))
(assoc-in [:app.db/pool :uri] (:database-uri config))
(assoc-in [:app.db/pool :username] (:database-username config))
(assoc-in [:app.db/pool :password] (:database-password config))
(assoc-in [:app.rpc/methods :templates] templates)
(dissoc :app.srepl/server
:app.http/server
:app.http/router
:app.auth.oidc/google-provider
:app.auth.oidc/gitlab-provider
:app.auth.oidc/github-provider
:app.auth.oidc/generic-provider
:app.setup/builtin-templates
:app.auth.oidc/routes
:app.worker/executors-monitor
:app.http.oauth/handler
:app.notifications/handler
:app.loggers.mattermost/reporter
:app.loggers.loki/reporter
:app.loggers.database/reporter
:app.loggers.zmq/receiver
:app.worker/cron
:app.worker/worker))
_ (ig/load-namespaces system)
system (-> (ig/prep system)
(ig/init))]
(try
(binding [*system* system
*pool* (:app.db/pool system)]
(next))
(finally
(ig/halt! system))))))
(defn database-reset
[next]
@ -163,8 +163,8 @@
params)]
(with-open [conn (db/open pool)]
(->> params
(cmd.auth/create-profile conn)
(cmd.auth/create-profile-relations conn))))))
(cmd.auth/create-profile! conn)
(cmd.auth/create-profile-rels! conn))))))
(defn create-project*
([i params] (create-project* *pool* i params))
@ -274,12 +274,10 @@
([pool {:keys [file-id changes session-id profile-id revn]
:or {session-id (uuid/next) revn 0}}]
(with-open [conn (db/open pool)]
(let [msgbus (:app.msgbus/msgbus *system*)
metrics (:app.metrics/metrics *system*)
features #{"components/v2"}]
(files.update/update-file {:conn conn
:msgbus msgbus
:metrics metrics}
(let [features #{"components/v2"}
cfg (-> (select-keys *system* [::mbus/msgbus ::mtx/metrics])
(assoc :conn conn))]
(files.update/update-file cfg
{:id file-id
:revn revn
:features features

View file

@ -652,7 +652,9 @@
;; check that the unknown frame thumbnail is deleted
(let [res (th/db-exec! ["select * from file_object_thumbnail"])]
(t/is (= 1 (count res)))
(t/is (= "new-data" (get-in res [0 :data])))))))
(t/is (= "new-data" (get-in res [0 :data])))))
))
(t/deftest file-thumbnail-ops

View file

@ -150,7 +150,7 @@
(let [row (th/db-get :team
{:id (:default-team-id prof)}
{:check-deleted? false})]
{::db/remove-deleted? false})]
(t/is (dt/instant? (:deleted-at row))))
;; query profile after delete

View file

@ -59,3 +59,10 @@
(.putLong buf (.getMostSignificantBits o))
(.putLong buf (.getLeastSignificantBits o))
(.array buf))))
#?(:clj
(defn from-bytes
[^bytes o]
(let [buf (ByteBuffer/wrap o)]
(UUID. ^long (.getLong buf)
^long (.getLong buf)))))