GH-48636: [C++][Parquet] Improve parquet reading using multi threads#50158
GH-48636: [C++][Parquet] Improve parquet reading using multi threads#50158OmBiradar wants to merge 1 commit into
Conversation
|
Thanks for opening a pull request! If this is not a minor PR. Could you open an issue for this pull request on GitHub? https://github.com/apache/arrow/issues/new/choose Opening GitHub issues ahead of time contributes to the Openness of the Apache Arrow project. Then could you also rename the pull request title in the following format? or See also: |
There was a problem hiding this comment.
Pull request overview
Note
Copilot was unable to run its full agentic suite in this review.
Updates Parquet testing submodule pins and introduces optional parallelism when loading/building nested struct children readers to improve throughput when use_threads() is enabled.
Changes:
- Bump
testingandcpp/submodules/parquet-testingsubmodule commits. - Use
::arrow::internal::OptionalParallelForto parallelizeStructReader::LoadBatch. - Parallelize
StructReader::BuildArraychild array construction before converting chunks to single arrays.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
| testing | Updates submodule commit SHA. |
| cpp/submodules/parquet-testing | Updates submodule commit SHA for parquet-testing. |
| cpp/src/parquet/arrow/reader.cc | Adds optional parallel execution for struct child LoadBatch and BuildArray. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| return ::arrow::internal::OptionalParallelFor( | ||
| ctx_->reader_properties->use_threads(), static_cast<int>(children_.size()), | ||
| [&](int i) { return children_[i]->LoadBatch(records_to_read); }); |
There was a problem hiding this comment.
template <class FUNCTION>
Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
if (use_threads) {
return ParallelFor(num_tasks, std::forward<FUNCTION>(func), executor);
} else {
for (int i = 0; i < num_tasks; ++i) {
RETURN_NOT_OK(func(i));
}
return Status::OK();
}
}At the end, it's always cast to an int so truncating is unavoidable.
| std::shared_ptr<ChunkedArray> field; | ||
| RETURN_NOT_OK(child->BuildArray(validity_io.values_read, &field)); | ||
| ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ArrayData> array_data, ChunksToSingle(*field)); | ||
| const int num_children = static_cast<int>(children_.size()); |
There was a problem hiding this comment.
template <class FUNCTION>
Status OptionalParallelFor(bool use_threads, int num_tasks, FUNCTION&& func,
Executor* executor = internal::GetCpuThreadPool()) {
if (use_threads) {
return ParallelFor(num_tasks, std::forward<FUNCTION>(func), executor);
} else {
for (int i = 0; i < num_tasks; ++i) {
RETURN_NOT_OK(func(i));
}
return Status::OK();
}
}At the end, it's always cast to an int so truncating is unavoidable.
Signed-off-by: OmBiradar <ombiradar04@gmail.com>
fdade64 to
3512d99
Compare
|
TBH, I don't think it is a good approach as we've tried this in the past. The main gotcha is that reading costs of different columns vary significantly by nature. For example, strings take longer time to decompress and decode but integers are smaller and faster. If the file is on a cloud object store, the majority time is blocked on waiting for I/O which may exhaust the thread pool if it is a wide column file. |
|
Yes, even I faced the same IO limitation while testing the changes. But as this is only triggered when the user wants multi threaded reading. When the file is close to the processor i.e in memory there is a significant improvement (considering a normal case) An edge case like a heavy column (I mean w.r.t the computing time) the delay is inevitable unless there is a complex system to manage the building and loading of a single column using multiple threads. I would say that this change would not be the best case but give workable results? |
Rationale for this change
Currently the parquet file structs are read sequentially, even when the
use_threadsoption is used by the user. This PR aims to bridge this gap by making the struct reading truly parallel.What changes are included in this PR?
Changes to the
LoadBatchandBuildArrayfunctions in theStructReaderin the Parquet reader to enable multi threaded reads of structs in parquet files.Are these changes tested?
Are there any user-facing changes?
This is purely a performance related PR. No changes to the user or API is needed.