mirror of
synced 2025-03-13 00:01:51 -05:00
356 lines
9.2 KiB
356 lines
9.2 KiB
;; This Source Code Form is subject to the terms of the Mozilla Public
;; License, v. 2.0. If a copy of the MPL was not distributed with this
;; file, You can obtain one at http://mozilla.org/MPL/2.0/.
;; Copyright (c) UXBOX Labs SL
(ns app.db
[app.common.data :as d]
[app.common.exceptions :as ex]
[app.common.geom.point :as gpt]
[app.common.spec :as us]
[app.db.sql :as sql]
[app.metrics :as mtx]
[app.util.json :as json]
[app.util.logging :as l]
[app.util.migrations :as mg]
[app.util.time :as dt]
[app.util.transit :as t]
[clojure.java.io :as io]
[clojure.spec.alpha :as s]
[integrant.core :as ig]
[next.jdbc :as jdbc]
[next.jdbc.date-time :as jdbc-dt])
(declare open)
(declare create-pool)
;; Initialization
(declare instrument-jdbc!)
(s/def ::name keyword?)
(s/def ::uri ::us/not-empty-string)
(s/def ::min-pool-size ::us/integer)
(s/def ::max-pool-size ::us/integer)
(s/def ::migrations map?)
(defmethod ig/pre-init-spec ::pool [_]
(s/keys :req-un [::uri ::name ::min-pool-size ::max-pool-size ::migrations ::mtx/metrics]))
(defmethod ig/init-key ::pool
[_ {:keys [migrations metrics] :as cfg}]
(l/info :action "initialize connection pool"
:name (d/name (:name cfg))
:uri (:uri cfg))
(instrument-jdbc! (:registry metrics))
(let [pool (create-pool cfg)]
(when (seq migrations)
(with-open [conn ^AutoCloseable (open pool)]
(mg/setup! conn)
(doseq [[name steps] migrations]
(mg/migrate! conn {:name (d/name name) :steps steps}))))
(defmethod ig/halt-key! ::pool
[_ pool]
(.close ^HikariDataSource pool))
(defn- instrument-jdbc!
{:registry registry
:type :counter
:name "database_query_total"
:help "An absolute counter of database queries."}))
;; API & Impl
(def initsql
(str "SET statement_timeout = 120000;\n"
"SET idle_in_transaction_session_timeout = 120000;"))
(defn- create-datasource-config
[{:keys [metrics] :as cfg}]
(let [dburi (:uri cfg)
username (:username cfg)
password (:password cfg)
config (HikariConfig.)
mtf (PrometheusMetricsTrackerFactory. (:registry metrics))]
(doto config
(.setJdbcUrl (str "jdbc:" dburi))
(.setPoolName (d/name (:name cfg)))
(.setAutoCommit true)
(.setReadOnly false)
(.setConnectionTimeout 8000) ;; 8seg
(.setValidationTimeout 8000) ;; 8seg
(.setIdleTimeout 120000) ;; 2min
(.setMaxLifetime 1800000) ;; 30min
(.setMinimumIdle (:min-pool-size cfg 0))
(.setMaximumPoolSize (:max-pool-size cfg 30))
(.setMetricsTrackerFactory mtf)
(.setConnectionInitSql initsql)
(.setInitializationFailTimeout -1))
(when username (.setUsername config username))
(when password (.setPassword config password))
(defn pool?
(instance? javax.sql.DataSource v))
(s/def ::pool pool?)
(defn pool-closed?
(.isClosed ^HikariDataSource pool))
(defn- create-pool
(let [dsc (create-datasource-config cfg)]
(HikariDataSource. dsc)))
(defn unwrap
[conn klass]
(.unwrap ^Connection conn klass))
(defn lobj-manager
(let [conn (unwrap conn org.postgresql.PGConnection)]
(.getLargeObjectAPI ^PGConnection conn)))
(defn lobj-create
(.createLO ^LargeObjectManager manager LargeObjectManager/READWRITE))
(defn lobj-open
([manager oid]
(lobj-open manager oid {}))
([manager oid {:keys [mode] :or {mode :rw}}]
(let [mode (case mode
(:r :read) LargeObjectManager/READ
(:w :write) LargeObjectManager/WRITE
(:rw :read+write) LargeObjectManager/READWRITE)]
(.open ^LargeObjectManager manager (long oid) mode))))
(defn lobj-unlink
[manager oid]
(.unlink ^LargeObjectManager manager (long oid)))
(extend-type LargeObject
(make-reader [lobj opts]
(let [^InputStream is (.getInputStream ^LargeObject lobj)]
(io/make-reader is opts)))
(make-writer [lobj opts]
(let [^OutputStream os (.getOutputStream ^LargeObject lobj)]
(io/make-writer os opts)))
(make-input-stream [lobj opts]
(let [^InputStream is (.getInputStream ^LargeObject lobj)]
(io/make-input-stream is opts)))
(make-output-stream [lobj opts]
(let [^OutputStream os (.getOutputStream ^LargeObject lobj)]
(io/make-output-stream os opts))))
(defmacro with-atomic
[& args]
`(jdbc/with-transaction ~@args))
(defn ^Connection open
(jdbc/get-connection pool))
(defn exec!
([ds sv]
(exec! ds sv {}))
([ds sv opts]
(jdbc/execute! ds sv (assoc opts :builder-fn sql/as-kebab-maps))))
(defn exec-one!
([ds sv] (exec-one! ds sv {}))
([ds sv opts]
(jdbc/execute-one! ds sv (assoc opts :builder-fn sql/as-kebab-maps))))
(defn insert!
([ds table params] (insert! ds table params nil))
([ds table params opts]
(exec-one! ds
(sql/insert table params opts)
(assoc opts :return-keys true))))
(defn insert-multi!
([ds table cols rows] (insert-multi! ds table cols rows nil))
([ds table cols rows opts]
(exec! ds
(sql/insert-multi table cols rows opts)
(assoc opts :return-keys true))))
(defn update!
([ds table params where] (update! ds table params where nil))
([ds table params where opts]
(exec-one! ds
(sql/update table params where opts)
(assoc opts :return-keys true))))
(defn delete!
([ds table params] (delete! ds table params nil))
([ds table params opts]
(exec-one! ds
(sql/delete table params opts)
(assoc opts :return-keys true))))
(defn get-by-params
([ds table params]
(get-by-params ds table params nil))
([ds table params {:keys [uncheked] :or {uncheked false} :as opts}]
(let [res (exec-one! ds (sql/select table params opts))]
(when (and (not uncheked)
(or (:deleted-at res) (not res)))
(ex/raise :type :not-found
:hint "database object not found"))
(defn get-by-id
([ds table id]
(get-by-params ds table {:id id} nil))
([ds table id opts]
(get-by-params ds table {:id id} opts)))
(defn query
([ds table params]
(query ds table params nil))
([ds table params opts]
(exec! ds (sql/select table params opts))))
(defn pgobject?
(instance? PGobject v))
(defn pginterval?
(instance? PGInterval v))
(defn pgpoint?
(instance? PGpoint v))
(defn pgarray?
(instance? PgArray v))
(defn pgarray-of-uuid?
(and (pgarray? v) (= "uuid" (.getBaseTypeName ^PgArray v))))
(defn pgpoint
(PGpoint. (:x p) (:y p)))
(defn create-array
[conn type objects]
(let [^PGConnection conn (unwrap conn org.postgresql.PGConnection)]
(if (coll? objects)
(.createArrayOf conn ^String type (into-array Object objects))
(.createArrayOf conn ^String type objects))))
(defn decode-pgpoint
[^PGpoint v]
(gpt/point (.-x v) (.-y v)))
(defn pginterval
(org.postgresql.util.PGInterval. ^String data))
(defn savepoint
([^Connection conn]
(.setSavepoint conn))
([^Connection conn label]
(.setSavepoint conn (name label))))
(defn rollback!
([^Connection conn]
(.rollback conn))
([^Connection conn ^Savepoint sp]
(.rollback conn sp)))
(defn interval
(integer? data)
(->> (/ data 1000.0)
(format "%s seconds")
(string? data)
(pginterval data)
(dt/duration? data)
(->> (/ (.toMillis ^java.time.Duration data) 1000.0)
(format "%s seconds")
(ex/raise :type :not-implemented)))
(defn decode-json-pgobject
[^PGobject o]
(let [typ (.getType o)
val (.getValue o)]
(if (or (= typ "json")
(= typ "jsonb"))
(json/decode-str val)
(defn decode-transit-pgobject
[^PGobject o]
(let [typ (.getType o)
val (.getValue o)]
(if (or (= typ "json")
(= typ "jsonb"))
(t/decode-str val)
(defn tjson
"Encode as transit json."
(doto (org.postgresql.util.PGobject.)
(.setType "jsonb")
(.setValue (t/encode-verbose-str data))))
(defn json
"Encode as plain json."
(doto (org.postgresql.util.PGobject.)
(.setType "jsonb")
(.setValue (json/encode-str data))))
(defn pgarray->set
(set (.getArray ^PgArray v)))
(defn pgarray->vector
(vec (.getArray ^PgArray v)))