From cfffddf2fd9b2535d041788776b76bc6c4b212af Mon Sep 17 00:00:00 2001 From: Tim Saucer Date: Thu, 4 Jun 2026 13:02:35 -0400 Subject: [PATCH] feat: expose SessionContext.write_csv, write_json, write_parquet Adds three plan-level writers on SessionContext that mirror the upstream datafusion::execution::context API. Each takes an ExecutionPlan and an output directory path; the plan is executed and its results are written one partition per file inside that directory. These complement the existing DataFrame.write_* methods, which are the right choice when callers need finer control (CSV header, Parquet compression, write options). The new SessionContext methods are the right choice when a caller already holds a physical ExecutionPlan (for example after custom physical optimizer rules or hand-built plans) and just wants the rows materialized. Related to #462. Co-Authored-By: Claude Opus 4.7 (1M context) --- crates/core/src/context.rs | 37 +++++++++++++++++++++++ python/datafusion/context.py | 57 ++++++++++++++++++++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/crates/core/src/context.rs b/crates/core/src/context.rs index da0df751b..c8d5d94ea 100644 --- a/crates/core/src/context.rs +++ b/crates/core/src/context.rs @@ -44,6 +44,7 @@ use datafusion::execution::options::{ArrowReadOptions, ReadOptions}; use datafusion::execution::runtime_env::RuntimeEnvBuilder; use datafusion::execution::session_state::SessionStateBuilder; use datafusion::execution::{FunctionRegistry, TaskContextProvider}; +use datafusion::physical_plan::ExecutionPlan; use datafusion::prelude::{ AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions, }; @@ -1410,6 +1411,42 @@ impl PySessionContext { Ok(PyRecordBatchStream::new(stream)) } + /// Execute an `ExecutionPlan` and write the results to a partitioned CSV file at `path`. + pub fn write_csv( + &self, + plan: PyExecutionPlan, + path: &str, + py: Python, + ) -> PyDataFusionResult<()> { + let plan: Arc = plan.into(); + wait_for_future(py, self.ctx.write_csv(plan, path))??; + Ok(()) + } + + /// Execute an `ExecutionPlan` and write the results to a partitioned newline-delimited JSON file at `path`. + pub fn write_json( + &self, + plan: PyExecutionPlan, + path: &str, + py: Python, + ) -> PyDataFusionResult<()> { + let plan: Arc = plan.into(); + wait_for_future(py, self.ctx.write_json(plan, path))??; + Ok(()) + } + + /// Execute an `ExecutionPlan` and write the results to a partitioned Parquet file at `path`. + pub fn write_parquet( + &self, + plan: PyExecutionPlan, + path: &str, + py: Python, + ) -> PyDataFusionResult<()> { + let plan: Arc = plan.into(); + wait_for_future(py, self.ctx.write_parquet(plan, path, None))??; + Ok(()) + } + pub fn __datafusion_task_context_provider__<'py>( &self, py: Python<'py>, diff --git a/python/datafusion/context.py b/python/datafusion/context.py index accb60f19..738870f5f 100644 --- a/python/datafusion/context.py +++ b/python/datafusion/context.py @@ -1902,6 +1902,63 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream: """Execute the ``plan`` and return the results.""" return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions)) + def write_csv(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None: + """Execute ``plan`` and write the results to a partitioned CSV file. + + ``path`` is treated as a directory; one file per partition is written + inside it. For per-DataFrame writes with options (header control, + write options), use :py:meth:`DataFrame.write_csv` instead. + + Examples: + >>> import tempfile, pathlib + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 2, 3]}) + >>> with tempfile.TemporaryDirectory() as tmp: + ... out = pathlib.Path(tmp) / "out" + ... ctx.write_csv(df.execution_plan(), str(out)) + ... sorted(p.suffix for p in out.iterdir()) + ['.csv'] + """ + self.ctx.write_csv(plan._raw_plan, str(path)) + + def write_json(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None: + """Execute ``plan`` and write the results to a partitioned NDJSON file. + + ``path`` is treated as a directory; one newline-delimited JSON file + per partition is written inside it. For per-DataFrame writes with + options, use :py:meth:`DataFrame.write_json` instead. + + Examples: + >>> import tempfile, pathlib + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 2, 3]}) + >>> with tempfile.TemporaryDirectory() as tmp: + ... out = pathlib.Path(tmp) / "out" + ... ctx.write_json(df.execution_plan(), str(out)) + ... sorted(p.suffix for p in out.iterdir()) + ['.json'] + """ + self.ctx.write_json(plan._raw_plan, str(path)) + + def write_parquet(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None: + """Execute ``plan`` and write the results to a partitioned Parquet file. + + ``path`` is treated as a directory; one Parquet file per partition is + written inside it. For per-DataFrame writes with compression and + writer options, use :py:meth:`DataFrame.write_parquet` instead. + + Examples: + >>> import tempfile, pathlib + >>> ctx = dfn.SessionContext() + >>> df = ctx.from_pydict({"a": [1, 2, 3]}) + >>> with tempfile.TemporaryDirectory() as tmp: + ... out = pathlib.Path(tmp) / "out" + ... ctx.write_parquet(df.execution_plan(), str(out)) + ... sorted(p.suffix for p in out.iterdir()) + ['.parquet'] + """ + self.ctx.write_parquet(plan._raw_plan, str(path)) + @staticmethod def _convert_file_sort_order( file_sort_order: Sequence[Sequence[SortKey]] | None,