Tracking Reindex Progress & Performance
Reindexing is a stateful, resource-intensive operation that directly impacts cluster stability, shard allocation, and Index Lifecycle Management (ILM) policy execution. Tracking Reindex Progress & Performance is not a passive monitoring exercise; it is an active control loop that dictates when to scale bulk workers, throttle ingestion, and trigger downstream cache warming or alias swaps. In production environments, a _reindex task operates asynchronously, returning a task identifier that must be polled or streamed to extract actionable telemetry.
At the core of this observability layer are the created, updated, deleted, batches, and total counters exposed by the Tasks API. These metrics, combined with throttled_millis and requests_per_second, form the baseline for capacity planning. For log analytics teams managing high-throughput time-series indices, understanding the delta between total and created/updated reveals document drop rates, while batches indicates bulk queue pressure. When integrated into broader Automated Reindexing Pipelines & Workflows, these counters become the primary signal for orchestrating ILM rollovers, mapping migrations, and cross-cluster replication syncs. Without precise telemetry, reindex operations risk silent data loss, uncontrolled heap pressure, or cascading shard relocation storms.
flowchart LR
A["Submit _reindex async"] --> B["Poll task status"]
B --> C{"throttled > 15%?"}
C -->|"yes"| D["reindex_rethrottle down"]
C -->|"no, headroom"| E["reindex_rethrottle up"]
D --> B
E --> B
B --> F{"completed?"}
F -->|"no"| B
F -->|"yes"| G["Check response.failures, swap alias"]
Asynchronous Task Initialization & Conflict Handling
Effective tracking begins at the configuration layer. The _reindex API must be invoked with wait_for_completion=false to immediately return a task identifier, enabling non-blocking execution. To surface granular progress data, append requests_per_second (or your calculated throughput ceiling) and slices to leverage parallelized sub-tasks. Each slice generates an independent progress stream, which is critical for petabyte-scale clusters where single-threaded reindexing becomes a bottleneck.
When initializing the job, explicitly define conflict resolution behavior. The default proceed strategy logs conflicts but continues execution, while abort halts the task on the first collision. For production data migrations, configure conflicts: "proceed" and route failures to a dead-letter index for later reconciliation, as detailed in Resolving Document Conflicts During Reindex.
POST _reindex?wait_for_completion=false&requests_per_second=50&slices=auto
{
"source": {
"index": "logs-2023.01",
"size": 5000,
"query": { "range": { "@timestamp": { "gte": "2023-01-01" } } }
},
"dest": {
"index": "logs-2023.01-v2",
"op_type": "create"
},
"conflicts": "proceed"
}Telemetry Extraction & Metric Mapping
Once the task is running, poll GET _tasks/<task_id> to extract the task.status object. The response structure maps directly to operational thresholds:
| Metric | Production Interpretation | Action Trigger |
|---|---|---|
total | Estimated document count from source query | Baseline for progress % calculation |
created / updated | Successfully ingested documents | Primary throughput indicator |
deleted | Documents the reindex script marked for deletion via ctx.op = "delete" | Track lifecycle pruning |
batches | Number of bulk requests issued | High values + low throughput = bulk queue saturation |
throttled_millis | Time spent waiting due to requests_per_second | > 20% of elapsed time = RPS ceiling too low |
failures | Array of shard-level errors | Immediate investigation required |
For operators preferring UI-based validation, Monitoring Reindex Task Status with Kibana Dev Tools provides a structured interface for real-time counter visualization. However, programmatic polling remains mandatory for automated ILM transitions.
Production-Ready Python v8+ Orchestration
The official elasticsearch v8+ client natively supports async/await patterns and structured JSON responses, eliminating legacy parsing overhead. Configure the client with explicit timeout thresholds and certificate verification. The following script implements a resilient polling loop, calculates real-time throughput, and dynamically adjusts requests_per_second when throttling exceeds safe limits.
import asyncio
import logging
from elasticsearch import AsyncElasticsearch
from elasticsearch.exceptions import ApiError
logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger(__name__)
async def orchestrate_reindex(es: AsyncElasticsearch, source_idx: str, dest_idx: str, base_rps: int = 100):
body = {
"source": {"index": source_idx, "size": 5000},
"dest": {"index": dest_idx, "op_type": "create"},
"conflicts": "proceed"
}
# Initiate async task
init_resp = await es.reindex(
body=body,
requests_per_second=base_rps,
wait_for_completion=False,
slices="auto"
)
task_id = init_resp["task"]
logger.info(f"Reindex task initiated: {task_id}")
current_rps = base_rps
poll_interval = 2.0
while True:
try:
task = await es.tasks.get(task_id=task_id)
status = task["task"]["status"]
completed = task.get("completed", False)
total = status.get("total", 0)
processed = status.get("created", 0) + status.get("updated", 0) + status.get("deleted", 0)
throttled = status.get("throttled_millis", 0)
batches = status.get("batches", 0)
progress = (processed / total * 100) if total > 0 else 0
logger.info(
f"[{task_id}] {progress:.1f}% | "
f"Processed: {processed}/{total} | Batches: {batches} | "
f"Throttled: {throttled}ms | RPS: {current_rps}"
)
if completed:
failures = task.get("response", {}).get("failures", [])
if failures:
logger.warning(f"Task completed with {len(failures)} failures. Inspect response for shard errors.")
else:
logger.info("Reindex task completed successfully.")
break
# Dynamic RPS tuning: reduce if throttling exceeds 15% of elapsed time.
# running_time_in_nanos lives on the inner task object.
elapsed_ms = task["task"]["running_time_in_nanos"] / 1_000_000
throttle_ratio = throttled / elapsed_ms if elapsed_ms > 0 else 0
# Rethrottle a running reindex via reindex_rethrottle (there is no tasks.update).
if throttle_ratio > 0.15 and current_rps > 25:
current_rps = max(25, int(current_rps * 0.8))
await es.reindex_rethrottle(task_id=task_id, requests_per_second=current_rps)
logger.info(f"Throttle detected. Adjusted RPS down to {current_rps}")
elif throttle_ratio < 0.05 and current_rps < 500:
current_rps = min(500, int(current_rps * 1.1))
await es.reindex_rethrottle(task_id=task_id, requests_per_second=current_rps)
logger.info(f"Low throttle. Scaled RPS up to {current_rps}")
await asyncio.sleep(poll_interval)
except ApiError as e:
if e.status_code == 404:
logger.warning("Task ID not found. Verifying via tasks.list...")
break
raise
# Usage:
# async def main():
# async with AsyncElasticsearch("https://cluster:9200", api_key="...", verify_certs=True) as es:
# await orchestrate_reindex(es, "source-index", "dest-index")
# asyncio.run(main())Threshold Tuning & Bulk Queue Management
Static requests_per_second values rarely survive production traffic spikes. The size parameter in the _reindex source dictates the bulk request payload. For time-series logs, size: 5000 to 10000 typically optimizes network round-trips without triggering es_rejected_execution_exception on coordinating nodes. Monitor the batches counter: if batches increases linearly but created/updated stalls, your bulk queue is saturated. Reduce size by 30% and increase refresh_interval on the destination index to 30s or -1 during ingestion.
When scaling across multiple data nodes, align slice count with primary shard count. Over-slicing (slices > primary_shards * 2) fragments the bulk queue and increases JVM heap churn. Refer to Designing Batch Reindex Workflows for shard-aware partitioning strategies. Always validate thread pool utilization via GET _nodes/stats/thread_pool during peak reindex windows.
Real-World Debugging & Failure Recovery
Stalled or Zombie Tasks
If progress plateaus for >10 minutes, check for shard allocation failures or mapping conflicts:
GET _tasks/<task_id>?detailed=true&humanIf the task is unresponsive, cancel it cleanly to release bulk queue resources:
POST _tasks/<task_id>/_cancelHeap Pressure & Circuit Breakers
High throttled_millis combined with parent circuit breaker trips indicates insufficient heap for bulk indexing. Mitigate by:
- Lowering
requests_per_secondto 25–50 - Disabling replica allocation during reindex:
PUT _cluster/settings {"transient":{"cluster.routing.allocation.enable":"primaries"}} - Re-enabling replicas post-completion
Silent Document Drops
A non-zero delta between total and processed often stems from malformed documents or strict mapping enforcement. Inspect the task’s response.failures array (_reindex does not auto-route failures to another index, and ignore_unavailable only skips missing/closed source indices — it does not capture malformed-document errors). Fix the destination mapping or pre-process the offending documents with an ingest pipeline, then cross-reference dropped _id values against source queries to isolate schema drift.
ILM Integration & Downstream Triggers
Once the polling loop confirms completed: true and failures: [], the observability layer transitions to ILM orchestration. Swap aliases atomically:
POST _aliases
{
"actions": [
{ "remove": { "index": "logs-current", "alias": "logs-read" } },
{ "add": { "index": "logs-current-v2", "alias": "logs-read" } }
]
}Trigger cache warming by executing a lightweight match_all query against the new index to populate the filesystem cache. Finally, attach the target ILM policy to the destination index to resume automated rollovers and phase transitions. By treating telemetry as a control signal rather than a dashboard metric, engineering teams eliminate manual handoffs and guarantee deterministic index lifecycle progression.