Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import com.socrata.soql.types._
import com.socrata.soql.types.obfuscation.{CryptProvider, Quadifier}
import java.io.{File, OutputStream, Reader}
import java.security.SecureRandom
import java.sql.Connection
import java.sql.{Connection, SQLException}
import java.util.concurrent.ExecutorService
import javax.sql.DataSource

Expand Down Expand Up @@ -50,6 +50,10 @@ object SoQLSystemColumns { sc =>
}
}

object SoQLCommon {
val log = org.slf4j.LoggerFactory.getLogger(classOf[SoQLCommon])
}

class SoQLCommon(dataSource: DataSource,
copyInProvider: (Connection, String, OutputStream => Unit) => Long,
executorService: ExecutorService,
Expand All @@ -64,6 +68,8 @@ class SoQLCommon(dataSource: DataSource,
//tableCleanupDelay: FiniteDuration,
cache: Cache)
{ common =>
import SoQLCommon._

type CT = SoQLType
type CV = SoQLValue

Expand Down Expand Up @@ -115,11 +121,31 @@ class SoQLCommon(dataSource: DataSource,
def isSystemColumnId(name: UserColumnId): Boolean =
SoQLSystemColumns.isSystemColumnId(name)

private class UnlockAll(conn: Connection) extends AutoCloseable {
override def close() {
try {
conn.setAutoCommit(true)
using(conn.prepareStatement("select pg_advisory_unlock_all()")) { stmt =>
stmt.execute()
}
} catch {
case e: SQLException =>
log.warn("Exception while doing the advisory unlock - this probably means the connection was lost", e)
throw e
}
}
}

val universe: Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] = new Managed[PostgresUniverse[CT, CV] with SchemaFinderProvider] {
def run[B](f: PostgresUniverse[CT, CV] with SchemaFinderProvider => B): B = {
val conn = dataSource.getConnection()
try {
using(new ResourceScope) { rs =>
val conn = rs.open(dataSource.getConnection());
conn.setAutoCommit(false)
using(conn.prepareStatement("select pg_advisory_unlock_all()")) { stmt =>
stmt.execute()
}
conn.commit();
rs.open(new UnlockAll(conn))
val u = new PostgresUniverse(conn, PostgresUniverseCommon) with SchemaFinderProvider {
lazy val cache: Cache = common.cache
lazy val schemaFinder = new SchemaFinder(common.typeContext.typeNamespace.userTypeForType, cache)
Expand All @@ -131,8 +157,6 @@ class SoQLCommon(dataSource: DataSource,
} finally {
u.rollback()
}
} finally {
conn.close()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV],
}
}
}
updateSecondaryMap(dataVersion)
updateSecondaryMap(datasetInfo, dataVersion)
dataVersion += 1
}
}
Expand Down Expand Up @@ -755,7 +755,7 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV],
retrying[Unit]({
mostRecentlyUpdatedCopyInfo.foreach { case mostRecent =>
timingReport("resync-update-secondary-map", "dataset" -> datasetId) {
updateSecondaryMap(mostRecent.dataVersion)
updateSecondaryMap(mostRecent.datasetInfo, mostRecent.dataVersion)
}
}
}, ignoreSerializationFailure)
Expand Down Expand Up @@ -829,14 +829,15 @@ class PlaybackToSecondary[CT, CV](u: PlaybackToSecondary.SuperUniverse[CT, CV],
}
}

