Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions tidb/resources/tidb.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ split-table = true

[log]
slow-query-file = "slow.log"
general-log-file = "general.log"

[tikv-client]

Expand Down
3 changes: 3 additions & 0 deletions tidb/src/tidb/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
:monotonic monotonic/inc-workload
:txn-cycle monotonic/txn-workload
:append monotonic/append-workload
:append-slock monotonic/append-slock-workload
:register register/workload
:set set/workload
:set-cas set/cas-workload
Expand All @@ -64,6 +65,8 @@
:auto-retry-limit [10 0]
:read-lock [nil "FOR UPDATE"]
:predicate-read [true false]}
:append-slock {:auto-retry [true false]
:auto-retry-limit [10 0]}
:bank {:auto-retry [true false]
:auto-retry-limit [10 0]
:update-in-place [true false]
Expand Down
11 changes: 7 additions & 4 deletions tidb/src/tidb/db.clj
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@
(def kv-data-dir (str tidb-dir "/data/kv"))
(def db-config-file (str tidb-dir "/db.conf"))
(def db-log-file (str tidb-dir "/db.log"))
(def db-slow-file (str tidb-dir "/slow.log"))
(def db-slog-file (str tidb-dir "/slow.log"))
(def db-glog-file (str tidb-dir "/general.log"))
(def db-stdout (str tidb-dir "/db.stdout"))
(def db-pid-file (str tidb-dir "/db.pid"))
(def system-db-config-file (str tidb-dir "/system-db.conf"))
Expand Down Expand Up @@ -356,7 +357,7 @@
(start!))

; Give it a bit
(Thread/sleep 10000)
(Thread/sleep (if (= name :db) 20000 10000))

; OK, how's it doing?
(let [status (get-status)]
Expand Down Expand Up @@ -576,7 +577,8 @@
; We have to wait for every region to become totally replicated
; before starting any TiDB instance: if we start TiDB first, it
; might take 80+ minutes to converge.
(wait-for-replica-count node)
(when (>= (count (:nodes test)) 3)
(wait-for-replica-count node))
(jepsen/synchronize test)

(Thread/sleep 5000)
Expand Down Expand Up @@ -618,7 +620,8 @@
(log-files [_ test node]
(when-not (:skip-collect-logs test)
(let [base (cond-> [db-log-file
db-slow-file
db-slog-file
db-glog-file
db-stdout
kv-log-file
kv-stdout]
Expand Down
65 changes: 65 additions & 0 deletions tidb/src/tidb/monotonic.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
key y decrease."
(:require [clojure.string :as str]
[clojure.tools.logging :refer [info]]
[clojure.java.jdbc :as j]
[jepsen [client :as client]
[checker :as checker]
[generator :as gen]]
Expand Down Expand Up @@ -212,6 +213,58 @@
(close! [this test]
(client/close! client test))))

(defn do-append-with-slock!
[conn spec [f k v]]
[f k
(case f
:r
(let [tx (:tx (first (c/query conn ["select @@tidb_current_ts as tx"])))
n (-> conn
(c/query ["select n from xlock where k = ?" k])
first (:n 0))]
(if (> n 0)
(do ; slock read
(c/execute! conn ["insert into slock(k, tx) values (?, ?)" k, tx])
(mapv :v (j/with-db-transaction [cc spec] ; do snapshot read in an another session
; the op txn might be [(append k) (read k)], then the op txn is actually holding a xlock on k,
; then the following insert should fail with timeout or constraint error (when the xlock record is
; inserted by op txn).
(c/query cc [(str "/* read within " tx " */ insert into slock(k, tx) values (?, @@tidb_current_ts)") k])
; now the op txn holds slock and the read txn also holds slock, we can do snapshot read safely.
(c/query cc [(str "/* read within " tx " */ select v from item where k = ? order by n") k]))))
(do ; xlock read
(c/execute! conn ["insert into xlock(k, n) values (?, 0) on duplicate key update n=n" k])
(mapv :v (c/query conn ["select v from item where k = ? order by n for update" k])))))
:append
(let [n (-> conn
(c/query ["select n from xlock where k = ? for update" k])
first (:n 0) inc)]
(c/execute! conn ["replace into xlock(k, n) values (?, ?)" k n])
(c/execute! conn ["insert into item(k, v, n) values (?, ?, ?)" k v n])
v))])

(defrecord AppendClientWithSLock [conn spec]
client/Client
(open! [this test node] (assoc this :conn (c/open node test) :spec (c/conn-spec node)))
(close! [this test] (c/close! conn))

(setup!
[this test]
(c/with-conn-failure-retry conn
(c/execute! conn ["create table if not exists item (k int, v int, n int)"])
(c/execute! conn ["create table if not exists xlock (k int primary key, n int)"])
(c/execute! conn [(str "create table if not exists slock (k int, tx bigint, "
"constraint fk_lock foreign key (k) references xlock(k) "
"on delete cascade on update cascade)")])))
(invoke!
[this test op]
(c/with-txn op [c conn {:isolation (util/isolation-level test)
:before-hook (partial c/rand-init-txn! test conn)}]
(assoc op :type :ok, :value
(mapv (partial do-append-with-slock! c spec) (:value op)))))

(teardown! [this test]))

(defn append-txns
"Like wr-txns, we just rewrite writes to be appends."
[opts]
Expand All @@ -231,3 +284,15 @@
; Jepsen may raise an IllegalStateException("Don't know how to classify") if a cycle only
; consists of realtime edges and tso edges, which is typically caused by wrong tso info.
:additional-graphs [cycle/realtime-graph]})})

(defn append-slock-workload
[opts]
{:client (AppendClientWithSLock. nil nil)
:generator (->> (append-txns {:min-txn-length 1
:max-txn-length 2
:key-count 5
:max-writes-per-key 16})
(map (fn [txn] {:type :invoke, :f :txn, :value txn}))
gen/seq)
:checker (append/checker {:anomalies [(if (= :read-committed (:isolation opts)) :G1 :G-single)]
:additional-graphs [cycle/realtime-graph]})})
43 changes: 26 additions & 17 deletions tidb/src/tidb/sql.clj
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,19 @@
(defn init-sql
[test]
(cond-> (:init-sql test)
(not= :default (:auto-retry test)) (conj (str "set @@tidb_disable_txn_auto_retry = " (if (:auto-retry test) 0 1)))
(not= :default (:auto-retry-limit test)) (conj (str "set @@tidb_retry_limit = " (:auto-retry-limit test 10)))
(:follower-read test) (conj "set @@tidb_replica_read = 'follower'")
true (conj (str "set @@tidb_txn_mode = '" (txn-mode test) "'"))
true (conj "set @@tidb_general_log = 1")))
(:follower-read test) (conj "set @@tidb_replica_read = 'follower'")))

