Skip to content

application of simple optimizer rule produces incorrect results (DF 49 regression) #17510

@wkalt

Description

@wkalt

Describe the bug

The following code produces an incorrect result on Datafusion 49.0.2, but correct behavior on 48.0.1. Both execution plans should produce the same row count, but in DF 49.0.2 one returns too few results. The behavior is independent of the order in which the original/optimized plans are executed. If the order of the executions is flipped, the second execution always returns the incorrect result.

use arrow_array::{RecordBatch, UInt32Array};
use arrow_schema::{DataType, Field, Schema};
use datafusion::datasource::memory::MemTable;
use datafusion::physical_optimizer::PhysicalOptimizerRule;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::*;
use datafusion_common::Result as DFResult;
use std::sync::Arc;

#[derive(Debug)]
struct NoOpOptimizer;

impl PhysicalOptimizerRule for NoOpOptimizer {
    fn optimize(
        &self,
        plan: Arc<dyn ExecutionPlan>,
        _config: &datafusion_common::config::ConfigOptions,
    ) -> DFResult<Arc<dyn ExecutionPlan>> {
        Ok(plan)
    }

    fn name(&self) -> &str {
        "NoOpOptimizer"
    }

    fn schema_check(&self) -> bool {
        true
    }
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Create simple test data
    let schema = Arc::new(Schema::new(vec![Field::new("id", DataType::UInt32, false)]));

    let batch = RecordBatch::try_new(
        schema.clone(),
        vec![Arc::new(UInt32Array::from((0..20).collect::<Vec<u32>>()))],
    )?;

    let ctx = SessionContext::new();
    let provider = MemTable::try_new(schema.clone(), vec![vec![batch]])?;
    ctx.register_table("test", Arc::new(provider))?;

    // Create a simple plan with a sort and a limit.
    let df = ctx
        .table("test")
        .await?
        .sort(vec![col("id").sort(true, true)])?
        .limit(0, Some(10))?;

    let logical_plan = df.into_optimized_plan()?;
    let plan = ctx.state().create_physical_plan(&logical_plan).await?;

    // Apply no-op optimizer
    let optimizer = NoOpOptimizer;
    let optimized_plan = optimizer.optimize(plan.clone(), &Default::default())?;

    let task_ctx = ctx.task_ctx();

    let original_stream = plan.execute(0, task_ctx.clone())?;
    let original_batches = datafusion::physical_plan::common::collect(original_stream).await?;
    let original_count: usize = original_batches.iter().map(|b| b.num_rows()).sum();

    let optimized_stream = optimized_plan.execute(0, task_ctx)?;
    let optimized_batches = datafusion::physical_plan::common::collect(optimized_stream).await?;
    let optimized_count: usize = optimized_batches.iter().map(|b| b.num_rows()).sum();

    // Result mismatch on DF 49
    println!("Expected: 10 rows (limit in query)");
    println!("Optimized plan: {} rows", optimized_count);
    println!("Original plan:  {} rows", original_count);

    Ok(())
}

Cargo.toml:

[package]
name = "datafusion-regression-test"
version = "0.1.0"
edition = "2021"

[workspace]

[dependencies]
arrow-array = "55"
arrow-schema = "55"
datafusion = "49.0.2"
datafusion-common = "49.0.2"
tokio = { version = "1", features = ["full"] }

DF 49.0.2:

Expected: 10 rows (limit in query)
Optimized plan: 9 rows
Original plan:  10 rows

DF 48.01:

Expected: 10 rows (limit in query)
Optimized plan: 10 rows
Original plan:  10 rows

To Reproduce

No response

Expected behavior

No response

Additional context

No response

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't workinghelp wantedExtra attention is needed

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions