Skip to content
13 changes: 12 additions & 1 deletion api/src/org/labkey/api/data/SimpleFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -664,11 +664,19 @@ public static DatabaseIdentifier getAliasForColumnFilter(SqlDialect dialect, Col

public static class InClause extends MultiValuedFilterClause
{
private InClauseGenerator _tempTableGenerator = null;

public InClause(FieldKey fieldKey, Collection<?> params)
{
this(fieldKey, params, false, false);
}

public InClause(FieldKey fieldKey, Collection<?> params, InClauseGenerator tempTableGenerator)
{
this(fieldKey, params, false, false);
_tempTableGenerator = tempTableGenerator;
}

public InClause(FieldKey fieldKey, Collection<?> params, boolean urlClause)
{
this(fieldKey, params, urlClause, false);
Expand Down Expand Up @@ -837,7 +845,10 @@ public SQLFragment toSQLFragment(Map<FieldKey, ? extends ColumnInfo> columnMap,
in.appendIdentifier(alias);

// Dialect may want to generate database-specific SQL, especially for very large IN clauses
dialect.appendInClauseSql(in, convertedParams);
if (null == _tempTableGenerator)
dialect.appendInClauseSql(in, convertedParams);
else
dialect.appendInClauseSqlWithCustomInClauseGenerator(in, convertedParams, _tempTableGenerator);

if (isIncludeNull())
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,11 +292,11 @@ public String addReselect(SQLFragment sql, ColumnInfo column, @Nullable String p
@Override
public SQLFragment appendInClauseSql(SQLFragment sql, @NotNull Collection<?> params)
{
return appendInClauseSql(sql, params, _tempTableInClauseGenerator);
return appendInClauseSqlWithCustomInClauseGenerator(sql, params, _tempTableInClauseGenerator);
}

@Override
public SQLFragment appendInClauseSql(SQLFragment sql, @NotNull Collection<?> params, InClauseGenerator tempTableGenerator)
public SQLFragment appendInClauseSqlWithCustomInClauseGenerator(SQLFragment sql, @NotNull Collection<?> params, InClauseGenerator tempTableGenerator)
{
if (params.size() >= TEMPTABLE_GENERATOR_MINSIZE)
{
Expand Down
6 changes: 3 additions & 3 deletions api/src/org/labkey/api/data/dialect/SqlDialect.java
Original file line number Diff line number Diff line change
Expand Up @@ -530,11 +530,11 @@ protected Set<String> getJdbcKeywords(SqlExecutor executor) throws SQLException,
// Most callers should use this method
public SQLFragment appendInClauseSql(SQLFragment sql, @NotNull Collection<?> params)
{
return appendInClauseSql(sql, params, null);
return appendInClauseSqlWithCustomInClauseGenerator(sql, params, null);
}

// Use in cases where the default temp schema won't do, e.g., you need to apply a large IN clause in an external data source
public SQLFragment appendInClauseSql(SQLFragment sql, @NotNull Collection<?> params, InClauseGenerator tempTableGenerator)
// Use only in cases where the default temp-table generator won't do, e.g., you need to apply a large IN clause in an external data source
public SQLFragment appendInClauseSqlWithCustomInClauseGenerator(SQLFragment sql, @NotNull Collection<?> params, InClauseGenerator tempTableGenerator)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tempTableGenerator not used?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the default case, no... we don't create temp tables for every dialect (our support for some databases is read-only, e.g.). Providing a custom generator is implemented only for PostgreSQL because that's all I needed. But I certainly can (and will) clean this up in a follow-on PR.

{
return DEFAULT_GENERATOR.appendInClauseSql(sql, params);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ default void beforeMigration(){}
DbScope getTargetScope();
@NotNull Set<String> getSkipSchemas();
Predicate<String> getColumnNameFilter();
@Nullable TableSelector getTableSelector(DbSchemaType schemaType, TableInfo sourceTable, TableInfo targetTable, Set<String> selectColumnNames, MigrationSchemaHandler schemaHandler);
@Nullable TableSelector getTableSelector(DbSchemaType schemaType, TableInfo sourceTable, TableInfo targetTable, Set<String> selectColumnNames, MigrationSchemaHandler schemaHandler, @Nullable MigrationTableHandler tableHandler);
default void copyAttachments(DbSchema sourceSchema, DbSchema targetSchema, MigrationSchemaHandler schemaHandler){}
default void afterMigration(){}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ default void migrate(DatabaseMigrationConfiguration configuration)

// By default, no-op implementations
default void registerSchemaHandler(MigrationSchemaHandler schemaHandler) {}
default void registerTableHandler(MigrationTableHandler tableHandler) {}
default void registerMigrationFilter(MigrationFilter filter) {}

default @Nullable MigrationFilter getMigrationFilter(String propertyName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Predicate<String> getColumnNameFilter()
}

@Override
public TableSelector getTableSelector(DbSchemaType schemaType, TableInfo sourceTable, TableInfo targetTable, Set<String> selectColumnNames, MigrationSchemaHandler schemaHandler)
public TableSelector getTableSelector(DbSchemaType schemaType, TableInfo sourceTable, TableInfo targetTable, Set<String> selectColumnNames, MigrationSchemaHandler schemaHandler, @Nullable MigrationTableHandler tableHandler)
{
return null;
}
Expand Down
43 changes: 32 additions & 11 deletions api/src/org/labkey/api/migration/DefaultMigrationSchemaHandler.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.labkey.api.migration;

import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.labkey.api.attachments.AttachmentService;
Expand Down Expand Up @@ -27,8 +28,11 @@
import org.labkey.api.query.FieldKey;
import org.labkey.api.query.SchemaKey;
import org.labkey.api.query.TableSorter;
import org.labkey.api.util.ConfigurationException;
import org.labkey.api.util.GUID;
import org.labkey.api.util.JobRunner;
import org.labkey.api.util.StringUtilsLabKey;
import org.labkey.api.util.logging.LogHelper;

import java.util.ArrayList;
import java.util.Arrays;
Expand All @@ -38,10 +42,13 @@
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

public class DefaultMigrationSchemaHandler implements MigrationSchemaHandler
{
private static final Logger LOG = LogHelper.getLogger(DefaultMigrationSchemaHandler.class, "Migration shutdown status");

private final DbSchema _schema;

public DefaultMigrationSchemaHandler(DbSchema schema)
Expand Down Expand Up @@ -77,7 +84,7 @@ public List<TableInfo> getTablesToCopy()

if (!allTables.isEmpty())
{
DatabaseMigrationService.LOG.info("These tables were removed by TableSorter: {}", allTables);
LOG.info("These tables were removed by TableSorter: {}", allTables);
}

return sortedTables.stream()
Expand Down Expand Up @@ -250,12 +257,13 @@ public void copyAttachments(DatabaseMigrationConfiguration configuration, DbSche
Collection<String> entityIds = new SqlSelector(targetSchema, sql).getCollection(String.class);
SQLFragment selectParents = new SQLFragment("Parent");
// This query against the source database is likely to contain a large IN clause, so use an alternative InClauseGenerator
sourceSchema.getSqlDialect().appendInClauseSql(selectParents, entityIds, getTempTableInClauseGenerator(sourceSchema.getScope()));
sourceSchema.getSqlDialect().appendInClauseSqlWithCustomInClauseGenerator(selectParents, entityIds, getTempTableInClauseGenerator(sourceSchema.getScope()));
copyAttachments(configuration, sourceSchema, new SQLClause(selectParents), type);
}

// TODO: fail if type.getSelectParentEntityIdsSql() returns null?
// TODO: throw if some registered AttachmentType is not seen
else
{
throw new ConfigurationException("AttachmentType \"" + type.getUniqueName() + "\" is not configured to find parent EntityIds!");
}
});
}

Expand All @@ -267,33 +275,46 @@ protected InClauseGenerator getTempTableInClauseGenerator(DbScope sourceScope)
}

private static final Set<AttachmentType> SEEN = new HashSet<>();
private static final JobRunner ATTACHMENT_JOB_RUNNER = new JobRunner("Attachment JobRunner", 1);

// Copy all core.Documents rows that match the provided filter clause
protected void copyAttachments(DatabaseMigrationConfiguration configuration, DbSchema sourceSchema, FilterClause filterClause, AttachmentType... type)
protected final void copyAttachments(DatabaseMigrationConfiguration configuration, DbSchema sourceSchema, FilterClause filterClause, AttachmentType... type)
{
SEEN.addAll(Arrays.asList(type));
String additionalMessage = " associated with " + Arrays.stream(type).map(t -> t.getClass().getSimpleName()).collect(Collectors.joining(", "));
TableInfo sourceDocumentsTable = sourceSchema.getScope().getSchema("core", DbSchemaType.Migration).getTable("Documents");
TableInfo targetDocumentsTable = CoreSchema.getInstance().getTableInfoDocuments();
DatabaseMigrationService.get().copySourceTableToTargetTable(configuration, sourceDocumentsTable, targetDocumentsTable, DbSchemaType.Module, false, additionalMessage, new DefaultMigrationSchemaHandler(CoreSchema.getInstance().getSchema())

// Queue up the core.Documents transfers and let them run in the background
ATTACHMENT_JOB_RUNNER.execute(() -> DatabaseMigrationService.get().copySourceTableToTargetTable(configuration, sourceDocumentsTable, targetDocumentsTable, DbSchemaType.Module, false, additionalMessage, new DefaultMigrationSchemaHandler(CoreSchema.getInstance().getSchema())
{
@Override
public FilterClause getTableFilterClause(TableInfo sourceTable, Set<GUID> containers)
{
return filterClause;
}
});
}));
}

public static void logUnseenAttachmentTypes()
// Global (not schema- or configuration-specific) cleanup
public static void afterMigration() throws InterruptedException
{
// Report any unseen attachment types
Set<AttachmentType> unseen = new HashSet<>(AttachmentService.get().getAttachmentTypes());
unseen.removeAll(SEEN);

if (unseen.isEmpty())
DatabaseMigrationService.LOG.info("All AttachmentTypes have been seen");
LOG.info("All AttachmentTypes have been seen");
else
throw new ConfigurationException("These AttachmentTypes have not been seen: " + unseen.stream().map(type -> type.getClass().getSimpleName()).collect(Collectors.joining(", ")));

// Shut down the attachment JobRunner
LOG.info("Waiting for core.Documents background transfer to complete");
ATTACHMENT_JOB_RUNNER.shutdown();
if (ATTACHMENT_JOB_RUNNER.awaitTermination(1, TimeUnit.HOURS))
LOG.info("core.Documents background transfer is complete");
else
DatabaseMigrationService.LOG.info("These AttachmentTypes have not been seen: {}", unseen.stream().map(type -> type.getClass().getSimpleName()).collect(Collectors.joining(", ")));
LOG.error("core.Documents background transfer did not complete after one hour! Giving up.");
}

@Override
Expand Down
18 changes: 18 additions & 0 deletions api/src/org/labkey/api/migration/MigrationTableHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package org.labkey.api.migration;

import org.labkey.api.data.SimpleFilter;
import org.labkey.api.data.TableInfo;
import org.labkey.api.util.GUID;

import java.util.Set;

/**
* Rarely needed, this interface lets a module filter the rows of another module's table. The specific use case: LabBook
* needs to filter the compliance.SignedSnapshots table of snapshots associated with Notebooks that are excluded by a
* NotebookFilter.
*/
public interface MigrationTableHandler
{
TableInfo getTableInfo();
void adjustFilter(TableInfo sourceTable, SimpleFilter filter, Set<GUID> containers);
}
26 changes: 14 additions & 12 deletions api/src/org/labkey/api/util/JobRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.labkey.api.data.DbScope;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -51,7 +53,7 @@ public class JobRunner implements Executor
private static final JobRunner _defaultJobRunner = new JobRunner("Default", 1);

private final ScheduledThreadPoolExecutor _executor;
private final HashMap<Future, Job> _jobs = new HashMap<>();
private final Map<Future<?>, Job> _jobs = new HashMap<>();


public JobRunner(String name, int max)
Expand All @@ -77,11 +79,6 @@ public void shutdownPre()
{
_executor.shutdown();
}

@Override
public void shutdownStarted()
{
}
});
}

Expand Down Expand Up @@ -111,11 +108,16 @@ public void shutdown()
_executor.shutdown();
}

