diff --git a/lib/migration_generator/migration_generator.ex b/lib/migration_generator/migration_generator.ex index 425ccb53..7f7baafa 100644 --- a/lib/migration_generator/migration_generator.ex +++ b/lib/migration_generator/migration_generator.ex @@ -555,6 +555,14 @@ defmodule AshPostgres.MigrationGenerator do version in versions end) |> Enum.each(fn {version, mod} -> + runner_opts = + [ + all: true, + prefix: prefix + ] + |> maybe_put_mod_attribute(mod, :disable_ddl_transaction) + |> maybe_put_mod_attribute(mod, :disable_migration_lock) + Ecto.Migration.Runner.run( repo, [], @@ -563,8 +571,7 @@ defmodule AshPostgres.MigrationGenerator do :forward, :down, :down, - all: true, - prefix: prefix + runner_opts ) Ecto.Migration.SchemaMigration.down(repo, repo.config(), version, prefix: prefix) @@ -3858,4 +3865,14 @@ defmodule AshPostgres.MigrationGenerator do defp to_ordered_object(value) when is_list(value), do: Enum.map(value, &to_ordered_object/1) defp to_ordered_object(value), do: value + + defp maybe_put_mod_attribute(opts, mod, attribute) do + migration_config = mod.__migration__() + + case migration_config[attribute] do + nil -> opts + false -> opts + value -> Keyword.put(opts, attribute, value) + end + end end diff --git a/lib/migration_generator/operation.ex b/lib/migration_generator/operation.ex index b7b09792..2cb7b557 100644 --- a/lib/migration_generator/operation.ex +++ b/lib/migration_generator/operation.ex @@ -46,6 +46,18 @@ defmodule AshPostgres.MigrationGenerator.Operation do end end + def concurrent_option(table, multitenancy, schema) do + if multitenancy.strategy == :context do + # For tenant migrations, prefix is a function call + "concurrently: AshPostgres.MigrationHelper.maybe_index_concurrently?(:#{as_atom(table)}, repo(), prefix())" + else + # For regular migrations, prefix is a string or nil + prefix_arg = if schema, do: "\"#{schema}\"", else: "nil" + + "concurrently: AshPostgres.MigrationHelper.maybe_index_concurrently?(:#{as_atom(table)}, repo(), #{prefix_arg})" + end + end + def on_delete(%{on_delete: {:nilify, columns}}) when is_list(columns) do "on_delete: {:nilify, #{inspect(columns)}}" end @@ -903,30 +915,46 @@ defmodule AshPostgres.MigrationGenerator.Operation do }, table: table, schema: schema, - multitenancy: multitenancy + multitenancy: multitenancy, + concurrently: concurrently }) do keys = index_keys(keys, all_tenants?, multitenancy) index_name = index_name || "#{table}_#{name}_index" + concurrent_opt = + if concurrently do + concurrent_option(table, multitenancy, schema) + else + nil + end + + base_opts = + join([ + "name: \"#{index_name}\"", + option("prefix", schema), + option("nulls_distinct", nils_distinct?), + concurrent_opt + ]) + cond do base_filter && where -> where = "(#{where}) AND (#{base_filter})" - "create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join(["name: \"#{index_name}\"", option("prefix", schema), option("nulls_distinct", nils_distinct?), option("where", where)])})" + "create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join([base_opts, option("where", where)])})" base_filter -> base_filter = "(#{base_filter})" - "create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], where: \"#{base_filter}\", #{join(["name: \"#{index_name}\"", option("prefix", schema), option("nulls_distinct", nils_distinct?)])})" + "create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], where: \"#{base_filter}\", #{base_opts})" where -> where = "(#{where})" - "create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join(["name: \"#{index_name}\"", option("prefix", schema), option("nulls_distinct", nils_distinct?), option("where", where)])})" + "create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join([base_opts, option("where", where)])})" true -> - "create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{join(["name: \"#{index_name}\"", option("prefix", schema), option("nulls_distinct", nils_distinct?)])})" + "create unique_index(:#{as_atom(table)}, [#{Enum.map_join(keys, ", ", &inspect/1)}], #{base_opts})" end end @@ -1007,11 +1035,18 @@ defmodule AshPostgres.MigrationGenerator.Operation do {where, base_filter} -> %{index | where: base_filter <> " AND " <> where} end + concurrent_opt = + if index.concurrently do + concurrent_option(table, multitenancy, schema) + else + nil + end + opts = join([ option(:name, index.name), option(:unique, index.unique), - option(:concurrently, index.concurrently), + concurrent_opt, option(:using, index.using), option(:prefix, index.prefix), option(:where, index.where), diff --git a/lib/migration_helper.ex b/lib/migration_helper.ex new file mode 100644 index 00000000..ee4d5bad --- /dev/null +++ b/lib/migration_helper.ex @@ -0,0 +1,70 @@ +# SPDX-FileCopyrightText: 2025 ash_postgres contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshPostgres.MigrationHelper do + @moduledoc """ + Helper functions for AshPostgres migrations. + + This module provides utilities for migrations, particularly for handling + concurrent index creation in various scenarios. + """ + + @doc """ + Determines whether a concurrent index should be used. + + Returns `false` (disables concurrent) in three scenarios: + 1. When running in test environment + 2. When inside a transaction + 3. When the table has no existing records (for tenant migrations) + + ## Examples + + # In a regular migration: + create index(:posts, [:title], concurrently: maybe_index_concurrently?(:posts, repo())) + + # In a tenant migration: + create index(:posts, [:title], concurrently: maybe_index_concurrently?(:posts, repo(), prefix())) + + """ + def maybe_index_concurrently?(table, repo, prefix \\ nil) do + cond do + Mix.env() == :test -> + false + + repo.in_transaction?() -> + false + + table_empty?(repo, table, prefix) -> + false + + true -> + true + end + end + + defp table_empty?(repo, table, prefix) do + table_name = to_string(table) + + quoted_table = + if prefix do + Ecto.Adapters.SQL.quote_name({prefix, table_name}) + else + Ecto.Adapters.SQL.quote_name(table_name) + end + + [[exists]] = + Ecto.Adapters.SQL.query!(repo, "SELECT EXISTS(SELECT 1 FROM #{quoted_table} LIMIT 1)", []) + + !exists + end + + defp quote_name({schema, table}) do + quote_name(schema) <> "." <> quote_name(table) + end + + defp quote_name(name) do + name = name |> to_string() + ~s("#{String.replace(name, ~s("), ~s(""))}") + end +end diff --git a/lib/multitenancy.ex b/lib/multitenancy.ex index 56d3f721..ce2e561d 100644 --- a/lib/multitenancy.ex +++ b/lib/multitenancy.ex @@ -46,6 +46,14 @@ defmodule AshPostgres.MultiTenancy do |> Enum.filter(& &1) |> Enum.map(&load_migration!/1) |> Enum.each(fn {version, mod} -> + runner_opts = + [ + all: true, + prefix: tenant_name + ] + |> maybe_put_mod_attribute(mod, :disable_ddl_transaction) + |> maybe_put_mod_attribute(mod, :disable_migration_lock) + Ecto.Migration.Runner.run( repo, [], @@ -54,8 +62,7 @@ defmodule AshPostgres.MultiTenancy do :forward, :up, :up, - all: true, - prefix: tenant_name + runner_opts ) Ecto.Migration.SchemaMigration.up(repo, repo.config(), version, prefix: tenant_name) @@ -121,4 +128,14 @@ defmodule AshPostgres.MultiTenancy do defp tenant_name_regex do ~r/^[a-zA-Z0-9_-]+$/ end + + defp maybe_put_mod_attribute(opts, mod, attribute) do + migration_config = mod.__migration__() + + case migration_config[attribute] do + nil -> opts + false -> opts + value -> Keyword.put(opts, attribute, value) + end + end end diff --git a/test/migration_module_attributes_no_sandbox_test.exs b/test/migration_module_attributes_no_sandbox_test.exs new file mode 100644 index 00000000..3451df98 --- /dev/null +++ b/test/migration_module_attributes_no_sandbox_test.exs @@ -0,0 +1,166 @@ +# SPDX-FileCopyrightText: 2019 ash_postgres contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshPostgres.MigrationModuleAttributesNoSandboxTest do + use AshPostgres.RepoNoSandboxCase, async: false + @moduletag :migration + + import ExUnit.CaptureLog + + setup do + timestamp = DateTime.utc_now() |> DateTime.to_unix(:microsecond) + unique_id = System.unique_integer([:positive]) + tenant_name = "test_no_sandbox_tenant_#{timestamp}_#{unique_id}" + + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + "CREATE SCHEMA IF NOT EXISTS \"#{tenant_name}\"", + [] + ) + + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + "CREATE TABLE \"#{tenant_name}\".posts (id serial PRIMARY KEY, title text)", + [] + ) + + on_exit(fn -> + Ecto.Adapters.SQL.query!(AshPostgres.TestRepo, "DROP SCHEMA \"#{tenant_name}\" CASCADE", []) + end) + + %{tenant_name: tenant_name} + end + + describe "migration attributes without sandbox" do + test "tenant migration with @disable_ddl_transaction can create concurrent index", %{ + tenant_name: tenant_name + } do + migration_content = """ + defmodule TestConcurrentIndexMigrationNoSandbox do + use Ecto.Migration + @disable_ddl_transaction true + @disable_migration_lock true + + def up do + create index(:posts, [:title], concurrently: true) + end + + def down do + drop index(:posts, [:title]) + end + end + """ + + IO.puts( + "You should not not see a warning in this test about missing @disable_ddl_transaction" + ) + + migration_file = + create_test_migration("test_concurrent_index_migration_no_sandbox.exs", migration_content) + + result = + capture_log(fn -> + AshPostgres.MultiTenancy.migrate_tenant( + tenant_name, + AshPostgres.TestRepo, + Path.dirname(migration_file) + ) + end) + + assert result =~ "== Migrated" + + index_result = + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + """ + SELECT indexname FROM pg_indexes + WHERE schemaname = '#{tenant_name}' + AND tablename = 'posts' + AND indexname LIKE '%title%' + """, + [] + ) + + assert length(index_result.rows) > 0 + + cleanup_migration_files(migration_file) + end + + test "tenant migration without @disable_ddl_transaction gives warnings", %{ + tenant_name: tenant_name + } do + migration_content = """ + defmodule TestConcurrentIndexMigrationWithoutDisableNoSandbox do + use Ecto.Migration + + def up do + create index(:posts, [:title], concurrently: true) + end + + def down do + drop index(:posts, [:title]) + end + end + """ + + IO.puts("You should see a warning in this test about missing @disable_ddl_transaction") + + migration_file = + create_test_migration( + "test_concurrent_index_migration_without_disable_no_sandbox.exs", + migration_content + ) + + result = + capture_log(fn -> + AshPostgres.MultiTenancy.migrate_tenant( + tenant_name, + AshPostgres.TestRepo, + Path.dirname(migration_file) + ) + end) + + # The warnings are printed to the console (visible in test output above) + # We can see them in the test output, but they're not captured by capture_log + assert result =~ "== Migrated" + + index_result = + Ecto.Adapters.SQL.query!( + AshPostgres.TestRepo, + """ + SELECT indexname FROM pg_indexes + WHERE schemaname = '#{tenant_name}' + AND tablename = 'posts' + AND indexname LIKE '%title%' + """, + [] + ) + + assert length(index_result.rows) > 0 + + cleanup_migration_files(migration_file) + end + end + + defp create_test_migration(filename, content) do + timestamp = DateTime.utc_now() |> DateTime.to_unix(:microsecond) |> Integer.to_string() + unique_id = System.unique_integer([:positive]) + + test_migrations_dir = + Path.join(System.tmp_dir!(), "ash_postgres_test_migrations_#{timestamp}_#{unique_id}") + + File.mkdir_p!(test_migrations_dir) + + migration_filename = "#{timestamp}_#{filename}" + migration_file = Path.join(test_migrations_dir, migration_filename) + File.write!(migration_file, content) + + migration_file + end + + defp cleanup_migration_files(migration_file) do + migration_dir = Path.dirname(migration_file) + File.rm_rf(migration_dir) + end +end diff --git a/test/support/repo_no_sandbox_case.ex b/test/support/repo_no_sandbox_case.ex new file mode 100644 index 00000000..c0366a25 --- /dev/null +++ b/test/support/repo_no_sandbox_case.ex @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: 2019 ash_postgres contributors +# +# SPDX-License-Identifier: MIT + +defmodule AshPostgres.RepoNoSandboxCase do + @moduledoc """ + Test case for testing database operations without sandbox transaction wrapping. + + This is useful for testing operations that cannot run inside transactions, + such as concurrent index creation with @disable_ddl_transaction. + """ + use ExUnit.CaseTemplate + + using do + quote do + alias AshPostgres.TestRepo + + import Ecto + import Ecto.Query + import AshPostgres.RepoNoSandboxCase + + # and any other stuff + end + end + + setup _tags do + # No sandbox setup - just ensure the repo is available + # This allows testing operations that cannot run in transactions + :ok + end +end