0

I am trying to import a very large .csv file as:

import dask.dataframe as dd
import pandas as pd
#TO DO

dd_subf1_small = dd.read_csv('subf1_small.csv', dtype={'Unnamed: 0': 'float64','oecd_subfield':'object','paperid':'object'}, sep=None, engine = 'python').persist()

but I am getting the following error:

---------------------------------------------------------------------------
ParserError                               Traceback (most recent call last)
Cell In [1], line 5
      2 import pandas as pd
      3 #TO DO
----> 5 dd_subf1_small = dd.read_csv('subf1_small.csv', dtype={'Unnamed: 0': 'float64','oecd_subfield':'object','paperid':'object'}, sep=None, engine = 'python').persist()

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/base.py:288, in DaskMethodsMixin.persist(self, **kwargs)
    249 def persist(self, **kwargs):
    250     """Persist this dask collection into memory
    251 
    252     This turns a lazy Dask collection into a Dask collection with the same
   (...)
    286     dask.base.persist
    287     """
--> 288     (result,) = persist(self, traverse=False, **kwargs)
    289     return result

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/base.py:904, in persist(traverse, optimize_graph, scheduler, *args, **kwargs)
    901     keys.extend(a_keys)
    902     postpersists.append((rebuild, a_keys, state))
--> 904 results = schedule(dsk, keys, **kwargs)
    905 d = dict(zip(keys, results))
    906 results2 = [r({k: d[k] for k in ks}, *s) for r, ks, s in postpersists]

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/threaded.py:89, in get(dsk, keys, cache, num_workers, pool, **kwargs)
     86     elif isinstance(pool, multiprocessing.pool.Pool):
     87         pool = MultiprocessingPoolExecutor(pool)
---> 89 results = get_async(
     90     pool.submit,
     91     pool._max_workers,
     92     dsk,
     93     keys,
     94     cache=cache,
     95     get_id=_thread_get_id,
     96     pack_exception=pack_exception,
     97     **kwargs,
     98 )
    100 # Cleanup pools associated to dead threads
    101 with pools_lock:

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/local.py:511, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    509         _execute_task(task, data)  # Re-execute locally
    510     else:
--> 511         raise_exception(exc, tb)
    512 res, worker_id = loads(res_info)
    513 state["cache"][key] = res

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/local.py:319, in reraise(exc, tb)
    317 if exc.__traceback__ is not tb:
    318     raise exc.with_traceback(tb)
--> 319 raise exc

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222 try:
    223     task, data = loads(task_info)
--> 224     result = _execute_task(task, data)
    225     id = get_id()
    226     result = dumps((result, id))

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/dataframe/io/csv.py:129, in CSVFunctionWrapper.__call__(self, part)
    126         rest_kwargs["usecols"] = columns
    128 # Call `pandas_read_text`
--> 129 df = pandas_read_text(
    130     self.reader,
    131     block,
    132     self.header,
    133     rest_kwargs,
    134     self.dtypes,
    135     columns,
    136     write_header,
    137     self.enforce,
    138     path_info,
    139 )
    140 if project_after_read:
    141     return df[self.columns]

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/dask/dataframe/io/csv.py:182, in pandas_read_text(reader, b, header, kwargs, dtypes, columns, write_header, enforce, path)
    180 bio.write(b)
    181 bio.seek(0)
--> 182 df = reader(bio, **kwargs)
    183 if dtypes:
    184     coerce_dtypes(df, dtypes)

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/util/_decorators.py:311, in deprecate_nonkeyword_arguments.<locals>.decorate.<locals>.wrapper(*args, **kwargs)
    305 if len(args) > num_allow_args:
    306     warnings.warn(
    307         msg.format(arguments=arguments),
    308         FutureWarning,
    309         stacklevel=stacklevel,
    310     )
--> 311 return func(*args, **kwargs)

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/readers.py:678, in read_csv(filepath_or_buffer, sep, delimiter, header, names, index_col, usecols, squeeze, prefix, mangle_dupe_cols, dtype, engine, converters, true_values, false_values, skipinitialspace, skiprows, skipfooter, nrows, na_values, keep_default_na, na_filter, verbose, skip_blank_lines, parse_dates, infer_datetime_format, keep_date_col, date_parser, dayfirst, cache_dates, iterator, chunksize, compression, thousands, decimal, lineterminator, quotechar, quoting, doublequote, escapechar, comment, encoding, encoding_errors, dialect, error_bad_lines, warn_bad_lines, on_bad_lines, delim_whitespace, low_memory, memory_map, float_precision, storage_options)
    663 kwds_defaults = _refine_defaults_read(
    664     dialect,
    665     delimiter,
   (...)
    674     defaults={"delimiter": ","},
    675 )
    676 kwds.update(kwds_defaults)
--> 678 return _read(filepath_or_buffer, kwds)

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/readers.py:581, in _read(filepath_or_buffer, kwds)
    578     return parser
    580 with parser:
--> 581     return parser.read(nrows)

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/readers.py:1253, in TextFileReader.read(self, nrows)
   1251 nrows = validate_integer("nrows", nrows)
   1252 try:
-> 1253     index, columns, col_dict = self._engine.read(nrows)
   1254 except Exception:
   1255     self.close()

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/python_parser.py:270, in PythonParser.read(self, rows)
    267     indexnamerow = content[0]
    268     content = content[1:]
--> 270 alldata = self._rows_to_cols(content)
    271 data, columns = self._exclude_implicit_index(alldata)
    273 conv_data = self._convert_data(data)

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/python_parser.py:1013, in PythonParser._rows_to_cols(self, content)
   1007             reason = (
   1008                 "Error could possibly be due to quotes being "
   1009                 "ignored when a multi-char delimiter is used."
   1010             )
   1011             msg += ". " + reason
-> 1013         self._alert_malformed(msg, row_num + 1)
   1015 # see gh-13320
   1016 zipped_content = list(lib.to_object_array(content, min_width=col_len).T)

File ~/opt/anaconda3/envs/bocconi/lib/python3.8/site-packages/pandas/io/parsers/python_parser.py:739, in PythonParser._alert_malformed(self, msg, row_num)
    722 """
    723 Alert a user about a malformed row, depending on value of
    724 `self.on_bad_lines` enum.
   (...)
    736     even though we 0-index internally.
    737 """
    738 if self.on_bad_lines == self.BadLineHandleMethod.ERROR:
--> 739     raise ParserError(msg)
    740 elif self.on_bad_lines == self.BadLineHandleMethod.WARN:
    741     base = f"Skipping line {row_num}: "

ParserError: Expected 3 fields in line 1811036, saw 5

Actually i don't know how the data are made as the csv file is 36gb and did not manage to open. I saw another question where the erro was passing header=None which I am not doing.

How can I avoid the above error?

Thanks!

1 Answer 1

1

As the error says, your CSV file probably contains rows with 5 values instead of 3.

You have two options:

  1. Found those rows and fix/remove them from the file. This might be challenging given the file is huge.
  2. use paramter on_bad_lines="skip" to let pandas skip them and continue loading the file.

Learn more about on_bad_lines here: https://pandas.pydata.org/docs/reference/api/pandas.read_csv.html

Also, I noticed you are using sep=None. Why? are the values in each row seperated by nothing? that doesn't make sense. The default (and most common delimiter (aka separator) is comma (,)). Post here an example of 3 lines from the file so I could assist with that.

Sign up to request clarification or add additional context in comments.

1 Comment

Thanks a lot for the reply. Actually I am using sep=None following this suggestion here: stackoverflow.com/questions/34359598/…. The issue was the arsing error again which I simply thought it was a sep error.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.