Source code for intake_elasticsearch.elasticsearch_table

from intake.source import base

try:
    from json.decoder import JSONDecodeError
except ImportError:
    JSONDecodeError = ValueError
from .elasticsearch_seq import ElasticSearchSeqSource


[docs]class ElasticSearchTableSource(ElasticSearchSeqSource): """ Data source which executes arbitrary queries on ElasticSearch This is the tabular reader: will return dataframes. Nested return items will become dict-like objects in the output. Parameters ---------- query: str Query to execute. Can either be in Lucene single-line format, or a JSON structured query (presented as text) npartitions: int Split query into this many sections. If one, will not split. qargs: dict Further parameters to pass to the query, such as set of indexes to consider, filtering, ordering. See http://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.search es_kwargs: dict Settings for the ES connection, e.g., a simple local connection may be ``{'host': 'localhost', 'port': 9200}``. Other keywords to the Plugin that end up here and are material: scroll: str how long the query is live for, default ``'100m'`` size: int the paging size when downloading, default 1000. metadata: dict Extra information for this source. """ _dataframe = None container = 'dataframe' name = 'elasticsearch_table' def __init__(self, *args, **kwargs): ElasticSearchSeqSource.__init__(self, *args, **kwargs) self.part = True def _get_schema(self): import pandas as pd """Get schema from first 10 hits or cached dataframe""" if self._dataframe is None: # get dtypes from first 100 results results = self._run_query(end=100) df = pd.DataFrame([r['_source'] for r in results['hits']['hits']]) self._dataframe = df self.part = True dtype = {k: str(v) for k, v in self._dataframe.dtypes.to_dict().items()} shape = (None if self.part else len(self._dataframe), len(dtype)) return base.Schema(datashape=None, dtype=dtype, shape=shape, npartitions=self.npartitions, extra_metadata=self.metadata)
[docs] def to_dask(self): """Turn into dask.dataframe""" import dask.dataframe as dd from dask import delayed self.discover() parts = [] if self.npartitions == 1: part = delayed(self._get_partition)() return dd.from_delayed([part], meta=self.dtype) for slice_id in range(self.npartitions): part = delayed(self._get_partition)(slice_id) parts.append(part) return dd.from_delayed(parts, meta=self.dtype)
def _get_partition(self, partition=None): """ Downloads all data or get the given partition of the query ES has a hard maximum of 10000 items to fetch. Otherwise need to implement paging, known to ES as "scroll" https://stackoverflow.com/questions/41655913/elk-how-do-i-retrieve-more-than-10000-results-events-in-elastic-search Parameters ---------- partition: int or None If None, get all data; otherwise, get specific partition """ import pandas as pd results = super(ElasticSearchTableSource, self)._get_partition( partition) df = pd.DataFrame(results) if df.empty: df = self._dataframe[:0] self._schema = None self.part = False self.discover() return df def _close(self): self._dataframe = None