Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions bin/cingest
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ Available applications:
manysplits Repeatedly lowers the split threshold on a table to create
many splits in order to test split performance
bulk Create RFiles in a Map Reduce job and calls importDirectory if successful
corrupt Corrupts the first entry after the minimum row. Use -o test.ci.ingest.row.min to change the minimum.
EOF
}

Expand Down Expand Up @@ -81,6 +82,9 @@ case "$1" in
fi
ci_main="${ci_package}.BulkIngest"
;;
corrupt)
ci_main="${ci_package}.CorruptEntry"
;;
*)
echo "Unknown application: $1"
print_usage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ static class BadChecksumException extends RuntimeException {
super(msg);
}

public BadChecksumException(String msg, Exception nfe) {
super(msg, nfe);
}
}

public static void main(String[] args) throws Exception {
Expand Down Expand Up @@ -154,7 +157,7 @@ private static String getPrevRow(Value value) {
return null;
}

private static int getChecksumOffset(byte[] val) {
static int getChecksumOffset(byte[] val) {
if (val[val.length - 1] != ':') {
if (val[val.length - 9] != ':')
throw new IllegalArgumentException(new String(val, UTF_8));
Expand All @@ -169,7 +172,12 @@ static void validate(Key key, Value value) throws BadChecksumException {
if (ckOff < 0)
return;

long storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16);
long storedCksum;
try {
storedCksum = Long.parseLong(new String(value.get(), ckOff, 8, UTF_8), 16);
} catch (NumberFormatException nfe) {
throw new BadChecksumException("Checksum invalid " + key + " " + value, nfe);
}

CRC32 cksum = new CRC32();

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.testing.continuous;

import static java.nio.charset.StandardCharsets.UTF_8;

import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.hadoop.io.Text;

public class CorruptEntry {
public static void main(String[] args) throws Exception {
try (ContinuousEnv env = new ContinuousEnv(args);
AccumuloClient client = env.getAccumuloClient();
var scanner = client.createScanner(env.getAccumuloTableName());
var writer = client.createBatchWriter(env.getAccumuloTableName())) {
final long rowMin = env.getRowMin();

var startRow = ContinuousIngest.genRow(rowMin);
new Range(new Text(startRow), null);
scanner.setRange(new Range(new Text(startRow), null));
var iter = scanner.iterator();
if (iter.hasNext()) {
var entry = iter.next();
byte[] val = entry.getValue().get();
int offset = ContinuousWalk.getChecksumOffset(val);
if (offset >= 0) {
for (int i = 0; i < 8; i++) {
if (val[i + offset] == 'f' || val[i + offset] == 'F') {
// in the case of an f hex char, set the hex char to 0
val[i + offset] = '0';

} else {
// increment the hex char in the checcksum
val[i + offset]++;
}
}
Key key = entry.getKey();
Mutation m = new Mutation(key.getRow());
m.at().family(key.getColumnFamily()).qualifier(key.getColumnQualifier())
.visibility(key.getColumnVisibility()).put(val);
writer.addMutation(m);
writer.flush();
System.out.println("Corrupted checksum value on key " + key);
}
} else {
throw new IllegalArgumentException("No entry found after " + new String(startRow, UTF_8));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ public synchronized void flush() throws MutationsRejectedException {
long t2 = System.nanoTime();

log.debug("Bulk imported dir {} destinations:{} mutations:{} memUsed:{} time:{}ms", tmpDir,
loadPlan.getDestinations().size(), mutations.size(), memUsed,
loadPlan.getDestinations().size(), keysValues.size(), memUsed,
TimeUnit.NANOSECONDS.toMillis(t2 - t1));

fileSystem.delete(tmpDir, true);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.accumulo.testing.continuous;

import java.io.IOException;
import java.util.Collection;

import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.iterators.WrappingIterator;

/**
* Validate the checksum on each entry read in the continuous ingest table
*/
public class ValidatingIterator extends WrappingIterator {

public void seek(Range range, Collection<ByteSequence> columnFamilies, boolean inclusive)
throws IOException {
super.seek(range, columnFamilies, inclusive);
if (super.hasTop()) {
ContinuousWalk.validate(super.getTopKey(), super.getTopValue());
}
}

public void next() throws IOException {
super.next();
if (super.hasTop()) {
ContinuousWalk.validate(super.getTopKey(), super.getTopValue());
}
}
}
53 changes: 53 additions & 0 deletions test/compaction-failures/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
<!--

Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.

-->

Some scripts for testing different compaction failure scenarios.

```bash
# start compactors, half of which will always fail any compaction because they are missing an iterator class
./setup-compactors.sh
# starting ingest into table ci1
./start-ingest.sh ci1 NORMAL
# starting ingest into table ci2 with a non-existent compaction iterator configured, all compactions should fail on this table
./start-ingest.sh ci2 BAD_ITER
# starting ingest into table ci3 with a compaction service that has no compactors running, no compactions should ever run for this table
./start-ingest.sh ci3 BAD_SERVICE
# starting ingest into table ci4, corrupting data in a single tablet such that that tablet can never compact
./start-ingest.sh ci4 BAD_TABLET
```

While test are running can use the following to monitor files per tablet on a table.

```
$ accumulo jshell
Preparing JShell for Apache Accumulo

Use 'client' to interact with Accumulo

| Welcome to JShell -- Version 17.0.15
| For an introduction type: /help intro

jshell> /open count-file-per-tablet.jshell

jshell> CFPT.printStats(client, "ci1", 3000)
0 secs min:20 avg:30.37 max:35
3 secs min:20 avg:30.28 max:35
```
50 changes: 50 additions & 0 deletions test/compaction-failures/count-file-per-tablet.jshell
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import com.google.common.collect.Iterators;
import com.google.common.collect.Streams;
import org.apache.accumulo.core.client.AccumuloClient;
import org.apache.accumulo.core.client.RowIterator;
import org.apache.accumulo.core.data.Range;

// counts files per tablet for a table over time
public class CFPT {

public static void printPerTabletFileStats(AccumuloClient client, String table, long startTime) throws Exception {
var tableId = client.tableOperations().tableIdMap().get(table);
try(var scanner = client.createScanner("accumulo.metadata")) {
scanner.setRange(new Range(tableId+":", tableId+"<"));
scanner.fetchColumnFamily("file");
RowIterator rowIterator = new RowIterator(scanner);
var stats = Streams.stream(rowIterator).mapToInt(Iterators::size).summaryStatistics();
long diff = (System.currentTimeMillis() -startTime)/1000;
System.out.printf("%3d secs min:%d avg:%.2f max:%d\n", diff, stats.getMin(),stats.getAverage(),stats.getMax() );
}
}

public static void printStats(AccumuloClient client, String table, long sleep) throws Exception {
long startTime = System.currentTimeMillis();
printPerTabletFileStats(client, table, startTime);

while(true) {
Thread.sleep(sleep);
printPerTabletFileStats(client, table, startTime);
}
}
}
60 changes: 60 additions & 0 deletions test/compaction-failures/setup-compactors.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#


# configure compaction services
accumulo shell -u root -p secret <<EOF
config -s tserver.compaction.major.service.cs1.planner=org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner
config -s tserver.compaction.major.service.cs1.planner.opts.executors=[{"name":"small","type":"external","maxSize":"128M","queue":"small"},{"name":"large","type":"external","queue":"large"}]
config -s tserver.compaction.major.service.cs2.planner=org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner
config -s tserver.compaction.major.service.cs2.planner.opts.executors=[{"name":"small","type":"external","maxSize":"128M","queue":"emptysmall"},{"name":"large","type":"external","queue":"emptylarge"}]
EOF

mkdir -p logs

pkill -f "Main compactor"
pkill -f "Main compaction-coord"

ACCUMULO_SERVICE_INSTANCE=coord accumulo compaction-coordinator &> logs/coord.out &

# get the absolute path of the the accumulo test non shaded test jar
TEST_JAR=$(readlink -f $(ls ../../target/accumulo-testing-[0-9].*jar))

# start 4 compactors with the test iterator on their classpath
for i in $(seq 1 4); do
CLASSPATH=$TEST_JAR ACCUMULO_SERVICE_INSTANCE=compactor_small_$i accumulo compactor -q small &> logs/compactor-small-$i.out &
done

# start four compactors that do not have the test jar on their classpath. Since
# every table configures an iterator w/ the test jar, every compaction on these
# compactors should fail
for i in $(seq 5 8); do
ACCUMULO_SERVICE_INSTANCE=compactor_small_$i accumulo compactor -q small &> logs/compactor-small-$i.out &
done

# start 4 compactors for the large group w/ the test iterator on the classpath
for i in $(seq 1 4); do
CLASSPATH=$TEST_JAR ACCUMULO_SERVICE_INSTANCE=compactor_large_$i accumulo compactor -q large &> logs/compactor-large-$i.out &
done

# start 4 compactors for the large group that are missing the iterator used by the test table
for i in $(seq 5 8); do
ACCUMULO_SERVICE_INSTANCE=compactor_large_$i accumulo compactor -q large &> logs/compactor-large-$i.out &
done
Loading