From 574995a9f1c0895992ea4d329dee519ab7680afb Mon Sep 17 00:00:00 2001 From: Mike Trelinski Date: Wed, 21 Aug 2013 17:50:57 -0700 Subject: [PATCH] Loosened MOK, MOV, ROK, ROV types to be more permissive for HMapCombineReduceTask, fixed ability to make table from traits and only having families, added MapReduce tests for type changes, removed test warnings from HPasteTestCase, and added HPasteMapReduceTestCase for just a basic HDFS and MR cluster for testing without any HBase --- pom.xml | 510 ++++++++++-------- .../gravity/hbase/mapreduce/mapreduce2.scala | 36 +- .../com/gravity/hbase/schema/HbaseTable.scala | 2 +- .../mapreduce.tests/MapReduceTests.scala | 187 +++++++ .../gravity/hbase/schema/ExampleSchema.scala | 69 ++- .../gravity/hbase/schema/HPasteTestCase.scala | 112 +++- 6 files changed, 661 insertions(+), 255 deletions(-) create mode 100644 src/test/scala/com/gravity/hbase/mapreduce.tests/MapReduceTests.scala diff --git a/pom.xml b/pom.xml index 2fba5fe..8ac5046 100644 --- a/pom.xml +++ b/pom.xml @@ -1,252 +1,292 @@ - 4.0.0 - com.gravity - gravity-hpaste - 0.1.24-SNAPSHOT - jar - hpaste - http://github.com/GravityLabs/HPaste - Scala mapper for hbase - 2011 - - Gravity - http://www.gravity.com/ - + 4.0.0 + com.gravity + gravity-hpaste + 0.1.24-SNAPSHOT + jar + hpaste + http://github.com/GravityLabs/HPaste + Scala mapper for hbase + 2011 + + Gravity + http://www.gravity.com/ + - - org.sonatype.oss - oss-parent - 7 - + + org.sonatype.oss + oss-parent + 7 + - - - Lemmsjid - Chris Bissell - chris@gravity.com - http://github.com/Lemmsjid - Gravity - http://www.gravity.com/ - - developer - architect - - -8 - - - erraggy - Robbie Coleman - robbie@gravity.com - http://robbie.robnrob.com/ - Gravity - http://www.gravity.com/ - - developer - - -8 - - http://1.gravatar.com/avatar/dc77b368ec1f077dcc4aca3b9c003d2d - - - - - scm:git:git@github.com:GravityLabs/HPaste.git - scm:git:git@github.com:GravityLabs/HPaste.git - scm:git:git@github.com:GravityLabs/HPaste.git - - - - Apache 2 - http://www.apache.org/licenses/LICENSE-2.0.txt - repo - A business-friendly OSS license - - + + + Lemmsjid + Chris Bissell + chris@gravity.com + http://github.com/Lemmsjid + Gravity + http://www.gravity.com/ + + developer + architect + + -8 + + + erraggy + Robbie Coleman + robbie@gravity.com + http://robbie.robnrob.com/ + Gravity + http://www.gravity.com/ + + developer + + -8 + + http://1.gravatar.com/avatar/dc77b368ec1f077dcc4aca3b9c003d2d + + + + mtrelinski + Michael Trelinski + mtrelinski@gravity.com + https://github.com/michael-trelinski + Gravity + http://www.gravity.com/ + + developer + + -8 + + https://1.gravatar.com/avatar/01a9f612dd1762578eeb4bc3e56443b4 + + + + + scm:git:git@github.com:GravityLabs/HPaste.git + scm:git:git@github.com:GravityLabs/HPaste.git + scm:git:git@github.com:GravityLabs/HPaste.git + + + + Apache 2 + http://www.apache.org/licenses/LICENSE-2.0.txt + repo + A business-friendly OSS license + + - - 1.5 - 1.5 - UTF-8 - 2.9.1 - + + 1.5 + 1.5 + UTF-8 + 2.9.1 + - - - joda-time - joda-time - 1.6.1 - - - org.apache.hadoop - hadoop-core - 0.20.2 - - - org.hbase - asynchbase - 1.4.0 - - - org.slf4j - log4j-over-slf4j - - - org.slf4j - jcl-over-slf4j - - - - + + + joda-time + joda-time + 1.6.1 + + + org.apache.hadoop + hadoop-core + 0.20.2 + + + org.slf4j + slf4j-log4j12 + + + + + org.hbase + asynchbase + 1.4.0 + + + org.slf4j + log4j-over-slf4j + + + org.slf4j + jcl-over-slf4j + + + org.slf4j + slf4j-log4j12 + + + - - org.apache.hbase - hbase - 0.90.4 - - - org.apache.hadoop - hadoop-core - - - org.slf4j - slf4j-log4j12 - - - log4j - log4j - - - org.jruby - jruby-complete - - - - + + org.apache.hbase + hbase + 0.90.4 + + + org.apache.hadoop + hadoop-core + + + org.slf4j + slf4j-log4j12 + + + log4j + log4j + + + org.jruby + jruby-complete + + + + net.sf.trove4j trove4j 3.0.3 + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hbase + hbase + 0.90.4 + test-jar + test + + + org.jruby + jruby-complete + + + org.slf4j + slf4j-log4j12 + + + + + org.apache.hadoop + hadoop-test + 0.20.2 + test + + + org.slf4j + slf4j-log4j12 + + + + + org.scala-lang + scala-library + ${scala.version} - - org.apache.hbase - hbase - 0.90.4 - test-jar - test - - - org.jruby - jruby-complete - - - - - org.apache.hadoop - hadoop-test - 0.20.2 - test - - - org.scala-lang - scala-library - ${scala.version} - - - - - junit - junit - 4.8.1 - test - - - - - - org.apache.maven.plugins - maven-source-plugin - 2.1.2 - - - - jar - - - - - - org.scala-tools - maven-scala-plugin - 2.15.0 - - - - compile - testCompile - - - - -make:transitive - -dependencyfile - ${project.build.directory}/.scala_dependencies - - - - - - - org.apache.maven.plugins - maven-surefire-plugin - 2.6 - - -Xmx1024m - false - true - - **/*Test.* - - - **/*IT.* - - - - - org.apache.maven.plugins - maven-release-plugin - 2.2.2 - - - + + + junit + junit + 4.8.1 + test + + - - - release-sign-artifacts - - - performRelease - true - - - + - - org.apache.maven.plugins - maven-gpg-plugin - 1.4 - - - sign-artifacts - verify - - sign - - - - + + org.apache.maven.plugins + maven-source-plugin + 2.1.2 + + + + jar + + + + + + org.scala-tools + maven-scala-plugin + 2.15.0 + + + + compile + testCompile + + + + -make:transitive + -dependencyfile + ${project.build.directory}/.scala_dependencies + + + + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.6 + + -Xmx1024m + false + true + + **/*Test.* + + + **/*IT.* + + + + + org.apache.maven.plugins + maven-release-plugin + 2.2.2 + - - - + + + + + release-sign-artifacts + + + performRelease + true + + + + + + org.apache.maven.plugins + maven-gpg-plugin + 1.4 + + + sign-artifacts + verify + + sign + + + + + + + + diff --git a/src/main/scala/com/gravity/hbase/mapreduce/mapreduce2.scala b/src/main/scala/com/gravity/hbase/mapreduce/mapreduce2.scala index d27f22b..de0ed66 100644 --- a/src/main/scala/com/gravity/hbase/mapreduce/mapreduce2.scala +++ b/src/main/scala/com/gravity/hbase/mapreduce/mapreduce2.scala @@ -393,8 +393,8 @@ class HJob[S <: SettingsBase](val name: String, tasks: HTask[_, _, _, _]*) { def getMapperFunc[MK, MV, MOK, MOV](idx: Int) = { val task = tasks(idx) - if (task.isInstanceOf[HMapReduceTask[MK, MV, MOK, MOV, _, _]]) { - val tk = task.asInstanceOf[HMapReduceTask[MK, MV, MOK, MOV, _, _]] + if (task.isInstanceOf[HMapReduceTask[MK, MV, MOK, MOV, _, _, _, _]]) { + val tk = task.asInstanceOf[HMapReduceTask[MK, MV, MOK, MOV, _, _, _, _]] tk.mapper } else if (task.isInstanceOf[HMapTask[MK, MV, MOK, MOV]]) { @@ -405,10 +405,10 @@ class HJob[S <: SettingsBase](val name: String, tasks: HTask[_, _, _, _]*) { } } - def getReducerFunc[MOK, MOV, ROK, ROV](idx: Int) = { + def getReducerFunc[MOK, MOV, RIK, RIV, ROK, ROV](idx: Int) = { val task = tasks(idx) - if (task.isInstanceOf[HMapReduceTask[_, _, MOK, MOV, ROK, ROV]]) { - val tk = task.asInstanceOf[HMapReduceTask[_, _, MOK, MOV, ROK, ROV]] + if (task.isInstanceOf[HMapReduceTask[_, _, MOK, MOV, RIK, RIV, ROK, ROV]]) { + val tk = task.asInstanceOf[HMapReduceTask[_, _, MOK, MOV, RIK, RIV, ROK, ROV]] tk.reducer } else { throw new RuntimeException("Unable to find reducer for index " + idx) @@ -1013,11 +1013,11 @@ abstract class HMapper[MK, MV, MOK, MOV] extends Mapper[MK, MV, MOK, MOV] with M } } -abstract class HReducer[MOK, MOV, ROK, ROV] extends Reducer[MOK, MOV, ROK, ROV] with MRWritable[ROK, ROV] { +abstract class HReducer[RIK, RIV, ROK, ROV] extends Reducer[RIK, RIV, ROK, ROV] with MRWritable[ROK, ROV] { type SettingsClass <: SettingsBase - var context: Reducer[MOK, MOV, ROK, ROV]#Context = null + var context: Reducer[RIK, RIV, ROK, ROV]#Context = null var settings: SettingsClass = _ def counter(message: String, count: Long) { @@ -1035,13 +1035,13 @@ abstract class HReducer[MOK, MOV, ROK, ROV] extends Reducer[MOK, MOV, ROK, ROV] def values = context.getValues - override def setup(context: Reducer[MOK, MOV, ROK, ROV]#Context) { + override def setup(context: Reducer[RIK, RIV, ROK, ROV]#Context) { this.context = context settings = Class.forName(context.getConfiguration.get("hpaste.settingsclass")).newInstance().asInstanceOf[SettingsClass] settings.fromSettings(context.getConfiguration) } - override def reduce(key: MOK, values: Iterable[MOV], context: Reducer[MOK, MOV, ROK, ROV]#Context) { + override def reduce(key: RIK, values: Iterable[RIV], context: Reducer[RIK, RIV, ROK, ROV]#Context) { reduce() } @@ -1050,7 +1050,7 @@ abstract class HReducer[MOK, MOV, ROK, ROV] extends Reducer[MOK, MOV, ROK, ROV] } object HMapReduceTask { - def apply[MK, MV, MOK: Manifest, MOV: Manifest, ROK: Manifest, ROV: Manifest](name: String, mapper: HMapper[MK, MV, MOK, MOV], reducer: HReducer[MOK, MOV, ROK, ROV]): HMapReduceTask[MK, MV, MOK, MOV, ROK, ROV] = { + def apply[MK, MV, MOK: Manifest, MOV: Manifest, RIK: Manifest, RIV: Manifest, ROK: Manifest, ROV: Manifest](name: String, mapper: HMapper[MK, MV, MOK, MOV], reducer: HReducer[MOK, MOV, ROK, ROV]): HMapReduceTask[MK, MV, MOK, MOV, MOK, MOV, ROK, ROV] = { HMapReduceTask( HTaskID(name), HTaskConfigs(), @@ -1089,13 +1089,13 @@ case class HGroupingTask[MK, MV, MOK: Manifest, MOV: Manifest, ROK: Manifest, RO /** * An HTask that wraps a standard mapper and reducer function. */ -case class HMapReduceTask[MK, MV, MOK: Manifest, MOV: Manifest, ROK: Manifest, ROV: Manifest]( +case class HMapReduceTask[MK, MV, MOK: Manifest, MOV: Manifest, RIK: Manifest, RIV: Manifest, ROK: Manifest, ROV: Manifest]( id: HTaskID, configs: HTaskConfigsBase = HTaskConfigs(), io: HIO[MK, MV, ROK, ROV] = HIO(), mapper: HMapper[MK, MV, MOK, MOV], - reducer: HReducer[MOK, MOV, ROK, ROV], - combiner: HReducer[MOK, MOV, MOK, MOV] = null) + reducer: HReducer[RIK, RIV, ROK, ROV], + combiner: HReducer[MOV, MOK, RIK, RIV] = null) extends HTask[MK, MV, ROK, ROV](id, configs, io) { @@ -1170,11 +1170,15 @@ case class HMapTask[MK, MV, MOK: Manifest, MOV: Manifest](id: HTaskID, configs: /** * A task for a Mapper / Combiner / Reducer combo */ -case class HMapCombineReduceTask[MK, MV, MOK: Manifest, MOV: Manifest, ROK, ROV](id: HTaskID, configs: HTaskConfigs = HTaskConfigs(), io: HIO[MK, MV, ROK, ROV] = HIO(), mapper: HMapper[MK, MV, MOK, MOV], combiner: HReducer[MOK, MOV, ROK, ROV], reducer: HReducer[MOK, MOV, ROK, ROV]) extends HTask[MK, MV, ROK, ROV](id, configs, io) { +case class HMapCombineReduceTask[MK, MV, MOK: Manifest, MOV: Manifest, RIK: Manifest, RIV: Manifest, ROK: Manifest, ROV: Manifest](id: HTaskID, configs: HTaskConfigs = HTaskConfigs(), io: HIO[MK, MV, ROK, ROV] = HIO(), mapper: HMapper[MK, MV, MOK, MOV], combiner: HReducer[MOK, MOV, RIK, RIV], reducer: HReducer[RIK, RIV, ROK, ROV]) extends HTask[MK, MV, ROK, ROV](id, configs, io) { def decorateJob(job: Job) { job.setMapperClass(mapper.getClass) + job.setMapOutputKeyClass(classManifest[MOK].erasure) job.setMapOutputValueClass(classManifest[MOV].erasure) + job.setOutputKeyClass(classManifest[ROK].erasure) + job.setOutputValueClass(classManifest[ROV].erasure) + job.setReducerClass(reducer.getClass) job.setCombinerClass(combiner.getClass) } @@ -1272,7 +1276,7 @@ class HMapContext[MK, MV, MOK, MOV, S <: SettingsBase](conf: Configuration, coun /** * This is the context object for a Reduce function. It gets passed into the reducer defined in an HTask. */ -class HReduceContext[MOK, MOV, ROK, ROV, S <: SettingsBase](conf: Configuration, counter: (String, Long) => Unit, val context: Reducer[MOK, MOV, ROK, ROV]#Context) extends HContext[S](conf, counter) { +class HReduceContext[RIK, RIV, ROK, ROV, S <: SettingsBase](conf: Configuration, counter: (String, Long) => Unit, val context: Reducer[RIK, RIV, ROK, ROV]#Context) extends HContext[S](conf, counter) { def key = context.getCurrentKey def values = context.getValues @@ -1280,7 +1284,7 @@ class HReduceContext[MOK, MOV, ROK, ROV, S <: SettingsBase](conf: Configuration, def write(key: ROK, value: ROV) {context.write(key, value)} } -class ToTableReduceContext[MOK, MOV, T <: HbaseTable[T, R, _], R, S <: SettingsBase](conf: Configuration, counter: (String, Long) => Unit, context: Reducer[MOK, MOV, NullWritable, Writable]#Context) extends HReduceContext[MOK, MOV, NullWritable, Writable, S](conf, counter, context) { +class ToTableReduceContext[RIK, RIV, T <: HbaseTable[T, R, _], R, S <: SettingsBase](conf: Configuration, counter: (String, Long) => Unit, context: Reducer[RIK, RIV, NullWritable, Writable]#Context) extends HReduceContext[RIK, RIV, NullWritable, Writable, S](conf, counter, context) { def write(operation: OpBase[T, R]) { operation.getOperations.foreach {op => write(NullWritable.get(), op)} } diff --git a/src/main/scala/com/gravity/hbase/schema/HbaseTable.scala b/src/main/scala/com/gravity/hbase/schema/HbaseTable.scala index 3eba5a0..015a6eb 100644 --- a/src/main/scala/com/gravity/hbase/schema/HbaseTable.scala +++ b/src/main/scala/com/gravity/hbase/schema/HbaseTable.scala @@ -111,7 +111,7 @@ abstract class HbaseTable[T <: HbaseTable[T, R, RR], R, RR <: HRow[T, R]](val ta */ def converterByBytes(famBytes: Array[Byte], colBytes: Array[Byte]): KeyValueConvertible[_, _, _] = { - if (colFamLookup.length == 0 || famLookup.length == 0) { + if (colFamLookup.length == 0 && famLookup.length == 0) { throw new RuntimeException("Attempting to lookup 0 length columns and families--HBaseTable is corrupt") } diff --git a/src/test/scala/com/gravity/hbase/mapreduce.tests/MapReduceTests.scala b/src/test/scala/com/gravity/hbase/mapreduce.tests/MapReduceTests.scala new file mode 100644 index 0000000..5ec0748 --- /dev/null +++ b/src/test/scala/com/gravity/hbase/mapreduce.tests/MapReduceTests.scala @@ -0,0 +1,187 @@ +package com.gravity.hbase.mapreduce.tests + +import org.apache.commons.logging.Log +import org.apache.commons.logging.LogFactory +import com.gravity.hbase.mapreduce._ +import org.apache.hadoop.io.{NullWritable, Text, IntWritable} +import scala.collection.JavaConversions._ +import junit.framework.TestCase +import org.junit._ +import org.apache.hadoop.mapred.MiniMRCluster +import org.apache.hadoop.hdfs.MiniDFSCluster +import java.io.{InputStreamReader, BufferedReader, File} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import com.gravity.hbase.schema.HPasteMapReduceTestCase +import com.gravity.hbase.mapreduce.HMapCombineReduceTask +import com.gravity.hbase.mapreduce.HTaskConfigs +import com.gravity.hbase.mapreduce.BigMemoryConf +import com.gravity.hbase.mapreduce.HIO +import com.gravity.hbase.mapreduce.HPathInput +import com.gravity.hbase.mapreduce.HTaskID +import com.gravity.hbase.mapreduce.ReducerCountConf +import com.gravity.hbase.mapreduce.HPathOutput + +/** + * User: mtrelinski + */ + +object TemporaryIOPaths { + + val NONCE = "com-gravity-hpaste-" + val INPUT = NONCE + "input" + + val ClassicWordCount_OUTPUT = NONCE + "classicwordcount-output" + val ClassicWordCountWithCombiner_OUTPUT = NONCE + "classicwordcountcombiner-output" + val ClassicWordCountWithCombinerDifferentReducer_OUTPUT = NONCE + "wordcountcombinerdiffreducer-output" + +} + +// The Jobs: + +class ClassicWordCount extends HJob[NoSettings] ( + "A Contrived and Classic Word Count", + HMapReduceTask( + HTaskID("AContrivedAndClassicWordCount"), + HTaskConfigs(ReducerCountConf(2), BigMemoryConf(512, 512, 500, 500)), + HIO( + HPathInput(TemporaryIOPaths.INPUT :: Nil), + HPathOutput(TemporaryIOPaths.ClassicWordCount_OUTPUT) + ), + new LineToWordsWithCounts, + new WordWithCountAggregator + ) +) + +class ClassicWordCountWithCombiner extends HJob[NoSettings] ( + "Another Contrived and Classic Word Count using a Combiner", + HMapCombineReduceTask( + HTaskID("AnotherContrivedWordCountWithACombiner"), + HTaskConfigs(ReducerCountConf(2), BigMemoryConf(512, 512, 500, 500)), + HIO( + HPathInput(TemporaryIOPaths.INPUT :: Nil), + HPathOutput(TemporaryIOPaths.ClassicWordCountWithCombiner_OUTPUT) + ), + new LineToWordsWithCounts, + new WordWithCountAggregator, + new WordWithCountAggregator + ) +) + +class ClassicWordCountWithCombinerAndDifferentReducer extends HJob[NoSettings] ( + "Another Contrived and Classic Word Count using a Combiner with a Reducer that doesn't match the output K, V types", + HMapCombineReduceTask( + HTaskID("YetAnotherContrivedWordCountWithACombiner"), + HTaskConfigs(ReducerCountConf(2), BigMemoryConf(512, 512, 500, 500)), + HIO( + HPathInput(TemporaryIOPaths.INPUT :: Nil), + HPathOutput(TemporaryIOPaths.ClassicWordCountWithCombinerDifferentReducer_OUTPUT) + ), + new LineToWordsWithCounts, + new WordWithCountAggregator, + new WordWithCountFinalAggregator + ) +) + +// The Transformers and Aggregators: + +class LineToWordsWithCounts extends HMapper[IntWritable, Text, Text, IntWritable] { + + val wordWritable = new Text + val one = new IntWritable(1) + + override def map() { + this.value.toString.split("\\s").foreach { + word => + wordWritable.set(word) + this.write(wordWritable, one) + } + + } + +} + +class WordWithCountAggregator extends HReducer[Text, IntWritable, Text, IntWritable] { + + override def reduce() { + + var total: Int = 0 + val valuesIterator = this.values.iterator() + while(valuesIterator.hasNext) + total = total + valuesIterator.next().get() + + this.write(this.key, new IntWritable(total)) + } + +} + +class WordWithCountFinalAggregator extends HReducer[Text, IntWritable, Text, NullWritable] { + + override def reduce() { + + var total: Int = 0 + val valuesIterator = this.values.iterator() + while(valuesIterator.hasNext) + total = total + valuesIterator.next().get() + + this.write(new Text(this.key.toString + "\t" + total), NullWritable.get()) + } + +} + +// The Tests: + +class MapReduceTests extends HPasteMapReduceTestCase(TemporaryIOPaths.INPUT :: TemporaryIOPaths.ClassicWordCount_OUTPUT :: TemporaryIOPaths.ClassicWordCountWithCombiner_OUTPUT :: TemporaryIOPaths.ClassicWordCountWithCombinerDifferentReducer_OUTPUT :: Nil) { + + val sentences = "the quick brown fox jumped over the lazy dog" :: "cat and dog" :: Nil + val sentencesWordFrequencies = Map("the" -> 2, "quick" -> 1, "brown" -> 1, "fox" -> 1, "jumped" -> 1, "over" -> 1, "lazy" -> 1, "dog" -> 2, "and" -> 1, "cat" -> 1) + + def verifyOutput(path: String) = { + val output: String = getJobOutput(path) + println("Job Output: \n" + output) + output.split("\n").foreach { + line => + val lineSplit = line.split("\t") + val (word, freq) = (lineSplit(0) -> lineSplit(1)) + Assert.assertEquals( "'" + word + "' should be " + sentencesWordFrequencies(word) + " but got " + freq + " instead", sentencesWordFrequencies(word).toLong, freq.toLong) + } + } + + def writeSentences() { + val stream = getFileSystem().create(new Path(TemporaryIOPaths.INPUT + "/sentences.txt"), true) + sentences.foreach { + sentence => stream.write((sentence + "\n").getBytes()) + } + stream.flush() + stream.sync() + stream.close() + } + + @Test + def testClassicWordCount() { + writeSentences() + val classicWordCount = new ClassicWordCount + val resultOfClassicWordCount = classicWordCount.run(new com.gravity.hbase.mapreduce.NoSettings, conf) + Assert.assertTrue("job failed", resultOfClassicWordCount._1) + verifyOutput(TemporaryIOPaths.ClassicWordCount_OUTPUT) + } + + @Test + def testClassicWordCountWithCombiner() { + writeSentences() + val classicWordCountWithCombiner = new ClassicWordCountWithCombiner + val resultOfClassicWordCountWithCombiner = classicWordCountWithCombiner.run(new com.gravity.hbase.mapreduce.NoSettings, conf) + Assert.assertTrue("job failed", resultOfClassicWordCountWithCombiner._1) + verifyOutput(TemporaryIOPaths.ClassicWordCountWithCombiner_OUTPUT) + } + + @Test + def testClassicWordCountWithCombinerAndDifferentReducer() { + writeSentences() + val classicWordCountWithCombinerAndDifferentReducer = new ClassicWordCountWithCombinerAndDifferentReducer + val resultOfClassicWordCountWithCombinerAndDifferentReducer = classicWordCountWithCombinerAndDifferentReducer.run(new com.gravity.hbase.mapreduce.NoSettings, conf) + Assert.assertTrue("job failed", resultOfClassicWordCountWithCombinerAndDifferentReducer._1) + verifyOutput(TemporaryIOPaths.ClassicWordCountWithCombinerDifferentReducer_OUTPUT) + } + +} diff --git a/src/test/scala/com/gravity/hbase/schema/ExampleSchema.scala b/src/test/scala/com/gravity/hbase/schema/ExampleSchema.scala index 2cf6706..725aee4 100644 --- a/src/test/scala/com/gravity/hbase/schema/ExampleSchema.scala +++ b/src/test/scala/com/gravity/hbase/schema/ExampleSchema.scala @@ -119,7 +119,52 @@ object ExampleSchema extends Schema { class ExampleTableRow(table:ExampleTable,result:DeserializedResult) extends HRow[ExampleTable,String](result,table) + + trait Kitty[T <: HbaseTable[T, R, _], R] { + this: HbaseTable[T, R, _] => + + val kittyCutenessStats = family[String, String, Double]("kcs", rowTtlInSeconds = 604800, compressed = true) + + } + + trait KittyRow[T <: HbaseTable[T, R, RR] with Kitty[T, R], R, RR <: HRow[T, R]] { + this: HRow[T, R] => + + lazy val kittyCutenessStats = family(_.kittyCutenessStats) + + } + + trait AdultCat[T <: HbaseTable[T, R, _], R] { + this: HbaseTable[T, R, _] => + + val catCutenessStats = family[String, String, Double]("ccs", rowTtlInSeconds = 604800, compressed = true) + + } + + trait AdultCatRow[T <: HbaseTable[T, R, RR] with AdultCat[T, R], R, RR <: HRow[T, R]] { + this: HRow[T, R] => + + lazy val catCuteness = family(_.catCutenessStats) + + } + + class CatTable extends HbaseTable[CatTable, String, CatRow](tableName = "cats_by_cuteness", rowKeyClass = classOf[String], logSchemaInconsistencies = false, tableConfig = HbaseTableConfig(maxFileSizeInBytes=1073741824)) + with Kitty[CatTable, String] + with AdultCat[CatTable, String] { + + override def rowBuilder(result: DeserializedResult) = new CatRow(result, this) + + } + + class CatRow(result: DeserializedResult, table: CatTable) extends HRow[CatTable, String](result, table) + with KittyRow[CatTable, String, CatRow] + with AdultCatRow[CatTable, String, CatRow] { + + } + //Register the table (DON'T FORGET TO DO THIS :) ) + + val catTable = table(new CatTable) val ExampleTable = table(new ExampleTable) } @@ -302,7 +347,29 @@ enable 'schema_example'""" ExampleSchema.ExampleTable.query2.withKey("Robbie").withFamilies(_.meta).singleOptionAsync().get.prettyPrint() } - + @Test def testScanCats() { + System.err.println("Filling up table...") + val remusKittyStats = Map("remus" -> 10.0d) + val remusCatStats = Map("remus" -> 9.0d) + val alaistorKittyStats = Map("remus" -> 9.0d) + val alaistorCatStats = Map("remus" -> 8.0d) + + + ExampleSchema.catTable.put("remus").valueMap(_.kittyCutenessStats, remusKittyStats).valueMap(_.catCutenessStats, remusCatStats).execute() + ExampleSchema.catTable.put("alaistor").valueMap(_.kittyCutenessStats, alaistorKittyStats).valueMap(_.catCutenessStats, alaistorCatStats).execute() + + var rows: Int = 0 + ExampleSchema.catTable.query2.withFamilies(_.catCutenessStats, _.kittyCutenessStats).scan { + scanner => + rows = rows + 1 + val cat = scanner.rowid + val catCutenessLevel = scanner.catCuteness.values.head + val kittyCutenessLevel = scanner.kittyCutenessStats.values.head + System.err.println(cat + " has a kitty-level cuteness of " + kittyCutenessLevel + " and an adult cuteness of " + catCutenessLevel) + } + Assert.assertEquals("not enough cats", rows.toInt, 2.toInt) + System.err.println("done with cat scan") + } } diff --git a/src/test/scala/com/gravity/hbase/schema/HPasteTestCase.scala b/src/test/scala/com/gravity/hbase/schema/HPasteTestCase.scala index cb54eca..5bb2ccc 100644 --- a/src/test/scala/com/gravity/hbase/schema/HPasteTestCase.scala +++ b/src/test/scala/com/gravity/hbase/schema/HPasteTestCase.scala @@ -4,6 +4,15 @@ import junit.framework.TestCase import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.hbase.HBaseTestingUtility import scala.collection.mutable.{SynchronizedSet, HashSet} +import org.junit.{After, Before, Test} +import org.apache.hadoop.hdfs.MiniDFSCluster +import org.apache.hadoop.mapred.MiniMRCluster +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import java.io.{File, InputStreamReader, BufferedReader} +import java.net.ServerSocket +import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption +import collection.mutable /* )\._.,--....,'``. .b--. /; _.. \ _\ (`._ ,. @@ -32,7 +41,106 @@ object LocalCluster { } } -class HPasteTestCase(schema:Schema) extends TestCase { +class HPasteTestCase(schema:Schema, name: String) extends TestCase(name) { + + if (schema != null) LocalCluster.initializeSchema(schema) + + def this() = this(null, "none") + + def this(name: String) = this(null, name) + + def this(schema: Schema) = this(schema, "none") + + @Test + def testEliminateWarning() { /* this avoids a warning */ } + +} + +class HPasteMapReduceTestCase(paths: Seq[String], name: String) extends TestCase(name) { + + var dfsCluster: MiniDFSCluster = _ + var mrCluster: MiniMRCluster = _ + var conf: Configuration = _ + + + def this() = this(Seq.empty[String], "none") + + def this(name: String) = this(Seq.empty[String], name) + + def this(paths: Seq[String]) = this(paths, "none") + + def getFileSystem() = dfsCluster.getFileSystem + + def getJobOutput(path: String) = { + val sb: StringBuilder = new StringBuilder + getFileSystem().listStatus(new Path(path)).foreach { + fileStatus => + if (!fileStatus.isDir) { + val stream = new BufferedReader(new InputStreamReader(getFileSystem().open(fileStatus.getPath))) + var currentLine = "" + while ( { + currentLine = stream.readLine(); currentLine + } != null) sb.append(currentLine + "\n") + stream.close() + } + } + sb.toString() + } + + def getRandomPort(): Int = { + val localmachine: ServerSocket = new ServerSocket(0) + val randomPort = localmachine.getLocalPort() + localmachine.close() + return randomPort + } + + @Before + override def setUp() { + super.setUp() + val logs = File.createTempFile("hadoop-logs", "").getName + new File(logs).mkdirs() + System.setProperty("hadoop.log.dir", logs) + System.setProperty("org.apache.commons.logging.Log", "org.apache.commons.logging.impl.NoOpLog") + System.setProperty("javax.xml.parsers.SAXParserFactory", "com.sun.org.apache.xerces.internal.jaxp.SAXParserFactoryImpl") + + val location = File.createTempFile("dfs", "") + location.delete() + val dfsName = new File(location.getName + File.separatorChar + "name") + val dfsData = new File(location.getName + File.separatorChar + "data") + dfsName.mkdirs() + dfsData.mkdirs() + + conf = new Configuration + conf.set("dfs.name.dir", dfsName.getAbsolutePath) + conf.set("dfs.data.dir", dfsData.getAbsolutePath) + + dfsCluster = new MiniDFSCluster(getRandomPort(), conf, 1, true, false, StartupOption.REGULAR, Array[String]("/rack1")) + for (p <- paths) getFileSystem().makeQualified(new Path(p)) + + mrCluster = new MiniMRCluster(1, getFileSystem().getUri().toString(), 1) + } + + @Test + def testEliminateWarning() { /* this avoids a warning */ } + + @After + override def tearDown() { + super.tearDown() + println("stopping services") + if (mrCluster != null) { + println("stopping mapred...") + mrCluster.shutdown() + mrCluster = null + } + if (dfsCluster != null) { + println("stopping datanodes...") + dfsCluster.shutdownDataNodes() + println("stopping hdfs...") + dfsCluster.shutdown() + dfsCluster = null + } + println("done stopping services") + } + - LocalCluster.initializeSchema(schema) } \ No newline at end of file