Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 8 additions & 10 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ jobs:
cluster:
runs-on: ubuntu-latest

strategy:
fail-fast: false
matrix:
db:
- type: "cluster"
- type: "cluster-cassandra"

steps:
- uses: actions/checkout@v4

Expand Down Expand Up @@ -121,16 +128,7 @@ jobs:
- name: ScalarDB Cluster test
run: |
cd scalardb
lein with-profile cluster run test --workload transfer --nodes localhost --db cluster --username runner --ssh-private-key ~/.ssh/id_rsa --docker-username ${{ github.repository_owner }} --docker-access-token ${{ secrets.CR_PAT }} --nemesis crash

- name: Check test result
run: |
if grep -q "Everything looks good!" scalardb/store/latest/jepsen.log; then
echo "Test passed"
else
echo "Test failed"
exit 1
fi
lein with-profile cluster run test --workload transfer --nodes localhost --db ${{ matrix.db.type }} --username runner --ssh-private-key ~/.ssh/id_rsa --docker-username ${{ github.repository_owner }} --docker-access-token ${{ secrets.CR_PAT }} --nemesis crash

- name: Upload logs
if: always()
Expand Down
5 changes: 0 additions & 5 deletions scalardb/src/scalardb/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,6 @@
[test]
(prepare-service! test :2pc))

(defn check-storage-connection!
[test]
(when-not @(:storage test)
(prepare-storage-service! test)))

