Skip to content

Distributed Query Optimization: Implementing Adaptive Cost-Based Query Planning for Large-Scale Time Series Aggregations #252

@Montana

Description

@Montana

Background

While working with Timely at scale (billions of metrics across distributed Accumulo clusters), I've observed that complex aggregation queries with multiple grouping dimensions can exhibit suboptimal performance. The current query execution appears to use a fixed strategy regardless of data distribution, cardinality, or temporal density.

Problem Statement

Large-scale analytical queries involving:

  • Multi-dimensional groupings (e.g., GROUP BY host, service, datacenter)
  • Long time ranges with sparse data
  • High-cardinality tag combinations
  • Complex aggregation functions (percentiles, rates, derivatives)

This often results in either tablet hotspots or inefficient data shuffling across the cluster.

Proposed Enhancement: Adaptive Query Planner

Implement a cost-based query optimizer that analyzes data distribution statistics and selects optimal execution strategies dynamically.

1. Statistics Collection Layer

public class MetricStatisticsCollector {
    
    private final Connector connector;
    private final String statsTable;
    
    public static class MetricStats {
        private final String metric;
        private final long totalDataPoints;
        private final long uniqueTagCombinations;
        private final Map<String, Long> tagCardinality;
        private final long avgPointsPerSeries;
        private final Distribution temporalDensity;
    }
    
    public MetricStats collectStats(String metric, long startTime, long endTime) 
            throws TableNotFoundException {
        
        Scanner scanner = connector.createScanner(
            TimelyConfiguration.METRICS_TABLE, 
            Authorizations.EMPTY
        );
        
        Range range = new Range(
            MetricAdapter.encodeRowId(metric, startTime),
            MetricAdapter.encodeRowId(metric, endTime)
        );
        scanner.setRange(range);
        
        long dataPoints = 0;
        Set<String> uniqueSeries = new HashSet<>();
        Map<String, Set<String>> tagValues = new HashMap<>();
        
        for (Entry<Key, Value> entry : scanner) {
            dataPoints++;
            
            String tags = entry.getKey().getColumnFamily().toString();
            uniqueSeries.add(tags);
            
            Map<String, String> tagMap = parseTagString(tags);
            tagMap.forEach((tagKey, tagValue) -> {
                tagValues.computeIfAbsent(tagKey, k -> new HashSet<>())
                         .add(tagValue);
            });
        }
        
        Map<String, Long> cardinality = tagValues.entrySet().stream()
            .collect(Collectors.toMap(
                Map.Entry::getKey,
                e -> (long) e.getValue().size()
            ));
        
        long avgPoints = uniqueSeries.isEmpty() ? 0 : 
                        dataPoints / uniqueSeries.size();
        
        return new MetricStats(
            metric, 
            dataPoints, 
            uniqueSeries.size(),
            cardinality,
            avgPoints,
            calculateTemporalDensity(dataPoints, startTime, endTime)
        );
    }
    
    private Distribution calculateTemporalDensity(
            long points, long start, long end) {
        long durationMs = end - start;
        double pointsPerSecond = (points * 1000.0) / durationMs;
        return new Distribution(pointsPerSecond);
    }
}

2. Cost-Based Query Planner

public class AdaptiveQueryPlanner {
    
    private final MetricStatisticsCollector statsCollector;
    
    public enum ExecutionStrategy {
        TABLET_PARALLEL,
        SEQUENTIAL_AGGREGATE,
        PUSH_DOWN_AGGREGATE,
        HYBRID_APPROACH
    }
    
    public static class QueryPlan {
        private final ExecutionStrategy strategy;
        private final int parallelism;
        private final boolean useServerSideAggregation;
        private final List<String> indexHints;
        private final double estimatedCost;
    }
    
