API Reference

Comprehensive reference documentation for the Elastro Python Client v1.0.0.

ElasticsearchClient

class
from elastro import ElasticsearchClient

Main client class for establishing connections to Elasticsearch clusters. Supports Basic Auth, API Keys, and Elastic Cloud.

__init__

def __init__(hosts=None, cloud_id=None, api_key=None, username=None, password=None, verify_certs=True, ca_certs=None, client_cert=None, client_key=None, ssl_show_warn=True)

Initialize the Elasticsearch client.

Parameters

NameTypeRequiredDescription
hostsList[str]OptionalList of node URLs (e.g. ['http://localhost:9200'])
Default: None
cloud_idstrOptionalElastic Cloud ID
Default: None
api_keystr | TupleOptionalAPI Key for authentication
Default: None
usernamestrOptionalBasic Auth username
Default: None
passwordstrOptionalBasic Auth password
Default: None
verify_certsboolOptionalVerify SSL certificates
Default: True

Returns

ElasticsearchClient instance

Example

client = ElasticsearchClient(
hosts=["http://localhost:9200"],
username="elastic",
password="changeme"
)

connect

def connect() -> bool

Establishes connection to the cluster and verifies connectivity (pings).

Returns

True if connection successful, raises ConnectionError otherwise.

info

def info() -> dict

Returns cluster information.

Returns

Dictionary containing cluster info (version, name, etc).

IndexManager

class
from elastro import IndexManager

Manage Elasticsearch indices: create, update, delete, and check existence.

create

def create(name: str, settings: Dict = None, mappings: Dict = None) -> Dict

Create a new index with optional settings and mappings.

Parameters

NameTypeRequiredDescription
namestr
Required
Name of the index
settingsdictOptionalIndex settings (shards, replicas, etc)
Default: None
mappingsdictOptionalIndex mappings (properties)
Default: None

Returns

API response dictionary

Example

im.create(
"logs-2024",
settings={"number_of_shards": 1},
mappings={"properties": {"timestamp": {"type": "date"}}}
)

delete

def delete(name: str) -> Dict

Delete an index.

Parameters

NameTypeRequiredDescription
namestr
Required
Name of the index

Returns

API response dictionary

exists

def exists(name: str) -> bool

Check if an index exists.

Parameters

NameTypeRequiredDescription
namestr
Required
Name of the index

Returns

True if exists, False otherwise

update

def update(name: str, settings: Dict) -> Dict

Update index settings (e.g. number of replicas). Index must often be closed for static setting changes.

Parameters

NameTypeRequiredDescription
namestr
Required
Name of the index
settingsdict
Required
Settings to update

Returns

API response dictionary

DocumentManager

class
from elastro import DocumentManager

Perform CRUD and Search operations on documents.

index

def index(index: str, document: Dict, id: str = None, refresh: bool = False) -> Dict

Index a document. Creates or replaces.

Parameters

NameTypeRequiredDescription
indexstr
Required
Target index
documentdict
Required
Document body
idstrOptionalDocument ID. If None, auto-generated.
Default: None
refreshboolOptionalRefresh index immediately
Default: False

Returns

API response with result status

get

def get(index: str, id: str) -> Dict

Retrieve a document by ID.

Parameters

NameTypeRequiredDescription
indexstr
Required
Target index
idstr
Required
Document ID

Returns

The document source dictionary

search

def search(index: str, query: Dict = None, size: int = 10, from_: int = 0, sort: List = None) -> Dict

Execute a search query.

Parameters

NameTypeRequiredDescription
indexstr
Required
Target index or pattern
querydictOptionalQuery DSL body (match_all if None)
Default: None
sizeintOptionalNumber of hits to return
Default: 10
sortlistOptionalSort criteria
Default: None

Returns

Full search response (hits, total, aggregations)

bulk

def bulk(operations: List[Dict], refresh: bool = False) -> Dict

Execute bulk operations (index, create, delete, update).

Parameters

NameTypeRequiredDescription
operationsList[dict]
Required
List of bulk actions/documents

Returns

Bulk API response

Example

docs = [
{"index": {"_index": "test", "_id": "1"}},
{"field": "value1"},
{"delete": {"_index": "test", "_id": "2"}}
]
dm.bulk(docs)

IngestEngine

class
from elastro import IngestEngine

Client-side data streaming, pre-flight validation, type coercion, and bulk importing for various formats (CSV, JSON, NDJSON, SQL, Parquet).

ingest

def ingest(source: str | Path, index: str, format: str = 'auto', batch_size: int = 2000, dlq_path: str = None, progress_callback: Callable = None) -> IngestResult

Ingests data from a source into an Elasticsearch index.

Parameters

NameTypeRequiredDescription
sourcestr | Path
Required
File path, SQL string, or open stream
indexstr
Required
Target index name
formatstrOptionalFormat (csv, ndjson, json, sql, parquet, auto)
Default: 'auto'
batch_sizeintOptionalNumber of documents per bulk request
Default: 2000
dlq_pathstrOptionalPath to write failed documents
Default: None

Returns

IngestResult with operation statistics

Example

result = ingest_engine.ingest(
source="data.csv",
index="my-index",
format="csv",
batch_size=2000,
progress_callback=lambda r, i, f: print(f"{i} indexed")
)

IlmManager

class
from elastro.core.ilm import IlmManager

Manage Index Lifecycle Management (ILM) policies.

create_policy

def create_policy(name: str, policy: Dict) -> bool

Create or update an ILM policy.

Parameters

NameTypeRequiredDescription
namestr
Required
Policy name
policydict
Required
Policy definition (phases, actions)

Returns

True if acknowledged

explain_lifecycle

def explain_lifecycle(index: str) -> Dict

Get the current ILM status for an index.

Parameters

NameTypeRequiredDescription
indexstr
Required
Index name

Returns

Detailed lifecycle explanation

SnapshotManager

class
from elastro.core.snapshot import SnapshotManager

Manage snapshot repositories and backup/restore operations.

create_repository

def create_repository(name: str, repo_type: str, settings: Dict) -> bool

Register a snapshot repository.

Parameters

NameTypeRequiredDescription
namestr
Required
Repository name
repo_typestr
Required
Type (fs, s3, gcs, azure)
settingsdict
Required
Repository specific settings

Returns

True if acknowledged

create_snapshot

def create_snapshot(repository: str, snapshot: str, indices='_all', wait_for_completion=False) -> Dict

Create a snapshot of indices.

Parameters

NameTypeRequiredDescription
repositorystr
Required
Repository name
snapshotstr
Required
Snapshot name
indicesstr | listOptionalIndices to backup
Default: '_all'
wait_for_completionboolOptionalBlock until finished
Default: False

Returns

Snapshot response

restore_snapshot

def restore_snapshot(repository: str, snapshot: str, indices='_all', rename_pattern=None, rename_replacement=None) -> Dict

Restore indices from a snapshot.

Parameters

NameTypeRequiredDescription
repositorystr
Required
Repository name
snapshotstr
Required
Snapshot name
indicesstrOptionalIndices to restore
Default: '_all'

Returns

Restore response