Skip to main content

Elasticsearch client for Zope3

Project description

This package provides an elasticsearch client for Zope3.

README

This package provides an elasticsearch client. Note we use a different port within our elasticsearch server stub (45299 instead of 9200). See elasticsearch/config for more info:

>>> from pprint import pprint
>>> from p01.elasticsearch import interfaces
>>> from p01.elasticsearch.pool import ServerPool
>>> from p01.elasticsearch.pool import ElasticSearchConnectionPool
>>> servers = ['localhost:45299']
>>> serverPool = ServerPool(servers, retryDelay=10, timeout=5)
>>> import p01.elasticsearch.testing
>>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer

ElasticSearchConnectionPool

We need to setup a elasticsearch connection pool:

>>> connectionPool = ElasticSearchConnectionPool(serverPool)

The connection pool stores the connection in threading local. You can set the re-connection time which is by default set to 60 seconds:

>>> connectionPool
<ElasticSearchConnectionPool localhost:45299>
>>> connectionPool.reConnectIntervall
60
>>> connectionPool.reConnectIntervall = 30
>>> connectionPool.reConnectIntervall
30

ElasticSearchConnection

Now we are able to get a connection which is persistent and observed by a thread local from the pool:

>>> conn = connectionPool.connection
>>> conn
<ElasticSearchConnection localhost:45299>

Such a connection provides a server pool which de connection can choose from. If a server goes down, another server get used. The Connection is also balancing http connections between all servers:

>>> conn.serverPool
<ServerPool retryDelay:10 localhost:45299>
>>> conn.serverPool.info
'localhost:45299'

Also a maxRetries value is provided. If by default None is given the connection will choose a max retry of alive server e.g. len(self.serverPool.aliveServers):

>>> conn.maxRetries is None
True

Another property called autoRefresh is responsible for call refresh implicit if a previous connection call changes the search index e.g. as the index call whould do:

>>> conn.autoRefresh
False

And there is a marker for bulk size. This means if we use the bulk marker which some methods provide. The bulkMaxSize value makes sure that not more then the given amount of items get cached in the connection before sent to the server:

>>> conn.bulkMaxSize
400

Mapping Configuration

Our test setup uses a predefined mapping configuration. This, I guess, is the common use case in most projects. I’m not really a friend of dynamic mapping at least if compes to migration and legacy data handling. Bbut of corse for some use case dynamic mapping is a nice feature. At least if you have to index cawled data and offer a search over all (_all) fields. Let’s test our predefined mappings:

Up till Elasticsearch version 19.1, this would return {}, but now it returns status 404, so our code raises an exception. This will be fixed in elasticsearch 19.5.

>>> conn.getMapping()
{}

As you can see, we don’t get a default mapping yet. First we need to index at least one item. Let’s index a fisrt job

>>> job = {'title': u'Wir suchen einen Marketingplaner',
...        'description': u'Wir bieten eine gute Anstellung'}
>>> pprint(conn.index(job, 'testing', 'job', 1))
{u'_id': u'1',
 u'_index': u'testing',
 u'_type': u'job',
 u'_version': 1,
 u'ok': True}
>>> statusRENormalizer.pprint(conn.getMapping())
{u'testing': {u'job': {u'_all': {u'store': u'yes'},
                       u'_id': {u'store': u'yes'},
                       u'_index': {u'enabled': True},
                       u'_type': {u'store': u'yes'},
                       u'properties': {u'__name__': {u'boost': 2.0,
                                                     u'include_in_all': False,
                                                     u'null_value': u'na',
                                                     u'type': u'string'},
                                       u'contact': {u'include_in_all': False,
                                                    u'properties': {u'firstname': {u'include_in_all': False,
                                                                                   u'type': u'string'},
                                                                    u'lastname': {u'include_in_all': False,
                                                                                  u'type': u'string'}}},
                                       u'description': {u'include_in_all': True,
                                                        u'null_value': u'na',
                                                        u'type': u'string'},
                                       u'location': {u'geohash': True,
                                                     u'lat_lon': True,
                                                     u'type': u'geo_point'},
                                       u'published': {u'format': u'date_optional_time',
                                                      u'type': u'date'},
                                       u'requirements': {u'properties': {u'description': {u'type': u'string'},
                                                                         u'name': {u'type': u'string'}}},
                                       u'tags': {u'index_name': u'tag',
                                                 u'type': u'string'},
                                       u'title': {u'boost': 2.0,
                                                  u'include_in_all': True,
                                                  u'null_value': u'na',
                                                  u'type': u'string'}}}}}