public boolean awaitTermination(long timeout, @NotNull TimeUnit unit) throws InterruptedException
{
return _executor.awaitTermination(timeout, unit);
}

/**
* This will schedule the runnable to execute immediately, with no delay
*/
@Override
public void execute(Runnable command)
public void execute(@NotNull Runnable command)
{
execute(command, 0);
}
Expand All @@ -132,7 +134,7 @@ public void execute(Runnable command, long delay)
{
synchronized (_jobs)
{
Future task = _executor.schedule(command, delay, TimeUnit.MILLISECONDS);
Future<?> task = _executor.schedule(command, delay, TimeUnit.MILLISECONDS);
if (command instanceof Job job)
{
job._task = task;
Expand All @@ -141,7 +143,7 @@ public void execute(Runnable command, long delay)
}
}

public Future submit(Runnable run)
public Future<?> submit(Runnable run)
{
if (run instanceof Job)
{
Expand Down Expand Up @@ -221,13 +223,13 @@ protected void afterExecute(Runnable r, Throwable t)
}
else
{
if (r instanceof Future)
if (r instanceof Future<?> f)
{
if (null == t)
{
try
{
((Future)r).get();
f.get();
}
catch (ExecutionException x)
{
Expand Down Expand Up @@ -277,7 +279,7 @@ static class JobThreadFactory implements ThreadFactory
}

@Override
public Thread newThread(Runnable r)
public Thread newThread(@NotNull Runnable r)
{
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
if (t.isDaemon())
Expand Down
Loading
Loading