-
Notifications
You must be signed in to change notification settings - Fork 1.8k
Description
Describe the bug
Hey everyone! 👋🏻
I was reading through how metrics are exported to datafusion from arrow-rs, it seems to be happening with the help of ArrowReaderMetrics. We init this struct in ParquetOpener and then update metric using map over stream by fetching value from the ArrowReaderMetrics struct and incrementing value of datafusion metric.
Problem I see with this flow is, map is called whenever stream returns anything, so lets say stream is polled 3 times. We get value of an ever increasing metric 3 times and add it over.
To Reproduce
I noticed this when trying to implement a different metric, its on an internal fork, but I can try to make a reproducer if this is getting hard to understand.
Expected behavior
For e.g. if it looked like this:
start: there are 3 record batches worth of data in this file, and df_predicate_cache_inner_records = 0
- 1st poll: return 1st record batch, predicate_cache_inner_records: 5, adding to df_predicate_cache_inner_records: 5
- 2nd poll: return 2nd record batch, predicate_cache_inner_records: 10, adding to df_predicate_cache_inner_records: 15
- 3rd poll: return 3rd record batch, predicate_cache_inner_records: 15, adding to df_predicate_cache_inner_records: 30
So instead of metric in datafusion having a value of 15, it gets 30. In short, we would have wanted to store metric at the end of the stream, but as we implemented this using map we are getting a compounded value of the metric over time.
Additional context
Do correct me if I am understanding this correctly, if this indeed sounds like a bug, I would love to make a fix for the same :)