Source code for intake_elasticsearch.elasticsearch_seq

from intake.source import base
import json
from . import __version__
try:
    from json.decoder import JSONDecodeError
except ImportError:
    JSONDecodeError = ValueError


[docs]class ElasticSearchSeqSource(base.DataSource): """ Data source which executes arbitrary queries on ElasticSearch This is the sequential reader: will return a list of dictionaries. Parameters ---------- query: str Query to execute. Can either be in Lucene single-line format, or a JSON structured query (presented as text) npartitions: int Split query into this many sections. If one, will not split. qargs: dict Further parameters to pass to the query, such as set of indexes to consider, filtering, ordering. See http://elasticsearch-py.readthedocs.io/en/master/api.html#elasticsearch.Elasticsearch.search es_kwargs: dict Settings for the ES connection, e.g., a simple local connection may be ``{'host': 'localhost', 'port': 9200}``. Other keywords to the Plugin that end up here and are material: scroll: str how long the query is live for, default ``'100m'`` size: int the paging size when downloading, default 1000. metadata: dict Extra information for this source. """ name = 'elasticsearch_seq' container = 'python' version = __version__ partition_access = False def __init__(self, query, npartitions=1, qargs={}, metadata={}, **es_kwargs): from elasticsearch import Elasticsearch self._query = query self._qargs = qargs self._scroll = es_kwargs.pop('scroll', '100m') self._size = es_kwargs.pop('size', 1000) # default page size self._es_kwargs = es_kwargs self._dataframe = None self.es = Elasticsearch([es_kwargs]) # maybe should be (more) global? super(ElasticSearchSeqSource, self).__init__(metadata=metadata) self.npartitions = npartitions def _run_query(self, size=None, end=None, slice_id=None, slice_max=None): """Execute query on ES Parameters ---------- size: int Number of objects per page end: int Cut query down to this number of results, useful for getting a sample slice_id, slice_max: int If given, this is one of slice_max partitions. """ if size is None: size = self._size if end is not None: size = min(end, size) slice_dict = None if slice_id is not None: slice_dict = {'slice': {'id': slice_id, 'max': slice_max}} try: q = json.loads(self._query) if 'query' not in q: q = {'query': q} if slice_dict: q.update(slice_dict) s = self.es.search(body=q, size=size, scroll=self._scroll, **self._qargs) except (JSONDecodeError, TypeError): s = self.es.search(body=slice_dict, q=self._query, size=size, scroll=self._scroll, **self._qargs) sid = s['_scroll_id'] scroll_size = s['hits']['total'] while scroll_size > len(s['hits']['hits']): page = self.es.scroll(scroll_id=sid, scroll=self._scroll) sid = page['_scroll_id'] s['hits']['hits'].extend(page['hits']['hits']) if end is not None and len(s['hits']['hits']) > end: break self.es.clear_scroll(scroll_id=sid) return s
[docs] def read(self): """Read all data in one go""" return self._get_partition()
[docs] def to_dask(self): """Form partitions into a dask.bag""" import dask.bag as db from dask import delayed self.discover() parts = [] if self.npartitions == 1: part = delayed(self._get_partition)() return db.from_delayed([part]) for slice_id in range(self.npartitions): parts.append( delayed(self._get_partition)(slice_id)) return db.from_delayed(parts)
def _get_schema(self, retry=2): return base.Schema(datashape=None, dtype=None, shape=None, npartitions=self.npartitions, extra_metadata={}) def _get_partition(self, partition=None): """ Downloads all data or get specific partion slice of the query Parameters ---------- partition: int or None If None, get all data; otherwise, get specific partition """ slice_id = partition results = self._run_query(slice_id=slice_id, slice_max=self.npartitions) return [r['_source'] for r in results['hits']['hits']]