    public QueryPlan planQuery(TimelyQuery query) {
        MetricStats stats = statsCollector.collectStats(
            query.getMetric(),
            query.getStartTime(),
            query.getEndTime()
        );
        
        double scanCost = estimateScanCost(stats, query);
        double aggregationCost = estimateAggregationCost(stats, query);
        double networkCost = estimateNetworkCost(stats, query);
        
        ExecutionStrategy strategy;
        int parallelism = 1;
        boolean serverSideAgg = false;
        
        if (stats.uniqueTagCombinations > 100_000 && 
            query.getGroupByTags().size() <= 2) {
            
            strategy = ExecutionStrategy.PUSH_DOWN_AGGREGATE;
            serverSideAgg = true;
            parallelism = Math.min(
                (int) (stats.uniqueTagCombinations / 10_000),
                Runtime.getRuntime().availableProcessors()
            );
            
        } else if (stats.temporalDensity.getMean() < 0.1) {
            strategy = ExecutionStrategy.SEQUENTIAL_AGGREGATE;
            parallelism = 1;
            
        } else if (stats.avgPointsPerSeries > 1000 && 
                   query.getGroupByTags().size() > 2) {
            
            strategy = ExecutionStrategy.TABLET_PARALLEL;
            parallelism = estimateOptimalParallelism(stats);
            
        } else {
            strategy = ExecutionStrategy.HYBRID_APPROACH;
            parallelism = 4;
            serverSideAgg = shouldUseServerSideAgg(stats, query);
        }
        
        double estimatedCost = scanCost + aggregationCost + networkCost;
        
        return new QueryPlan(
            strategy,
            parallelism,
            serverSideAgg,
            generateIndexHints(stats, query),
            estimatedCost
        );
    }
    
    private double estimateScanCost(MetricStats stats, TimelyQuery query) {
        return 100.0 + (stats.totalDataPoints * 0.001);
    }
    
    private double estimateAggregationCost(MetricStats stats, TimelyQuery query) {
        long groupByCardinality = query.getGroupByTags().stream()
            .map(tag -> stats.tagCardinality.getOrDefault(tag, 1L))
            .reduce(1L, (a, b) -> a * b);
        
        return groupByCardinality * 0.01;
    }
    
    private double estimateNetworkCost(MetricStats stats, TimelyQuery query) {
        long estimatedResultRows = stats.uniqueTagCombinations / 
                                   (query.getGroupByTags().isEmpty() ? 1 : 10);
        return estimatedResultRows * 0.1;
    }
    
    private int estimateOptimalParallelism(MetricStats stats) {
        int cpuCores = Runtime.getRuntime().availableProcessors();
        int dataBasedParallelism = (int) Math.sqrt(stats.totalDataPoints / 10000);
        return Math.min(cpuCores * 2, Math.max(2, dataBasedParallelism));
    }
    
    private boolean shouldUseServerSideAgg(MetricStats stats, TimelyQuery query) {
        double reductionRatio = (double) stats.uniqueTagCombinations / 
                               Math.max(1, estimateResultSetSize(stats, query));
        return reductionRatio > 100;
    }
    
    private long estimateResultSetSize(MetricStats stats, TimelyQuery query) {
        if (query.getGroupByTags().isEmpty()) {
            return 1;
        }
        
        return query.getGroupByTags().stream()
            .map(tag -> stats.tagCardinality.getOrDefault(tag, 1L))
            .reduce(1L, (a, b) -> Math.min(a * b, stats.uniqueTagCombinations));
    }
}

3. Execution Engine with Strategy Pattern

public class AdaptiveQueryExecutor {
    
    private final Connector connector;
    private final AdaptiveQueryPlanner planner;
    private final ExecutorService executorService;
    
    public QueryResult executeQuery(TimelyQuery query) 
            throws TableNotFoundException {
        
        QueryPlan plan = planner.planQuery(query);
        
        logger.info("Executing query with strategy: {} (estimated cost: {})",
                   plan.getStrategy(), plan.getEstimatedCost());
        
        switch (plan.getStrategy()) {
            case TABLET_PARALLEL:
                return executeTabletParallel(query, plan);
                
            case PUSH_DOWN_AGGREGATE:
                return executePushDownAggregate(query, plan);
                
            case SEQUENTIAL_AGGREGATE:
                return executeSequential(query, plan);
                
            case HYBRID_APPROACH:
                return executeHybrid(query, plan);
                
            default:
                throw new IllegalStateException(
                    "Unknown strategy: " + plan.getStrategy()
                );
        }
    }
    
