diff --git a/bin/cingest b/bin/cingest index 4ef312c6..26b20d46 100755 --- a/bin/cingest +++ b/bin/cingest @@ -39,6 +39,7 @@ Available applications: manysplits Repeatedly lowers the split threshold on a table to create many splits in order to test split performance bulk Create RFiles in a Map Reduce job and calls importDirectory if successful + corrupt Corrupts the first entry after the minimum row. Use -o test.ci.ingest.row.min to change the minimum. EOF } @@ -81,6 +82,9 @@ case "$1" in fi ci_main="${ci_package}.BulkIngest" ;; + corrupt) + ci_main="${ci_package}.CorruptEntry" + ;; *) echo "Unknown application: $1" print_usage diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java index 94a53bb1..0dc13f59 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/ContinuousWalk.java @@ -46,6 +46,9 @@ static class BadChecksumException extends RuntimeException { super(msg); } + public BadChecksumException(String msg, Exception nfe) { + super(msg, nfe); + } } public static void main(String[] args) throws Exception { @@ -154,7 +157,7 @@ private static String getPrevRow(Value value) { return null; } - private static int getChecksumOffset(byte[] val) { + static int getChecksumOffset(byte[] val) { if (val[val.length - 1] != ':') { if (val[val.length - 9] != ':') throw new IllegalArgumentException(new String(val, UTF_8)); @@ -169,7 +172,12 @@ static void validate(Key key, Value value) throws BadChecksumException { if (ckOff < 0) return; - long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16); + long storedCksum; + try { + storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16); + } catch (NumberFormatException nfe) { + throw new BadChecksumException("Checksum invalid " + key + " " + value, nfe); + } CRC32 cksum = new CRC32(); diff --git a/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java b/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java new file mode 100644 index 00000000..bd71fa12 --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/continuous/CorruptEntry.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.testing.continuous; + +import static java.nio.charset.StandardCharsets.UTF_8; + +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.data.Key; +import org.apache.accumulo.core.data.Mutation; +import org.apache.accumulo.core.data.Range; +import org.apache.hadoop.io.Text; + +public class CorruptEntry { + public static void main(String[] args) throws Exception { + try (ContinuousEnv env = new ContinuousEnv(args); + AccumuloClient client = env.getAccumuloClient(); + var scanner = client.createScanner(env.getAccumuloTableName()); + var writer = client.createBatchWriter(env.getAccumuloTableName())) { + final long rowMin = env.getRowMin(); + + var startRow = ContinuousIngest.genRow(rowMin); + new Range(new Text(startRow), null); + scanner.setRange(new Range(new Text(startRow), null)); + var iter = scanner.iterator(); + if (iter.hasNext()) { + var entry = iter.next(); + byte[] val = entry.getValue().get(); + int offset = ContinuousWalk.getChecksumOffset(val); + if (offset >= 0) { + for (int i = 0; i < 8; i++) { + if (val[i + offset] == 'f' || val[i + offset] == 'F') { + // in the case of an f hex char, set the hex char to 0 + val[i + offset] = '0'; + + } else { + // increment the hex char in the checcksum + val[i + offset]++; + } + } + Key key = entry.getKey(); + Mutation m = new Mutation(key.getRow()); + m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier()) + .visibility(key.getColumnVisibility()).put(val); + writer.addMutation(m); + writer.flush(); + System.out.println("Corrupted checksum value on key " + key); + } + } else { + throw new IllegalArgumentException("No entry found after " + new String(startRow, UTF_8)); + } + } + } +} diff --git a/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java index 281b1834..9259fa02 100644 --- a/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java +++ b/src/main/java/org/apache/accumulo/testing/continuous/FlakyBulkBatchWriter.java @@ -187,7 +187,7 @@ public synchronized void flush() throws MutationsRejectedException { long t2 = System.nanoTime(); log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir, - loadPlan.getDestinations().size(), mutations.size(), memUsed, + loadPlan.getDestinations().size(), keysValues.size(), memUsed, TimeUnit.NANOSECONDS.toMillis(t2 - t1)); fileSystem.delete(tmpDir, true); diff --git a/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java b/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java new file mode 100644 index 00000000..500e7a5c --- /dev/null +++ b/src/main/java/org/apache/accumulo/testing/continuous/ValidatingIterator.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.testing.continuous; + +import java.io.IOException; +import java.util.Collection; + +import org.apache.accumulo.core.data.ByteSequence; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.iterators.WrappingIterator; + +/** + * Validate the checksum on each entry read in the continuous ingest table + */ +public class ValidatingIterator extends WrappingIterator { + + public void seek(Range range, Collection columnFamilies, boolean inclusive) + throws IOException { + super.seek(range, columnFamilies, inclusive); + if (super.hasTop()) { + ContinuousWalk.validate(super.getTopKey(), super.getTopValue()); + } + } + + public void next() throws IOException { + super.next(); + if (super.hasTop()) { + ContinuousWalk.validate(super.getTopKey(), super.getTopValue()); + } + } +} diff --git a/test/compaction-failures/README.md b/test/compaction-failures/README.md new file mode 100644 index 00000000..1b48b295 --- /dev/null +++ b/test/compaction-failures/README.md @@ -0,0 +1,53 @@ + + +Some scripts for testing different compaction failure scenarios. + +```bash +# start compactors, half of which will always fail any compaction because they are missing an iterator class +./setup-compactors.sh +# starting ingest into table ci1 +./start-ingest.sh ci1 NORMAL +# starting ingest into table ci2 with a non-existent compaction iterator configured, all compactions should fail on this table +./start-ingest.sh ci2 BAD_ITER +# starting ingest into table ci3 with a compaction service that has no compactors running, no compactions should ever run for this table +./start-ingest.sh ci3 BAD_SERVICE +# starting ingest into table ci4, corrupting data in a single tablet such that that tablet can never compact +./start-ingest.sh ci4 BAD_TABLET +``` + +While test are running can use the following to monitor files per tablet on a table. + +``` +$ accumulo jshell +Preparing JShell for Apache Accumulo + +Use 'client' to interact with Accumulo + +| Welcome to JShell -- Version 17.0.15 +| For an introduction type: /help intro + +jshell> /open count-file-per-tablet.jshell + +jshell> CFPT.printStats(client, "ci1", 3000) + 0 secs min:20 avg:30.37 max:35 + 3 secs min:20 avg:30.28 max:35 +``` diff --git a/test/compaction-failures/count-file-per-tablet.jshell b/test/compaction-failures/count-file-per-tablet.jshell new file mode 100644 index 00000000..5f3cf40c --- /dev/null +++ b/test/compaction-failures/count-file-per-tablet.jshell @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Iterators; +import com.google.common.collect.Streams; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.RowIterator; +import org.apache.accumulo.core.data.Range; + +// counts files per tablet for a table over time +public class CFPT { + + public static void printPerTabletFileStats(AccumuloClient client, String table, long startTime) throws Exception { + var tableId = client.tableOperations().tableIdMap().get(table); + try(var scanner = client.createScanner("accumulo.metadata")) { + scanner.setRange(new Range(tableId+":", tableId+"<")); + scanner.fetchColumnFamily("file"); + RowIterator rowIterator = new RowIterator(scanner); + var stats = Streams.stream(rowIterator).mapToInt(Iterators::size).summaryStatistics(); + long diff = (System.currentTimeMillis() -startTime)/1000; + System.out.printf("%3d secs min:%d avg:%.2f max:%d\n", diff, stats.getMin(),stats.getAverage(),stats.getMax() ); + } + } + + public static void printStats(AccumuloClient client, String table, long sleep) throws Exception { + long startTime = System.currentTimeMillis(); + printPerTabletFileStats(client, table, startTime); + + while(true) { + Thread.sleep(sleep); + printPerTabletFileStats(client, table, startTime); + } + } +} diff --git a/test/compaction-failures/setup-compactors.sh b/test/compaction-failures/setup-compactors.sh new file mode 100755 index 00000000..8c28ae3f --- /dev/null +++ b/test/compaction-failures/setup-compactors.sh @@ -0,0 +1,60 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# configure compaction services +accumulo shell -u root -p secret < logs/coord.out & + +# get the absolute path of the the accumulo test non shaded test jar +TEST_JAR=$(readlink -f $(ls ../../target/accumulo-testing-[0-9].*jar)) + +# start 4 compactors with the test iterator on their classpath +for i in $(seq 1 4); do + CLASSPATH=$TEST_JAR ACCUMULO_SERVICE_INSTANCE=compactor_small_$i accumulo compactor -q small &> logs/compactor-small-$i.out & +done + +# start four compactors that do not have the test jar on their classpath. Since +# every table configures an iterator w/ the test jar, every compaction on these +# compactors should fail +for i in $(seq 5 8); do + ACCUMULO_SERVICE_INSTANCE=compactor_small_$i accumulo compactor -q small &> logs/compactor-small-$i.out & +done + +# start 4 compactors for the large group w/ the test iterator on the classpath +for i in $(seq 1 4); do + CLASSPATH=$TEST_JAR ACCUMULO_SERVICE_INSTANCE=compactor_large_$i accumulo compactor -q large &> logs/compactor-large-$i.out & +done + +# start 4 compactors for the large group that are missing the iterator used by the test table +for i in $(seq 5 8); do + ACCUMULO_SERVICE_INSTANCE=compactor_large_$i accumulo compactor -q large &> logs/compactor-large-$i.out & +done \ No newline at end of file diff --git a/test/compaction-failures/start-ingest.sh b/test/compaction-failures/start-ingest.sh new file mode 100755 index 00000000..9bebfb67 --- /dev/null +++ b/test/compaction-failures/start-ingest.sh @@ -0,0 +1,73 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +if [ "$#" != "2" ]; then + echo "Usage $0 BAD_ITER|BAD_TABLET|BAD_SERVICE|NORMAL" + exit 1 +fi + +#accumulo-testing directory +ATD=../.. + +table=$1 +test_type=$2 + +$ATD/bin/cingest createtable -o test.ci.common.accumulo.table=$table + +# setup a compaction time iterator and point the tablet to compaction service w/ external compactors +accumulo shell -u root -p secret < BAD_ITER|BAD_TABLET|BAD_SERVICE|NORMAL" + exit 1 +esac + +mkdir -p logs + +if [ "$test_type" == "NORMAL" ]; then + # start unlimited ingest into the table + $ATD/bin/cingest ingest -o test.ci.ingest.bulk.workdir=/ci_bulk -o test.ci.common.accumulo.table=$table &> logs/bulk-$table.log & +else + # limit the amount of data written since tablets can not compact + # would not need to do this if the table.file.pause property existed in 2.1 + $ATD/bin/cingest ingest -o test.ci.ingest.bulk.workdir=/ci_bulk -o test.ci.common.accumulo.table=$table -o test.ci.ingest.client.entries=10000000 &> logs/bulk-$table.log & +fi +