Let’s define another item with more data and index them:

>>> import datetime
>>> job = {'title': u'Wir suchen einen Buchhalter',
...        'description': u'Wir bieten Ihnen eine gute Anstellung',
...        'requirements': [
...            {'name': u'MBA', 'description': u'MBA Abschluss'}
...        ],
...        'tags': [u'MBA', u'certified'],
...        'published': datetime.datetime(2011, 02, 24, 12, 0, 0),
...        'contact': {
...            'firstname': u'Jessy',
...            'lastname': u'Ineichen',
...        },
...        'location':  [-71.34, 41.12]}
>>> pprint(conn.index(job, 'testing', 'job', 2))
{u'_id': u'2',
 u'_index': u'testing',
 u'_type': u'job',
 u'_version': 1,
 u'ok': True}
>>> import time
>>> time.sleep(1)

get

Now let’s get the job from our index by it’s id. But first refresh our index:

>>> statusRENormalizer.pprint(conn.get(2, "testing", "job"))
{u'_id': u'2',
 u'_index': u'testing',
 u'_source': {u'contact': {u'firstname': u'Jessy', u'lastname': u'Ineichen'},
              u'description': u'Wir bieten Ihnen eine gute Anstellung',
              u'location': [..., ...],
              u'published': datetime.datetime(2011, 2, 24, 12, 0),
              u'requirements': [{u'description': u'MBA Abschluss',
                                 u'name': u'MBA'}],
              u'tags': [u'MBA', u'certified'],
              u'title': u'Wir suchen einen Buchhalter'},
 u'_type': u'job',
 u'_version': 1,
 u'exists': True}

Index

This test will setup some sample data in our test setup method. After that a new elasticsearch instance in another sandbox is started for this test. Check the p01/elasticsearch/test.py file for more info about the sample data and elasticsearch server setup.

We will test if we can delete an existing index and create them with the same mapping again:

>>> import json
>>> from pprint import pprint
>>> import p01.elasticsearch.testing
>>> statusRENormalizer = p01.elasticsearch.testing.statusRENormalizer

Now let’s define a new elasticsearch connection based on our server pool:

>>> conn = p01.elasticsearch.testing.getTestConnection()

Now we are ready to access the elasticsearch server. Check the status:

>>> statusRENormalizer.pprint(conn.status())
{u'_shards': {u'failed': 0, u'successful': 1, u'total': 1},
 u'indices': {u'companies': {u'docs': {u'deleted_docs': 0,
                                       u'max_doc': ...,
                                       u'num_docs': ...},
                             u'flush': {u'total': 0,
                                        u'total_time': u'...',
                                        u'total_time_in_millis': ...},
                             u'index': {u'primary_size': u'...',
                                        u'primary_size_in_bytes': ...,
                                        u'size': u'...',
                                        u'size_in_bytes': ...},
                             u'merges': {u'current': 0,
                                         u'current_docs': 0,
                                         u'current_size': u'0b',
                                         u'current_size_in_bytes': 0,
                                         u'total': 0,
                                         u'total_docs': 0,
                                         u'total_size': u'0b',
                                         u'total_size_in_bytes': 0,
                                         u'total_time': u'...',
                                         u'total_time_in_millis': ...},
                             u'refresh': {u'total': ...,
                                          u'total_time': u'...',
                                          u'total_time_in_millis': ...},
                             u'shards': {u'0': [{u'docs': {u'deleted_docs': 0,
                                                           u'max_doc': ...,
                                                           u'num_docs': ...},
                                                 u'flush': {u'total': 0,
                                                            u'total_time': u'...',
                                                            u'total_time_in_millis': ...},
                                                 u'index': {u'size': u'...',
                                                            u'size_in_bytes': ...},
                                                 u'merges': {u'current': 0,
                                                             u'current_docs': 0,
                                                             u'current_size': u'0b',
                                                             u'current_size_in_bytes': 0,
                                                             u'total': 0,
                                                             u'total_docs': 0,
                                                             u'total_size': u'0b',
                                                             u'total_size_in_bytes': 0,
                                                             u'total_time': u'...',
                                                             u'total_time_in_millis': ...},
                                                 u'refresh': {u'total': ...,
                                                              u'total_time': u'...',
                                                              u'total_time_in_millis': ...},
                                                 u'routing': {u'index': u'companies',
                                                              u'node': u'...',
                                                              u'primary': True,
                                                              u'relocating_node': None,
                                                              u'shard': 0,
                                                              u'state': u'STARTED'},
                                                 u'state': u'STARTED',
                                                 u'translog': {u'id': ...,
                                                               u'operations': 0}}]},
                             u'translog': {u'operations': 0}}},
 u'ok': True}

