From 0c9aee460fa94b479556be12345cc974b0cd59e6 Mon Sep 17 00:00:00 2001 From: Rob Norris Date: Wed, 2 Jul 2025 10:38:52 -0500 Subject: [PATCH] allow fetch to fail cleanly --- .../src/main/scala/DoobieMapping.scala | 4 +- .../shared/src/main/scala/SkunkMapping.scala | 84 +++++++++---------- .../sql-core/src/main/scala/SqlMapping.scala | 2 +- .../sql-core/src/main/scala/SqlModule.scala | 2 +- 4 files changed, 46 insertions(+), 46 deletions(-) diff --git a/modules/doobie-core/src/main/scala/DoobieMapping.scala b/modules/doobie-core/src/main/scala/DoobieMapping.scala index ff3a737c..37a7a5ca 100644 --- a/modules/doobie-core/src/main/scala/DoobieMapping.scala +++ b/modules/doobie-core/src/main/scala/DoobieMapping.scala @@ -147,7 +147,7 @@ trait DoobieMappingLike[F[_]] extends Mapping[F] with SqlMappingLike[F] { } } - def fetch(fragment: Fragment, codecs: List[(Boolean, Codec)]): F[Vector[Array[Any]]] = { + def fetch(fragment: Fragment, codecs: List[(Boolean, Codec)]): F[Result[Vector[Array[Any]]]] = { import cats.syntax.all._ val reads: Array[Read[Any]] = codecs.toArray.map { @@ -161,6 +161,6 @@ trait DoobieMappingLike[F[_]] extends Mapping[F] with SqlMappingLike[F] { } implicit val read: Read[Array[Any]] = new Read.CompositeOfInstances[Any](reads) - fragment.query[Array[Any]].to[Vector].transact(transactor) + fragment.query[Array[Any]].to[Vector].transact(transactor).map(Result.success) } } diff --git a/modules/skunk/shared/src/main/scala/SkunkMapping.scala b/modules/skunk/shared/src/main/scala/SkunkMapping.scala index 848cea3b..b3a755aa 100644 --- a/modules/skunk/shared/src/main/scala/SkunkMapping.scala +++ b/modules/skunk/shared/src/main/scala/SkunkMapping.scala @@ -117,55 +117,55 @@ trait SkunkMappingLike[F[_]] extends Mapping[F] with SqlPgMappingLike[F] { outer Some(codec._1.types.head.name) } - // And we need to be able to fetch `Rows` given a `Fragment` and a list of decoders. - def fetch(fragment: Fragment, codecs: List[(Boolean, Codec)]): F[Vector[Array[Any]]] = { - val rowDecoder: Decoder[Array[Any]] = - new Decoder[Array[Any]] { - val types = codecs.flatMap { case (_, (d, _)) => d.types } - - def arrToList(arr: Arr[_]): List[Any] = - (arr.foldLeft(List.empty[Any]) { case (acc, elem) => elem :: acc }).reverse - - def decode(start: Int, ss: List[Option[String]]): Either[Decoder.Error, Array[Any]] = { - val ncols = ss.length-start - val arr = scala.Array.ofDim[Any](ncols) - - var i = 0 - var ss0 = ss.drop(start) - var codecs0 = codecs - while(i < ncols) { - val (isJoin, (decoder, isNullable)) = codecs0.head - val len = decoder.length - val (seg, tl) = ss0.splitAt(len) - val elem: Either[Decoder.Error, Any] = - if(isJoin && !isNullable) - decoder.opt.decode(0, seg).map(_.getOrElse(FailedJoin)) - else - decoder.decode(0, seg) - - elem match { - case Left(err) => return Left(err) - case Right(v) => - v match { - case a: Arr[a] => arr(i) = arrToList(a) - case Some(a: Arr[a]) => arr(i) = Some(arrToList(a)) - case other => arr(i) = other - } - } - - i = i + 1 - ss0 = tl - codecs0 = codecs0.tail + protected def rowDecoder(codecs: List[(Boolean, Codec)]): Decoder[Array[Any]] = + new Decoder[Array[Any]] { + val types = codecs.flatMap { case (_, (d, _)) => d.types } + + def arrToList(arr: Arr[_]): List[Any] = + (arr.foldLeft(List.empty[Any]) { case (acc, elem) => elem :: acc }).reverse + + def decode(start: Int, ss: List[Option[String]]): Either[Decoder.Error, Array[Any]] = { + val ncols = ss.length-start + val arr = scala.Array.ofDim[Any](ncols) + + var i = 0 + var ss0 = ss.drop(start) + var codecs0 = codecs + while(i < ncols) { + val (isJoin, (decoder, isNullable)) = codecs0.head + val len = decoder.length + val (seg, tl) = ss0.splitAt(len) + val elem: Either[Decoder.Error, Any] = + if(isJoin && !isNullable) + decoder.opt.decode(0, seg).map(_.getOrElse(FailedJoin)) + else + decoder.decode(0, seg) + + elem match { + case Left(err) => return Left(err) + case Right(v) => + v match { + case a: Arr[a] => arr(i) = arrToList(a) + case Some(a: Arr[a]) => arr(i) = Some(arrToList(a)) + case other => arr(i) = other + } } - Right(arr) + i = i + 1 + ss0 = tl + codecs0 = codecs0.tail } + + Right(arr) } + } + // And we need to be able to fetch `Rows` given a `Fragment` and a list of decoders. + def fetch(fragment: Fragment, codecs: List[(Boolean, Codec)]): F[Result[Vector[Array[Any]]]] = { pool.use { s => - Resource.eval(s.prepare(fragment.fragment.query(rowDecoder))).use { ps => + Resource.eval(s.prepare(fragment.fragment.query(rowDecoder(codecs)))).use { ps => ps.stream(fragment.argument, 1024).compile.toVector - } + }.map(Result.success) }.onError { case NonFatal(e) => Sync[F].delay(e.printStackTrace()) } diff --git a/modules/sql-core/src/main/scala/SqlMapping.scala b/modules/sql-core/src/main/scala/SqlMapping.scala index 572277da..be928895 100644 --- a/modules/sql-core/src/main/scala/SqlMapping.scala +++ b/modules/sql-core/src/main/scala/SqlMapping.scala @@ -3308,7 +3308,7 @@ trait SqlMappingLike[F[_]] extends CirceMappingLike[F] with SqlModule[F] { self def fetch: F[Result[Table]] = { (for { frag <- ResultT(fragment.pure[F]) - rows <- ResultT(self.fetch(frag, query.codecs).map(_.success)) + rows <- ResultT(self.fetch(frag, query.codecs)) } yield Table(rows)).value } diff --git a/modules/sql-core/src/main/scala/SqlModule.scala b/modules/sql-core/src/main/scala/SqlModule.scala index dd90c8cc..01924c51 100644 --- a/modules/sql-core/src/main/scala/SqlModule.scala +++ b/modules/sql-core/src/main/scala/SqlModule.scala @@ -77,5 +77,5 @@ trait SqlModule[F[_]] { def intCodec: Codec - def fetch(fragment: Fragment, codecs: List[(Boolean, Codec)]): F[Vector[Array[Any]]] + def fetch(fragment: Fragment, codecs: List[(Boolean, Codec)]): F[Result[Vector[Array[Any]]]] }