(defn init-conn!
"Sets initial variables on a connection, based on test options.
Returns conn."
[conn test]
(j/execute! conn [(str "set"
" @@innodb_lock_wait_timeout = 3,"
" @@tidb_general_log = 1,"
" @@tidb_idle_transaction_timeout = 2,"
" @@tidb_enable_mutation_checker=1,"
" @@tidb_txn_assertion_level=strict,"
" @@tidb_txn_mode = '" (txn-mode test) "'")])
(doseq [stmt (init-sql test)]
(info (str "init> " stmt))
(j/execute! conn [stmt]))
Expand Down Expand Up @@ -185,7 +188,7 @@

(def rollback-msg
"mariadb drivers have a few exception classes that use this message"
"Deadlock found when trying to get lock; try restarting transaction")
"try restarting transaction")

(defmacro capture-txn-abort
"Converts aborted transactions to an ::abort keyword"
Expand All @@ -202,6 +205,7 @@
(catch java.sql.SQLException e#
(condp re-find (.getMessage e#)
#"can not retry select for update statement" ::abort
#"try restarting transaction" ::abort
#"\[try again later\]" ::abort
(throw e#)))))

Expand Down Expand Up @@ -240,17 +244,22 @@
(throw e#)))

(catch clojure.lang.ExceptionInfo e#
(cond (= "Connection is closed" (.cause (:rollback (ex-data e#))))
(assoc ~op :type :info, :error :conn-closed-rollback-failed)

(= "createStatement() is called on closed connection"
(.cause (:rollback (ex-data e#))))
(assoc ~op :type :fail, :error :conn-closed-rollback-failed)

true (do (info e# :caught (pr-str (ex-data e#)))
(info :caught-rollback (:rollback (ex-data e#)))
(info :caught-cause (.cause (:rollback (ex-data e#))))
(throw e#))))))
(let [rollback-msg# (-> e# ex-data :rollback str)]
(cond (str/includes? rollback-msg#
"Interrupted reading remaining batch response")
(assoc ~op :type :info, :error :query-timed-out)

(str/includes? rollback-msg#
"Connection is closed")
(assoc ~op :type :info, :error :conn-closed-rollback-failed)

(str/includes? rollback-msg#
"createStatement() is called on closed connection")
(assoc ~op :type :fail, :error :conn-closed-rollback-failed)

true (do (info :caught (pr-str (ex-data e#)))
(info :caught-rollback rollback-msg#)
(throw e#)))))))

(defmacro with-txn
"Executes body in a transaction, with a timeout, automatically retrying
Expand Down