Elasticsearch shard allocation algorithm


Demystifying shard allocation internals in Elasticsearch

Note: This intended for people who want to understand internal workings of Elasticsearch shard allocation

Shard Allocation in ES is provided as service component named AllocationService. AllocationService is initiated and started by ClusterModule which is run in Master node and manages entire cluster level actions.

AllocationService is initiated here: AllocationService Github link

What AllocationService does ?

This service manages the node allocation of a cluster. For this reason the AllocationService keeps AllocationDeciders to choose nodes for shard allocation. This class also manages new nodes joining the cluster and rerouting of shards.

Major methods that we need to focus on this service to understand how allocation works are:

1.org.elasticsearch.cluster.routing.allocation.AllocationService#applyStartedShards(ClusterState, List)

This method applies the started shards. Initializing ShardRouting (i.e Shard info) instances that exist in the routing table are provided as parameter to this method. If the same instance of the ClusterState is returned by this method, then no change has been made.

2.org.elasticsearch.cluster.routing.allocation.AllocationService#applyFailedShards(ClusterState, List, List)

This method applies the failed shards. Assigned ShardRouting(i.e Shard info) instances that exist in the routing table is provided as parameter. Also applies a list of allocation ids to remove from the in-sync set for shard copies for which there are no routing entries in the routing table.

Both of the above method calls the below method which actually performs the shard allocation.

org.elasticsearch.cluster.routing.allocation.AllocationService#reroute(RoutingAllocation) This is the method which processes the provided shard (initializing or failed one) and tries to find nodes where these shards can be allocated/re-routed to. Reroute method above make following calls to allocate the shards:

  shardsAllocator.allocate(allocation);

here shardAllocator is the logic class where all the shard allocation algorithm lies.

Currently ES uses the following shard allocator: org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator

Method: org.elasticsearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.Balancer#decideAllocateUnassigned is the place where the logic of allocating unassigned shard resides.

Let’s check out the algorithm used in decideAllocateUnassigned method For each shard that needs to be (re)assigned, BalancedShardsAllocator calls list of deciders (we will focus on list of deciders later below) class for each available node and finds all nodes where the shard can be stored, then amongst those selected nodes it picks the node which has less weight.

we can split entire allocation approach into three parts as sorting and picking required shards to be allocated, and for each shard picking list of suitable nodes , and within those nodes select the one which has less weight (weight scoring is mentioned below).

1. Sorting the list of shards that needs to be assigned
and order of processing/assigning them

First before allocating the list of shards, ES will sort them. ES uses following approach

Say there are 2 shards i.e from two index 1 and 2 with replica set as 2, then order of allocating them would be

index1:shard1:primary
index1:shard2:primary

index2:shard1:primary
index2:shard2:primary

index1:shard1:replica1
index1:shard2:replica1

index2:shard1:replica1
index2:shard2:replica1

index1:shard1:replica2
index1:shard2:replica2

index2:shard1:replica2
index2:shard2:replica2

This logic is present here: Github link

2. Find suitable nodes where the shard’s can be placed

In this section for each shard we find list of nodes where this shard can be placed. We have list of following deciders and we run the shard and node info to each of the deciders and if all deciders returns YES, then that node is considered as suitable node else any one of the node returns NO it wont be considered.

Following are list of deciders that ES currently use and they are called in the order as mentioned below. If any of the deciders returns NO then we short circuit i.e we return early and it means that the node is not suitable for hosting the shard. If all deciders returns YES, then that node will be added to list of supported nodes for a given shard and we go to next section to pick one from the list of supported nodes.

MaxRetryAllocationDecider (Prevent retry i.e fails chain early after shard has been retried multiple times)

ResizeAllocationDecider (ensures we allocate the shards of a target index for resize operations next to the source primaries)

ReplicaAfterPrimaryActiveAllocationDecider (only allows for a replica to be allocated when the primary is active)

RebalanceOnlyWhenActiveAllocationDecider (allow rebalancing when all shards are active within the shard replication group)

ClusterRebalanceAllocationDecider

ConcurrentRebalanceAllocationDecider (throttles shard allocation/re-allocation based on max concurrent rebalance settings)

EnableAllocationDecider

NodeVersionAllocationDecider (ensure ES version and node version are compactible)

SnapshotInProgressAllocationDecider (prevent shard movement which is currently being snapshot)

RestoreInProgressAllocationDecider (prevent shard movement which is currently being restored also prevents failed shards to be allocated)

FilterAllocationDecider

SameShardAllocationDecider (prevents copy of shard allocated to same node)

DiskThresholdDecider (Ensure enough disk space is present for shard)

ThrottlingAllocationDecider

ShardsLimitAllocationDecider

AwarenessAllocationDecider (Ensure’s that primary and replica are not kept together in same rack/zone etc)

this logic is done here: Github link

3. Pick one node from list of suitable nodes(from above list)
having lowest weight

Above step i.e Step 2 gives us list of suitable nodes where we can place the shards. If step one returns empty list i.e no nodes are suitable for the shard then those shards will be un-assigned and would be retried later. For each of those suitable nodes where shard can be placed we calculate weight for those nodes assuming the given shard is added to it, then pick the node which has the lowest weight upon allocation.

First these thresholds are derived from settings By default indexBalance = 0.55f and shardBalance is 0.45f and can be changed by changing ES settings named (cluster.routing.allocation.balance.index and cluster.routing.allocation.balance.shard)

theta0 = shardBalance / (indexBalance + shardBalance);
theta1 = indexBalance / (indexBalance + shardBalance);

Below is the weight function that is called for each node which has required capacity for storing the index and we choose the node for which weight is less, thus allowing equal distribution of index and shards across the available nodes.

i.e Below is algorithm calculates weight of a node by checking how many shards and index it has. We pick the node with less weight so that we are distributing the shards equally without overloading any particular node.

private float weight(Balancer balancer, ModelNode node, String index, int numAdditionalShards) {
  final float weightShard = node.numShards() + numAdditionalShards - balancer.avgShardsPerNode();
  final float weightIndex = node.numShards(index) + numAdditionalShards - balancer.avgShardsPerNode(index);
  return theta0 * weightShard + theta1 * weightIndex;
}
         

This logic is done here: Github link

comments powered by Disqus