Skip to content
Merged
4 changes: 2 additions & 2 deletions asv_bench/benchmarks/io/csv.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def time_read_csv(self, bad_date_value):
class ReadCSVSkipRows(BaseIO):

fname = "__test__.csv"
params = ([None, 10000], ["c", "python"])
params = ([None, 10000], ["c", "python", "pyarrow"])
param_names = ["skiprows", "engine"]

def setup(self, skiprows, engine):
Expand Down Expand Up @@ -320,7 +320,7 @@ def time_read_csv_python_engine(self, sep, decimal, float_precision):


class ReadCSVEngine(StringIORewind):
params = ["c", "python"]
params = ["c", "python", "pyarrow"]
param_names = ["engine"]

def setup(self, engine):
Expand Down
29 changes: 21 additions & 8 deletions doc/source/user_guide/io.rst
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,14 @@ dtype : Type name or dict of column -> type, default ``None``
(unsupported with ``engine='python'``). Use ``str`` or ``object`` together
with suitable ``na_values`` settings to preserve and
not interpret dtype.
engine : {``'c'``, ``'python'``}
Parser engine to use. The C engine is faster while the Python engine is
currently more feature-complete.
engine : {``'c'``, ``'python'``, ``'pyarrow'``}
Parser engine to use. The C and pyarrow engines are faster, while the python engine
is currently more feature-complete. Multithreading is currently only supported by
the pyarrow engine.