As you can see, we can test our sample data created mapping:

>>> pprint(conn.getMapping('companies', 'company'))
{u'company': {u'properties': {u'__name__': {u'type': u'string'},
                              u'city': {u'type': u'string'},
                              u'number': {u'ignore_malformed': False,
                                          u'type': u'long'},
                              u'street': {u'type': u'string'},
                              u'text': {u'type': u'string'},
                              u'zip': {u'type': u'string'}}}}

And search for our sample data where we added within our sample data generator in our test setup:

>>> pprint(conn.search('street').total)
100

deleteIndex

Now we will delete the index:

>>> conn.deleteIndex('companies')
{u'acknowledged': True, u'ok': True}

As you can see there is no index anymore:

>>> statusRENormalizer.pprint(conn.status())
{u'_shards': {u'failed': 0, u'successful': 0, u'total': 0},
 u'indices': {},
 u'ok': True}

createIndex

Now we can create the index again. Let’s get our sample data mapping:

>>> import os.path
>>> import json
>>> import p01.elasticsearch
>>> mFile = os.path.join(os.path.dirname(p01.elasticsearch.__file__),
...     'sample', 'config', 'companies', 'company.json')
>>> f = open(mFile)
>>> data = f.read()
>>> f.close()
>>> mappings = json.loads(data)
>>> pprint(mappings)
{u'company': {u'_all': {u'enabled': True, u'store': u'yes'},
              u'_id': {u'store': u'yes'},
              u'_index': {u'enabled': True},
              u'_source': {u'enabled': False},
              u'_type': {u'store': u'yes'},
              u'properties': {u'__name__': {u'include_in_all': False,
                                            u'index': u'not_analyzed',
                                            u'store': u'yes',
                                            u'type': u'string'},
                              u'_id': {u'include_in_all': False,
                                       u'index': u'no',
                                       u'store': u'yes',
                                       u'type': u'string'},
                              u'city': {u'boost': 1.0,
                                        u'include_in_all': True,
                                        u'index': u'not_analyzed',
                                        u'null_value': u'na',
                                        u'store': u'yes',
                                        u'type': u'string'},
                              u'street': {u'boost': 1.0,
                                          u'include_in_all': True,
                                          u'index': u'not_analyzed',
                                          u'null_value': u'na',
                                          u'store': u'yes',
                                          u'type': u'string'},
                              u'text': {u'boost': 1.0,
                                        u'include_in_all': True,
                                        u'index': u'not_analyzed',
                                        u'null_value': u'na',
                                        u'store': u'yes',
                                        u'type': u'string'},
                              u'zip': {u'boost': 1.0,
                                       u'include_in_all': True,
                                       u'index': u'not_analyzed',
                                       u'null_value': u'na',
                                       u'store': u'yes',
                                       u'type': u'string'}}}}

Now we can create an new index with the given mapping:

>>> conn.createIndex('companies', mappings=mappings)
{u'acknowledged': True, u'ok': True}

As you can see, our index and mapping is back again:

