diff --git a/build.sbt b/build.sbt index cf461fbd..848ba0a1 100644 --- a/build.sbt +++ b/build.sbt @@ -3,7 +3,6 @@ val Library = new { object Version { val pekko = "1.4.0" val pekkoHttp = "1.3.0" - // val pekkoHttpCirce = "1.39.2" val circe = "0.14.14" val refined = "0.11.3" val scalaTest = "3.2.12" diff --git a/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/kafka/KafkaClient.scala b/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/kafka/KafkaClient.scala index fa58ea2f..38648a86 100644 --- a/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/kafka/KafkaClient.scala +++ b/libats-messaging/src/main/scala/com/advancedtelematic/libats/messaging/kafka/KafkaClient.scala @@ -21,8 +21,6 @@ import io.circe.syntax.* import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.producer.{Callback, KafkaProducer, ProducerRecord, RecordMetadata} import org.apache.kafka.common.serialization.* - -import java.time.temporal.ChronoUnit import scala.concurrent.{ExecutionContext, Future, Promise} object KafkaClient { diff --git a/libats-slick/src/main/scala/com/advancedtelematic/libats/slick/db/SlickExtensions.scala b/libats-slick/src/main/scala/com/advancedtelematic/libats/slick/db/SlickExtensions.scala index 6146538f..9e61af00 100644 --- a/libats-slick/src/main/scala/com/advancedtelematic/libats/slick/db/SlickExtensions.scala +++ b/libats-slick/src/main/scala/com/advancedtelematic/libats/slick/db/SlickExtensions.scala @@ -1,10 +1,15 @@ /** - * Copyright: Copyright (C) 2016, ATS Advanced Telematic Systems GmbH - * License: MPL-2.0 - */ + * Copyright: Copyright (C) 2016, ATS Advanced Telematic Systems GmbH + * License: MPL-2.0 + */ package com.advancedtelematic.libats.slick.db -import java.sql.{BatchUpdateException, SQLException, SQLIntegrityConstraintViolationException, Timestamp} +import java.sql.{ + BatchUpdateException, + SQLException, + SQLIntegrityConstraintViolationException, + Timestamp +} import java.time.Instant import java.util.UUID @@ -23,37 +28,50 @@ import scala.language.implicitConversions import scala.util.{Failure, Success, Try} import PaginationResult.* -object SlickPipeToUnit { - implicit def pipeToUnit(value: DBIO[Any])(implicit ec: ExecutionContext): DBIO[Unit] = value.map(_ => ()) +object SlickDiscardOps { + implicit class SlickDiscardOps[T](value: DBIO[T]) { + // we can use `parasitic` since we just immediately ignore the argument and return Unit + implicit def discard: DBIO[Unit] = + value.map(_ => ())(ExecutionContext.parasitic) + } } object SqlExceptions { object NoReferencedRow { - def unapply(t: Throwable): Option[SQLIntegrityConstraintViolationException] = t match { - case e: SQLIntegrityConstraintViolationException if e.getErrorCode == 1452 => Some(e) - case _ => None - } + def unapply( + t: Throwable): Option[SQLIntegrityConstraintViolationException] = + t match { + case e: SQLIntegrityConstraintViolationException + if e.getErrorCode == 1452 => + Some(e) + case _ => None + } } // ER_KEY_NOT_FOUND See https://mariadb.com/kb/en/library/mariadb-error-codes/ object KeyNotFound { def unapply(arg: Throwable): Option[SQLException] = arg match { case e: SQLException if e.getErrorCode == 1032 => Some(e) - case _ => None + case _ => None } } object IntegrityConstraintViolation { - def unapply(arg: Throwable): Option[SQLIntegrityConstraintViolationException] = arg match { - case e: SQLIntegrityConstraintViolationException => - Some(e) - - case e: BatchUpdateException if e.getCause.isInstanceOf[SQLIntegrityConstraintViolationException] => - Some(e.getCause.asInstanceOf[SQLIntegrityConstraintViolationException]) - - case _ => None - } + def unapply( + arg: Throwable): Option[SQLIntegrityConstraintViolationException] = + arg match { + case e: SQLIntegrityConstraintViolationException => + Some(e) + + case e: BatchUpdateException + if e.getCause + .isInstanceOf[SQLIntegrityConstraintViolationException] => + Some( + e.getCause.asInstanceOf[SQLIntegrityConstraintViolationException]) + + case _ => None + } } } @@ -61,31 +79,39 @@ trait SlickResultExtensions { implicit class DBIOActionExtensions[T](action: DBIO[T]) { import SqlExceptions._ - def mapError(mapping: PartialFunction[Throwable, Throwable])(implicit ec: ExecutionContext): DBIO[T] = + def mapError(mapping: PartialFunction[Throwable, Throwable])( + implicit ec: ExecutionContext): DBIO[T] = recover { - case Failure(t) if mapping.isDefinedAt(t) => DBIO.failed(mapping.apply(t)) + case Failure(t) if mapping.isDefinedAt(t) => + DBIO.failed(mapping.apply(t)) } - def recover(handler: PartialFunction[Try[T], DBIO[T]])(implicit ec: ExecutionContext): DBIO[T] = - action.asTry.flatMap{ x => - handler.applyOrElse(x, (t: Try[T]) => t match { - case Success(a) => DBIO.successful(a) - case Failure(e) => DBIO.failed(e) - }) + def recover(handler: PartialFunction[Try[T], DBIO[T]])( + implicit ec: ExecutionContext): DBIO[T] = + action.asTry.flatMap { x => + handler.applyOrElse(x, + (t: Try[T]) => + t match { + case Success(a) => DBIO.successful(a) + case Failure(e) => DBIO.failed(e) + }) } - def handleForeignKeyError(error: Throwable)(implicit ec: ExecutionContext): DBIO[T] = + def handleForeignKeyError(error: Throwable)( + implicit ec: ExecutionContext): DBIO[T] = mapError { case NoReferencedRow(_) => error } - def handleIntegrityErrors(error: Throwable)(implicit ec: ExecutionContext): DBIO[T] = + def handleIntegrityErrors(error: Throwable)( + implicit ec: ExecutionContext): DBIO[T] = mapError { case IntegrityConstraintViolation(_) => error - case KeyNotFound(_) => error - } + case KeyNotFound(_) => error + } } implicit class DbioUpdateActionExtensions(action: DBIO[Int]) { - def handleSingleUpdateError(result: Throwable)(implicit ec: ExecutionContext): DBIO[Unit] = { + def handleSingleUpdateError(result: Throwable)( + implicit ec: ExecutionContext): DBIO[Unit] = { action.flatMap { case c if c == 1 => DBIO.successful(()) @@ -98,8 +124,7 @@ trait SlickResultExtensions { } implicit class DBIOOptionOps[T](io: DBIO[Option[T]]) { - def failIfNone(t: Throwable) - (implicit ec: ExecutionContext): DBIO[T] = + def failIfNone(t: Throwable)(implicit ec: ExecutionContext): DBIO[T] = io.flatMap(_.fold[DBIO[T]](DBIO.failed(t))(DBIO.successful)) } @@ -116,7 +141,7 @@ trait SlickResultExtensions { query.withFilter { (e: E) => exp match { case Some(s) if s.nonEmpty => f(e).mappedTo[String].like(s"%$s%") - case _ => true.bind + case _ => true.bind } } } @@ -124,20 +149,19 @@ trait SlickResultExtensions { implicit class DBIOSeqOps[+T](io: DBIO[Seq[T]]) { def failIfMany(implicit ec: ExecutionContext): DBIO[Option[T]] = io.flatMap { result => - if(result.size > 1) + if (result.size > 1) DBIO.failed(Errors.TooManyElements) else DBIO.successful(result.headOption) } - def failIfNotSingle(t: Throwable) - (implicit ec: ExecutionContext): DBIO[T] = + def failIfNotSingle(t: Throwable)(implicit ec: ExecutionContext): DBIO[T] = DBIOOptionOps(failIfMany).failIfNone(t) - def failIfEmpty(t: Throwable) - (implicit ec: ExecutionContext): DBIO[Seq[T]] = { + def failIfEmpty(t: Throwable)( + implicit ec: ExecutionContext): DBIO[Seq[T]] = { io.flatMap { result => - if(result.isEmpty) + if (result.isEmpty) DBIO.failed(t) else DBIO.successful(result) @@ -155,23 +179,30 @@ trait SlickPagination { .drop(offset) .take(limit) - def paginateAndSort[T](fn: E => T, offset: Long, limit: Long)(implicit ev: T => slick.lifted.Ordered): Query[E, U, Seq] = + def paginateAndSort[T](fn: E => T, offset: Long, limit: Long)( + implicit ev: T => slick.lifted.Ordered): Query[E, U, Seq] = action .sortBy(fn) .drop(offset) .take(limit) - def paginateResult(offset: Offset, limit: Limit)(implicit ec: ExecutionContext): DBIO[PaginationResult[U]] = { + def paginateResult(offset: Offset, limit: Limit)( + implicit ec: ExecutionContext): DBIO[PaginationResult[U]] = { val tot = action.length.result val pag = action.paginate(offset, limit).result - tot.zip(pag).map{ case (total, values) => PaginationResult(values, total, offset, limit) } + tot.zip(pag).map { + case (total, values) => PaginationResult(values, total, offset, limit) + } } - def paginateAndSortResult[T](fn: E => T, offset: Offset, limit: Limit) - (implicit ec: ExecutionContext, ev: T => slick.lifted.Ordered): DBIO[PaginationResult[U]] = { + def paginateAndSortResult[T](fn: E => T, offset: Offset, limit: Limit)( + implicit ec: ExecutionContext, + ev: T => slick.lifted.Ordered): DBIO[PaginationResult[U]] = { val tot = action.length.result val pag = action.paginateAndSort(fn, offset, limit).result - tot.zip(pag).map{ case (total, values) => PaginationResult(values, total, offset, limit) } + tot.zip(pag).map { + case (total, values) => PaginationResult(values, total, offset, limit) + } } } } @@ -179,30 +210,40 @@ trait SlickPagination { object SlickPagination extends SlickPagination object SlickExtensions extends SlickResultExtensions with SlickPagination { - implicit val UriColumnType: slick.jdbc.MySQLProfile.BaseColumnType[Uri] = MappedColumnType.base[Uri, String](_.toString(), Uri.apply) + implicit val UriColumnType: slick.jdbc.MySQLProfile.BaseColumnType[Uri] = + MappedColumnType.base[Uri, String](_.toString(), Uri.apply) - implicit val uuidColumnType: slick.jdbc.MySQLProfile.BaseColumnType[java.util.UUID] = MappedColumnType.base[UUID, String](_.toString(), UUID.fromString) + implicit val uuidColumnType + : slick.jdbc.MySQLProfile.BaseColumnType[java.util.UUID] = + MappedColumnType.base[UUID, String](_.toString(), UUID.fromString) - implicit val javaInstantMapping: slick.jdbc.MySQLProfile.BaseColumnType[java.time.Instant] = MappedColumnType.base[Instant, Timestamp](Timestamp.from, _.toInstant) + implicit val javaInstantMapping + : slick.jdbc.MySQLProfile.BaseColumnType[java.time.Instant] = + MappedColumnType.base[Instant, Timestamp](Timestamp.from, _.toInstant) implicit class MappedColumnExtensions(c: Rep[_]) { def mappedTo[U: TypedType] = Rep.forNode[U](c.toNode) } - implicit def uuidToJava(refined: Refined[String, Uuid]): Rep[UUID] = UUID.fromString(refined.value).bind + implicit def uuidToJava(refined: Refined[String, Uuid]): Rep[UUID] = + UUID.fromString(refined.value).bind - implicit class InsertOrUpdateWithKeyOps[Q <: AbstractTable[_]](tableQuery: TableQuery[Q])(implicit ec: ExecutionContext) { + implicit class InsertOrUpdateWithKeyOps[Q <: AbstractTable[_]]( + tableQuery: TableQuery[Q])(implicit ec: ExecutionContext) { type E = Q#TableElementType - def insertIfNotExists(element: E)(findElement: TableQuery[Q] => Query[Q, E, Seq]): DBIO[Unit] = + def insertIfNotExists(element: E)( + findElement: TableQuery[Q] => Query[Q, E, Seq]): DBIO[Unit] = findElement(tableQuery).exists.result.flatMap { - case true => DBIO.successful(()) + case true => DBIO.successful(()) case false => (tableQuery += element).map(_ => ()) } - def insertOrUpdateWithKey(element: E, primaryKeyQuery: TableQuery[Q] => Query[Q, E, Seq], onUpdate: E => E) - (implicit ec: ExecutionContext): DBIO[E] = { + def insertOrUpdateWithKey( + element: E, + primaryKeyQuery: TableQuery[Q] => Query[Q, E, Seq], + onUpdate: E => E)(implicit ec: ExecutionContext): DBIO[E] = { val findQuery = primaryKeyQuery(tableQuery) @@ -212,12 +253,13 @@ object SlickExtensions extends SlickResultExtensions with SlickPagination { } val io = findQuery.result.flatMap { res => - if(res.isEmpty) + if (res.isEmpty) (tableQuery += element).map(_ => element) - else if(res.size == 1) + else if (res.size == 1) update(res.head) else - DBIO.failed(new Exception("Too many elements found to update. primaryKeyQuery must define a unique key")) + DBIO.failed(new Exception( + "Too many elements found to update. primaryKeyQuery must define a unique key")) } io.transactionally diff --git a/libats/src/main/scala/com/advancedtelematic/libats/data/DataType.scala b/libats/src/main/scala/com/advancedtelematic/libats/data/DataType.scala index 7ad1c5d2..3cdfe4c0 100644 --- a/libats/src/main/scala/com/advancedtelematic/libats/data/DataType.scala +++ b/libats/src/main/scala/com/advancedtelematic/libats/data/DataType.scala @@ -57,6 +57,10 @@ object DataType { override def toString: String = s"urn:tdx-ota:update:$value" } + final case class OfflineInstallCorrelationId(value: UUID) extends CorrelationId { + override def toString: String = s"urn:tdx-ota:offline-install:$value" + } + type ValidLockboxHash = String Refined (HexStringSpec And Size[Equal[12]]) final case class OfflineUpdateId(name: String, version: Long, hash: ValidLockboxHash) extends CorrelationId { @@ -71,6 +75,8 @@ object DataType { private[this] val TrxUpdateIdRe = """^urn:tdx-ota:update:([0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12})$"""r + private[this] val TrxOfflineInstallIdRe = """^urn:tdx-ota:offline-install:([0-9a-fA-F]{8}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{4}\-[0-9a-fA-F]{12})$"""r + def fromString(s: String): Either[String, CorrelationId] = s match { case CorrelationIdRe("mtu", uuid) => Right(MultiTargetUpdateCorrelationId(UUID.fromString(uuid))) @@ -80,6 +86,8 @@ object DataType { Right(TargetSpecCorrelationId(UUID.fromString(uuid))) case TrxUpdateIdRe(uuid) => Right(UpdateCorrelationId(UUID.fromString(uuid))) + case TrxOfflineInstallIdRe(uuid) => + Right(OfflineInstallCorrelationId(UUID.fromString(uuid))) case TrxLockBoxIdRe(name, version, hash) => RefType.applyRef[ValidLockboxHash](hash).map { hash => OfflineUpdateId(name, version.toLong, hash) diff --git a/src/main/scala/com/advancedtelematic/libats/macros/UUIDKeyMacros.scala b/src/main/scala/com/advancedtelematic/libats/macros/UUIDKeyMacros.scala new file mode 100644 index 00000000..e69de29b