Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ Changelog
Unreleased
----------

* Prevent upgrades to CrateDB 6.0.0–6.0.5 if the cluster contains tables created
before 5.5, avoiding potential data loss.

2.59.1 (2026-04-20)
-------------------

Expand Down
5 changes: 3 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ RUN apt-get update && \
apt-get install git -y

COPY . /src
RUN python -m pip install -U setuptools==${SETUPTOOLS_VERSION} && \
python setup.py clean bdist_wheel
RUN python -m pip install -U setuptools==${SETUPTOOLS_VERSION} \
&& pip install setuptools_scm vcs_versioning \
&& python setup.py clean bdist_wheel


# Run container
Expand Down
73 changes: 73 additions & 0 deletions crate/operator/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,78 @@ async def check_reindexing_tables(
)


async def check_unsafe_upgrade_paths(
core: CoreV1Api,
namespace: str,
name: str,
body: kopf.Body,
old: kopf.Body,
logger: logging.Logger,
):
"""
Block upgrades to CrateDB 6.0.0–6.0.5 if the cluster contains tables
created before CrateDB 5.5, which are affected by a data-loss bug.

:param core: An instance of the Kubernetes Core V1 API.
:param namespace: The Kubernetes namespace for the CrateDB cluster.
:param name: The name for the ``CrateDB`` custom resource.
:param body: The full body of the ``CrateDB`` custom resource per
:class:`kopf.Body`.
:param old: The old resource body. Required to get the old version.
"""
target_version = CrateVersion(body.spec["cluster"]["version"])

if not (CrateVersion("6.0.0") <= target_version < CrateVersion("6.0.6")):
return

logger.info(
"Checking for unsafe upgrade path to %s (CrateDB 6.0.0–6.0.5)",
target_version,
)

host = await get_host(core, namespace, name)
password = await get_system_user_password(core, namespace, name)
conn_factory = connection_factory(host, password)

async with conn_factory() as conn:
async with conn.cursor() as cursor:
await cursor.execute(
"""
SELECT COUNT(*)
FROM sys.shards
WHERE
lpad(split_part(min_lucene_version, '.', 1), 2, '0') || '.' ||
lpad(split_part(min_lucene_version, '.', 2), 2, '0') || '.' ||
lpad(split_part(min_lucene_version, '.', 3), 2, '0') <= '09.06.00'
"""
)
q1_count = (await cursor.fetchone())[0]

await cursor.execute(
"""
SELECT COUNT(*)
FROM information_schema.tables
WHERE
split_part(version['created'], '.', 1) = '5'
AND lpad(split_part(version['created'], '.', 2), 2, '0') < '05'
"""
)
q2_count = (await cursor.fetchone())[0]

logger.info(
"Unsafe upgrade detection: lucene_check=%s, version_check=%s",
q1_count,
q2_count,
)

if q1_count > 0 or q2_count > 0:
raise kopf.PermanentError(
"Upgrade blocked: Cluster contains tables created before CrateDB 5.5. "
"Upgrading to CrateDB 6.0.0–6.0.5 may cause data loss. "
"Please upgrade to >= 6.0.6 instead."
)


async def recreate_internal_tables(
core: CoreV1Api,
namespace: str,
Expand Down Expand Up @@ -489,6 +561,7 @@ async def handle( # type: ignore
async with GlobalApiClient() as api_client:
core = CoreV1Api(api_client)
await check_reindexing_tables(core, namespace, name, body, old, logger)
await check_unsafe_upgrade_paths(core, namespace, name, body, old, logger)


class AfterUpgradeSubHandler(StateBasedSubHandler):
Expand Down
73 changes: 73 additions & 0 deletions tests/test_upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from crate.operator.create import get_statefulset_crate_command
from crate.operator.upgrade import (
check_reindexing_tables,
check_unsafe_upgrade_paths,
recreate_internal_tables,
upgrade_command_data_nodes,
upgrade_command_global_jwt_config,
Expand Down Expand Up @@ -717,3 +718,75 @@ async def test_recreate_internal_tables_skips_missing_tables(
assert f'INSERT INTO "{schema}"."{tmp_table}"' not in executed_sql
assert f'ALTER CLUSTER SWAP TABLE "{schema}"."{tmp_table}"' not in executed_sql
assert f'DROP TABLE IF EXISTS "{schema}"."{tmp_table}"' not in executed_sql


@pytest.mark.asyncio
@mock.patch("crate.operator.upgrade.get_host", new_callable=mock.AsyncMock)
@mock.patch(
"crate.operator.upgrade.get_system_user_password", new_callable=mock.AsyncMock
)
async def test_check_unsafe_upgrade_paths_blocks_on_old_tables(
mock_get_pwd,
mock_get_host,
mock_cratedb_connection,
):
mock_get_host.return_value = "localhost"
mock_get_pwd.return_value = "pwd"

mock_cursor = mock_cratedb_connection["mock_cursor"]

# First query (lucene) -> returns > 0
# Second query (version_created) -> returns 0
mock_cursor.fetchone.side_effect = [
(1,), # q1_count
(0,), # q2_count
]

body = mock.MagicMock()
old = {"spec": {"cluster": {"version": "5.10.16"}}}
body.spec = {"cluster": {"version": "6.0.5"}}

with pytest.raises(
kopf.PermanentError,
match="Upgrade blocked: Cluster contains tables created before CrateDB 5.5",
):
await check_unsafe_upgrade_paths(
core=mock.MagicMock(),
namespace="ns",
name="my-cluster",
body=body,
old=old,
logger=mock.MagicMock(),
)

# Ensure both queries were executed
assert mock_cursor.execute.call_count == 2


@pytest.mark.asyncio
@mock.patch("crate.operator.upgrade.get_host", new_callable=mock.AsyncMock)
@mock.patch(
"crate.operator.upgrade.get_system_user_password", new_callable=mock.AsyncMock
)
async def test_check_unsafe_upgrade_paths_allows_safe_versions(
mock_get_pwd,
mock_get_host,
mock_cratedb_connection,
):
mock_get_host.return_value = "localhost"
mock_get_pwd.return_value = "pwd"

body = mock.MagicMock()
old = {"spec": {"cluster": {"version": "5.10.16"}}}
body.spec = {"cluster": {"version": "6.0.6"}} # safe version

await check_unsafe_upgrade_paths(
core=mock.MagicMock(),
namespace="ns",
name="my-cluster",
body=body,
old=old,
logger=mock.MagicMock(),
)

mock_cratedb_connection["mock_cursor"].execute.assert_not_called()
Loading