>>> statusRENormalizer.pprint(conn.status())
{u'_shards': {u'failed': 0, u'successful': 1, u'total': 1},
 u'indices': {u'companies': {u'docs': {u'deleted_docs': 0,
                                       u'max_doc': ...,
                                       u'num_docs': ...},
                             u'flush': {u'total': 0,
                                        u'total_time': u'...',
                                        u'total_time_in_millis': ...},
                             u'index': {u'primary_size': u'...',
                                        u'primary_size_in_bytes': ...,
                                        u'size': u'...',
                                        u'size_in_bytes': ...},
                             u'merges': {u'current': 0,
                                         u'current_docs': 0,
                                         u'current_size': u'0b',
                                         u'current_size_in_bytes': 0,
                                         u'total': 0,
                                         u'total_docs': 0,
                                         u'total_size': u'0b',
                                         u'total_size_in_bytes': 0,
                                         u'total_time': u'...',
                                         u'total_time_in_millis': ...},
                             u'refresh': {u'total': ...,
                                          u'total_time': u'...',
                                          u'total_time_in_millis': ...},
                             u'shards': {u'0': [{u'docs': {u'deleted_docs': 0,
                                                           u'max_doc': ...,
                                                           u'num_docs': ...},
                                                 u'flush': {u'total': 0,
                                                            u'total_time': u'...',
                                                            u'total_time_in_millis': ...},
                                                 u'index': {u'size': u'...',
                                                            u'size_in_bytes': ...},
                                                 u'merges': {u'current': 0,
                                                             u'current_docs': 0,
                                                             u'current_size': u'0b',
                                                             u'current_size_in_bytes': 0,
                                                             u'total': 0,
                                                             u'total_docs': 0,
                                                             u'total_size': u'0b',
                                                             u'total_size_in_bytes': 0,
                                                             u'total_time': u'...',
                                                             u'total_time_in_millis': ...},
                                                 u'refresh': {u'total': ...,
                                                              u'total_time': u'...',
                                                              u'total_time_in_millis': ...},
                                                 u'routing': {u'index': u'companies',
                                                              u'node': u'...',
                                                              u'primary': True,
                                                              u'relocating_node': None,
                                                              u'shard': 0,
                                                              u'state': u'STARTED'},
                                                 u'state': u'STARTED',
                                                 u'translog': {u'id': ...,
                                                               u'operations': 0}}]},
                             u'translog': {u'operations': 0}}},
 u'ok': True}
>>> pprint(conn.getMapping('companies', 'company'))
{u'company': {u'_all': {u'store': u'yes'},
              u'_id': {u'store': u'yes'},
              u'_index': {u'enabled': True},
              u'_source': {u'enabled': False},
              u'_type': {u'store': u'yes'},
              u'properties': {u'__name__': {u'include_in_all': False,
                                            u'index': u'not_analyzed',
                                            u'store': u'yes',
                                            u'type': u'string'},
                              u'city': {u'include_in_all': True,
                                        u'index': u'not_analyzed',
                                        u'null_value': u'na',
                                        u'store': u'yes',
                                        u'type': u'string'},
                              u'street': {u'include_in_all': True,
                                          u'index': u'not_analyzed',
                                          u'null_value': u'na',
                                          u'store': u'yes',
                                          u'type': u'string'},
                              u'text': {u'include_in_all': True,
                                        u'index': u'not_analyzed',
                                        u'null_value': u'na',
                                        u'store': u'yes',
                                        u'type': u'string'},
                              u'zip': {u'include_in_all': True,
                                       u'index': u'not_analyzed',
                                       u'null_value': u'na',
                                       u'store': u'yes',
                                       u'type': u'string'}}}}

As you can see the index is empty:

>>> pprint(conn.search('street').total)
0

Mapping

Note: this test will start and run an elasticsearch server on port 45299!

This test experiments with some mapping configurations. Since the elasitcsearch documentation is not very clear to me. I try to find out how the mapping part has to be done here.

>>> from pprint import pprint
>>> from p01.elasticsearch import interfaces
>>> from p01.elasticsearch.pool import ServerPool
>>> from p01.elasticsearch.pool import ElasticSearchConnectionPool