    private QueryResult executeTabletParallel(
            TimelyQuery query, QueryPlan plan) 
            throws TableNotFoundException {
        
        BatchScanner batchScanner = connector.createBatchScanner(
            TimelyConfiguration.METRICS_TABLE,
            Authorizations.EMPTY,
            plan.getParallelism()
        );
        
        List<Range> ranges = splitIntoTabletRanges(
            query.getMetric(),
            query.getStartTime(),
            query.getEndTime(),
            plan.getParallelism()
        );
        batchScanner.setRanges(ranges);
        
        Map<String, AggregationState> results = new ConcurrentHashMap<>();
        List<Future<?>> futures = new ArrayList<>();
        
        for (Entry<Key, Value> entry : batchScanner) {
            futures.add(executorService.submit(() -> {
                String groupKey = extractGroupKey(entry, query.getGroupByTags());
                results.computeIfAbsent(groupKey, k -> new AggregationState())
                      .update(entry);
            }));
        }
        
        futures.forEach(f -> {
            try { f.get(); } 
            catch (Exception e) { throw new RuntimeException(e); }
        });
        
        batchScanner.close();
        return buildResult(results, query);
    }
    
    private QueryResult executePushDownAggregate(
            TimelyQuery query, QueryPlan plan) 
            throws TableNotFoundException {
        
        Scanner scanner = connector.createScanner(
            TimelyConfiguration.METRICS_TABLE,
            Authorizations.EMPTY
        );
        
        IteratorSetting iterSetting = new IteratorSetting(
            100,
            "aggregation",
            GroupingAggregationIterator.class
        );
        
        iterSetting.addOption("groupBy", 
            String.join(",", query.getGroupByTags()));
        iterSetting.addOption("aggregation", 
            query.getAggregationType().name());
        
        scanner.addScanIterator(iterSetting);
        
        Map<String, Double> results = new HashMap<>();
        for (Entry<Key, Value> entry : scanner) {
            results.put(
                entry.getKey().getColumnQualifier().toString(),
                ByteBuffer.wrap(entry.getValue().get()).getDouble()
            );
        }
        
        scanner.close();
        return new QueryResult(results);
    }
}

4. Performance Monitoring and Auto-Tuning

public class QueryPerformanceMonitor {
    
    private final ConcurrentHashMap<String, QueryMetrics> metricsCache;
    
    public static class QueryMetrics {
        private final String queryHash;
        private final ExecutionStrategy strategy;
        private final long executionTimeMs;
        private final long rowsScanned;
        private final long rowsReturned;
        private final double actualCost;
        
        public double getEfficiency() {
            return (double) rowsReturned / Math.max(1, rowsScanned);
        }
    }
    
    public void recordExecution(
            TimelyQuery query,
            QueryPlan plan,
            QueryResult result,
            long executionTimeMs) {
        
        String queryHash = generateQueryHash(query);
        
        QueryMetrics metrics = new QueryMetrics(
            queryHash,
            plan.getStrategy(),
            executionTimeMs,
            result.getRowsScanned(),
            result.getRowsReturned(),
            calculateActualCost(executionTimeMs, result)
        );
        
        metricsCache.put(queryHash, metrics);
        
        if (metrics.actualCost > plan.getEstimatedCost() * 1.5) {
            logger.warn("Query {} exceeded cost estimate by {}%",
                       queryHash,
                       ((metrics.actualCost / plan.getEstimatedCost()) - 1) * 100);
            
            adjustCostModel(query, plan, metrics);
        }
    }
    
    private void adjustCostModel(
            TimelyQuery query, 
            QueryPlan plan, 
            QueryMetrics metrics) {
        
        if (metrics.getEfficiency() < 0.1) {
            logger.info("Low efficiency detected, suggesting strategy change");
        }
    }
}

Benefits

  1. Automatic Performance Optimization: Queries adapt to data characteristics without manual tuning
  2. Reduced Hotspots: Intelligent parallelism prevents tablet server overload
  3. Lower Latency: Optimal strategy selection minimizes query execution time
  4. Better Resource Utilization: Cost-aware planning balances CPU, memory, and network
  5. Self-Learning: Performance monitoring enables continuous improvement

Testing Strategy

@Test
public void testQueryPlannerWithHighCardinality() {
    MetricStats stats = new MetricStats(
        "test.metric",
        100_000_000L,
        1_000_000L,
        Map.of("host", 10_000L, "service", 1000L, "datacenter", 10L),
        100L,
        new Distribution(10.0)
    );
    
    TimelyQuery query = new TimelyQuery()
        .metric("test.metric")
        .groupBy("datacenter", "service")
        .aggregate(AggregationType.SUM);
    
    QueryPlan plan = planner.planQuery(query);
    
    assertEquals(ExecutionStrategy.PUSH_DOWN_AGGREGATE, plan.getStrategy());
    assertTrue(plan.isUseServerSideAggregation());
}

Michael.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions