0
Fork 0
mirror of https://github.com/penpot/penpot.git synced 2025-02-21 06:16:28 -05:00

♻️ Remove deprecated with-atomic and refactor tx-run!

This commit is contained in:
Andrey Antukh 2025-02-20 10:58:19 +01:00
parent 60530a80d9
commit cc2fada83c
21 changed files with 436 additions and 467 deletions

View file

@ -22,7 +22,8 @@
[clojure.set :as set]
[integrant.core :as ig]
[next.jdbc :as jdbc]
[next.jdbc.date-time :as jdbc-dt])
[next.jdbc.date-time :as jdbc-dt]
[next.jdbc.transaction])
(:import
com.zaxxer.hikari.HikariConfig
com.zaxxer.hikari.HikariDataSource
@ -223,16 +224,6 @@
(let [^OutputStream os (.getOutputStream ^LargeObject lobj)]
(io/make-output-stream os opts))))
(defmacro with-atomic
[& args]
(if (symbol? (first args))
(let [cfgs (first args)
body (rest args)]
`(jdbc/with-transaction [conn# (::pool ~cfgs)]
(let [~cfgs (assoc ~cfgs ::conn conn#)]
~@body)))
`(jdbc/with-transaction ~@args)))
(defn open
[system-or-pool]
(if (pool? system-or-pool)
@ -535,43 +526,31 @@
(l/trc :hint "explicit rollback requested (savepoint)")
(.rollback conn sp))))
(defn transact!
"A lower-level function for executing function in a transaction"
([transactable f] (transact! transactable f {}))
([transactable f opts]
(binding [next.jdbc.transaction/*nested-tx* :ignore]
(jdbc/transact transactable f opts))))
(defn tx-run!
"Run a function in a transaction."
[system f & params]
(cond
(connection? system)
(if (connection? system)
(tx-run! {::conn system} f)
(pool? system)
(tx-run! {::pool system} f)
(::conn system)
(let [conn (::conn system)
sp (savepoint conn)]
(try
(let [system' (-> system
(assoc ::savepoint sp)
(dissoc ::rollback))
result (apply f system' params)]
(if (::rollback system)
(rollback! conn sp)
(release! conn sp))
result)
(catch Throwable cause
(.rollback ^Connection conn ^Savepoint sp)
(throw cause))))
(::pool system)
(with-atomic [conn (::pool system)]
(let [system' (-> system
(assoc ::conn conn)
(dissoc ::rollback))
result (apply f system' params)]
(when (::rollback system)
(rollback! conn))
result))
:else
(throw (IllegalArgumentException. "invalid system/cfg provided"))))
(if (pool? system)
(tx-run! {::pool system} f)
(if-let [conn (or (::conn system)
(::pool system))]
(transact! conn
(fn [conn]
(let [system' (-> system
(dissoc ::rollback)
(assoc ::conn conn))]
(apply f system' params)))
{:rollback-only (::rollback system)
:read-only (::read-only system)})
(throw (IllegalArgumentException. "invalid system/cfg provided"))))))
(defn run!
[system f & params]

View file

@ -337,16 +337,17 @@
or (updated_at is null and
created_at < now() - ?::interval)")
(defmethod ig/init-key ::tasks/gc
[_ {:keys [::db/pool ::tasks/max-age] :as cfg}]
(l/debug :hint "initializing session gc task" :max-age max-age)
(fn [_]
(db/with-atomic [conn pool]
(let [interval (db/interval max-age)
result (db/exec-one! conn [sql:delete-expired interval interval])
result (:next.jdbc/update-count result)]
(l/debug :task "gc"
:hint "clean http sessions"
:deleted result)
result))))
(defn- collect-expired-tasks
[{:keys [::db/conn ::tasks/max-age]}]
(let [interval (db/interval max-age)
result (db/exec-one! conn [sql:delete-expired interval interval])
result (:next.jdbc/update-count result)]
(l/debug :task "gc"
:hint "clean http sessions"
:deleted result)
result))
(defmethod ig/init-key ::tasks/gc
[_ {:keys [::tasks/max-age] :as cfg}]
(l/debug :hint "initializing session gc task" :max-age max-age)
(fn [_] (db/tx-run! cfg collect-expired-tasks)))

View file

@ -43,13 +43,8 @@
(decode-row token)))
(defn repl:create-access-token
[{:keys [::db/pool] :as system} profile-id name expiration]
(db/with-atomic [conn pool]
(let [props (:app.setup/props system)]
(create-access-token {::db/conn conn ::setup/props props}
profile-id
name
expiration))))
[cfg profile-id name expiration]
(db/tx-run! cfg create-access-token profile-id name expiration))
(def ^:private schema:create-access-token
[:map {:title "create-access-token"}

View file

@ -149,7 +149,7 @@
;; ---- COMMAND: Recover Profile
(defn recover-profile
[{:keys [::db/pool] :as cfg} {:keys [token password]}]
[{:keys [::db/conn] :as cfg} {:keys [token password]}]
(letfn [(validate-token [token]
(let [tdata (tokens/verify (::setup/props cfg) {:token token :iss :password-recovery})]
(:profile-id tdata)))
@ -159,10 +159,10 @@
(db/update! conn :profile {:password pwd :is-active true} {:id profile-id})
nil))]
(db/with-atomic [conn pool]
(->> (validate-token token)
(update-password conn))
nil)))
(->> (validate-token token)
(update-password conn))
nil))
(def schema:recover-profile
[:map {:title "recover-profile"}
@ -173,7 +173,8 @@
{::rpc/auth false
::doc/added "1.15"
::sm/params schema:recover-profile
::climit/id :auth/global}
::climit/id :auth/global
::db/transaction true}
[cfg params]
(recover-profile cfg params))

View file

@ -27,7 +27,7 @@
{::rpc/auth false
::doc/added "1.15"
::doc/changes ["1.15" "This method is migrated from mutations to commands."]}
[{:keys [::db/pool] :as cfg} _]
[cfg _]
(when-not (contains? cf/flags :demo-users)
(ex/raise :type :validation
@ -49,9 +49,11 @@
:password (profile/derive-password cfg password)
:props {}}]
(db/with-atomic [conn pool]
(let [profile (->> (auth/create-profile! conn params)
(auth/create-profile-rels! conn))]
(with-meta {:email email
:password password}
{::audit/profile-id (:id profile)})))))
(let [profile (db/tx-run! cfg (fn [{:keys [::db/conn]}]
(->> (auth/create-profile! conn params)
(auth/create-profile-rels! conn))))]
(with-meta {:email email
:password password}
{::audit/profile-id (:id profile)}))))

View file

@ -803,17 +803,17 @@
[:id ::sm/uuid]
[:name [:string {:max 250}]]
[:created-at ::dt/instant]
[:modified-at ::dt/instant]]}
[:modified-at ::dt/instant]]
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id id)
(let [file (rename-file conn params)]
(rph/with-meta
(select-keys file [:id :name :created-at :modified-at])
{::audit/props {:project-id (:project-id file)
:created-at (:created-at file)
:modified-at (:modified-at file)}}))))
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(check-edition-permissions! conn profile-id id)
(let [file (rename-file conn params)]
(rph/with-meta
(select-keys file [:id :name :created-at :modified-at])
{::audit/props {:project-id (:project-id file)
:created-at (:created-at file)
:modified-at (:modified-at file)}})))
;; --- MUTATION COMMAND: set-file-shared
@ -1005,15 +1005,17 @@
{::doc/added "1.17"
::webhooks/event? true
::sm/params schema:link-file-to-library}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id library-id] :as params}]
[cfg {:keys [::rpc/profile-id file-id library-id] :as params}]
(when (= file-id library-id)
(ex/raise :type :validation
:code :invalid-library
:hint "A file cannot be linked to itself"))
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id file-id)
(check-edition-permissions! conn profile-id library-id)
(link-file-to-library conn params)))
(db/tx-run! cfg
(fn [{:keys [::db/conn]}]
(check-edition-permissions! conn profile-id file-id)
(check-edition-permissions! conn profile-id library-id)
(link-file-to-library conn params))))
;; --- MUTATION COMMAND: unlink-file-from-library
@ -1031,12 +1033,12 @@
(sv/defmethod ::unlink-file-from-library
{::doc/added "1.17"
::webhooks/event? true
::sm/params schema:unlink-file-to-library}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id file-id)
(unlink-file-from-library conn params)
nil))
::sm/params schema:unlink-file-to-library
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(check-edition-permissions! conn profile-id file-id)
(unlink-file-from-library conn params)
nil)
;; --- MUTATION COMMAND: update-sync
@ -1056,12 +1058,11 @@
(sv/defmethod ::update-file-library-sync-status
"Update the synchronization status of a file->library link"
{::doc/added "1.17"
::sm/params schema:update-file-library-sync-status}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id file-id)
(update-sync conn params)))
::sm/params schema:update-file-library-sync-status
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id file-id] :as params}]
(check-edition-permissions! conn profile-id file-id)
(update-sync conn params))
;; --- MUTATION COMMAND: ignore-sync
@ -1082,9 +1083,9 @@
(sv/defmethod ::ignore-file-library-sync-status
"Ignore updates in linked files"
{::doc/added "1.17"
::sm/params schema:ignore-file-library-sync-status}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id file-id)
(-> (ignore-sync conn params)
(update :features db/decode-pgarray #{}))))
::sm/params schema:ignore-file-library-sync-status
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id file-id] :as params}]
(check-edition-permissions! conn profile-id file-id)
(-> (ignore-sync conn params)
(update :features db/decode-pgarray #{})))

View file

@ -33,11 +33,11 @@
pages of a file with specific permissions (who-comment and who-inspect)."
{::doc/added "1.18"
::doc/module :files
::sm/params schema:create-share-link}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(db/with-atomic [conn pool]
(files/check-edition-permissions! conn profile-id file-id)
(create-share-link conn (assoc params :profile-id profile-id))))
::sm/params schema:create-share-link
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id file-id] :as params}]
(files/check-edition-permissions! conn profile-id file-id)
(create-share-link conn (assoc params :profile-id profile-id)))
(defn create-share-link
[conn {:keys [profile-id file-id pages who-comment who-inspect]}]
@ -61,10 +61,10 @@
(sv/defmethod ::delete-share-link
{::doc/added "1.18"
::doc/module ::files
::sm/params schema:delete-share-link}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(db/with-atomic [conn pool]
(let [slink (db/get-by-id conn :share-link id)]
(files/check-edition-permissions! conn profile-id (:file-id slink))
(db/delete! conn :share-link {:id id})
nil)))
::sm/params schema:delete-share-link
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id] :as params}]
(let [slink (db/get-by-id conn :share-link id)]
(files/check-edition-permissions! conn profile-id (:file-id slink))
(db/delete! conn :share-link {:id id})
nil))

View file

@ -397,44 +397,44 @@
(defn clone-template
[cfg {:keys [project-id profile-id] :as params} template]
(db/tx-run! cfg (fn [{:keys [::db/conn ::wrk/executor] :as cfg}]
;; NOTE: the importation process performs some operations
;; that are not very friendly with virtual threads, and for
;; avoid unexpected blocking of other concurrent operations
;; we dispatch that operation to a dedicated executor.
(let [template (tmp/tempfile-from template
:prefix "penpot.template."
:suffix ""
:min-age "30m")
(db/run! cfg (fn [{:keys [::db/conn ::wrk/executor] :as cfg}]
;; NOTE: the importation process performs some operations
;; that are not very friendly with virtual threads, and for
;; avoid unexpected blocking of other concurrent operations
;; we dispatch that operation to a dedicated executor.
(let [template (tmp/tempfile-from template
:prefix "penpot.template."
:suffix ""
:min-age "30m")
format (bfc/parse-file-format template)
team (teams/get-team conn
:profile-id profile-id
:project-id project-id)
cfg (-> cfg
(assoc ::bfc/project-id project-id)
(assoc ::bfc/profile-id profile-id)
(assoc ::bfc/input template)
(assoc ::bfc/features (cfeat/get-team-enabled-features cf/flags team)))
format (bfc/parse-file-format template)
team (teams/get-team conn
:profile-id profile-id
:project-id project-id)
cfg (-> cfg
(assoc ::bfc/project-id project-id)
(assoc ::bfc/profile-id profile-id)
(assoc ::bfc/input template)
(assoc ::bfc/features (cfeat/get-team-enabled-features cf/flags team)))
result (if (= format :binfile-v3)
(px/invoke! executor (partial bf.v3/import-files! cfg))
(px/invoke! executor (partial bf.v1/import-files! cfg)))]
result (if (= format :binfile-v3)
(px/invoke! executor (partial bf.v3/import-files! cfg))
(px/invoke! executor (partial bf.v1/import-files! cfg)))]
(db/update! conn :project
{:modified-at (dt/now)}
{:id project-id})
(db/update! conn :project
{:modified-at (dt/now)}
{:id project-id})
(let [props (audit/clean-props params)]
(doseq [file-id result]
(let [props (assoc props :id file-id)
event (-> (audit/event-from-rpc-params params)
(assoc ::audit/profile-id profile-id)
(assoc ::audit/name "create-file")
(assoc ::audit/props props))]
(audit/submit! cfg event))))
(let [props (audit/clean-props params)]
(doseq [file-id result]
(let [props (assoc props :id file-id)
event (-> (audit/event-from-rpc-params params)
(assoc ::audit/profile-id profile-id)
(assoc ::audit/name "create-file")
(assoc ::audit/props props))]
(audit/submit! cfg event))))
result))))
result))))
(def ^:private
schema:clone-template

View file

@ -273,15 +273,14 @@
(sv/defmethod ::clone-file-media-object
{::doc/added "1.17"
::sm/params schema:clone-file-media-object}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(db/with-atomic [conn pool]
(files/check-edition-permissions! conn profile-id file-id)
(-> (assoc cfg :conn conn)
(clone-file-media-object params))))
::sm/params schema:clone-file-media-object
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id file-id] :as params}]
(files/check-edition-permissions! conn profile-id file-id)
(clone-file-media-object cfg params))
(defn clone-file-media-object
[{:keys [conn]} {:keys [id file-id is-local]}]
[{:keys [::db/conn]} {:keys [id file-id is-local]}]
(let [mobj (db/get-by-id conn :file-media-object id)]
(db/insert! conn :file-media-object
{:id (uuid/next)

View file

@ -124,32 +124,32 @@
(sv/defmethod ::update-profile
{::doc/added "1.0"
::sm/params schema:update-profile
::sm/result schema:profile}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id fullname lang theme] :as params}]
(db/with-atomic [conn pool]
;; NOTE: we need to retrieve the profile independently if we use
;; it or not for explicit locking and avoid concurrent updates of
;; the same row/object.
(let [profile (-> (db/get-by-id conn :profile profile-id ::sql/for-update true)
(decode-row))
::sm/result schema:profile
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id fullname lang theme] :as params}]
;; NOTE: we need to retrieve the profile independently if we use
;; it or not for explicit locking and avoid concurrent updates of
;; the same row/object.
(let [profile (-> (db/get-by-id conn :profile profile-id ::sql/for-update true)
(decode-row))
;; Update the profile map with direct params
profile (-> profile
(assoc :fullname fullname)
(assoc :lang lang)
(assoc :theme theme))]
;; Update the profile map with direct params
profile (-> profile
(assoc :fullname fullname)
(assoc :lang lang)
(assoc :theme theme))]
(db/update! conn :profile
{:fullname fullname
:lang lang
:theme theme
:props (db/tjson (:props profile))}
{:id profile-id})
(db/update! conn :profile
{:fullname fullname
:lang lang
:theme theme
:props (db/tjson (:props profile))}
{:id profile-id})
(-> profile
(strip-private-attrs)
(d/without-nils)
(rph/with-meta {::audit/props (audit/profile->props profile)})))))
(-> profile
(strip-private-attrs)
(d/without-nils)
(rph/with-meta {::audit/props (audit/profile->props profile)}))))
;; --- MUTATION: Update Password
@ -168,21 +168,20 @@
(sv/defmethod ::update-profile-password
{::doc/added "1.0"
::sm/params schema:update-profile-password
::climit/id :auth/global}
::climit/id :auth/global
::db/transaction true}
[cfg {:keys [::rpc/profile-id password] :as params}]
(let [profile (validate-password! cfg (assoc params :profile-id profile-id))
session-id (::session/id params)]
(db/tx-run! cfg (fn [cfg]
(let [profile (validate-password! cfg (assoc params :profile-id profile-id))
session-id (::session/id params)]
(when (= (:email profile) (str/lower (:password params)))
(ex/raise :type :validation
:code :email-as-password
:hint "you can't use your email as password"))
(when (= (:email profile) (str/lower (:password params)))
(ex/raise :type :validation
:code :email-as-password
:hint "you can't use your email as password"))
(update-profile-password! cfg (assoc profile :password password))
(invalidate-profile-session! cfg profile-id session-id)
nil))))
(update-profile-password! cfg (assoc profile :password password))
(invalidate-profile-session! cfg profile-id session-id)
nil))
(defn- invalidate-profile-session!
"Removes all sessions except the current one."
@ -440,37 +439,36 @@
(declare ^:private get-owned-teams)
(sv/defmethod ::delete-profile
{::doc/added "1.0"}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id] :as params}]
(db/with-atomic [conn pool]
(let [teams (get-owned-teams conn profile-id)
deleted-at (dt/now)]
{::doc/added "1.0"
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id] :as params}]
(let [teams (get-owned-teams conn profile-id)
deleted-at (dt/now)]
;; If we found owned teams with participants, we don't allow
;; delete profile until the user properly transfer ownership or
;; explicitly removes all participants from the team
(when (some pos? (map :participants teams))
(ex/raise :type :validation
:code :owner-teams-with-people
:hint "The user need to transfer ownership of owned teams."
:context {:teams (mapv :id teams)}))
;; If we found owned teams with participants, we don't allow
;; delete profile until the user properly transfer ownership or
;; explicitly removes all participants from the team
(when (some pos? (map :participants teams))
(ex/raise :type :validation
:code :owner-teams-with-people
:hint "The user need to transfer ownership of owned teams."
:context {:teams (mapv :id teams)}))
;; Mark profile deleted immediatelly
(db/update! conn :profile
{:deleted-at deleted-at}
{:id profile-id})
;; Mark profile deleted immediatelly
(db/update! conn :profile
{:deleted-at deleted-at}
{:id profile-id})
;; Schedule cascade deletion to a worker
(wrk/submit! {::db/conn conn
::wrk/task :delete-object
::wrk/params {:object :profile
:deleted-at deleted-at
:id profile-id}})
;; Schedule cascade deletion to a worker
(wrk/submit! {::db/conn conn
::wrk/task :delete-object
::wrk/params {:object :profile
:deleted-at deleted-at
:id profile-id}})
(-> (rph/wrap nil)
(rph/with-transform (session/delete-fn cfg))))))
(-> (rph/wrap nil)
(rph/with-transform (session/delete-fn cfg)))))
;; --- HELPERS

View file

@ -219,12 +219,12 @@
::sm/params schema:update-project-pin
::webhooks/batch-timeout (dt/duration "5s")
::webhooks/batch-key (webhooks/key-fn ::rpc/profile-id :id)
::webhooks/event? true}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id team-id is-pinned] :as params}]
(db/with-atomic [conn pool]
(check-read-permissions! conn profile-id id)
(db/exec-one! conn [sql:update-project-pin team-id id profile-id is-pinned is-pinned])
nil))
::webhooks/event? true
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id team-id is-pinned] :as params}]
(check-read-permissions! conn profile-id id)
(db/exec-one! conn [sql:update-project-pin team-id id profile-id is-pinned is-pinned])
nil)
;; --- MUTATION: Rename Project
@ -238,17 +238,17 @@
(sv/defmethod ::rename-project
{::doc/added "1.18"
::sm/params schema:rename-project
::webhooks/event? true}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id name] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id id)
(let [project (db/get-by-id conn :project id ::sql/for-update true)]
(db/update! conn :project
{:name name}
{:id id})
(rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project)
:prev-name (:name project)}}))))
::webhooks/event? true
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id name] :as params}]
(check-edition-permissions! conn profile-id id)
(let [project (db/get-by-id conn :project id ::sql/for-update true)]
(db/update! conn :project
{:name name}
{:id id})
(rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project)
:prev-name (:name project)}})))
;; --- MUTATION: Delete Project
@ -280,13 +280,13 @@
(sv/defmethod ::delete-project
{::doc/added "1.18"
::sm/params schema:delete-project
::webhooks/event? true}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id id)
(let [project (delete-project conn id)]
(rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project)
:name (:name project)
:created-at (:created-at project)
:modified-at (:modified-at project)}}))))
::webhooks/event? true
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id] :as params}]
(check-edition-permissions! conn profile-id id)
(let [project (delete-project conn id)]
(rph/with-meta (rph/wrap)
{::audit/props {:team-id (:team-id project)
:name (:name project)
:created-at (:created-at project)
:modified-at (:modified-at project)}})))

View file

@ -527,14 +527,14 @@
(sv/defmethod ::update-team
{::doc/added "1.17"
::sm/params schema:update-team}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id name] :as params}]
(db/with-atomic [conn pool]
(check-edition-permissions! conn profile-id id)
(db/update! conn :team
{:name name}
{:id id})
nil))
::sm/params schema:update-team
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id id name]}]
(check-edition-permissions! conn profile-id id)
(db/update! conn :team
{:name name}
{:id id})
nil)
;; --- Mutation: Leave Team
@ -592,10 +592,10 @@
(sv/defmethod ::leave-team
{::doc/added "1.17"
::sm/params schema:leave-team}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id] :as params}]
(db/with-atomic [conn pool]
(leave-team conn (assoc params :profile-id profile-id))))
::sm/params schema:leave-team
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id] :as params}]
(leave-team conn (assoc params :profile-id profile-id)))
;; --- Mutation: Delete Team
@ -627,16 +627,16 @@
(sv/defmethod ::delete-team
{::doc/added "1.17"
::sm/params schema:delete-team}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(db/with-atomic [conn pool]
(let [perms (get-permissions conn profile-id id)]
(when-not (:is-owner perms)
(ex/raise :type :validation
:code :only-owner-can-delete-team))
::sm/params schema:delete-team
::db/transaction true}
[{:keys [::db/conn] :as cfg} {:keys [::rpc/profile-id id] :as params}]
(let [perms (get-permissions conn profile-id id)]
(when-not (:is-owner perms)
(ex/raise :type :validation
:code :only-owner-can-delete-team))
(delete-team conn id)
nil)))
(delete-team conn id)
nil))
;; --- Mutation: Team Update Role
@ -714,31 +714,30 @@
(sv/defmethod ::delete-team-member
{::doc/added "1.17"
::sm/params schema:delete-team-member}
[{:keys [::db/pool ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id team-id member-id] :as params}]
(db/with-atomic [conn pool]
(let [team (get-team pool :profile-id profile-id :team-id team-id)
perms (get-permissions conn profile-id team-id)]
(when-not (or (:is-owner perms)
(:is-admin perms))
(ex/raise :type :validation
:code :insufficient-permissions))
::sm/params schema:delete-team-member
::db/transaction true}
[{:keys [::db/conn ::mbus/msgbus] :as cfg} {:keys [::rpc/profile-id team-id member-id] :as params}]
(let [team (get-team conn :profile-id profile-id :team-id team-id)
perms (get-permissions conn profile-id team-id)]
(when-not (or (:is-owner perms)
(:is-admin perms))
(ex/raise :type :validation
:code :insufficient-permissions))
(when (= member-id profile-id)
(ex/raise :type :validation
:code :cant-remove-yourself))
(when (= member-id profile-id)
(ex/raise :type :validation
:code :cant-remove-yourself))
(db/delete! conn :team-profile-rel {:profile-id member-id
:team-id team-id})
(db/delete! conn :team-profile-rel {:profile-id member-id
:team-id team-id})
(mbus/pub! msgbus
:topic member-id
:message {:type :team-membership-change
:change :removed
:team-id team-id
:team-name (:name team)})
(mbus/pub! msgbus
:topic member-id
:message {:type :team-membership-change
:change :removed
:team-id team-id
:team-name (:name team)})
nil)))
nil))
;; --- Mutation: Update Team Photo
@ -764,16 +763,16 @@
(let [team (get-team pool :profile-id profile-id :team-id team-id)
photo (profile/upload-photo cfg params)]
(db/with-atomic [conn pool]
(check-admin-permissions! conn profile-id team-id)
;; Mark object as touched for make it ellegible for tentative
;; garbage collection.
(when-let [id (:photo-id team)]
(sto/touch-object! storage id))
(check-admin-permissions! pool profile-id team-id)
;; Save new photo
(db/update! pool :team
{:photo-id (:id photo)}
{:id team-id})
;; Mark object as touched for make it ellegible for tentative
;; garbage collection.
(when-let [id (:photo-id team)]
(sto/touch-object! storage id))
(assoc team :photo-id (:id photo)))))
;; Save new photo
(db/update! pool :team
{:photo-id (:id photo)}
{:id team-id})
(assoc team :photo-id (:id photo))))

View file

@ -407,20 +407,20 @@
(sv/defmethod ::update-team-invitation-role
{::doc/added "1.17"
::doc/module :teams
::sm/params schema:update-team-invitation-role}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id email role] :as params}]
(db/with-atomic [conn pool]
(let [perms (teams/get-permissions conn profile-id team-id)]
::sm/params schema:update-team-invitation-role
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id team-id email role] :as params}]
(let [perms (teams/get-permissions conn profile-id team-id)]
(when-not (:is-admin perms)
(ex/raise :type :validation
:code :insufficient-permissions))
(when-not (:is-admin perms)
(ex/raise :type :validation
:code :insufficient-permissions))
(db/update! conn :team-invitation
{:role (name role) :updated-at (dt/now)}
{:team-id team-id :email-to (profile/clean-email email)})
(db/update! conn :team-invitation
{:role (name role) :updated-at (dt/now)}
{:team-id team-id :email-to (profile/clean-email email)})
nil)))
nil))
;; --- Mutation: Delete invitation
@ -431,20 +431,20 @@
(sv/defmethod ::delete-team-invitation
{::doc/added "1.17"
::sm/params schema:delete-team-invition}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id team-id email] :as params}]
(db/with-atomic [conn pool]
(let [perms (teams/get-permissions conn profile-id team-id)]
::sm/params schema:delete-team-invition
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id team-id email] :as params}]
(let [perms (teams/get-permissions conn profile-id team-id)]
(when-not (:is-admin perms)
(ex/raise :type :validation
:code :insufficient-permissions))
(when-not (:is-admin perms)
(ex/raise :type :validation
:code :insufficient-permissions))
(let [invitation (db/delete! conn :team-invitation
{:team-id team-id
:email-to (profile/clean-email email)}
{::db/return-keys true})]
(rph/wrap nil {::audit/props {:invitation-id (:id invitation)}})))))
(let [invitation (db/delete! conn :team-invitation
{:team-id team-id
:email-to (profile/clean-email email)}
{::db/return-keys true})]
(rph/wrap nil {::audit/props {:invitation-id (:id invitation)}}))))
;; --- Mutation: Request Team Invitation

View file

@ -144,20 +144,20 @@
(sv/defmethod ::delete-webhook
{::doc/added "1.17"
::sm/params schema:delete-webhook}
[{:keys [::db/pool] :as cfg} {:keys [::rpc/profile-id id]}]
(db/with-atomic [conn pool]
(let [whook (-> (db/get conn :webhook {:id id}) decode-row)]
(check-webhook-edition-permissions! conn profile-id (:team-id whook) (:profile-id whook))
(db/delete! conn :webhook {:id id})
nil)))
::sm/params schema:delete-webhook
::db/transaction true}
[{:keys [::db/conn]} {:keys [::rpc/profile-id id]}]
(let [whook (-> (db/get conn :webhook {:id id}) decode-row)]
(check-webhook-edition-permissions! conn profile-id (:team-id whook) (:profile-id whook))
(db/delete! conn :webhook {:id id})
nil))
;; --- Query: Webhooks
(def sql:get-webhooks
"SELECT id, uri, mtype, is_active, error_code, error_count, profile_id
FROM webhook
WHERE team_id = ?
"SELECT id, uri, mtype, is_active, error_code, error_count, profile_id
FROM webhook
WHERE team_id = ?
ORDER BY uri")
(def ^:private schema:get-webhooks

View file

@ -78,19 +78,19 @@
(defmethod ig/init-key ::props
[_ {:keys [::db/pool ::key] :as cfg}]
(db/with-atomic [conn pool]
(db/xact-lock! conn 0)
(when-not key
(l/warn :hint (str "using autogenerated secret-key, it will change on each restart and will invalidate "
"all sessions on each restart, it is highly recommended setting up the "
"PENPOT_SECRET_KEY environment variable")))
(let [secret (or key (generate-random-key))]
(-> (get-all-props conn)
(assoc :secret-key secret)
(assoc :tokens-key (keys/derive secret :salt "tokens"))
(update :instance-id handle-instance-id conn (db/read-only? pool))))))
(db/tx-run! cfg (fn [{:keys [::db/conn]}]
(db/xact-lock! conn 0)
(when-not key
(l/warn :hint (str "using autogenerated secret-key, it will change on each restart and will invalidate "
"all sessions on each restart, it is highly recommended setting up the "
"PENPOT_SECRET_KEY environment variable")))
(let [secret (or key (generate-random-key))]
(-> (get-all-props conn)
(assoc :secret-key secret)
(assoc :tokens-key (keys/derive secret :salt "tokens"))
(update :instance-id handle-instance-id conn (db/read-only? pool)))))))
;; FIXME
(sm/register! ::props :any)

View file

@ -36,37 +36,39 @@
(defmethod exec-command :create-profile
[{:keys [fullname email password is-active]
:or {is-active true}}]
(when-let [system (get-current-system)]
(db/with-atomic [conn (:app.db/pool system)]
(let [password (cmd.profile/derive-password system password)
params {:id (uuid/next)
:email email
:fullname fullname
:is-active is-active
:password password
:props {}}]
(->> (cmd.auth/create-profile! conn params)
(cmd.auth/create-profile-rels! conn))))))
(some-> (get-current-system)
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [password (cmd.profile/derive-password system password)
params {:id (uuid/next)
:email email
:fullname fullname
:is-active is-active
:password password
:props {}}]
(->> (cmd.auth/create-profile! conn params)
(cmd.auth/create-profile-rels! conn)))))))
(defmethod exec-command :update-profile
[{:keys [fullname email password is-active]}]
(when-let [system (get-current-system)]
(db/with-atomic [conn (:app.db/pool system)]
(let [params (cond-> {}
(some? fullname)
(assoc :fullname fullname)
(some-> (get-current-system)
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [params (cond-> {}
(some? fullname)
(assoc :fullname fullname)
(some? password)
(assoc :password (auth/derive-password password))
(some? password)
(assoc :password (auth/derive-password password))
(some? is-active)
(assoc :is-active is-active))]
(when (seq params)
(let [res (db/update! conn :profile
params
{:email email
:deleted-at nil})]
(pos? (db/get-update-count res))))))))
(some? is-active)
(assoc :is-active is-active))]
(when (seq params)
(let [res (db/update! conn :profile
params
{:email email
:deleted-at nil})]
(pos? (db/get-update-count res)))))))))
(defmethod exec-command :delete-profile
[{:keys [email soft]}]
@ -75,16 +77,16 @@
:code :invalid-arguments
:hint "email should be provided"))
(when-let [system (get-current-system)]
(db/with-atomic [conn (:app.db/pool system)]
(let [res (if soft
(db/update! conn :profile
{:deleted-at (dt/now)}
{:email email :deleted-at nil})
(db/delete! conn :profile
{:email email}))]
(pos? (db/get-update-count res))))))
(some-> (get-current-system)
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [res (if soft
(db/update! conn :profile
{:deleted-at (dt/now)}
{:email email :deleted-at nil})
(db/delete! conn :profile
{:email email}))]
(pos? (db/get-update-count res)))))))
(defmethod exec-command :search-profile
[{:keys [email]}]
@ -93,12 +95,12 @@
:code :invalid-arguments
:hint "email should be provided"))
(when-let [system (get-current-system)]
(db/with-atomic [conn (:app.db/pool system)]
(let [sql (str "select email, fullname, created_at, deleted_at from profile "
" where email similar to ? order by created_at desc limit 100")]
(db/exec! conn [sql email])))))
(some-> (get-current-system)
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [sql (str "select email, fullname, created_at, deleted_at from profile "
" where email similar to ? order by created_at desc limit 100")]
(db/exec! conn [sql email]))))))
(defmethod exec-command :derive-password
[{:keys [password]}]

View file

@ -101,38 +101,46 @@
"Mark the profile blocked and removes all the http sessiones
associated with the profile-id."
[email]
(db/with-atomic [conn (:app.db/pool main/system)]
(when-let [profile (db/get* conn :profile
{:email (str/lower email)}
{:columns [:id :email]})]
(when-not (:is-blocked profile)
(db/update! conn :profile {:is-active true} {:id (:id profile)})
:activated))))
(some-> main/system
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(when-let [profile (db/get* conn :profile
{:email (str/lower email)}
{:columns [:id :email]})]
(when-not (:is-blocked profile)
(db/update! conn :profile {:is-active true} {:id (:id profile)})
:activated))))))
(defn mark-profile-as-blocked!
"Mark the profile blocked and removes all the http sessiones
associated with the profile-id."
[email]
(db/with-atomic [conn (:app.db/pool main/system)]
(when-let [profile (db/get* conn :profile
{:email (str/lower email)}
{:columns [:id :email]})]
(when-not (:is-blocked profile)
(db/update! conn :profile {:is-blocked true} {:id (:id profile)})
(db/delete! conn :http-session {:profile-id (:id profile)})
:blocked))))
(some-> main/system
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(when-let [profile (db/get* conn :profile
{:email (str/lower email)}
{:columns [:id :email]})]
(when-not (:is-blocked profile)
(db/update! conn :profile {:is-blocked true} {:id (:id profile)})
(db/delete! conn :http-session {:profile-id (:id profile)})
:blocked))))))
(defn reset-password!
"Reset a password to a specific one for a concrete user or all users
if email is `:all` keyword."
[& {:keys [email password] :or {password "123123"} :as params}]
(us/verify! (contains? params :email) "`email` parameter is mandatory")
(db/with-atomic [conn (:app.db/pool main/system)]
(let [password (derive-password password)]
(if (= email :all)
(db/exec! conn ["update profile set password=?" password])
(let [email (str/lower email)]
(db/exec! conn ["update profile set password=? where email=?" password email]))))))
(when-not email
(throw (IllegalArgumentException. "email is mandatory")))
(some-> main/system
(db/tx-run!
(fn [{:keys [::db/conn] :as system}]
(let [password (derive-password password)]
(if (= email :all)
(db/exec! conn ["update profile set password=?" password])
(let [email (str/lower email)]
(db/exec! conn ["update profile set password=? where email=?" password email]))))))))
;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
;; FEATURES

View file

@ -26,18 +26,14 @@
{k (assoc v ::min-age (cf/get-deletion-delay))})
(defmethod ig/init-key ::handler
[_ {:keys [::db/pool ::min-age] :as cfg}]
[_ {:keys [::min-age] :as cfg}]
(fn [{:keys [props] :as task}]
(let [min-age (or (:min-age props) min-age)]
(db/with-atomic [conn pool]
(let [interval (db/interval min-age)
result (db/exec-one! conn [sql:delete-completed-tasks interval])
result (db/get-update-count result)]
(l/debug :hint "task finished" :total result)
(when (:rollback? props)
(db/rollback! conn))
result)))))
(-> cfg
(assoc ::db/rollback (:rollback? props))
(db/tx-run! (fn [{:keys [::db/conn]}]
(let [interval (db/interval min-age)
result (db/exec-one! conn [sql:delete-completed-tasks interval])
result (db/get-update-count result)]
(l/debug :hint "task finished" :total result)
result)))))))

View file

@ -71,11 +71,12 @@
(run-batch! [rconn]
(try
(db/with-atomic [conn pool]
(if-let [tasks (get-tasks conn)]
(->> (group-by :queue tasks)
(run! (partial push-tasks! conn rconn)))
(px/sleep (::wait-duration cfg))))
(db/tx-run! cfg (fn [{:keys [::db/conn]}]
(if-let [tasks (get-tasks conn)]
(->> (group-by :queue tasks)
(run! (partial push-tasks! conn rconn)))
;; FIXME: this sleep should be outside the transaction
(px/sleep (::wait-duration cfg)))))
(catch InterruptedException cause
(throw cause))
(catch Exception cause

View file

@ -138,14 +138,13 @@
" FROM information_schema.tables "
" WHERE table_schema = 'public' "
" AND table_name != 'migrations';")]
(db/with-atomic [conn *pool*]
(db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"])
(db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"])
(let [result (->> (db/exec! conn [sql])
(map :table-name))]
(doseq [table result]
(db/exec! conn [(str "delete from " table ";")]))))
(db/transact! *pool* (fn [conn]
(db/exec-one! conn ["SET CONSTRAINTS ALL DEFERRED"])
(db/exec-one! conn ["SET LOCAL rules.deletion_protection TO off"])
(let [result (->> (db/exec! conn [sql])
(map :table-name))]
(doseq [table result]
(db/exec! conn [(str "delete from " table ";")])))))
(next)))
(defn clean-storage

View file

@ -20,45 +20,33 @@
(t/use-fixtures :each th/database-reset)
(t/deftest soft-auth-middleware
(db/with-atomic [conn (::db/pool th/*system*)]
(let [profile (th/create-profile* 1)
system (-> th/*system*
(assoc ::db/conn conn)
(assoc ::main/props (:app.setup/props th/*system*)))
(let [profile (th/create-profile* 1)
token (db/tx-run! th/*system* app.rpc.commands.access-token/create-access-token (:id profile) "test" nil)
token (app.rpc.commands.access-token/create-access-token
system (:id profile) "test" nil)
request (volatile! nil)
handler (#'app.http.access-token/wrap-soft-auth
(fn [req] (vreset! request req))
th/*system*)]
request (volatile! nil)
handler (#'app.http.access-token/wrap-soft-auth
(fn [req] (vreset! request req))
system)]
(with-mocks [m1 {:target 'app.http.access-token/get-token
:return nil}]
(handler {})
(t/is (= {} @request)))
(with-mocks [m1 {:target 'app.http.access-token/get-token
:return nil}]
(handler {})
(t/is (= {} @request)))
(with-mocks [m1 {:target 'app.http.access-token/get-token
:return (:token token)}]
(handler {})
(with-mocks [m1 {:target 'app.http.access-token/get-token
:return (:token token)}]
(handler {})
(let [token-id (get @request :app.http.access-token/id)]
(t/is (= token-id (:id token))))))))
(let [token-id (get @request :app.http.access-token/id)]
(t/is (= token-id (:id token)))))))
(t/deftest authz-middleware
(let [profile (th/create-profile* 1)
system (assoc th/*system* ::main/props (:app.setup/props th/*system*))
token (db/with-atomic [conn (::db/pool th/*system*)]
(let [system (assoc system ::db/conn conn)]
(app.rpc.commands.access-token/create-access-token
system (:id profile) "test" nil)))
token (db/tx-run! th/*system* app.rpc.commands.access-token/create-access-token (:id profile) "test" nil)
request (volatile! {})
handler (#'app.http.access-token/wrap-authz
(fn [req] (vreset! request req))
system)]
th/*system*)]
(handler nil)
(t/is (nil? @request))