Setup a conncetion:

>>> servers = ['localhost:45299']
>>> serverPool = ServerPool(servers)
>>> connectionPool = ElasticSearchConnectionPool(serverPool)
>>> conn = connectionPool.connection

Let’s setup a mapping definition:

>>> mapping = {
...     'item': {
...         'properties': {
...             'boolean': {
...                 'type': 'boolean'
...             },
...             'date': {
...                 'type': 'date'
...             },
...             'datetime': {
...                 'type': 'date'
...             },
...             'double': {
...                 'type': 'double'
...             },
...             'float': {
...                 'type': 'float'
...             },
...             'integer': {
...                 'type': 'integer'
...             },
...             'long': {
...                 'type': 'long'
...             },
...             'string': {
...                 'type': 'string',
...                 'null_value' : 'nada'
...             },
...         }
...     }
... }

No let’s add the mapping using our putMapping method and call refresh:

>>> conn.putMapping(mapping, 'test-mapping', 'item')
Traceback (most recent call last):
...
IndexMissingException: [test-mapping] missing

as you can see there was an exception because our index doesn’t exist yet. Let’s add our test-mapping index and try again:

>>> conn.createIndex('test-mapping')
{u'acknowledged': True, u'ok': True}
>>> pprint(conn.refresh('test-mapping', 4))
{u'_shards': {u'failed': 0, u'successful': ..., u'total': 10}, u'ok': True}
>>> conn.putMapping(mapping, 'test-mapping', 'item')
{u'acknowledged': True, u'ok': True}
>>> pprint(conn.refresh('test-mapping', 4))
{u'_shards': {u'failed': 0, u'successful': ..., u'total': 10}, u'ok': True}

And get our mapping:

>>> pprint(conn.getMapping('test-mapping', 'item'), width=60)
{u'item': {u'properties': {u'boolean': {u'type': u'boolean'},
                           u'date': {u'format': u'dateOptionalTime',
                                     u'type': u'date'},
                           u'datetime': {u'format': u'dateOptionalTime',
                                         u'type': u'date'},
                           u'double': {u'type': u'double'},
                           u'float': {u'type': u'float'},
                           u'integer': {u'type': u'integer'},
                           u'long': {u'type': u'long'},
                           u'string': {u'null_value': u'nada',
                                       u'type': u'string'}}}}

Now let’s index a new item:

>>> import datetime
>>> doc = {'boolean': True,
...        'datetime': datetime.datetime(2011, 02, 24, 12, 0, 0),
...        'date': datetime.date(2011, 02, 24),
...        'float': float(42),
...        'integer': int(42),
...        'long': long(42*10000000000000000),
...        'string': 'string'}
>>> conn.index(doc, 'test-mapping', 'item', 1)
{u'_type': u'item', u'_id': u'1', u'ok': True, u'_version': 1, u'_index': u'test-mapping'}

refresh index:

>>> pprint(conn.refresh('test-mapping', 4))
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 10}, u'ok': True}

and search for our index items:

>>> response = conn.search('string', 'test-mapping', 'item')
>>> data = response.data
>>> pprint(data)
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'1',
                      u'_index': u'test-mapping',
                      u'_score': ...,
                      u'_source': {u'boolean': True,
                                   u'date': datetime.datetime(2011, 2, 24, 0, 0),
                                   u'datetime': datetime.datetime(2011, 2, 24, 12, 0),
                                   u'float': 42.0,
                                   u'integer': 42,
                                   u'long': 420000000000000000L,
                                   u'string': u'string'},
                      u'_type': u'item'}],
           u'max_score': ...,
           u'total': 1},
 u'timed_out': False,
 u'took': ...}

Now check our values:

>>> source = data['hits']['hits'][0]['_source']
>>> pprint(source)
{u'boolean': True,
 u'date': datetime.datetime(2011, 2, 24, 0, 0),
 u'datetime': datetime.datetime(2011, 2, 24, 12, 0),
 u'float': 42.0,
 u'integer': 42,
 u'long': 420000000000000000L,
 u'string': u'string'}