.. versionadded:: 1.4.0
The "pyarrow" engine was added as an *experimental* engine, and some features
are unsupported, or may not work correctly, with this engine.
converters : dict, default ``None``
Dict of functions for converting values in certain columns. Keys can either be
integers or column labels.
Expand Down Expand Up @@ -1622,11 +1627,19 @@ Specifying ``iterator=True`` will also return the ``TextFileReader`` object:
Specifying the parser engine
''''''''''''''''''''''''''''

Under the hood pandas uses a fast and efficient parser implemented in C as well
as a Python implementation which is currently more feature-complete. Where
possible pandas uses the C parser (specified as ``engine='c'``), but may fall
back to Python if C-unsupported options are specified. Currently, C-unsupported
options include:
Pandas currently supports three engines, the C engine, the python engine, and an experimental
pyarrow engine(which requires the ``pyarrow`` package). In general, the pyarrow engine is fastest
on larger workloads, and is equivalent in speed to the C engine on most other workloads.
The python engine tends to be slower than the pyarrow and C engines on most workloads. However,
the pyarrow engine is much less robust than the C engine, which lacks a few features compared to the
Python engines

Where possible pandas uses the C parser (specified as ``engine='c'``), but it may fall
back to Python if C-unsupported options are specified. If pyarrow unsupported options are
specified while using ``engine='pyarrow'``, the parser will throw an error.
(a full list of unsupported options is available at ``pandas.io.parsers._pyarrow_unsupported``).

Currently, C-unsupported options include:.

* ``sep`` other than a single character (e.g. regex separators)
* ``skipfooter``
Expand Down
9 changes: 6 additions & 3 deletions doc/source/whatsnew/v1.4.0.rst
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,13 @@ In Pandas 2.0, :class:`NumericIndex` will become the default numeric index type

See :ref:`here <advanced.numericindex>` for more.

.. _whatsnew_140.enhancements.enhancement2:
.. _whatsnew_140.enhancements.pyarrow_csv_engine:

enhancement2
^^^^^^^^^^^^
Multithreaded CSV reading with a new CSV Engine based on pyarrow
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

:func:`pandas.read_csv` now accepts ``engine="pyarrow"`` as an argument, allowing for faster csv parsing on multicore machines
with pyarrow installed. See the :doc:`I/O docs </user_guide/io>` for more info. (:issue:`23697`)

.. _whatsnew_140.enhancements.other:

Expand Down
107 changes: 107 additions & 0 deletions pandas/io/parsers/arrow_parser_wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
from __future__ import annotations

from pandas.compat._optional import import_optional_dependency

from pandas.core.dtypes.inference import is_integer

from pandas.io.common import get_handle
from pandas.io.parsers.base_parser import ParserBase


class ArrowParserWrapper(ParserBase):
"""
Wrapper for the pyarrow engine for read_csv()
"""

def __init__(self, src, **kwds):
self.kwds = kwds
self.src = src

ParserBase.__init__(self, kwds)

self._parse_kwds()

def _parse_kwds(self):
encoding: str | None = self.kwds.get("encoding")
self.encoding = "utf-8" if encoding is None else encoding

self.usecols, self.usecols_dtype = self._validate_usecols_arg(
self.kwds["usecols"]
)
na_values = self.kwds["na_values"]
if isinstance(na_values, dict):
raise ValueError(
"The pyarrow engine doesn't support passing a dict for na_values"
)
self.na_values = list(self.kwds["na_values"])

def _get_pyarrow_options(self):
# rename some arguments to pass to pyarrow
mapping = {
"usecols": "include_columns",
"na_values": "null_values",
"escapechar": "escape_char",
"skip_blank_lines": "ignore_empty_lines",
}
for pandas_name, pyarrow_name in mapping.items():
if pandas_name in self.kwds and self.kwds.get(pandas_name) is not None:
self.kwds[pyarrow_name] = self.kwds.pop(pandas_name)

self.parse_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("delimiter", "quote_char", "escape_char", "ignore_empty_lines")
}
self.convert_options = {
option_name: option_value
for option_name, option_value in self.kwds.items()
if option_value is not None
and option_name
in ("include_columns", "null_values", "true_values", "false_values")
}
self.read_options = {
"autogenerate_column_names": self.header is None,
"skip_rows": self.header
if self.header is not None
else self.kwds["skiprows"],
}

def _finalize_output(self, frame):
num_cols = len(frame.columns)
if self.header is None:
if self.names is None:
if self.prefix is not None:
self.names = [f"{self.prefix}{i}" for i in range(num_cols)]
elif self.header is None:
self.names = range(num_cols)
frame.columns = self.names
# we only need the frame not the names
frame.columns, frame = self._do_date_conversions(frame.columns, frame)
if self.index_col is not None:
for i, item in enumerate(self.index_col):
if is_integer(item):
self.index_col[i] = frame.columns[item]
frame.set_index(self.index_col, drop=True, inplace=True)

if self.kwds.get("dtype") is not None:
frame = frame.astype(self.kwds.get("dtype"))
return frame

def read(self):
pyarrow_csv = import_optional_dependency("pyarrow.csv")
self._get_pyarrow_options()

with get_handle(
self.src, "rb", encoding=self.encoding, is_text=False
) as handles:
table = pyarrow_csv.read_csv(
handles.handle,
read_options=pyarrow_csv.ReadOptions(**self.read_options),
parse_options=pyarrow_csv.ParseOptions(**self.parse_options),
convert_options=pyarrow_csv.ConvertOptions(**self.convert_options),
)

frame = table.to_pandas()
return self._finalize_output(frame)
119 changes: 95 additions & 24 deletions pandas/io/parsers/readers.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
from pandas.core.indexes.api import RangeIndex

from pandas.io.common import validate_header_arg
from pandas.io.parsers.arrow_parser_wrapper import ArrowParserWrapper
from pandas.io.parsers.base_parser import (
ParserBase,
is_index_col,
Expand Down Expand Up @@ -143,9 +144,14 @@
to preserve and not interpret dtype.
If converters are specified, they will be applied INSTEAD
of dtype conversion.
engine : {{'c', 'python'}}, optional
Parser engine to use. The C engine is faster while the python engine is
currently more feature-complete.
engine : {{'c', 'python', 'pyarrow'}}, optional
Parser engine to use. The C and pyarrow engines are faster, while the python engine
is currently more feature-complete. Multithreading is currently only supported by
the pyarrow engine.

.. versionadded:: 1.4.0
The "pyarrow" engine was added as an *experimental* engine, and some features
are unsupported, or may not work correctly, with this engine.
converters : dict, optional
Dict of functions for converting values in certain columns. Keys can either
be integers or column labels.
Expand Down Expand Up @@ -406,6 +412,33 @@

_c_unsupported = {"skipfooter"}
_python_unsupported = {"low_memory", "float_precision"}
_pyarrow_unsupported = {
"skipfooter",
"float_precision",
"chunksize",
"comment",
"nrows",
"thousands",
"memory_map",
"dialect",
"warn_bad_lines",
"error_bad_lines",
# TODO(1.4)
# This doesn't error properly ATM, fix for release
# but not blocker for initial PR
# "on_bad_lines",
"delim_whitespace",
"quoting",
"lineterminator",
"converters",
"decimal",
"iterator",
"dayfirst",
"infer_datetime_format",
"verbose",
"skipinitialspace",
"low_memory",
}

_deprecated_defaults: dict[str, Any] = {"error_bad_lines": None, "warn_bad_lines": None}
_deprecated_args: set[str] = {"error_bad_lines", "warn_bad_lines"}
Expand Down Expand Up @@ -472,7 +505,20 @@ def _read(filepath_or_buffer: FilePathOrBuffer, kwds):

# Extract some of the arguments (pass chunksize on).
iterator = kwds.get("iterator", False)
chunksize = validate_integer("chunksize", kwds.get("chunksize", None), 1)
chunksize = kwds.get("chunksize", None)
if kwds.get("engine") == "pyarrow":
if iterator:
raise ValueError(
"The 'iterator' option is not supported with the 'pyarrow' engine"
)

if chunksize is not None:
raise ValueError(
"The 'chunksize' option is not supported with the 'pyarrow' engine"
)
else:
chunksize = validate_integer("chunksize", kwds.get("chunksize", None), 1)

nrows = kwds.get("nrows", None)

# Check for duplicates in names.
Expand Down Expand Up @@ -785,6 +831,10 @@ def __init__(self, f, engine=None, **kwds):

dialect = _extract_dialect(kwds)
if dialect is not None:
if engine == "pyarrow":
raise ValueError(
"The 'dialect' option is not supported with the 'pyarrow' engine"
)
kwds = _merge_with_dialect_properties(dialect, kwds)

if kwds.get("header", "infer") == "infer":
Expand Down Expand Up @@ -823,7 +873,17 @@ def _get_options_with_defaults(self, engine):
value = kwds.get(argname, default)

# see gh-12935
if argname == "mangle_dupe_cols" and not value:
if (
engine == "pyarrow"
and argname in _pyarrow_unsupported
and value != default
):
raise ValueError(
f"The {repr(argname)} option is not supported with the "
f"'pyarrow' engine"
)
elif argname == "mangle_dupe_cols" and value is False:
# GH12935
raise ValueError("Setting mangle_dupe_cols=False is not supported yet")
else:
options[argname] = value
Expand Down Expand Up @@ -878,7 +938,7 @@ def _clean_options(self, options, engine):
delim_whitespace = options["delim_whitespace"]

if sep is None and not delim_whitespace:
if engine == "c":
if engine in ("c", "pyarrow"):
fallback_reason = (
"the 'c' engine does not support "
"sep=None with delim_whitespace=False"
Expand All @@ -891,7 +951,7 @@ def _clean_options(self, options, engine):
elif engine not in ("python", "python-fwf"):
# wait until regex engine integrated
fallback_reason = (
"the 'c' engine does not support "
f"the '{engine}' engine does not support "
"regex separators (separators > 1 char and "
r"different from '\s+' are interpreted as regex)"
)
Expand All @@ -910,7 +970,7 @@ def _clean_options(self, options, engine):
if not encodeable and engine not in ("python", "python-fwf"):
fallback_reason = (
f"the separator encoded in {encoding} "
"is > 1 char long, and the 'c' engine "
f"is > 1 char long, and the '{engine}' engine "
"does not support such separators"
)
engine = "python"
Expand All @@ -925,7 +985,7 @@ def _clean_options(self, options, engine):
fallback_reason = (
"ord(quotechar) > 127, meaning the "
"quotechar is larger than one byte, "
"and the 'c' engine does not support such quotechars"
f"and the '{engine}' engine does not support such quotechars"
)
engine = "python"

Expand Down Expand Up @@ -1001,8 +1061,15 @@ def _clean_options(self, options, engine):
na_values, na_fvalues = _clean_na_values(na_values, keep_default_na)

# handle skiprows; this is internally handled by the
# c-engine, so only need for python parsers
if engine != "c":
# c-engine, so only need for python and pyarrow parsers
if engine == "pyarrow":
if not is_integer(skiprows) and skiprows is not None:
# pyarrow expects skiprows to be passed as an integer
raise ValueError(
"skiprows argument must be an integer when using "
"engine='pyarrow'"
)
else:
if is_integer(skiprows):
skiprows = list(range(skiprows))
if skiprows is None:
Expand Down Expand Up @@ -1030,6 +1097,7 @@ def _make_engine(self, engine="c"):
mapping: dict[str, type[ParserBase]] = {
"c": CParserWrapper,
"python": PythonParser,
"pyarrow": ArrowParserWrapper,
"python-fwf": FixedWidthFieldParser,
}
if engine not in mapping:
Expand All @@ -1043,22 +1111,25 @@ def _failover_to_python(self):
raise AbstractMethodError(self)

def read(self, nrows=None):
nrows = validate_integer("nrows", nrows)
index, columns, col_dict = self._engine.read(nrows)

if index is None:
if col_dict:
# Any column is actually fine:
new_rows = len(next(iter(col_dict.values())))
index = RangeIndex(self._currow, self._currow + new_rows)
else:
new_rows = 0
if self.engine == "pyarrow":
df = self._engine.read()
else:
new_rows = len(index)
nrows = validate_integer("nrows", nrows)
index, columns, col_dict = self._engine.read(nrows)

if index is None:
if col_dict:
# Any column is actually fine:
new_rows = len(next(iter(col_dict.values())))
index = RangeIndex(self._currow, self._currow + new_rows)
else:
new_rows = 0
else:
new_rows = len(index)

df = DataFrame(col_dict, columns=columns, index=index)
df = DataFrame(col_dict, columns=columns, index=index)

self._currow += new_rows
self._currow += new_rows

if self.squeeze and len(df.columns) == 1:
return df[df.columns[0]].copy()
Expand Down
Loading