-
Notifications
You must be signed in to change notification settings - Fork 82
Implementing storehaus with Postgres backend #331
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Conversation
johnynek
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the contribution. I made some suggestions. Let me know if you have any questions.
| * | ||
| * Created by ponkin on 10/20/16. | ||
| */ | ||
| trait Read[A] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we instead follow the style of the rest of the project and use Injection[A, PostgresValue] here instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was using Injections at first but it went me to many ugly implicit arguments like
PostgresStoreK, V(implicit inj1: Injection[K, PostgresValue], inj2: Injection[V, PostgresValue], inj3: Injection[K, Element], inj1: Injection[V, Element]) - and also expose details of implementation( Element is object from finagle-roc postgres driver). May be you can point me some example, how to do it right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you could do in your package:
type PostgresValueConverter[A] = Injection[A, PostgresValue]then in the object PostgresValue { ... define the implicits and it should be pretty clean and pick them up automatically.
| case (key, Some(value)) => upsertSQL(List((key, value))) | ||
| case (key, None) => deleteSQL(List(key)) | ||
| } | ||
| println(sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should have the println around.
| } | ||
| ks.foldLeft(Map.empty[K1, Future[Option[V]]]) { (acc, key) => | ||
| acc + (key -> reqMap.map(_.get(key))) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ks.iterator.map { key =>
(key, reqMap.map(_.get(key)))
}.toMapwould be more direct than building the map yourself (and maybe faster since it can use a MapBuilder).
| s"DELETE FROM $table WHERE $kCol" | ||
|
|
||
| override def get(key: K): Future[Option[V]] = { | ||
| val result = run(selectSQL(List(key))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we pass a limit here? I'm also worried that we may have more than one item. I would rather limit to 2, then do:
result.flatMap {
case Nil => Future.None
case h :: Nil => Future.value(Read[V].read(h.get(Symbol(vCol))))
case other => Future.exception(new Exception(s"for key: $key expected 0 or 1 items, found: $other")| * | ||
| * Created by ponkin on 10/18/16. | ||
| */ | ||
| sealed trait PostgresValue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the extra wrapper on Element?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to change Postgres driver(finagle-roc) in future(if we we want to).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. That's fine if we are going to automatically handle the conversions I think.
| Map.empty | ||
| } else { | ||
| val sql = doMultiPut(kvs) | ||
| println(sql) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's remove this.
| multiPutSQL(toUpsert, toDelete.toList) | ||
| } | ||
|
|
||
| override def close(t: Time): Future[Unit] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't need { } for single expression methods.
| new PostgresStore[K, V](client, table, kCol, vCol) | ||
| } | ||
|
|
||
| class PostgresStore[K: Show : Read, V: Show : Read] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would it be too much work to add a PostgresMergeableStore that does merge in a transaction? That would be nice to have for several use cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will do it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, that part turns out to be kind of tricky). How can we implement merge since we use async driver( separate calls client.query("BEGIN") and client.query("COMMIT;") can lead to unknown result, since some requests can be made simultaneously and can be added in transaction). I do not know ho to tie Futures to particular connection. Any ideas? We can use optimistic locking but we need additional column to keep lock, like timestamp or version.
build.sbt
Outdated
| "com.github.finagle" %% "roc-core" % "0.0.5", | ||
| "com.github.finagle" %% "roc-types" % "0.0.5" | ||
| "com.github.finagle" %% "roc-types" % "0.0.5", | ||
| "com.chuusai" %% "shapeless" % "2.3.2" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we need this? I don't see where.
| /** | ||
| * Created by ponkin on 10/26/16. | ||
| */ | ||
| class MergeablePostgresStore[V] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need this?
| implicit val boolRead = new Read[Boolean] { | ||
| override def read(v: PostgresValue): Boolean = v match { | ||
| case RocPostgresValue(e) => e.as[Boolean] | ||
| implicit val boolInjection= new OneWayInjection[Boolean] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay, I'm really sorry. I overlooked that your Read and Show were not inverses of each other. One is going to String the other is coming from a PostgresValue. I wish I had noticed that.
I'd actually prefer the way you had it to throwing exceptions. I mistakenly thought we could combine Read and Show into Injection but not the way you have the code.
What about a direct trait like:
trait PostgresCodec[A] {
def encode(a: A): String
def decode(p: PostgresValue): Try[A]
}I don't see a need to decouple these since they are so special purpose. I thought Injection would do, but the types are not the same on encode and decode.
|
@johnynek Sorry for long delay. I added MergeablePostgresStore and tests for it. Also I rewrote all with Injections. I was trying to keep overall style. please review) |
| * All values with the same key will be merged with Semigroup[V2].plus method | ||
| */ | ||
| private def mergeWithSemigroup[K1 <: K](m1: Map[K, V], m2: Map[K1, V]): Map[K1, V] = { | ||
| m2.keySet.iterator.map{ k2 => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
foldLeft is generally faster because you may get some sharing:
val smaller = if (m1.size < m2.size) m1 else m2
val bigger = if (m1.size < m2.size) m2 else m1
smaller.foldLeft(bigger) { case (m, (k, v)) =>
val newV = Semigroup.maybePlus(v, m.get(k))
m + ((k, newV))
}may need to bump the algebird dependency to get maybePlus, but that is fine.
| forUpdate: Boolean = false) | ||
| (implicit cli: Client): Future[Map[K, V]] = | ||
| if (ks.isEmpty) { | ||
| Future(Map.empty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Future.value(Map.empty) is better since it avoids the call-by-name parameter, and since that is a constant, you might save it as a private[this] val and avoid the reallocation.
|
|
||
| trait OneWayInjection[T] extends Injection[T, PostgresValue] { | ||
| override def apply(t: T): PostgresValue = | ||
| throw IllegalConversionException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we avoid the throwing? I know we don't use it, but I really don't want to add code that throws unless we have no other way.
Here I can see two ways:
- just add the code to wrap the primitives below in
PostgresValue, which does not seem hard. - make a type-class that only goes in one direction.
|
|
||
| private val toV: PostgresValue => V = vInj.invert(_) match { | ||
| case Success(v) => v | ||
| case Failure(e) => throw IllegalConversionException(e.getMessage) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to throw here? Why not just put this in the future?
private def toV(p: PostgresValue): Future[V] = vInj.invert(p) match {
case Success(v) => Future.value(v)
case Failure(e) => Future.exception(e)
}then use toV(v).flatMap { ... rather than throwing?
| } | ||
| } | ||
|
|
||
| def toEscString(a: Any): String = a match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't like this approach since we have no guarantee that toString is going to work. Why can't use a full injection to PostgresValue then use some safe mechanism to go from Value[Any] to String that can be interpolated in a SQL statement. I assume the Value wrapper has some support for that, no?
|
Alexey Ponkin seems not to be a GitHub user. You need a GitHub account to be able to sign the CLA. If you have already a GitHub account, please add the email address used for this commit to your account. You have signed the CLA already but the status is still pending? Let us recheck it. |
Hi everybody. Here is storehouse implementation for postgres.
Need your review and comments.