>>> isinstance(source['boolean'], bool)
True
>>> isinstance(source['datetime'], datetime.datetime)
True
>>> isinstance(source['date'], datetime.date)
True
>>> isinstance(source['float'], float)
True
>>> isinstance(source['integer'], int)
True
>>> isinstance(source['long'], long)
True
>>> isinstance(source['string'], basestring)
True
>>> isinstance(source['string'], unicode)
True

Note, the datetime and date are also datetime and date items:

>>> isinstance(source['date'], datetime.datetime)
True
>>> isinstance(source['datetime'], datetime.date)
True

Scan Search Type

Note: this test will start and run an elasticsearch server on port 45299!

Let’s just do some simple tests without to use a connection pool.

>>> from pprint import pprint
>>> from p01.elasticsearch.connection import ElasticSearchConnection
>>> from p01.elasticsearch.exceptions import ElasticSearchServerException
>>> from p01.elasticsearch.pool import ServerPool
>>> servers = ['localhost:45299']
>>> serverPool = ServerPool(servers)

Now we are able to get a connection which is persistent and observed by a thread local.

>>> conn = ElasticSearchConnection(serverPool)

Setup a test mapping and add a few documents:

>>> conn.createIndex('scanning')
{u'acknowledged': True, u'ok': True}
>>> for i in range(1000):
...     _id = unicode(i)
...     doc = {'_id': _id, 'dummy': u'dummy'}
...     ignored = conn.index(doc, 'scanning', 'doc')
>>> conn.refresh('scanning')
{u'ok': True, u'_shards': {u'successful': 5, u'failed': 0, u'total': 10}}

Let’s show how we can batch large search results with our scan method.

>>> pprint(conn.search('dummy', 'scanning').total)
1000
>>> result = list(conn.scan('dummy', 'scanning'))
>>> len(result)
1000
>>> pprint(sorted(result)[:5])
[{u'_id': u'0',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'0', u'dummy': u'dummy'},
  u'_type': u'doc'},
 {u'_id': u'1',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'1', u'dummy': u'dummy'},
  u'_type': u'doc'},
 {u'_id': u'10',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'10', u'dummy': u'dummy'},
  u'_type': u'doc'},
 {u'_id': u'100',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'100', u'dummy': u'dummy'},
  u'_type': u'doc'},
 {u'_id': u'101',
  u'_index': u'scanning',
  u'_score': 0.0,
  u'_source': {u'_id': u'101', u'dummy': u'dummy'},
  u'_type': u'doc'}]

Bulk

Note: this test will start and run an elasticsearch server on port 45299!

This test shows how to index items using the bulk concept.

>>> from pprint import pprint
>>> from p01.elasticsearch import interfaces
>>> from p01.elasticsearch.pool import ServerPool
>>> from p01.elasticsearch.pool import ElasticSearchConnectionPool
>>> servers = ['localhost:45299']
>>> serverPool = ServerPool(servers)

Now we are able to get a connection which is persistent and observed by a thread local from the pool:

>>> connectionPool = ElasticSearchConnectionPool(serverPool)
>>> conn = connectionPool.connection
>>> conn
<ElasticSearchConnection localhost:45299>

Let’s set the bulkMaxSize to 5. This means if we index 5 items the index method will implicit send a index request to the server

>>> conn.bulkMaxSize = 5
>>> conn.bulkMaxSize
5

Let’s bulk index some items:

>>> doc = {'title': u'Wir suchen einen Marketingplaner',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 1)
>>> doc = {'title': u'Wir suchen einen Buchhalter',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 2)

Now commit our bulk data. even if we not indexed the full amount of bulkMaxSize:

>>> pprint(conn.bulkCommit())
{u'items': [{u'index': {u'_id': u'1',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'2',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}}],
 u'took': ...}
>>> conn.bulkCounter
0

Now we search the items:

>>> response = conn.search("Anstellung", 'testing', 'job')
>>> pprint(response.data)
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [], u'max_score': None, u'total': 0},
 u'timed_out': False,
 u'took': ...}

