Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions crates/core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion::execution::options::{ArrowReadOptions, ReadOptions};
use datafusion::execution::runtime_env::RuntimeEnvBuilder;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::{FunctionRegistry, TaskContextProvider};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, DataFrame, JsonReadOptions, ParquetReadOptions,
};
Expand Down Expand Up @@ -1410,6 +1411,42 @@ impl PySessionContext {
Ok(PyRecordBatchStream::new(stream))
}

/// Execute an `ExecutionPlan` and write the results to a partitioned CSV file at `path`.
pub fn write_csv(
&self,
plan: PyExecutionPlan,
path: &str,
py: Python,
) -> PyDataFusionResult<()> {
let plan: Arc<dyn ExecutionPlan> = plan.into();
wait_for_future(py, self.ctx.write_csv(plan, path))??;
Ok(())
}

/// Execute an `ExecutionPlan` and write the results to a partitioned newline-delimited JSON file at `path`.
pub fn write_json(
&self,
plan: PyExecutionPlan,
path: &str,
py: Python,
) -> PyDataFusionResult<()> {
let plan: Arc<dyn ExecutionPlan> = plan.into();
wait_for_future(py, self.ctx.write_json(plan, path))??;
Ok(())
}

/// Execute an `ExecutionPlan` and write the results to a partitioned Parquet file at `path`.
pub fn write_parquet(
&self,
plan: PyExecutionPlan,
path: &str,
py: Python,
) -> PyDataFusionResult<()> {
let plan: Arc<dyn ExecutionPlan> = plan.into();
wait_for_future(py, self.ctx.write_parquet(plan, path, None))??;
Ok(())
}

pub fn __datafusion_task_context_provider__<'py>(
&self,
py: Python<'py>,
Expand Down
57 changes: 57 additions & 0 deletions python/datafusion/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1902,6 +1902,63 @@ def execute(self, plan: ExecutionPlan, partitions: int) -> RecordBatchStream:
"""Execute the ``plan`` and return the results."""
return RecordBatchStream(self.ctx.execute(plan._raw_plan, partitions))

def write_csv(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None:
"""Execute ``plan`` and write the results to a partitioned CSV file.

``path`` is treated as a directory; one file per partition is written
inside it. For per-DataFrame writes with options (header control,
write options), use :py:meth:`DataFrame.write_csv` instead.

Examples:
>>> import tempfile, pathlib
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
>>> with tempfile.TemporaryDirectory() as tmp:
... out = pathlib.Path(tmp) / "out"
... ctx.write_csv(df.execution_plan(), str(out))
... sorted(p.suffix for p in out.iterdir())
['.csv']
"""
self.ctx.write_csv(plan._raw_plan, str(path))

def write_json(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None:
"""Execute ``plan`` and write the results to a partitioned NDJSON file.

``path`` is treated as a directory; one newline-delimited JSON file
per partition is written inside it. For per-DataFrame writes with
options, use :py:meth:`DataFrame.write_json` instead.

Examples:
>>> import tempfile, pathlib
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
>>> with tempfile.TemporaryDirectory() as tmp:
... out = pathlib.Path(tmp) / "out"
... ctx.write_json(df.execution_plan(), str(out))
... sorted(p.suffix for p in out.iterdir())
['.json']
"""
self.ctx.write_json(plan._raw_plan, str(path))

def write_parquet(self, plan: ExecutionPlan, path: str | pathlib.Path) -> None:
"""Execute ``plan`` and write the results to a partitioned Parquet file.

``path`` is treated as a directory; one Parquet file per partition is
written inside it. For per-DataFrame writes with compression and
writer options, use :py:meth:`DataFrame.write_parquet` instead.

Examples:
>>> import tempfile, pathlib
>>> ctx = dfn.SessionContext()
>>> df = ctx.from_pydict({"a": [1, 2, 3]})
>>> with tempfile.TemporaryDirectory() as tmp:
... out = pathlib.Path(tmp) / "out"
... ctx.write_parquet(df.execution_plan(), str(out))
... sorted(p.suffix for p in out.iterdir())
['.parquet']
"""
self.ctx.write_parquet(plan._raw_plan, str(path))

@staticmethod
def _convert_file_sort_order(
file_sort_order: Sequence[Sequence[SortKey]] | None,
Expand Down
Loading