Skip to content
This repository was archived by the owner on Oct 30, 2018. It is now read-only.
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 46 additions & 3 deletions src/main/scala/com/gravity/hbase/schema/Query2.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import immutable.TreeSet
import java.util.NavigableSet
import scala.collection.mutable.Buffer
import org.apache.hadoop.hbase.filter.FilterList.Operator
import org.joda.time.{ReadableInstant, DateTime}
import org.joda.time.ReadableInstant
import org.apache.hadoop.hbase.filter._
import org.hbase.async.GetRequest

Expand Down Expand Up @@ -809,6 +809,10 @@ class Query2[T <: HbaseTable[T, R, RR], R, RR <: HRow[T, R]] private(
scan
}

/**
* @see [[scanUntil2()]]
* @see [[scanWhile()]]
*/
def scan(handler: (RR) => Unit, maxVersions: Int = 1, cacheBlocks: Boolean = true, cacheSize: Int = 100, useLocalCache: Boolean = false, localTTL: Int = 30) {
val scan = makeScanner(maxVersions, cacheBlocks, cacheSize)

Expand Down Expand Up @@ -901,10 +905,13 @@ class Query2[T <: HbaseTable[T, R, RR], R, RR <: HRow[T, R]] private(

object YouCanStopNow extends Stopable

/**Similar to the scan method but if your handler returns false, it will stop scanning.
/**
* Like [[scan()]], but scanning will cease if handler returns false ("scan while handler() == true").
*
* @see [[scanUntil2()]]
* @see [[scan()]]
*/
def scanUntil(handler: (RR) => Boolean, maxVersions: Int = 1, cacheBlocks: Boolean = true, cacheSize: Int = 100) {
def scanWhile(handler: (RR) => Boolean, maxVersions: Int = 1, cacheBlocks: Boolean = true, cacheSize: Int = 100) {
table.withTable() {
htable =>
val scan = makeScanner(maxVersions, cacheBlocks, cacheSize)
Expand All @@ -923,6 +930,42 @@ class Query2[T <: HbaseTable[T, R, RR], R, RR <: HRow[T, R]] private(
}
}

/**
* Like [[scan()]], but scanning will cease if handler returns false ("while" semantics).
*
* @see [[scanUntil2()]]
* @see [[scanWhile()]]
* @see [[scan()]]
*/
@deprecated("Use scanWhile() or scanUntil2() for correct boolean semantics.")
def scanUntil(handler: (RR) => Boolean, maxVersions: Int = 1, cacheBlocks: Boolean = true, cacheSize: Int = 100)
= scanWhile(handler, maxVersions, cacheBlocks, cacheSize)

/**
* Like [[scan()]], but scanning will cease if handler returns true ("scan until handler() == false").
*
* @see [[scanWhile()]]
* @see [[scan()]]
*/
def scanUntil2(handler: (RR) => Boolean, maxVersions: Int = 1, cacheBlocks: Boolean = true, cacheSize: Int = 100) {
table.withTable() {
htable =>
val scan = makeScanner(maxVersions, cacheBlocks, cacheSize)

val scanner = htable.getScanner(scan)

try {
for (result <- scanner) {
if (handler(table.buildRow(result))) throw YouCanStopNow
}
} catch {
case _: Stopable => // nothing to see here... move along. move along.
} finally {
scanner.close()
}
}
}

}

object Query2 {
Expand Down