As you can see, we didn’t comit the data because we didn’t use the refresh parameter. Let’s call refresh now:

>>> conn.refresh('testing')
{u'ok': True, u'_shards': {u'successful': 5, u'failed': 0, u'total': 10}}
and search again:
>>> response = conn.search("Anstellung", 'testing', 'job')
>>> pprint(response.data)
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'1',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Marketingplaner'},
                      u'_type': u'job'},
                     {u'_id': u'2',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Buchhalter'},
                      u'_type': u'job'}],
           u'max_score': ...,
           u'total': 2},
 u'timed_out': False,
 u'took': ...}

Let’s index more items till we reach the bulkMaxSize:

>>> len(conn.bulkItems)
0
>>> doc = {'title': u'Wir suchen einen Koch',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 3)
>>> conn.bulkCounter
1
>>> doc = {'title': u'Wir suchen einen Sachbearbeiter',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 4)
>>> conn.bulkCounter
2
>>> doc = {'title': u'Wir suchen einen Mechaniker',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 5)
>>> conn.bulkCounter
3
>>> doc = {'title': u'Wir suchen einen Exportfachmann',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> conn.bulkIndex(doc, 'testing', 'job', 6)
>>> conn.bulkCounter
4

Now, our bulkMaxSize forces to commit data:

>>> doc = {'title': u'Wir suchen einen Entwickler',
...        'description': u'Wir bieten Ihnen eine gute Anstellung'}
>>> pprint(conn.bulkIndex(doc, 'testing', 'job', 7))
{u'items': [{u'index': {u'_id': u'3',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'4',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'5',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'6',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}},
            {u'index': {u'_id': u'7',
                        u'_index': u'testing',
                        u'_type': u'job',
                        u'_version': 1,
                        u'ok': True}}],
 u'took': ...}

just wait till the server calls refresh by itself every second by default:

>>> import time
>>> time.sleep(1)
>>> len(conn.bulkItems)
0

As you can see, we have all 7 items indexed:

>>> response = conn.search("Anstellung", 'testing', 'job')
>>> pprint(response.data)
{u'_shards': {u'failed': 0, u'successful': 5, u'total': 5},
 u'hits': {u'hits': [{u'_id': u'1',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Marketingplaner'},
                      u'_type': u'job'},
                     {u'_id': u'6',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Exportfachmann'},
                      u'_type': u'job'},
                     {u'_id': u'2',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Buchhalter'},
                      u'_type': u'job'},
                     {u'_id': u'7',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Entwickler'},
                      u'_type': u'job'},
                     {u'_id': u'4',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Sachbearbeiter'},
                      u'_type': u'job'},
                     {u'_id': u'5',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Mechaniker'},
                      u'_type': u'job'},
                     {u'_id': u'3',
                      u'_index': u'testing',
                      u'_score': ...,
                      u'_source': {u'description': u'Wir bieten Ihnen eine gute Anstellung',
                                   u'title': u'Wir suchen einen Koch'},
                      u'_type': u'job'}],
           u'max_score': ...,
           u'total': 7},
 u'timed_out': False,
 u'took': ...}

CHANGES

0.6.0 (2014-03-24)

  • feature: implemented putTemplate method using a PUT request at the _template endpoint

0.5.2 (2013-06-28)

  • bugfix: improve error handling. Use json response string if no error message is given.

0.5.1 (2012-12-22)

  • implemented put settings (putSettings) method

  • fix tests based on changed elasticsearch 0.20.1 output

  • switch to p01.recipe.setup:importchecker

0.5.0 (2012-11-18)

  • initial release

Project details


Download files

Download the file for your platform. If you're not sure which to choose, learn more about installing packages.

Source Distribution

p01.elasticsearch-0.6.0.zip (76.7 kB view hashes)

Uploaded Source

Supported by

AWS AWS Cloud computing and Security Sponsor Datadog Datadog Monitoring Fastly Fastly CDN Google Google Download Analytics Microsoft Microsoft PSF Sponsor Pingdom Pingdom Monitoring Sentry Sentry Error logging StatusPage StatusPage Status page