Internal Data Ingestion

Dask contains internal tools for extensible data ingestion in the dask.bytes package. These functions are developer-focused rather than for direct consumption by users. These functions power user facing functions like ``dd.read_csv`` and ``db.read_text`` which are probably more useful for most users.

read_bytes(urlpath[, delimiter, not_zero, ...]) Convert path to a list of delayed values
open_files(urlpath[, compression, mode, ...]) Given path return dask.delayed file-like objects
open_text_files(urlpath[, compression, ...]) Given path return dask.delayed file-like objects in text mode

These functions are extensible in their output formats (bytes, file objects), their input locations (file system, S3, HDFS), line delimiters, and compression formats.

These functions provide data as dask.delayed objects. These objects either point to blocks of bytes (read_bytes) or open file objects (open_files, open_text_files). They can handle different compression formats by prepending protocols like s3:// or hdfs://. They handle compression formats listed in the dask.bytes.compression module.

These functions are not used for all data sources. Some data sources like HDF5 are quite particular and receive custom treatment.

Delimiters

The read_bytes function takes a path (or globstring of paths) and produces a sample of the first file and a list of delayed objects for each of the other files. If passed a delimiter such as delimiter=b'\n' it will ensure that the blocks of bytes start directly after a delimiter and end directly before a delimiter. This allows other functions, like pd.read_csv, to operate on these delayed values with expected behavior.

These delimiters are useful both for typical line-based formats (log files, CSV, JSON) as well as other delimited formats like Avro, which may separate logical chunks by a complex sentinel string.

Locations

These functions dispatch to other functions that handle different storage backends, like S3 and HDFS. These storage backends register themselves with protocols and so are called whenever the path is prepended with a string like the following:

s3://bucket/keys-*.csv

The various back-ends accept optional extra keywords, detailing authentication and other parameters, see remote data services

Compression

These functions support widely available compression technologies like gzip, bz2, xz, snappy, and lz4. More compressions can be easily added by inserting functions into dictionaries available in the dask.bytes.compression module. This can be done at runtime and need not be added directly to the codebase.

However, not all compression technologies are available for all functions. In particular, compression technologies like gzip do not support efficient random access and so are useful for streaming open_files but not useful for read_bytes which splits files at various points.

Functions

dask.bytes.read_bytes(urlpath, delimiter=None, not_zero=False, blocksize=134217728, sample=True, compression=None, **kwargs)

Convert path to a list of delayed values

The path may be a filename like '2015-01-01.csv' or a globstring like '2015-*-*.csv'.

The path may be preceded by a protocol, like s3:// or hdfs:// if those libraries are installed.

This cleanly breaks data by a delimiter if given, so that block boundaries start directly after a delimiter and end on the delimiter.

Parameters:
  • urlpath (string) – Absolute or relative filepath, URL (may include protocols like s3://), or globstring pointing to data.
  • delimiter (bytes) – An optional delimiter, like b'\n' on which to split blocks of bytes.
  • not_zero (bool) – Force seek of start-of-file delimiter, discarding header.
  • blocksize (int (=128MB)) – Chunk size in bytes
  • compression (string or None) – String like ‘gzip’ or ‘xz’. Must support efficient random access.
  • sample (bool or int) – Whether or not to return a header sample. If an integer is given it is used as sample size, otherwise the default sample size is 10kB.
  • **kwargs (dict) – Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.

Examples

>>> sample, blocks = read_bytes('2015-*-*.csv', delimiter=b'\n')  
>>> sample, blocks = read_bytes('s3://bucket/2015-*-*.csv', delimiter=b'\n')  
Returns:
  • A sample header and list of dask.Delayed objects or list of lists of
  • delayed objects if fn is a globstring.
dask.bytes.open_files(urlpath, compression=None, mode='rb', encoding='utf8', errors=None, name_function=None, num=1, **kwargs)

Given path return dask.delayed file-like objects

Parameters:
  • urlpath (string) – Absolute or relative filepath, URL (may include protocols like s3://), or globstring pointing to data.
  • compression (string) – Compression to use. See dask.bytes.compression.files for options.
  • mode ('rb', 'wt', etc.) –
  • encoding (str) – For text mode only
  • errors (None or str) – Passed to TextIOWrapper in text mode
  • name_function (function or None) – if opening a set of files for writing, those files do not yet exist, so we need to generate their names by formatting the urlpath for each sequence number
  • num (int [1]) – if writing mode, number of files we expect to create (passed to name+function)
  • **kwargs (dict) – Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.

Examples

>>> files = open_files('2015-*-*.csv')  
>>> files = open_files('s3://bucket/2015-*-*.csv.gz', compression='gzip')  
Returns:
Return type:List of dask.delayed objects that compute to file-like objects
dask.bytes.open_text_files(urlpath, compression=None, mode='rt', encoding='utf8', errors='strict', **kwargs)

Given path return dask.delayed file-like objects in text mode

Parameters:
  • urlpath (string) – Absolute or relative filepath, URL (may include protocols like s3://), or globstring pointing to data.
  • encoding (string) –
  • errors (string) –
  • compression (string) – Compression to use. See dask.bytes.compression.files for options.
  • **kwargs (dict) – Extra options that make sense to a particular storage connection, e.g. host, port, username, password, etc.

Examples

>>> files = open_text_files('2015-*-*.csv', encoding='utf-8')  
>>> files = open_text_files('s3://bucket/2015-*-*.csv')  
Returns:
Return type:List of dask.delayed objects that compute to text file-like objects