def updateSecondaryMap(newLastDataVersion: Long): Unit = {
def updateSecondaryMap(datasetInfo: metadata.DatasetInfo, newLastDataVersion: Long): Unit = {
// We want to end the current transaction here. We don't want to be holding share locks on data-tables like log
// tables while updating a row on the secondary_manifest. This is o avoid deadlocks when data-coordinator also has
// locks out on the data-tables and is also updating the same row on the secondary_manifest.
//
// The activity in the current transaction (before committing) should all
// be _reads_ from metadata tables and the dataset's log table.
if(!u.isAutoCommit) u.commit()
u.datasetMapReader.replicationLock(datasetInfo, shared = true)
u.secondaryManifest.completedReplicationTo(secondary.storeId,
claimantId,
datasetId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,11 @@ import com.socrata.soql.environment.ColumnName
import scala.concurrent.duration.Duration

trait DatasetMapBase[CT] extends `-impl`.BaseDatasetMapReader[CT] {
/** Advisory: "I'm going to be mutating this dataset, inform the underlying database".
* This method exists to (hopefully) prevent deadlock errors by acquiring secondary_manifest
* locks early in a transaction, right after loading the dataset.
*/
def replicationLock(datasetInfo: DatasetInfo, shared: Boolean): Unit
}

trait DatasetMapReader[CT] extends DatasetMapBase[CT] {
Expand Down Expand Up @@ -32,12 +37,6 @@ trait DatasetMapWriter[CT] extends DatasetMapBase[CT] with `-impl`.BaseDatasetMa
* @throws DatasetIdInUseByWriterException if some other writer has been used to look up this dataset. */
def datasetInfo(datasetId: DatasetId, timeout: Duration, semiExclusive: Boolean = false): Option[DatasetInfo]

/** Advisory: "I'm going to be mutating this dataset, inform the underlying database".
* This method exists to (hopefully) prevent deadlock errors by acquiring secondary_manifest
* locks early in a transaction, right after loading the dataset.
*/
def replicationLock(datasetInfo: DatasetInfo): Unit

/** Creates a new dataset in the truthstore.
* @note Does not actually create any tables; this just updates the bookkeeping.
* @return A `CopyInfo` that refers to an unpublished copy. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,30 @@ trait BasePostgresDatasetMapReader[CT] extends `-impl`.BaseDatasetMapReader[CT]

private def toDateTime(time: Timestamp): DateTime = new DateTime(time.getTime)

// we'll use the top 2 bits for four different types of locks, of
// which there are presently only two defined. This _does_ mean
// dataset IDs are now 62-bit numbers instead of 64-bit numbers...
protected def datasetLockId(datasetId: DatasetId): Long =
datasetId.underlying // 00
protected def replicationLockId(datasetId: DatasetId): Long =
datasetId.underlying | (1L << 62) // 01
protected def lockTypeThreeId(datasetId: DatasetId): Long =
datasetId.underlying | (2L << 62) // 10
protected def lockTypeFourId(datasetId: DatasetId): Long =
datasetId.underlying | (3L << 62) // 11

def replicationLock(datasetInfo: DatasetInfo, shared: Boolean) {
for {
stmt <- managed(conn.prepareStatement("select pg_advisory_lock(?)"))
.and(_.setLong(1, replicationLockId(datasetInfo.systemId)))
rs <- managed(stmt.executeQuery())
} {
while(rs.next()) {
// nothing, we're just reading the rows
}
}
}

// The time the current transaction was started.
def currentTime(): DateTime = {
using(conn.prepareStatement("SELECT CURRENT_TIMESTAMP")) { stmt=>
Expand Down Expand Up @@ -1400,10 +1424,13 @@ class PostgresDatasetMapWriter[CT](val conn: Connection, tns: TypeNamespace[CT],
// Can't set parameters' values via prepared statement placeholders
def setTimeout(timeoutMs: Int) =
s"SET LOCAL statement_timeout TO $timeoutMs"
def datasetInfoBySystemIdQuery(semiExclusive: Boolean) =
"SELECT system_id, next_counter_value, latest_data_version, locale_name, obfuscation_key, resource_name FROM dataset_map WHERE system_id = ? FOR %s".format(
if(semiExclusive) "SHARE" else "UPDATE"
)
private def datasetLockBySystemIdQuery(semiExclusive: Boolean) =
if(semiExclusive) {
"select pg_advisory_lock_shared(?)"
} else {
"select pg_advisory_lock(?)"
}

def resetTimeout = "SET LOCAL statement_timeout TO DEFAULT"
def datasetInfo(datasetId: DatasetId, timeout: Duration, semiExclusive: Boolean): Option[DatasetInfo] = {
// For now we assume that we're the only one setting the statement_timeout
Expand All @@ -1427,25 +1454,22 @@ class PostgresDatasetMapWriter[CT](val conn: Connection, tns: TypeNamespace[CT],
log.trace("Setting statement timeout to {}ms", ms)
execute(setTimeout(ms))
}
val result =
using(conn.prepareStatement(datasetInfoBySystemIdQuery(semiExclusive))) { stmt =>
stmt.setDatasetId(1, datasetId)
try {
t("lookup-dataset-for-update", "dataset_id" -> datasetId)(stmt.execute())
getInfoResult(stmt)
} catch {
case e: PSQLException if isStatementTimeout(e) =>
log.trace("Get dataset _with_ waiting failed; abandoning")
conn.rollback(savepoint)
throw new DatasetIdInUseByWriterException(datasetId, e)
case e: PSQLException if isReadOnlyTransaction(e) =>
log.trace("Get dataset for update failed due to read-only txn; abandoning")
conn.rollback(savepoint)
throw new DatabaseInReadOnlyMode(e)
}
using(conn.prepareStatement(datasetLockBySystemIdQuery(semiExclusive))) { stmt =>
stmt.setLong(1, datasetLockId(datasetId))
try {
t("lock-dataset-for-update", "dataset_id" -> datasetId)(stmt.execute())
} catch {
case e: PSQLException if isStatementTimeout(e) =>
log.trace("Get dataset _with_ waiting failed; abandoning")
conn.rollback(savepoint)
throw new DatasetIdInUseByWriterException(datasetId, e)
case e: PSQLException if isReadOnlyTransaction(e) =>
log.trace("Get dataset for update failed due to read-only txn; abandoning")
conn.rollback(savepoint)
throw new DatabaseInReadOnlyMode(e)
}
}
if(timeout.isFinite) execute(resetTimeout)
result
} finally {
try {
conn.releaseSavepoint(savepoint)
Expand All @@ -1463,16 +1487,15 @@ class PostgresDatasetMapWriter[CT](val conn: Connection, tns: TypeNamespace[CT],
log.warn("Unexpected exception while releasing savepoint", e)
}
}
}

def replicationLock(datasetInfo: DatasetInfo) {
for {
stmt <- managed(conn.prepareStatement("select store_id from secondary_manifest where dataset_system_id = ? order by store_id for update"))
.and(_.setDatasetId(1, datasetInfo.systemId))
rs <- managed(stmt.executeQuery())
} {
while(rs.next()) {
// nothing, we're just reading the rows
using(conn.prepareStatement("SELECT system_id, next_counter_value, latest_data_version, locale_name, obfuscation_key, resource_name FROM dataset_map WHERE system_id = ?")) { stmt =>
stmt.setDatasetId(1, datasetId)
using(t("lookup-dataset-for-update", "dataset_id" -> datasetId)(stmt.executeQuery())) { rs =>
if(rs.next()) {
Some(DatasetInfo(rs.getDatasetId("system_id"), rs.getLong("next_counter_value"), rs.getLong("latest_data_version"), rs.getString("locale_name"), rs.getBytes("obfuscation_key"), rs.getDatasetResourceName("resource_name")))
} else {
None
}
}
}
}
Expand All @@ -1483,15 +1506,6 @@ class PostgresDatasetMapWriter[CT](val conn: Connection, tns: TypeNamespace[CT],
stmt.execute(s)
}
}

def getInfoResult(stmt: Statement) =
using(stmt.getResultSet) { rs =>
if(rs.next()) {
Some(DatasetInfo(rs.getDatasetId("system_id"), rs.getLong("next_counter_value"), rs.getLong("latest_data_version"), rs.getString("locale_name"), rs.getBytes("obfuscation_key"), rs.getDatasetResourceName("resource_name")))
} else {
None
}
}
}

object PostgresDatasetMapWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class PostgresDatabaseMutator[CT, CV](universe: Managed[Universe[CT, CV] with Lo
final def loadLatestVersionOfDataset(datasetId: DatasetId, lockTimeout: Duration): Option[DatasetCopyContext[CT]] = {
val map = datasetMap
map.datasetInfo(datasetId, lockTimeout) map { datasetInfo =>
map.replicationLock(datasetInfo)
map.replicationLock(datasetInfo, shared = false)
val latest = map.latest(datasetInfo)
val schema = map.schema(latest)
new DatasetCopyContext(latest, schema)
Expand Down