Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ val Library = new {
object Version {
val pekko = "1.4.0"
val pekkoHttp = "1.3.0"
// val pekkoHttpCirce = "1.39.2"
val circe = "0.14.14"
val refined = "0.11.3"
val scalaTest = "3.2.12"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import io.circe.syntax.*
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata}
import org.apache.kafka.common.serialization.*

import java.time.temporal.ChronoUnit
import scala.concurrent.{ExecutionContext, Future, Promise}

object KafkaClient {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
/**
* Copyright: Copyright (C) 2016, ATS Advanced Telematic Systems GmbH
* License: MPL-2.0
*/
* Copyright: Copyright (C) 2016, ATS Advanced Telematic Systems GmbH
* License: MPL-2.0
*/
package com.advancedtelematic.libats.slick.db

import java.sql.{BatchUpdateException, SQLException, SQLIntegrityConstraintViolationException, Timestamp}
import java.sql.{
BatchUpdateException,
SQLException,
SQLIntegrityConstraintViolationException,
Timestamp
}
import java.time.Instant
import java.util.UUID

Expand All @@ -23,69 +28,90 @@ import scala.language.implicitConversions
import scala.util.{Failure, Success, Try}
import PaginationResult.*

object SlickPipeToUnit {
implicit def pipeToUnit(value: DBIO[Any])(implicit ec: ExecutionContext): DBIO[Unit] = value.map(_ => ())
object SlickDiscardOps {
implicit class SlickDiscardOps[T](value: DBIO[T]) {
// we can use `parasitic` since we just immediately ignore the argument and return Unit
implicit def discard: DBIO[Unit] =
value.map(_ => ())(ExecutionContext.parasitic)
}
}

object SqlExceptions {

object NoReferencedRow {
def unapply(t: Throwable): Option[SQLIntegrityConstraintViolationException] = t match {
case e: SQLIntegrityConstraintViolationException if e.getErrorCode == 1452 => Some(e)
case _ => None
}
def unapply(
t: Throwable): Option[SQLIntegrityConstraintViolationException] =
t match {
case e: SQLIntegrityConstraintViolationException
if e.getErrorCode == 1452 =>
Some(e)
case _ => None
}
}

// ER_KEY_NOT_FOUND See https://mariadb.com/kb/en/library/mariadb-error-codes/
object KeyNotFound {
def unapply(arg: Throwable): Option[SQLException] = arg match {
case e: SQLException if e.getErrorCode == 1032 => Some(e)
case _ => None
case _ => None
}
}

object IntegrityConstraintViolation {
def unapply(arg: Throwable): Option[SQLIntegrityConstraintViolationException] = arg match {
case e: SQLIntegrityConstraintViolationException =>
Some(e)

case e: BatchUpdateException if e.getCause.isInstanceOf[SQLIntegrityConstraintViolationException] =>
Some(e.getCause.asInstanceOf[SQLIntegrityConstraintViolationException])

case _ => None
}
def unapply(
arg: Throwable): Option[SQLIntegrityConstraintViolationException] =
arg match {
case e: SQLIntegrityConstraintViolationException =>
Some(e)

case e: BatchUpdateException
if e.getCause
.isInstanceOf[SQLIntegrityConstraintViolationException] =>
Some(
e.getCause.asInstanceOf[SQLIntegrityConstraintViolationException])

case _ => None
}
}
}

trait SlickResultExtensions {
implicit class DBIOActionExtensions[T](action: DBIO[T]) {
import SqlExceptions._

def mapError(mapping: PartialFunction[Throwable, Throwable])(implicit ec: ExecutionContext): DBIO[T] =
def mapError(mapping: PartialFunction[Throwable, Throwable])(
implicit ec: ExecutionContext): DBIO[T] =
recover {
case Failure(t) if mapping.isDefinedAt(t) => DBIO.failed(mapping.apply(t))
case Failure(t) if mapping.isDefinedAt(t) =>
DBIO.failed(mapping.apply(t))
}

def recover(handler: PartialFunction[Try[T], DBIO[T]])(implicit ec: ExecutionContext): DBIO[T] =
action.asTry.flatMap{ x =>
handler.applyOrElse(x, (t: Try[T]) => t match {
case Success(a) => DBIO.successful(a)
case Failure(e) => DBIO.failed(e)
})
def recover(handler: PartialFunction[Try[T], DBIO[T]])(
implicit ec: ExecutionContext): DBIO[T] =
action.asTry.flatMap { x =>
handler.applyOrElse(x,
(t: Try[T]) =>
t match {
case Success(a) => DBIO.successful(a)
case Failure(e) => DBIO.failed(e)
})
}

def handleForeignKeyError(error: Throwable)(implicit ec: ExecutionContext): DBIO[T] =
def handleForeignKeyError(error: Throwable)(
implicit ec: ExecutionContext): DBIO[T] =
mapError { case NoReferencedRow(_) => error }

def handleIntegrityErrors(error: Throwable)(implicit ec: ExecutionContext): DBIO[T] =
def handleIntegrityErrors(error: Throwable)(
implicit ec: ExecutionContext): DBIO[T] =
mapError {
case IntegrityConstraintViolation(_) => error
case KeyNotFound(_) => error
}
case KeyNotFound(_) => error
}
}

implicit class DbioUpdateActionExtensions(action: DBIO[Int]) {
def handleSingleUpdateError(result: Throwable)(implicit ec: ExecutionContext): DBIO[Unit] = {
def handleSingleUpdateError(result: Throwable)(
implicit ec: ExecutionContext): DBIO[Unit] = {
action.flatMap {
case c if c == 1 =>
DBIO.successful(())
Expand All @@ -98,8 +124,7 @@ trait SlickResultExtensions {
}

implicit class DBIOOptionOps[T](io: DBIO[Option[T]]) {
def failIfNone(t: Throwable)
(implicit ec: ExecutionContext): DBIO[T] =
def failIfNone(t: Throwable)(implicit ec: ExecutionContext): DBIO[T] =
io.flatMap(_.fold[DBIO[T]](DBIO.failed(t))(DBIO.successful))
}

Expand All @@ -116,28 +141,27 @@ trait SlickResultExtensions {
query.withFilter { (e: E) =>
exp match {
case Some(s) if s.nonEmpty => f(e).mappedTo[String].like(s"%$s%")
case _ => true.bind
case _ => true.bind
}
}
}

implicit class DBIOSeqOps[+T](io: DBIO[Seq[T]]) {
def failIfMany(implicit ec: ExecutionContext): DBIO[Option[T]] =
io.flatMap { result =>
if(result.size > 1)
if (result.size > 1)
DBIO.failed(Errors.TooManyElements)
else
DBIO.successful(result.headOption)
}

def failIfNotSingle(t: Throwable)
(implicit ec: ExecutionContext): DBIO[T] =
def failIfNotSingle(t: Throwable)(implicit ec: ExecutionContext): DBIO[T] =
DBIOOptionOps(failIfMany).failIfNone(t)

def failIfEmpty(t: Throwable)
(implicit ec: ExecutionContext): DBIO[Seq[T]] = {
def failIfEmpty(t: Throwable)(
implicit ec: ExecutionContext): DBIO[Seq[T]] = {
io.flatMap { result =>
if(result.isEmpty)
if (result.isEmpty)
DBIO.failed(t)
else
DBIO.successful(result)
Expand All @@ -155,54 +179,71 @@ trait SlickPagination {
.drop(offset)
.take(limit)

def paginateAndSort[T](fn: E => T, offset: Long, limit: Long)(implicit ev: T => slick.lifted.Ordered): Query[E, U, Seq] =
def paginateAndSort[T](fn: E => T, offset: Long, limit: Long)(
implicit ev: T => slick.lifted.Ordered): Query[E, U, Seq] =
action
.sortBy(fn)
.drop(offset)
.take(limit)

def paginateResult(offset: Offset, limit: Limit)(implicit ec: ExecutionContext): DBIO[PaginationResult[U]] = {
def paginateResult(offset: Offset, limit: Limit)(
implicit ec: ExecutionContext): DBIO[PaginationResult[U]] = {
val tot = action.length.result
val pag = action.paginate(offset, limit).result
tot.zip(pag).map{ case (total, values) => PaginationResult(values, total, offset, limit) }
tot.zip(pag).map {
case (total, values) => PaginationResult(values, total, offset, limit)
}
}

def paginateAndSortResult[T](fn: E => T, offset: Offset, limit: Limit)
(implicit ec: ExecutionContext, ev: T => slick.lifted.Ordered): DBIO[PaginationResult[U]] = {
def paginateAndSortResult[T](fn: E => T, offset: Offset, limit: Limit)(
implicit ec: ExecutionContext,
ev: T => slick.lifted.Ordered): DBIO[PaginationResult[U]] = {
val tot = action.length.result
val pag = action.paginateAndSort(fn, offset, limit).result
tot.zip(pag).map{ case (total, values) => PaginationResult(values, total, offset, limit) }
tot.zip(pag).map {
case (total, values) => PaginationResult(values, total, offset, limit)
}
}
}
}

object SlickPagination extends SlickPagination

object SlickExtensions extends SlickResultExtensions with SlickPagination {
implicit val UriColumnType: slick.jdbc.MySQLProfile.BaseColumnType[Uri] = MappedColumnType.base[Uri, String](_.toString(), Uri.apply)
implicit val UriColumnType: slick.jdbc.MySQLProfile.BaseColumnType[Uri] =
MappedColumnType.base[Uri, String](_.toString(), Uri.apply)

implicit val uuidColumnType: slick.jdbc.MySQLProfile.BaseColumnType[java.util.UUID] = MappedColumnType.base[UUID, String](_.toString(), UUID.fromString)
implicit val uuidColumnType
: slick.jdbc.MySQLProfile.BaseColumnType[java.util.UUID] =
MappedColumnType.base[UUID, String](_.toString(), UUID.fromString)

implicit val javaInstantMapping: slick.jdbc.MySQLProfile.BaseColumnType[java.time.Instant] = MappedColumnType.base[Instant, Timestamp](Timestamp.from, _.toInstant)
implicit val javaInstantMapping
: slick.jdbc.MySQLProfile.BaseColumnType[java.time.Instant] =
MappedColumnType.base[Instant, Timestamp](Timestamp.from, _.toInstant)

implicit class MappedColumnExtensions(c: Rep[_]) {
def mappedTo[U: TypedType] = Rep.forNode[U](c.toNode)
}

implicit def uuidToJava(refined: Refined[String, Uuid]): Rep[UUID] = UUID.fromString(refined.value).bind
implicit def uuidToJava(refined: Refined[String, Uuid]): Rep[UUID] =
UUID.fromString(refined.value).bind

implicit class InsertOrUpdateWithKeyOps[Q <: AbstractTable[_]](tableQuery: TableQuery[Q])(implicit ec: ExecutionContext) {
implicit class InsertOrUpdateWithKeyOps[Q <: AbstractTable[_]](
tableQuery: TableQuery[Q])(implicit ec: ExecutionContext) {

type E = Q#TableElementType

def insertIfNotExists(element: E)(findElement: TableQuery[Q] => Query[Q, E, Seq]): DBIO[Unit] =
def insertIfNotExists(element: E)(
findElement: TableQuery[Q] => Query[Q, E, Seq]): DBIO[Unit] =
findElement(tableQuery).exists.result.flatMap {
case true => DBIO.successful(())
case true => DBIO.successful(())
case false => (tableQuery += element).map(_ => ())
}

def insertOrUpdateWithKey(element: E, primaryKeyQuery: TableQuery[Q] => Query[Q, E, Seq], onUpdate: E => E)
(implicit ec: ExecutionContext): DBIO[E] = {
def insertOrUpdateWithKey(
element: E,
primaryKeyQuery: TableQuery[Q] => Query[Q, E, Seq],
onUpdate: E => E)(implicit ec: ExecutionContext): DBIO[E] = {

val findQuery = primaryKeyQuery(tableQuery)

Expand All @@ -212,12 +253,13 @@ object SlickExtensions extends SlickResultExtensions with SlickPagination {
}

val io = findQuery.result.flatMap { res =>
if(res.isEmpty)
if (res.isEmpty)
(tableQuery += element).map(_ => element)
else if(res.size == 1)
else if (res.size == 1)
update(res.head)
else
DBIO.failed(new Exception("Too many elements found to update. primaryKeyQuery must define a unique key"))
DBIO.failed(new Exception(
"Too many elements found to update. primaryKeyQuery must define a unique key"))
}

io.transactionally
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ object DataType {
override def toString: String = s"urn:tdx-ota:update:$value"
}

final case class OfflineInstallCorrelationId(value: UUID) extends CorrelationId {
override def toString: String = s"urn:tdx-ota:offline-install:$value"
}

type ValidLockboxHash = String Refined (HexStringSpec And Size[Equal[12]])

final case class OfflineUpdateId(name: String, version: Long, hash: ValidLockboxHash) extends CorrelationId {
Expand All @@ -71,6 +75,8 @@ object DataType {

private[this] val TrxUpdateIdRe = """^urn:tdx-ota:update:([0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12})$"""r

private[this] val TrxOfflineInstallIdRe = """^urn:tdx-ota:offline-install:([0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12})$"""r

def fromString(s: String): Either[String, CorrelationId] = s match {
case CorrelationIdRe("mtu", uuid) =>
Right(MultiTargetUpdateCorrelationId(UUID.fromString(uuid)))
Expand All @@ -80,6 +86,8 @@ object DataType {
Right(TargetSpecCorrelationId(UUID.fromString(uuid)))
case TrxUpdateIdRe(uuid) =>
Right(UpdateCorrelationId(UUID.fromString(uuid)))
case TrxOfflineInstallIdRe(uuid) =>
Right(OfflineInstallCorrelationId(UUID.fromString(uuid)))
case TrxLockBoxIdRe(name, version, hash) =>
RefType.applyRef[ValidLockboxHash](hash).map { hash =>
OfflineUpdateId(name, version.toLong, hash)
Expand Down
Empty file.