diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala index e50c7f24..afd073b7 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/common/SoQLCommon.scala @@ -16,7 +16,7 @@ import com.socrata.soql.types._ import com.socrata.soql.types.obfuscation.{CryptProvider, Quadifier} import java.io.{File, OutputStream, Reader} import java.security.SecureRandom -import java.sql.Connection +import java.sql.{Connection, SQLException} import java.util.concurrent.ExecutorService import javax.sql.DataSource @@ -50,6 +50,10 @@ object SoQLSystemColumns { sc => } } +object SoQLCommon { + val log = org.slf4j.LoggerFactory.getLogger(classOf[SoQLCommon]) +} + class SoQLCommon(dataSource: DataSource, copyInProvider: (Connection, String, OutputStream => Unit) => Long, executorService: ExecutorService, @@ -64,6 +68,8 @@ class SoQLCommon(dataSource: DataSource, //tableCleanupDelay: FiniteDuration, cache: Cache) { common => + import SoQLCommon._ + type CT = SoQLType type CV = SoQLValue @@ -115,11 +121,31 @@ class SoQLCommon(dataSource: DataSource, def isSystemColumnId(name: UserColumnId): Boolean = SoQLSystemColumns.isSystemColumnId(name) + private class UnlockAll(conn: Connection) extends AutoCloseable { + override def close() { + try { + conn.setAutoCommit(true) + using(conn.prepareStatement("select pg_advisory_unlock_all()")) { stmt => + stmt.execute() + } + } catch { + case e: SQLException => + log.warn("Exception while doing the advisory unlock - this probably means the connection was lost", e) + throw e + } + } + } + val universe: Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] = new Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] { def run[B](f: PostgresUniverse[CT, CV] with SchemaFinderProvider => B): B = { - val conn = dataSource.getConnection() - try { + using(new ResourceScope) { rs => + val conn = rs.open(dataSource.getConnection()); conn.setAutoCommit(false) + using(conn.prepareStatement("select pg_advisory_unlock_all()")) { stmt => + stmt.execute() + } + conn.commit(); + rs.open(new UnlockAll(conn)) val u = new PostgresUniverse(conn, PostgresUniverseCommon) with SchemaFinderProvider { lazy val cache: Cache = common.cache lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache) @@ -131,8 +157,6 @@ class SoQLCommon(dataSource: DataSource, } finally { u.rollback() } - } finally { - conn.close() } } } diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala index 1b38d4b2..e0fa5847 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/secondary/PlaybackToSecondary.scala @@ -264,7 +264,7 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV], } } } - updateSecondaryMap(dataVersion) + updateSecondaryMap(datasetInfo, dataVersion) dataVersion += 1 } } @@ -755,7 +755,7 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV], retrying[Unit]({ mostRecentlyUpdatedCopyInfo.foreach { case mostRecent => timingReport("resync-update-secondary-map", "dataset" -> datasetId) { - updateSecondaryMap(mostRecent.dataVersion) + updateSecondaryMap(mostRecent.datasetInfo, mostRecent.dataVersion) } } }, ignoreSerializationFailure) @@ -829,7 +829,7 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV], } } - def updateSecondaryMap(newLastDataVersion: Long): Unit = { + def updateSecondaryMap(datasetInfo: metadata.DatasetInfo, newLastDataVersion: Long): Unit = { // We want to end the current transaction here. We don't want to be holding share locks on data-tables like log // tables while updating a row on the secondary_manifest. This is o avoid deadlocks when data-coordinator also has // locks out on the data-tables and is also updating the same row on the secondary_manifest. @@ -837,6 +837,7 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV], // The activity in the current transaction (before committing) should all // be _reads_ from metadata tables and the dataset's log table. if(!u.isAutoCommit) u.commit() + u.datasetMapReader.replicationLock(datasetInfo, shared = true) u.secondaryManifest.completedReplicationTo(secondary.storeId, claimantId, datasetId, diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/metadata/DatasetMap.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/metadata/DatasetMap.scala index 26993b43..c7798504 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/metadata/DatasetMap.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/metadata/DatasetMap.scala @@ -5,6 +5,11 @@ import com.socrata.soql.environment.ColumnName import scala.concurrent.duration.Duration trait DatasetMapBase[CT] extends `-impl`.BaseDatasetMapReader[CT] { + /** Advisory: "I'm going to be mutating this dataset, inform the underlying database". + * This method exists to (hopefully) prevent deadlock errors by acquiring secondary_manifest + * locks early in a transaction, right after loading the dataset. + */ + def replicationLock(datasetInfo: DatasetInfo, shared: Boolean): Unit } trait DatasetMapReader[CT] extends DatasetMapBase[CT] { @@ -32,12 +37,6 @@ trait DatasetMapWriter[CT] extends DatasetMapBase[CT] with `-impl`.BaseDatasetMa * @throws DatasetIdInUseByWriterException if some other writer has been used to look up this dataset. */ def datasetInfo(datasetId: DatasetId, timeout: Duration, semiExclusive: Boolean = false): Option[DatasetInfo] - /** Advisory: "I'm going to be mutating this dataset, inform the underlying database". - * This method exists to (hopefully) prevent deadlock errors by acquiring secondary_manifest - * locks early in a transaction, right after loading the dataset. - */ - def replicationLock(datasetInfo: DatasetInfo): Unit - /** Creates a new dataset in the truthstore. * @note Does not actually create any tables; this just updates the bookkeeping. * @return A `CopyInfo` that refers to an unpublished copy. */ diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/metadata/sql/PostgresDatasetMap.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/metadata/sql/PostgresDatasetMap.scala index c82f877f..2e3ed4e9 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/metadata/sql/PostgresDatasetMap.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/metadata/sql/PostgresDatasetMap.scala @@ -35,6 +35,30 @@ trait BasePostgresDatasetMapReader[CT] extends `-impl`.BaseDatasetMapReader[CT] private def toDateTime(time: Timestamp): DateTime = new DateTime(time.getTime) + // we'll use the top 2 bits for four different types of locks, of + // which there are presently only two defined. This _does_ mean + // dataset IDs are now 62-bit numbers instead of 64-bit numbers... + protected def datasetLockId(datasetId: DatasetId): Long = + datasetId.underlying // 00 + protected def replicationLockId(datasetId: DatasetId): Long = + datasetId.underlying | (1L << 62) // 01 + protected def lockTypeThreeId(datasetId: DatasetId): Long = + datasetId.underlying | (2L << 62) // 10 + protected def lockTypeFourId(datasetId: DatasetId): Long = + datasetId.underlying | (3L << 62) // 11 + + def replicationLock(datasetInfo: DatasetInfo, shared: Boolean) { + for { + stmt <- managed(conn.prepareStatement("select pg_advisory_lock(?)")) + .and(_.setLong(1, replicationLockId(datasetInfo.systemId))) + rs <- managed(stmt.executeQuery()) + } { + while(rs.next()) { + // nothing, we're just reading the rows + } + } + } + // The time the current transaction was started. def currentTime(): DateTime = { using(conn.prepareStatement("SELECT CURRENT_TIMESTAMP")) { stmt=> @@ -1400,10 +1424,13 @@ class PostgresDatasetMapWriter[CT](val conn: Connection, tns: TypeNamespace[CT], // Can't set parameters' values via prepared statement placeholders def setTimeout(timeoutMs: Int) = s"SET LOCAL statement_timeout TO $timeoutMs" - def datasetInfoBySystemIdQuery(semiExclusive: Boolean) = - "SELECT system_id, next_counter_value, latest_data_version, locale_name, obfuscation_key, resource_name FROM dataset_map WHERE system_id = ? FOR %s".format( - if(semiExclusive) "SHARE" else "UPDATE" - ) + private def datasetLockBySystemIdQuery(semiExclusive: Boolean) = + if(semiExclusive) { + "select pg_advisory_lock_shared(?)" + } else { + "select pg_advisory_lock(?)" + } + def resetTimeout = "SET LOCAL statement_timeout TO DEFAULT" def datasetInfo(datasetId: DatasetId, timeout: Duration, semiExclusive: Boolean): Option[DatasetInfo] = { // For now we assume that we're the only one setting the statement_timeout @@ -1427,25 +1454,22 @@ class PostgresDatasetMapWriter[CT](val conn: Connection, tns: TypeNamespace[CT], log.trace("Setting statement timeout to {}ms", ms) execute(setTimeout(ms)) } - val result = - using(conn.prepareStatement(datasetInfoBySystemIdQuery(semiExclusive))) { stmt => - stmt.setDatasetId(1, datasetId) - try { - t("lookup-dataset-for-update", "dataset_id" -> datasetId)(stmt.execute()) - getInfoResult(stmt) - } catch { - case e: PSQLException if isStatementTimeout(e) => - log.trace("Get dataset _with_ waiting failed; abandoning") - conn.rollback(savepoint) - throw new DatasetIdInUseByWriterException(datasetId, e) - case e: PSQLException if isReadOnlyTransaction(e) => - log.trace("Get dataset for update failed due to read-only txn; abandoning") - conn.rollback(savepoint) - throw new DatabaseInReadOnlyMode(e) - } + using(conn.prepareStatement(datasetLockBySystemIdQuery(semiExclusive))) { stmt => + stmt.setLong(1, datasetLockId(datasetId)) + try { + t("lock-dataset-for-update", "dataset_id" -> datasetId)(stmt.execute()) + } catch { + case e: PSQLException if isStatementTimeout(e) => + log.trace("Get dataset _with_ waiting failed; abandoning") + conn.rollback(savepoint) + throw new DatasetIdInUseByWriterException(datasetId, e) + case e: PSQLException if isReadOnlyTransaction(e) => + log.trace("Get dataset for update failed due to read-only txn; abandoning") + conn.rollback(savepoint) + throw new DatabaseInReadOnlyMode(e) } + } if(timeout.isFinite) execute(resetTimeout) - result } finally { try { conn.releaseSavepoint(savepoint) @@ -1463,16 +1487,15 @@ class PostgresDatasetMapWriter[CT](val conn: Connection, tns: TypeNamespace[CT], log.warn("Unexpected exception while releasing savepoint", e) } } - } - def replicationLock(datasetInfo: DatasetInfo) { - for { - stmt <- managed(conn.prepareStatement("select store_id from secondary_manifest where dataset_system_id = ? order by store_id for update")) - .and(_.setDatasetId(1, datasetInfo.systemId)) - rs <- managed(stmt.executeQuery()) - } { - while(rs.next()) { - // nothing, we're just reading the rows + using(conn.prepareStatement("SELECT system_id, next_counter_value, latest_data_version, locale_name, obfuscation_key, resource_name FROM dataset_map WHERE system_id = ?")) { stmt => + stmt.setDatasetId(1, datasetId) + using(t("lookup-dataset-for-update", "dataset_id" -> datasetId)(stmt.executeQuery())) { rs => + if(rs.next()) { + Some(DatasetInfo(rs.getDatasetId("system_id"), rs.getLong("next_counter_value"), rs.getLong("latest_data_version"), rs.getString("locale_name"), rs.getBytes("obfuscation_key"), rs.getDatasetResourceName("resource_name"))) + } else { + None + } } } } @@ -1483,15 +1506,6 @@ class PostgresDatasetMapWriter[CT](val conn: Connection, tns: TypeNamespace[CT], stmt.execute(s) } } - - def getInfoResult(stmt: Statement) = - using(stmt.getResultSet) { rs => - if(rs.next()) { - Some(DatasetInfo(rs.getDatasetId("system_id"), rs.getLong("next_counter_value"), rs.getLong("latest_data_version"), rs.getString("locale_name"), rs.getBytes("obfuscation_key"), rs.getDatasetResourceName("resource_name"))) - } else { - None - } - } } object PostgresDatasetMapWriter { diff --git a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/sql/PostgresDatabaseMutator.scala b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/sql/PostgresDatabaseMutator.scala index c57868b7..6cc31706 100644 --- a/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/sql/PostgresDatabaseMutator.scala +++ b/coordinatorlib/src/main/scala/com/socrata/datacoordinator/truth/sql/PostgresDatabaseMutator.scala @@ -25,7 +25,7 @@ class PostgresDatabaseMutator[CT, CV](universe: Managed[Universe[CT, CV] with Lo final def loadLatestVersionOfDataset(datasetId: DatasetId, lockTimeout: Duration): Option[DatasetCopyContext[CT]] = { val map = datasetMap map.datasetInfo(datasetId, lockTimeout) map { datasetInfo => - map.replicationLock(datasetInfo) + map.replicationLock(datasetInfo, shared = false) val latest = map.latest(datasetInfo) val schema = map.schema(latest) new DatasetCopyContext(latest, schema)