A D Vishnu Prasad

Director of Cloud Engineering @ Team8Solutions, Freelancer

Elasticsearch Clone Indices Using Python Client

Introduction

Creating a script to automate the cloning of indices from a staging Elasticsearch cluster is a practical approach to streamline the process. By automating this task, you can save time and ensure consistency in your feature environment setup.

Here’s an example of how you can write a script to clone indices from your staging Elasticsearch cluster:

Script: Option 1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import json
import logging
from typing import Dict

from elasticsearch import Elasticsearch

logging.basicConfig(filename="es.log", level=logging.INFO)

# Creates indices in ES Cloud

class EsClient:
    def __init__(self):
        self.es_client = Elasticsearch(
            ["https://es-url"],
            http_auth=("elastic", "dummy")
        )
        logging.info(self.es_client.ping())

    def copy_mapping(self, source_index, destination_index):

      # Get the mapping and settings of the source index
      source_mapping = self.es_client.indices.get_mapping(index=source_index)
      source_settings = self.es_client.indices.get_settings(index=source_index)

      # Important: The settings brings some unwanted values as well. So pop them before creating
      source_settings[source_index]["settings"]["index"].pop("creation_date")
      source_settings[source_index]["settings"]["index"].pop("provided_name")
      source_settings[source_index]["settings"]["index"].pop("uuid")
      source_settings[source_index]["settings"]["index"].pop("version")

      # Create the destination index with the mapping and settings of the source index
      response = self.es_client.indices.create(
          index=destination_index,
          body={
              "mappings": source_mapping[source_index]["mappings"],
              "settings": source_settings[source_index]["settings"]
          }
      )

    def copy_data(self, source_index, destination_index):

      print(f"Copying data from index {source_index} to {destination_index}")

      # Create a search query to retrieve all documents from the source index
      search_body = {
        "query": {
          "match_all": {}
        }
      }

      # Scroll through the search results and bulk index the documents into the destination index
      scroll_size = 1000
      scroll_response = self.es_client.search(
          index=source_index,
          scroll="2m",
          size=scroll_size,
          body=search_body
      )

      scroll_id = scroll_response['_scroll_id']
      hits = scroll_response['hits']['hits']

      while hits:
          bulk_body = []
          for hit in hits:
              bulk_body.append({"index": {"_index": destination_index, "_id": hit["_id"]}})
              bulk_body.append(hit["_source"])

          self.es_client.bulk(body=bulk_body, refresh=True)
          scroll_response = self.es_client.scroll(scroll_id=scroll_id, scroll="2m")
          scroll_id = scroll_response['_scroll_id']
          hits = scroll_response['hits']['hits']


indices = {"source_index_name": "destination_index_name"}

for key in indices:
    # print(indices[key])
    EsClient().copy_mapping(key, indices[key])
    EsClient().copy_data(key, indices[key])

Script: Option 2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from elasticsearch import Elasticsearch

# Elasticsearch configuration
staging_host = 'staging_cluster_host'
staging_port = 9200

feature_host = 'feature_cluster_host'
feature_port = 9200

# Connect to Elasticsearch clusters
staging_client = Elasticsearch([{'host': staging_host, 'port': staging_port}])
feature_client = Elasticsearch([{'host': feature_host, 'port': feature_port}])

# List all indices in the staging cluster
staging_indices = staging_client.indices.get_alias().keys()

# Clone indices to the feature cluster
for index in staging_indices:
    # Retrieve index settings and mappings from the staging cluster
    index_settings = staging_client.indices.get_settings(index)[index]['settings']
    index_mappings = staging_client.indices.get_mapping(index)[index]['mappings']

    index_settings[index]["settings"]["index"].pop("creation_date")
    index_settings[index]["settings"]["index"].pop("provided_name")
    index_settings[index]["settings"]["index"].pop("uuid")
    index_settings[index]["settings"]["index"].pop("version")

    # Create the index with the same settings and mappings in the feature cluster
    feature_client.indices.create(index=index, body={'settings': index_settings, 'mappings': index_mappings})

    # Reindex data from the staging index to the feature index
    reindex_body = {
        'source': {'remote': {'host': staging_host, 'port': staging_port}, 'index': index},
        'dest': {'index': index}
    }
    feature_client.reindex(body=reindex_body, request_timeout=3600)  # Increase timeout if needed

    print(f"Index '{index}' cloned to the feature cluster.")

print("Index cloning completed.")

Comments