(defn check-transaction-connection!
[test]
(when-not @(:transaction test)
Expand Down
1 change: 0 additions & 1 deletion scalardb/src/scalardb/db/cassandra.clj
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

(defrecord ExtCassandra []
ext/DbExtension
(get-db-type [_] :cassandra)
(live-nodes [_ test] (cassandra/live-nodes test))
(wait-for-recovery [_ test] (cassandra/wait-rf-nodes test))
(create-table-opts
Expand Down
193 changes: 143 additions & 50 deletions scalardb/src/scalardb/db/cluster.clj
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,14 @@
(def ^:private ^:const DEFAULT_HELM_CHART_VERSION "1.7.2")
(def ^:private ^:const DEFAULT_CHAOS_MESH_VERSION "2.7.2")

(def ^:private ^:const DEFAULT_CLUSTER_NODE_COUNT 3)
(def ^:private ^:const DEFAULT_CASSANDRA_REPLICA_COUNT 1)

(def ^:private ^:const TIMEOUT_SEC 600)
(def ^:private ^:const INTERVAL_SEC 10)

(def ^:private ^:const CLUSTER_NODE_NAME "scalardb-cluster-node")
(def ^:private ^:const CASSANDRA_NODE_NAME "cassandra-scalardb-cluster")

(def ^:private ^:const CLUSTER_VALUES
{:envoy {:enabled true
Expand All @@ -30,19 +34,14 @@
:tag (or (some-> (env :scalardb-cluster-version) not-empty)
DEFAULT_SCALARDB_CLUSTER_VERSION)}

;; Storage configurations will be set later
:scalardbClusterNodeProperties
(str/join "\n"
;; ScalarDB Cluster configurations
["scalar.db.cluster.membership.type=KUBERNETES"
"scalar.db.cluster.membership.kubernetes.endpoint.namespace_name=${env:SCALAR_DB_CLUSTER_MEMBERSHIP_KUBERNETES_ENDPOINT_NAMESPACE_NAME}"
"scalar.db.cluster.membership.kubernetes.endpoint.name=${env:SCALAR_DB_CLUSTER_MEMBERSHIP_KUBERNETES_ENDPOINT_NAME}"
""
;; Storage configurations
"scalar.db.storage=jdbc"
"scalar.db.contact_points=jdbc:postgresql://postgresql-scalardb-cluster.default.svc.cluster.local:5432/postgres"
"scalar.db.username=postgres"
"scalar.db.password=postgres"
""
;; Set to true to include transaction metadata in the records
"scalar.db.consensus_commit.include_metadata.enabled=true"])

Expand All @@ -51,16 +50,39 @@
(defn- update-cluster-values
[test values]
(let [path [:scalardbCluster :scalardbClusterNodeProperties]
[storage contact-points user-pass]
(case (:db-type test)
:cluster ["jdbc"
"jdbc:postgresql://postgresql-scalardb-cluster.default.svc.cluster.local:5432/postgres"
"postgres"]
:cluster-cassandra ["cassandra"
(->> (map #(str "cassandra-scalardb-cluster-"
%
".cassandra-scalardb-cluster-headless.default.svc.cluster.local")
(range DEFAULT_CASSANDRA_REPLICA_COUNT))
(str/join ","))
"cassandra"]
(throw (ex-info "Unsupported DB type" {:db-type (:db-type test)})))
isolation-level (-> test
:isolation-level
name
str/upper-case
(str/replace #"-" "_"))
new-db-props (-> values
(get-in path)
(str
;; storage
"\nscalar.db.storage="
storage
"\nscalar.db.contact_points="
contact-points
"\nscalar.db.username="
user-pass
"\nscalar.db.password="
user-pass
;; isolation level
"\nscalar.db.consensus_commit.isolation_level="
(-> test
:isolation-level
name
str/upper-case
(str/replace #"-" "_"))
isolation-level
;; one phase commit
(when (:enable-one-phase-commit test)
"\nscalar.db.consensus_commit.one_phase_commit.enabled=true")
Expand Down Expand Up @@ -109,20 +131,33 @@

(defn- start!
[test]
;; postgresql
(c/exec
:helm :install "postgresql-scalardb-cluster" "bitnami/postgresql"
:--set "auth.postgresPassword=postgres"
:--set "primary.persistence.enabled=true"
;; Need an external IP for storage APIs
:--set "service.type=LoadBalancer"
:--set "primary.service.type=LoadBalancer"
;; Use legacy images
:--set "image.repository=bitnamilegacy/postgresql"
:--set "volumePermissions.image.repository=bitnamilegacy/os-shell"
:--set "metrics.image.repository=bitnamilegacy/postgres-exporter"
:--set "global.security.allowInsecureImages=true"
:--version "16.7.0")
;; postgre or cassandra
(case (:db-type test)
:cluster (c/exec
:helm :install "postgresql-scalardb-cluster" "bitnami/postgresql"
:--set "auth.postgresPassword=postgres"
:--set "primary.persistence.enabled=true"
;; Need an external IP for storage APIs
:--set "service.type=LoadBalancer"
:--set "primary.service.type=LoadBalancer"
;; Use legacy images
:--set "image.repository=bitnamilegacy/postgresql"
:--set "volumePermissions.image.repository=bitnamilegacy/os-shell"
:--set "metrics.image.repository=bitnamilegacy/postgres-exporter"
:--set "global.security.allowInsecureImages=true"
:--version "16.7.0")
:cluster-cassandra (c/exec
:helm :install "cassandra-scalardb-cluster" "bitnami/cassandra"
:--set "dbUser.user=cassandra"
:--set "dbUser.user=cassandra"
:--set "dbUser.password=cassandra"
:--set (str "replicaCount=" DEFAULT_CASSANDRA_REPLICA_COUNT)
;; TODO: config cassandra.yaml for commitlog_sync
:--set "persistence.enabled=true"
;; Need an external IP for storage APIs
:--set "service.type=LoadBalancer"
:--set "primary.service.type=LoadBalancer")
(throw (ex-info "Unsupported DB type" {:db-type (:db-type test)})))

;; ScalarDB Cluster
(let [chart-version (or (some-> (env :helm-chart-version) not-empty)
Expand All @@ -145,21 +180,36 @@
:--version DEFAULT_CHAOS_MESH_VERSION))

(defn- wipe!
[]
[test]
;; ignore errors because these files or pods might not exist
(try
(info "wiping old logs...")
(binding [c/*dir* (System/getProperty "user.dir")]
(some->> (-> (c/exec :ls) (str/split #"\s+"))
(filter #(re-matches #"scalardb-cluster-node-.*\.log" %))
seq
(apply c/exec :rm :-f)))
(info "wiping the pods...")
(c/exec :helm :uninstall :postgresql-scalardb-cluster)
(c/exec :kubectl :delete
:pvc :-l "app.kubernetes.io/instance=postgresql-scalardb-cluster")
(catch Exception _))
(info "wiping the pods...")
(try
(c/exec :helm :uninstall
(case (:db-type test)
:cluster :postgresql-scalardb-cluster
:cluster-cassandra :cassandra-scalardb-cluster))
(catch Exception _))
(try
(c/exec :kubectl :delete :pvc :-l
(str "app.kubernetes.io/instance="
(case (:db-type test)
:cluster "postgresql-scalardb-cluster"
:cluster-cassandra "cassandra-scalardb-cluster")))
(catch Exception _))
(try
(c/exec :helm :uninstall :scalardb-cluster)
(catch Exception _))
(try
(c/exec :helm :uninstall :chaos-mesh :-n "chaos-mesh")
(catch Exception _ nil)))
(catch Exception _)))

(defn- get-pod-list
[name]
Expand Down Expand Up @@ -188,7 +238,7 @@
first))

(defn get-postgres-ip
"Get the IP of the load balancer"
"Get the IP of the Postgres"
[]
(->> (c/exec :kubectl :get :svc)
str/split-lines
Expand All @@ -197,20 +247,34 @@
(map #(nth (str/split % #"\s+") 3))
first))

(defn get-cassandra-ip
"Get one IP of the Cassandra nodes"
[]
(->> (c/exec :kubectl :get :svc)
str/split-lines
(filter #(str/includes? % "cassandra-scalardb-cluster"))
(filter #(str/includes? % "LoadBalancer"))
(map #(nth (str/split % #"\s+") 3))
first))
Comment on lines +250 to +258

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This function currently returns only one IP address for Cassandra, which can be a single point of failure for the test client. It would be more robust to return all available Cassandra LoadBalancer IPs. The docstring should also be updated to reflect this change.

(defn get-cassandra-ip
  "Get all IPs of the Cassandra nodes"
  []
  (->> (c/exec :kubectl :get :svc)
       str/split-lines
       (filter #(str/includes? % "cassandra-scalardb-cluster"))
       (filter #(str/includes? % "LoadBalancer"))
       (map #(nth (str/split % #"\s+") 3))))


(defn- running-pods?
"Check a live node."
[test]
"Check if nodes are running."
[test prefix num]
(try
(info "DEBUG:" (-> test :nodes first (c/on (c/exec :kubectl :describe :pod "cassandra-scalardb-cluster-0"))))
(info "DEBUG:" (-> test :nodes first (c/on (c/exec :kubectl :describe :pod "cassandra-scalardb-cluster-1"))))
(info "DEBUG:" (-> test :nodes first (c/on (c/exec :kubectl :describe :pod "cassandra-scalardb-cluster-2"))))
(catch Exception _ nil))
(-> test
:nodes
first
(c/on (get-pod-list CLUSTER_NODE_NAME))
(c/on (get-pod-list prefix))
count
;; TODO: check the number of pods
(= 3)))
(= num)))

(defn- cluster-nodes-ready?
[test]
(and (running-pods? test)
(and (running-pods? test CLUSTER_NODE_NAME DEFAULT_CLUSTER_NODE_COUNT)
(try
(c/on (-> test :nodes first)
(->> (get-pod-list CLUSTER_NODE_NAME)
Expand All @@ -223,12 +287,30 @@
(warn e "An error occurred")
false))))

(defn- cassandra-nodes-ready?
[test]
(or (not= (:db-type test) :cluster-cassandra)
(and (running-pods? test
CASSANDRA_NODE_NAME
DEFAULT_CASSANDRA_REPLICA_COUNT)
(try
(c/on (-> test :nodes first)
(->> (get-pod-list CASSANDRA_NODE_NAME)
(mapv #(c/exec :kubectl :wait
"--for=condition=Ready"
"--timeout=120s"
(str "pod/" %)))))
true
(catch Exception e
(warn (.getMessage e))
false)))))

(defn- wait-for-recovery
"Wait for the node bootstrapping."
([test]
(wait-for-recovery TIMEOUT_SEC INTERVAL_SEC test))
([timeout-sec interval-sec test]
(when-not (cluster-nodes-ready? test)
(when-not (and (cassandra-nodes-ready? test) (cluster-nodes-ready? test))
(Thread/sleep (* interval-sec 1000))
(if (>= timeout-sec interval-sec)
(wait-for-recovery (- timeout-sec interval-sec) interval-sec test)
Expand All @@ -242,7 +324,7 @@
db/DB
(setup! [_ test _]
(when-not (:leave-db-running? test)
(wipe!))
(wipe! test))
(install!)
(configure! test)
(start! test)
Expand All @@ -251,7 +333,7 @@

(teardown! [_ test _]
(when-not (:leave-db-running? test)
(wipe!)))
(wipe! test)))

db/Primary
(primaries [_ test] (:nodes test))
Expand All @@ -275,8 +357,9 @@

(defrecord ExtCluster []
ext/DbExtension
(get-db-type [_] :cluster)
(live-nodes [_ test] (running-pods? test))
(live-nodes [_ test] (running-pods? test
CLUSTER_NODE_NAME
DEFAULT_CLUSTER_NODE_COUNT))
(wait-for-recovery [_ test] (wait-for-recovery test))
(create-table-opts [_ _] {})
(create-properties
Expand All @@ -292,15 +375,25 @@
(.setProperty "scalar.db.sql.cluster_mode.contact_points"
(str "indirect:" ip)))
(ext/set-common-properties test)))))
(create-storage-properties [_ _]
(create-storage-properties [_ test]
(let [node (-> test :nodes first)
ip (c/on node (get-postgres-ip))]
db-type (:db-type test)
[storage contact-points user-pass]
(c/on node (case db-type
:cluster ["jdbc"
(str "jdbc:postgresql://"
(get-postgres-ip)
":5432/postgres")
"postgres"]
:cluster-cassandra ["cassandra"
(get-cassandra-ip)
"cassandra"]
Comment on lines +388 to +390

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To make the client connection to Cassandra more robust, get-cassandra-ip should return a list of all available IPs (as suggested in another comment). This call should then be updated to join those IPs into a comma-separated string for the contact_points property.

                       :cluster-cassandra ["cassandra"
                                           (str/join "," (get-cassandra-ip))
                                           "cassandra"]

(throw (ex-info "Unsupported DB type" {:db-type db-type}))))]
(doto (Properties.)
(.setProperty "scalar.db.storage" "jdbc")
(.setProperty "scalar.db.contact_points"
(str "jdbc:postgresql://" ip ":5432/postgres"))
(.setProperty "scalar.db.username" "postgres")
(.setProperty "scalar.db.password" "postgres")))))
(.setProperty "scalar.db.storage" storage)
(.setProperty "scalar.db.contact_points" contact-points)
(.setProperty "scalar.db.username" user-pass)
(.setProperty "scalar.db.password" user-pass)))))

(defn gen-db
[faults admin]
Expand Down
1 change: 0 additions & 1 deletion scalardb/src/scalardb/db/postgres.clj
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,6 @@

(defrecord ExtPostgres []
ext/DbExtension
(get-db-type [_] :postgres)
(live-nodes [_ test] (live-node? test))
(wait-for-recovery [_ test] (wait-for-recovery test))
(create-table-opts [_ _] {})
Expand Down
2 changes: 0 additions & 2 deletions scalardb/src/scalardb/db_extend.clj
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
(.setProperty "scalar.db.consensus_commit.include_metadata.enabled" "true")))

(defprotocol DbExtension
(get-db-type [this])
(live-nodes [this test])
(wait-for-recovery [this test])
(create-table-opts [this test])
Expand All @@ -53,7 +52,6 @@
db/LogFiles
(log-files [_ test node] (db/log-files db test node))
DbExtension
(get-db-type [_] (get-db-type ext-db))
(live-nodes [_ test] (live-nodes ext-db test))
(wait-for-recovery [_ test] (wait-for-recovery ext-db test))
(create-table-opts [_ test] (create-table-opts ext-db test))
Expand Down
Loading
Loading