diff --git a/pom.xml b/pom.xml index 2fba5fe..8ac5046 100644 --- a/pom.xml +++ b/pom.xml @@ -1,252 +1,292 @@ - 4.0.0 - com.gravity - gravity-hpaste - 0.1.24-SNAPSHOT - jar - hpaste - http://github.com/GravityLabs/HPaste - Scala mapper for hbase - 2011 - - Gravity - http://www.gravity.com/ - + 4.0.0 + com.gravity + gravity-hpaste + 0.1.24-SNAPSHOT + jar + hpaste + http://github.com/GravityLabs/HPaste + Scala mapper for hbase + 2011 + + Gravity + http://www.gravity.com/ + - - org.sonatype.oss - oss-parent - 7 - + + org.sonatype.oss + oss-parent + 7 + - - - Lemmsjid - Chris Bissell - chris@gravity.com - http://github.com/Lemmsjid - Gravity - http://www.gravity.com/ - - developer - architect - - -8 - - - erraggy - Robbie Coleman - robbie@gravity.com - http://robbie.robnrob.com/ - Gravity - http://www.gravity.com/ - - developer - - -8 - - http://1.gravatar.com/avatar/dc77b368ec1f077dcc4aca3b9c003d2d - - - - - scm:git:git@github.com:GravityLabs/HPaste.git - scm:git:git@github.com:GravityLabs/HPaste.git - scm:git:git@github.com:GravityLabs/HPaste.git - - - - Apache 2 - http://www.apache.org/licenses/LICENSE-2.0.txt - repo - A business-friendly OSS license - - + + + Lemmsjid + Chris Bissell + chris@gravity.com + http://github.com/Lemmsjid + Gravity + http://www.gravity.com/ + + developer + architect + + -8 + + + erraggy + Robbie Coleman + robbie@gravity.com + http://robbie.robnrob.com/ + Gravity + http://www.gravity.com/ + + developer + + -8 + + http://1.gravatar.com/avatar/dc77b368ec1f077dcc4aca3b9c003d2d + + + + mtrelinski + Michael Trelinski + mtrelinski@gravity.com + https://github.com/michael-trelinski + Gravity + http://www.gravity.com/ + + developer + + -8 + + https://1.gravatar.com/avatar/01a9f612dd1762578eeb4bc3e56443b4 + + + + + scm:git:git@github.com:GravityLabs/HPaste.git + scm:git:git@github.com:GravityLabs/HPaste.git + scm:git:git@github.com:GravityLabs/HPaste.git + + + + Apache 2 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + - - 1.5 - 1.5 - UTF-8 - 2.9.1 - + + 1.5 + 1.5 + UTF-8 + 2.9.1 + - - - joda-time - joda-time - 1.6.1 - - - org.apache.hadoop - hadoop-core - 0.20.2 - - - org.hbase - asynchbase - 1.4.0 - - - org.slf4j - log4j-over-slf4j - - - org.slf4j - jcl-over-slf4j - - - - + + + joda-time + joda-time + 1.6.1 + + + org.apache.hadoop + hadoop-core + 0.20.2 + + + org.slf4j + slf4j-log4j12 + + + + + org.hbase + asynchbase + 1.4.0 + + + org.slf4j + log4j-over-slf4j + + + org.slf4j + jcl-over-slf4j + + + org.slf4j + slf4j-log4j12 + + + - - org.apache.hbase - hbase - 0.90.4 - - - org.apache.hadoop - hadoop-core - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - org.jruby - jruby-complete - - - - + + org.apache.hbase + hbase + 0.90.4 + + + org.apache.hadoop + hadoop-core + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + org.jruby + jruby-complete + + + + net.sf.trove4j trove4j 3.0.3 + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hbase + hbase + 0.90.4 + test-jar + test + + + org.jruby + jruby-complete + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hadoop + hadoop-test + 0.20.2 + test + + + org.slf4j + slf4j-log4j12 + + + + + org.scala-lang + scala-library + ${scala.version} - - org.apache.hbase - hbase - 0.90.4 - test-jar - test - - - org.jruby - jruby-complete - - - - - org.apache.hadoop - hadoop-test - 0.20.2 - test - - - org.scala-lang - scala-library - ${scala.version} - - - - - junit - junit - 4.8.1 - test - - - - - - org.apache.maven.plugins - maven-source-plugin - 2.1.2 - - - - jar - - - - - - org.scala-tools - maven-scala-plugin - 2.15.0 - - - - compile - testCompile - - - - -make:transitive - -dependencyfile - ${project.build.directory}/.scala_dependencies - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.6 - - -Xmx1024m - false - true - - **/*Test.* - - - **/*IT.* - - - - - org.apache.maven.plugins - maven-release-plugin - 2.2.2 - - - + + + junit + junit + 4.8.1 + test + + - - - release-sign-artifacts - - - performRelease - true - - - + - - org.apache.maven.plugins - maven-gpg-plugin - 1.4 - - - sign-artifacts - verify - - sign - - - - + + org.apache.maven.plugins + maven-source-plugin + 2.1.2 + + + + jar + + + + + + org.scala-tools + maven-scala-plugin + 2.15.0 + + + + compile + testCompile + + + + -make:transitive + -dependencyfile + ${project.build.directory}/.scala_dependencies + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.6 + + -Xmx1024m + false + true + + **/*Test.* + + + **/*IT.* + + + + + org.apache.maven.plugins + maven-release-plugin + 2.2.2 + - - - + + + + + release-sign-artifacts + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.4 + + + sign-artifacts + verify + + sign + + + + + + + + diff --git a/src/main/scala/com/gravity/hbase/mapreduce/mapreduce2.scala b/src/main/scala/com/gravity/hbase/mapreduce/mapreduce2.scala index d27f22b..de0ed66 100644 --- a/src/main/scala/com/gravity/hbase/mapreduce/mapreduce2.scala +++ b/src/main/scala/com/gravity/hbase/mapreduce/mapreduce2.scala @@ -393,8 +393,8 @@ class HJob[S <: SettingsBase](val name: String, tasks: HTask[_, _, _, _]*) { def getMapperFunc[MK, MV, MOK, MOV](idx: Int) = { val task = tasks(idx) - if (task.isInstanceOf[HMapReduceTask[MK, MV, MOK, MOV, _, _]]) { - val tk = task.asInstanceOf[HMapReduceTask[MK, MV, MOK, MOV, _, _]] + if (task.isInstanceOf[HMapReduceTask[MK, MV, MOK, MOV, _, _, _, _]]) { + val tk = task.asInstanceOf[HMapReduceTask[MK, MV, MOK, MOV, _, _, _, _]] tk.mapper } else if (task.isInstanceOf[HMapTask[MK, MV, MOK, MOV]]) { @@ -405,10 +405,10 @@ class HJob[S <: SettingsBase](val name: String, tasks: HTask[_, _, _, _]*) { } } - def getReducerFunc[MOK, MOV, ROK, ROV](idx: Int) = { + def getReducerFunc[MOK, MOV, RIK, RIV, ROK, ROV](idx: Int) = { val task = tasks(idx) - if (task.isInstanceOf[HMapReduceTask[_, _, MOK, MOV, ROK, ROV]]) { - val tk = task.asInstanceOf[HMapReduceTask[_, _, MOK, MOV, ROK, ROV]] + if (task.isInstanceOf[HMapReduceTask[_, _, MOK, MOV, RIK, RIV, ROK, ROV]]) { + val tk = task.asInstanceOf[HMapReduceTask[_, _, MOK, MOV, RIK, RIV, ROK, ROV]] tk.reducer } else { throw new RuntimeException("Unable to find reducer for index " + idx) @@ -1013,11 +1013,11 @@ abstract class HMapper[MK, MV, MOK, MOV] extends Mapper[MK, MV, MOK, MOV] with M } } -abstract class HReducer[MOK, MOV, ROK, ROV] extends Reducer[MOK, MOV, ROK, ROV] with MRWritable[ROK, ROV] { +abstract class HReducer[RIK, RIV, ROK, ROV] extends Reducer[RIK, RIV, ROK, ROV] with MRWritable[ROK, ROV] { type SettingsClass <: SettingsBase - var context: Reducer[MOK, MOV, ROK, ROV]#Context = null + var context: Reducer[RIK, RIV, ROK, ROV]#Context = null var settings: SettingsClass = _ def counter(message: String, count: Long) { @@ -1035,13 +1035,13 @@ abstract class HReducer[MOK, MOV, ROK, ROV] extends Reducer[MOK, MOV, ROK, ROV] def values = context.getValues - override def setup(context: Reducer[MOK, MOV, ROK, ROV]#Context) { + override def setup(context: Reducer[RIK, RIV, ROK, ROV]#Context) { this.context = context settings = Class.forName(context.getConfiguration.get("hpaste.settingsclass")).newInstance().asInstanceOf[SettingsClass] settings.fromSettings(context.getConfiguration) } - override def reduce(key: MOK, values: Iterable[MOV], context: Reducer[MOK, MOV, ROK, ROV]#Context) { + override def reduce(key: RIK, values: Iterable[RIV], context: Reducer[RIK, RIV, ROK, ROV]#Context) { reduce() } @@ -1050,7 +1050,7 @@ abstract class HReducer[MOK, MOV, ROK, ROV] extends Reducer[MOK, MOV, ROK, ROV] } object HMapReduceTask { - def apply[MK, MV, MOK: Manifest, MOV: Manifest, ROK: Manifest, ROV: Manifest](name: String, mapper: HMapper[MK, MV, MOK, MOV], reducer: HReducer[MOK, MOV, ROK, ROV]): HMapReduceTask[MK, MV, MOK, MOV, ROK, ROV] = { + def apply[MK, MV, MOK: Manifest, MOV: Manifest, RIK: Manifest, RIV: Manifest, ROK: Manifest, ROV: Manifest](name: String, mapper: HMapper[MK, MV, MOK, MOV], reducer: HReducer[MOK, MOV, ROK, ROV]): HMapReduceTask[MK, MV, MOK, MOV, MOK, MOV, ROK, ROV] = { HMapReduceTask( HTaskID(name), HTaskConfigs(), @@ -1089,13 +1089,13 @@ case class HGroupingTask[MK, MV, MOK: Manifest, MOV: Manifest, ROK: Manifest, RO /** * An HTask that wraps a standard mapper and reducer function. */ -case class HMapReduceTask[MK, MV, MOK: Manifest, MOV: Manifest, ROK: Manifest, ROV: Manifest]( +case class HMapReduceTask[MK, MV, MOK: Manifest, MOV: Manifest, RIK: Manifest, RIV: Manifest, ROK: Manifest, ROV: Manifest]( id: HTaskID, configs: HTaskConfigsBase = HTaskConfigs(), io: HIO[MK, MV, ROK, ROV] = HIO(), mapper: HMapper[MK, MV, MOK, MOV], - reducer: HReducer[MOK, MOV, ROK, ROV], - combiner: HReducer[MOK, MOV, MOK, MOV] = null) + reducer: HReducer[RIK, RIV, ROK, ROV], + combiner: HReducer[MOV, MOK, RIK, RIV] = null) extends HTask[MK, MV, ROK, ROV](id, configs, io) { @@ -1170,11 +1170,15 @@ case class HMapTask[MK, MV, MOK: Manifest, MOV: Manifest](id: HTaskID, configs: /** * A task for a Mapper / Combiner / Reducer combo */ -case class HMapCombineReduceTask[MK, MV, MOK: Manifest, MOV: Manifest, ROK, ROV](id: HTaskID, configs: HTaskConfigs = HTaskConfigs(), io: HIO[MK, MV, ROK, ROV] = HIO(), mapper: HMapper[MK, MV, MOK, MOV], combiner: HReducer[MOK, MOV, ROK, ROV], reducer: HReducer[MOK, MOV, ROK, ROV]) extends HTask[MK, MV, ROK, ROV](id, configs, io) { +case class HMapCombineReduceTask[MK, MV, MOK: Manifest, MOV: Manifest, RIK: Manifest, RIV: Manifest, ROK: Manifest, ROV: Manifest](id: HTaskID, configs: HTaskConfigs = HTaskConfigs(), io: HIO[MK, MV, ROK, ROV] = HIO(), mapper: HMapper[MK, MV, MOK, MOV], combiner: HReducer[MOK, MOV, RIK, RIV], reducer: HReducer[RIK, RIV, ROK, ROV]) extends HTask[MK, MV, ROK, ROV](id, configs, io) { def decorateJob(job: Job) { job.setMapperClass(mapper.getClass) + job.setMapOutputKeyClass(classManifest[MOK].erasure) job.setMapOutputValueClass(classManifest[MOV].erasure) + job.setOutputKeyClass(classManifest[ROK].erasure) + job.setOutputValueClass(classManifest[ROV].erasure) + job.setReducerClass(reducer.getClass) job.setCombinerClass(combiner.getClass) } @@ -1272,7 +1276,7 @@ class HMapContext[MK, MV, MOK, MOV, S <: SettingsBase](conf: Configuration, coun /** * This is the context object for a Reduce function. It gets passed into the reducer defined in an HTask. */ -class HReduceContext[MOK, MOV, ROK, ROV, S <: SettingsBase](conf: Configuration, counter: (String, Long) => Unit, val context: Reducer[MOK, MOV, ROK, ROV]#Context) extends HContext[S](conf, counter) { +class HReduceContext[RIK, RIV, ROK, ROV, S <: SettingsBase](conf: Configuration, counter: (String, Long) => Unit, val context: Reducer[RIK, RIV, ROK, ROV]#Context) extends HContext[S](conf, counter) { def key = context.getCurrentKey def values = context.getValues @@ -1280,7 +1284,7 @@ class HReduceContext[MOK, MOV, ROK, ROV, S <: SettingsBase](conf: Configuration, def write(key: ROK, value: ROV) {context.write(key, value)} } -class ToTableReduceContext[MOK, MOV, T <: HbaseTable[T, R, _], R, S <: SettingsBase](conf: Configuration, counter: (String, Long) => Unit, context: Reducer[MOK, MOV, NullWritable, Writable]#Context) extends HReduceContext[MOK, MOV, NullWritable, Writable, S](conf, counter, context) { +class ToTableReduceContext[RIK, RIV, T <: HbaseTable[T, R, _], R, S <: SettingsBase](conf: Configuration, counter: (String, Long) => Unit, context: Reducer[RIK, RIV, NullWritable, Writable]#Context) extends HReduceContext[RIK, RIV, NullWritable, Writable, S](conf, counter, context) { def write(operation: OpBase[T, R]) { operation.getOperations.foreach {op => write(NullWritable.get(), op)} } diff --git a/src/main/scala/com/gravity/hbase/schema/HbaseTable.scala b/src/main/scala/com/gravity/hbase/schema/HbaseTable.scala index 3eba5a0..015a6eb 100644 --- a/src/main/scala/com/gravity/hbase/schema/HbaseTable.scala +++ b/src/main/scala/com/gravity/hbase/schema/HbaseTable.scala @@ -111,7 +111,7 @@ abstract class HbaseTable[T <: HbaseTable[T, R, RR], R, RR <: HRow[T, R]](val ta */ def converterByBytes(famBytes: Array[Byte], colBytes: Array[Byte]): KeyValueConvertible[_, _, _] = { - if (colFamLookup.length == 0 || famLookup.length == 0) { + if (colFamLookup.length == 0 && famLookup.length == 0) { throw new RuntimeException("Attempting to lookup 0 length columns and families--HBaseTable is corrupt") } diff --git a/src/test/scala/com/gravity/hbase/mapreduce.tests/MapReduceTests.scala b/src/test/scala/com/gravity/hbase/mapreduce.tests/MapReduceTests.scala new file mode 100644 index 0000000..5ec0748 --- /dev/null +++ b/src/test/scala/com/gravity/hbase/mapreduce.tests/MapReduceTests.scala @@ -0,0 +1,187 @@ +package com.gravity.hbase.mapreduce.tests + +import org.apache.commons.logging.Log +import org.apache.commons.logging.LogFactory +import com.gravity.hbase.mapreduce._ +import org.apache.hadoop.io.{NullWritable, Text, IntWritable} +import scala.collection.JavaConversions._ +import junit.framework.TestCase +import org.junit._ +import org.apache.hadoop.mapred.MiniMRCluster +import org.apache.hadoop.hdfs.MiniDFSCluster +import java.io.{InputStreamReader, BufferedReader, File} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import com.gravity.hbase.schema.HPasteMapReduceTestCase +import com.gravity.hbase.mapreduce.HMapCombineReduceTask +import com.gravity.hbase.mapreduce.HTaskConfigs +import com.gravity.hbase.mapreduce.BigMemoryConf +import com.gravity.hbase.mapreduce.HIO +import com.gravity.hbase.mapreduce.HPathInput +import com.gravity.hbase.mapreduce.HTaskID +import com.gravity.hbase.mapreduce.ReducerCountConf +import com.gravity.hbase.mapreduce.HPathOutput + +/** + * User: mtrelinski + */ + +object TemporaryIOPaths { + + val NONCE = "com-gravity-hpaste-" + val INPUT = NONCE + "input" + + val ClassicWordCount_OUTPUT = NONCE + "classicwordcount-output" + val ClassicWordCountWithCombiner_OUTPUT = NONCE + "classicwordcountcombiner-output" + val ClassicWordCountWithCombinerDifferentReducer_OUTPUT = NONCE + "wordcountcombinerdiffreducer-output" + +} + +// The Jobs: + +class ClassicWordCount extends HJob[NoSettings] ( + "A Contrived and Classic Word Count", + HMapReduceTask( + HTaskID("AContrivedAndClassicWordCount"), + HTaskConfigs(ReducerCountConf(2), BigMemoryConf(512, 512, 500, 500)), + HIO( + HPathInput(TemporaryIOPaths.INPUT :: Nil), + HPathOutput(TemporaryIOPaths.ClassicWordCount_OUTPUT) + ), + new LineToWordsWithCounts, + new WordWithCountAggregator + ) +) + +class ClassicWordCountWithCombiner extends HJob[NoSettings] ( + "Another Contrived and Classic Word Count using a Combiner", + HMapCombineReduceTask( + HTaskID("AnotherContrivedWordCountWithACombiner"), + HTaskConfigs(ReducerCountConf(2), BigMemoryConf(512, 512, 500, 500)), + HIO( + HPathInput(TemporaryIOPaths.INPUT :: Nil), + HPathOutput(TemporaryIOPaths.ClassicWordCountWithCombiner_OUTPUT) + ), + new LineToWordsWithCounts, + new WordWithCountAggregator, + new WordWithCountAggregator + ) +) + +class ClassicWordCountWithCombinerAndDifferentReducer extends HJob[NoSettings] ( + "Another Contrived and Classic Word Count using a Combiner with a Reducer that doesn't match the output K, V types", + HMapCombineReduceTask( + HTaskID("YetAnotherContrivedWordCountWithACombiner"), + HTaskConfigs(ReducerCountConf(2), BigMemoryConf(512, 512, 500, 500)), + HIO( + HPathInput(TemporaryIOPaths.INPUT :: Nil), + HPathOutput(TemporaryIOPaths.ClassicWordCountWithCombinerDifferentReducer_OUTPUT) + ), + new LineToWordsWithCounts, + new WordWithCountAggregator, + new WordWithCountFinalAggregator + ) +) + +// The Transformers and Aggregators: + +class LineToWordsWithCounts extends HMapper[IntWritable, Text, Text, IntWritable] { + + val wordWritable = new Text + val one = new IntWritable(1) + + override def map() { + this.value.toString.split("\\s").foreach { + word => + wordWritable.set(word) + this.write(wordWritable, one) + } + + } + +} + +class WordWithCountAggregator extends HReducer[Text, IntWritable, Text, IntWritable] { + + override def reduce() { + + var total: Int = 0 + val valuesIterator = this.values.iterator() + while(valuesIterator.hasNext) + total = total + valuesIterator.next().get() + + this.write(this.key, new IntWritable(total)) + } + +} + +class WordWithCountFinalAggregator extends HReducer[Text, IntWritable, Text, NullWritable] { + + override def reduce() { + + var total: Int = 0 + val valuesIterator = this.values.iterator() + while(valuesIterator.hasNext) + total = total + valuesIterator.next().get() + + this.write(new Text(this.key.toString + "\t" + total), NullWritable.get()) + } + +} + +// The Tests: + +class MapReduceTests extends HPasteMapReduceTestCase(TemporaryIOPaths.INPUT :: TemporaryIOPaths.ClassicWordCount_OUTPUT :: TemporaryIOPaths.ClassicWordCountWithCombiner_OUTPUT :: TemporaryIOPaths.ClassicWordCountWithCombinerDifferentReducer_OUTPUT :: Nil) { + + val sentences = "the quick brown fox jumped over the lazy dog" :: "cat and dog" :: Nil + val sentencesWordFrequencies = Map("the" -> 2, "quick" -> 1, "brown" -> 1, "fox" -> 1, "jumped" -> 1, "over" -> 1, "lazy" -> 1, "dog" -> 2, "and" -> 1, "cat" -> 1) + + def verifyOutput(path: String) = { + val output: String = getJobOutput(path) + println("Job Output: \n" + output) + output.split("\n").foreach { + line => + val lineSplit = line.split("\t") + val (word, freq) = (lineSplit(0) -> lineSplit(1)) + Assert.assertEquals( "'" + word + "' should be " + sentencesWordFrequencies(word) + " but got " + freq + " instead", sentencesWordFrequencies(word).toLong, freq.toLong) + } + } + + def writeSentences() { + val stream = getFileSystem().create(new Path(TemporaryIOPaths.INPUT + "/sentences.txt"), true) + sentences.foreach { + sentence => stream.write((sentence + "\n").getBytes()) + } + stream.flush() + stream.sync() + stream.close() + } + + @Test + def testClassicWordCount() { + writeSentences() + val classicWordCount = new ClassicWordCount + val resultOfClassicWordCount = classicWordCount.run(new com.gravity.hbase.mapreduce.NoSettings, conf) + Assert.assertTrue("job failed", resultOfClassicWordCount._1) + verifyOutput(TemporaryIOPaths.ClassicWordCount_OUTPUT) + } + + @Test + def testClassicWordCountWithCombiner() { + writeSentences() + val classicWordCountWithCombiner = new ClassicWordCountWithCombiner + val resultOfClassicWordCountWithCombiner = classicWordCountWithCombiner.run(new com.gravity.hbase.mapreduce.NoSettings, conf) + Assert.assertTrue("job failed", resultOfClassicWordCountWithCombiner._1) + verifyOutput(TemporaryIOPaths.ClassicWordCountWithCombiner_OUTPUT) + } + + @Test + def testClassicWordCountWithCombinerAndDifferentReducer() { + writeSentences() + val classicWordCountWithCombinerAndDifferentReducer = new ClassicWordCountWithCombinerAndDifferentReducer + val resultOfClassicWordCountWithCombinerAndDifferentReducer = classicWordCountWithCombinerAndDifferentReducer.run(new com.gravity.hbase.mapreduce.NoSettings, conf) + Assert.assertTrue("job failed", resultOfClassicWordCountWithCombinerAndDifferentReducer._1) + verifyOutput(TemporaryIOPaths.ClassicWordCountWithCombinerDifferentReducer_OUTPUT) + } + +} diff --git a/src/test/scala/com/gravity/hbase/schema/ExampleSchema.scala b/src/test/scala/com/gravity/hbase/schema/ExampleSchema.scala index 2cf6706..725aee4 100644 --- a/src/test/scala/com/gravity/hbase/schema/ExampleSchema.scala +++ b/src/test/scala/com/gravity/hbase/schema/ExampleSchema.scala @@ -119,7 +119,52 @@ object ExampleSchema extends Schema { class ExampleTableRow(table:ExampleTable,result:DeserializedResult) extends HRow[ExampleTable,String](result,table) + + trait Kitty[T <: HbaseTable[T, R, _], R] { + this: HbaseTable[T, R, _] => + + val kittyCutenessStats = family[String, String, Double]("kcs", rowTtlInSeconds = 604800, compressed = true) + + } + + trait KittyRow[T <: HbaseTable[T, R, RR] with Kitty[T, R], R, RR <: HRow[T, R]] { + this: HRow[T, R] => + + lazy val kittyCutenessStats = family(_.kittyCutenessStats) + + } + + trait AdultCat[T <: HbaseTable[T, R, _], R] { + this: HbaseTable[T, R, _] => + + val catCutenessStats = family[String, String, Double]("ccs", rowTtlInSeconds = 604800, compressed = true) + + } + + trait AdultCatRow[T <: HbaseTable[T, R, RR] with AdultCat[T, R], R, RR <: HRow[T, R]] { + this: HRow[T, R] => + + lazy val catCuteness = family(_.catCutenessStats) + + } + + class CatTable extends HbaseTable[CatTable, String, CatRow](tableName = "cats_by_cuteness", rowKeyClass = classOf[String], logSchemaInconsistencies = false, tableConfig = HbaseTableConfig(maxFileSizeInBytes=1073741824)) + with Kitty[CatTable, String] + with AdultCat[CatTable, String] { + + override def rowBuilder(result: DeserializedResult) = new CatRow(result, this) + + } + + class CatRow(result: DeserializedResult, table: CatTable) extends HRow[CatTable, String](result, table) + with KittyRow[CatTable, String, CatRow] + with AdultCatRow[CatTable, String, CatRow] { + + } + //Register the table (DON'T FORGET TO DO THIS :) ) + + val catTable = table(new CatTable) val ExampleTable = table(new ExampleTable) } @@ -302,7 +347,29 @@ enable 'schema_example'""" ExampleSchema.ExampleTable.query2.withKey("Robbie").withFamilies(_.meta).singleOptionAsync().get.prettyPrint() } - + @Test def testScanCats() { + System.err.println("Filling up table...") + val remusKittyStats = Map("remus" -> 10.0d) + val remusCatStats = Map("remus" -> 9.0d) + val alaistorKittyStats = Map("remus" -> 9.0d) + val alaistorCatStats = Map("remus" -> 8.0d) + + + ExampleSchema.catTable.put("remus").valueMap(_.kittyCutenessStats, remusKittyStats).valueMap(_.catCutenessStats, remusCatStats).execute() + ExampleSchema.catTable.put("alaistor").valueMap(_.kittyCutenessStats, alaistorKittyStats).valueMap(_.catCutenessStats, alaistorCatStats).execute() + + var rows: Int = 0 + ExampleSchema.catTable.query2.withFamilies(_.catCutenessStats, _.kittyCutenessStats).scan { + scanner => + rows = rows + 1 + val cat = scanner.rowid + val catCutenessLevel = scanner.catCuteness.values.head + val kittyCutenessLevel = scanner.kittyCutenessStats.values.head + System.err.println(cat + " has a kitty-level cuteness of " + kittyCutenessLevel + " and an adult cuteness of " + catCutenessLevel) + } + Assert.assertEquals("not enough cats", rows.toInt, 2.toInt) + System.err.println("done with cat scan") + } } diff --git a/src/test/scala/com/gravity/hbase/schema/HPasteTestCase.scala b/src/test/scala/com/gravity/hbase/schema/HPasteTestCase.scala index cb54eca..5bb2ccc 100644 --- a/src/test/scala/com/gravity/hbase/schema/HPasteTestCase.scala +++ b/src/test/scala/com/gravity/hbase/schema/HPasteTestCase.scala @@ -4,6 +4,15 @@ import junit.framework.TestCase import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.HBaseTestingUtility import scala.collection.mutable.{SynchronizedSet, HashSet} +import org.junit.{After, Before, Test} +import org.apache.hadoop.hdfs.MiniDFSCluster +import org.apache.hadoop.mapred.MiniMRCluster +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import java.io.{File, InputStreamReader, BufferedReader} +import java.net.ServerSocket +import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption +import collection.mutable /* )\._.,--....,'``. .b--. /; _.. \ _\ (`._ ,. @@ -32,7 +41,106 @@ object LocalCluster { } } -class HPasteTestCase(schema:Schema) extends TestCase { +class HPasteTestCase(schema:Schema, name: String) extends TestCase(name) { + + if (schema != null) LocalCluster.initializeSchema(schema) + + def this() = this(null, "none") + + def this(name: String) = this(null, name) + + def this(schema: Schema) = this(schema, "none") + + @Test + def testEliminateWarning() { /* this avoids a warning */ } + +} + +class HPasteMapReduceTestCase(paths: Seq[String], name: String) extends TestCase(name) { + + var dfsCluster: MiniDFSCluster = _ + var mrCluster: MiniMRCluster = _ + var conf: Configuration = _ + + + def this() = this(Seq.empty[String], "none") + + def this(name: String) = this(Seq.empty[String], name) + + def this(paths: Seq[String]) = this(paths, "none") + + def getFileSystem() = dfsCluster.getFileSystem + + def getJobOutput(path: String) = { + val sb: StringBuilder = new StringBuilder + getFileSystem().listStatus(new Path(path)).foreach { + fileStatus => + if (!fileStatus.isDir) { + val stream = new BufferedReader(new InputStreamReader(getFileSystem().open(fileStatus.getPath))) + var currentLine = "" + while ( { + currentLine = stream.readLine(); currentLine + } != null) sb.append(currentLine + "\n") + stream.close() + } + } + sb.toString() + } + + def getRandomPort(): Int = { + val localmachine: ServerSocket = new ServerSocket(0) + val randomPort = localmachine.getLocalPort() + localmachine.close() + return randomPort + } + + @Before + override def setUp() { + super.setUp() + val logs = File.createTempFile("hadoop-logs", "").getName + new File(logs).mkdirs() + System.setProperty("hadoop.log.dir", logs) + System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.NoOpLog") + System.setProperty("javax.xml.parsers.SAXParserFactory", "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl") + + val location = File.createTempFile("dfs", "") + location.delete() + val dfsName = new File(location.getName + File.separatorChar + "name") + val dfsData = new File(location.getName + File.separatorChar + "data") + dfsName.mkdirs() + dfsData.mkdirs() + + conf = new Configuration + conf.set("dfs.name.dir", dfsName.getAbsolutePath) + conf.set("dfs.data.dir", dfsData.getAbsolutePath) + + dfsCluster = new MiniDFSCluster(getRandomPort(), conf, 1, true, false, StartupOption.REGULAR, Array[String]("/rack1")) + for (p <- paths) getFileSystem().makeQualified(new Path(p)) + + mrCluster = new MiniMRCluster(1, getFileSystem().getUri().toString(), 1) + } + + @Test + def testEliminateWarning() { /* this avoids a warning */ } + + @After + override def tearDown() { + super.tearDown() + println("stopping services") + if (mrCluster != null) { + println("stopping mapred...") + mrCluster.shutdown() + mrCluster = null + } + if (dfsCluster != null) { + println("stopping datanodes...") + dfsCluster.shutdownDataNodes() + println("stopping hdfs...") + dfsCluster.shutdown() + dfsCluster = null + } + println("done stopping services") + } + - LocalCluster.initializeSchema(schema) } \ No newline at end of file