最近我们搭建了一个新的Elasticsearch(以下简称ES)集群,集群中有3个master节点和6个data节点,集群使用到的ES版本为7.5.2。其中master节点JVM的堆内存设置为2GB,data节点JVM的堆内存设置为30GB,master和data节点都只用作单一的节点角色(即节点不会同时是master和data两种角色)。因为master不存储数据,所以我们给master三台负载的配置都比较低。

master配置不够导致读写出错

我们在操作ES集群的时候使用了elasticsearch-py这个ES的Python操作库,问题在于这个库在操作ES的时候会先根据ES的/_nodes/_all/http接口获取集群所有节点的HTTP接口地址,之后利用这些接口地址对ES进行读写。当然elasticsearch-py会在这些接口地址间做负载均衡以及错误重试等等操作,但是/_nodes/_all/http接口会同时返回master、data以及一些其它角色节点的HTTP接口地址,这就导致elasticsearch-py在后面操作集群的过程中既会读写data节点,也会读写master节点。

因为我们master节点的配置很低,所以一旦在master节点上面进行读写操作,那么master节点的压力尤其是内存的压力就会比较大,经常就会出现circuit_breaking_exception的错误。

解决办法就是利用的elasticsearch-py的host_info_callback参数来过滤要操作的节点,具体使用方式如下

在创建ES连接对象的时候指定回调方法

1
2
3
4
5
6
from elasticsearch import Elasticsearch

es = Elasticsearch(
host_info_callback=not_master_nodes, # 指定操作的回调方法
sniff_on_start=True, sniff_on_connection_fail=True, sniffer_timeout=60
)

之后我们创建not_master_nodes过滤方法如下

1
2
3
4
5
6
7
8
9
def not_master_nodes(node_info, host):
"""
由于master节点的性能较差,所以过滤掉master节点
:param node_info:
:param host:
:return:
"""
roles = node_info.get('roles', [])
return host if 'master' not in roles else None

逻辑非常简单,就是把roles属性中包含了master的节点给剔除即可。

elasticsearch-py会在_get_host_info方法中调用host_info_callback方法,并且在sniff_hosts方法中对不符合要求的节点进行过滤,具体实现逻辑可以参考前面的源码连接。

添加了host_info_callback属性之后,elasticsearch-py就再也不会操作master节点了。这样带来了两个好处

  1. master节点的压力降低,提升了master节点的稳定性
  2. 读写操作也不会因为master节点的内存不足而报错了,提升了读写操作的稳定性

我们把上面的代码上线之后,通过nload -u M命令查看三台master节点的网速,发现三个节点的平均网速均从0.3 MByte/s降到了0.02 MByte/s,网速下降非常明显。此外通过命令netstat -an | grep 9200也看不到任何与master节点的9200端口的连接了,说明此时elasticsearch-py已经不再连接master节点了。

为什么之前从来没有出过这个问题?

我们使用ES已经很久了,为什么之前的1.5.2版本的集群都很正常,但是到了7.5.2版本的集群上就会出现master节点内存不足的错误呢?带着这样的疑问我们也查看了一下1.5.2版本的elasticsearch-py的源码,发现在1.5.2的源码中同样会根据host_info_callback方法来过滤节点,区别在于1.5.2的源码中在创建Transport对象的时候会给host_info_callback参数设置一个默认值:host_info_callback=get_host_info

get_host_info方法在源码中已经定义好了,这里摘录如下

1
2
3
4
5
6
7
8
9
def get_host_info(node_info, host):
attrs = node_info.get('attributes', {})

# ignore master only nodes
if (attrs.get('data', 'true') == 'false' and
attrs.get('client', 'false') == 'false' and
attrs.get('master', 'true') == 'true'):
return None
return host

可以看到在1.5.2版本的elasticsearch-py中,库本身就已经帮我们过滤掉了纯master节点了,也就是说1.5.2是不会读写纯粹的master节点的,这也难怪为什么我们之前从来没有遇到过这个问题了。至于为什么在后续版本中elasticsearch-py把这个特性去掉了,我猜也许是为了避免让库对用户的操作进行过多的干涉吧,因为想不想读master节点这种事情本来也应该交给用户来决定而不是库本身擅自决定的。

应用本身的一些配置

如果使用两个版本的elasticsearch-py

我们的应用现在在同时读写1.5.2和7.5.2这两个版本的集群,因为使用两个版本的elasticsearch-py会导致包冲突,我们的解决办法是把这两个版本的elasticsearch-py的源码直接复制到我们应用的源码中,两个版本的源码分别放在应用中的elasticsearch1和elasticsearch7这两个模块的文件夹中,之后想要调用时直接使用import elasticsearch1import elasticsearch7导入模块即可。

如何配置seed hosts

我们现在已经知道了elasticsearch-py操作ES分为两个步骤

  1. 通过配置的seed hosts访问ES集群,根据ES提供的接口获取到ES集群所有节点的HTTP接口,之后根据需要剔除掉一些节点(这一步在1.5.2和7.5.2中有所差异),最终得到一个符合我们需要的ES集群的节点HTTP接口列表(在elasticsearch-py中这一步操作对应的方法叫做sniff_hosts
  2. 通过第一步拿到的节点列表来对ES集群进行真正的读写操作

由此我们可以知道我们配置的seed hosts并不是一定会作为真正的读写节点的,真正读写的节点会在第一步操作中通过接口获取并进行判断得到的。所以我们现在在设置seed hosts时会把seed hosts设置为所有的master节点地址,这样的好处在于master节点基本上不会更换,而data节点可能会频繁的变更(例如更换硬盘、增加配置等等),使用master节点作为seed hosts就保证了可以在data节点变更时不再需要修改配置。

总结

其实本文核心过程就是这几部分

  1. 设置seed hosts
  2. 以seed hosts为基础,根据sniff_hosts方法拿到集群的全部节点
  3. 对拿到的全部节点进行过滤,过滤之后剩下的就是我们想要的节点
  4. 通过过滤之后的节点对集群进行真正的操作

参考

https://discuss.elastic.co/t/how-to-only-query-on-data-nodes-by-elasticsearch-py/249293
https://github.com/elastic/elasticsearch-py/issues/1378

2020-09-24补充

后来经过研究发现其实在elasticsearch-py的7.5.2版本的源码中也是定义了get_host_info方法的,摘录如下

1
2
3
4
5
def get_host_info(node_info, host):
# ignore master only nodes
if node_info.get("roles", []) == ["master"]:
return None
return host

上面的逻辑表示,只要一个节点的roles是["master"]那么默认就会被剔除。只是我调用ES的接口/_nodes/_all/http查看了一下7.5.2集群的配置,发现我的master节点的roles是["master", "ml"],还附带了一个ml的角色,因此不能匹配上面的条件,导致我的master被保留了下来。

知道了原理之后,我们就知道了解决办法了。除了像上面重写过滤方法之外,我们也可以把master节点的ml的角色设置为false,根据ES文档,我们只需要在master节点中添加如下设置即可

node.ml: false

2020-11-03补充

通过总结我们可以得到一个最佳实践

对于集群本身,我们先创建三个master节点,它们的配置如下

discovery.seed_hosts: [{{ alias.MasterIPList }}]
cluster.initial_master_nodes: [{{ alias.MasterIPList }}]

集群在刚刚启动的时候master节点的seed_hosts和initial_master_nodes都需要设置为master节点的地址列表。在集群启动完毕之后,我们可以添加data节点或者ingest节点等等其它的节点,它们只需要把seed_hosts设置为当前的master节点即可

discovery.seed_hosts: [{{ alias.MasterIPList }}]

而对于客户端,同样我们在设置ES地址的时候也只需要设置master节点的地址就可以了

1
2
3
{
"elasticsearch_hosts": [{{ alias.MasterIPList }}]
}

es的Python客户端会使用master的节点地址作为种子,通过种子获取了data节点的地址之后再在data节点上面进行真正的读写操作。

附上我们目前使用的elasticsearch.yml和jvm.options配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# ======================== Elasticsearch Configuration =========================
#
# NOTE: Elasticsearch comes with reasonable defaults for most settings.
# Before you set out to tweak and tune the configuration, make sure you
# understand what are you trying to accomplish and the consequences.
#
# The primary way of configuring a node is via this file. This template lists
# the most important settings you may want to configure for a production cluster.
#
# Please consult the documentation for further information on configuration options:
# https://www.elastic.co/guide/en/elasticsearch/reference/index.html
#
# ---------------------------------- Cluster -----------------------------------
#
# Use a descriptive name for your cluster:
#
cluster.name: {{ attrs.t_clustername }}
#
# ------------------------------------ Node ------------------------------------
#
# Use a descriptive name for the node:
#
node.name: {{ cHost.name }}
#
# Add custom attributes to the node:
#
# node.attr.rack: r1
#
node.master: {{ attrs.t_is_master }}
#
node.data: {{ attrs.t_is_data }}
#
node.ingest: {{ attrs.t_is_ingest }}
#
node.ml: false
#
# ----------------------------------- Paths ------------------------------------
#
# Path to directory where to store the data (separate multiple locations by comma):
#
path.data: /home/elasticsearch/data
#
# Path to log files:
#
path.logs: /home/elasticsearch/elasticsearch/logs
#
# ----------------------------------- Memory -----------------------------------
#
# Lock the memory on startup:
#
#bootstrap.memory_lock: true
bootstrap.system_call_filter: false
#
# Make sure that the heap size is set to about half the memory available
# on the system and that the owner of the process is allowed to use this
# limit.
#
# Elasticsearch performs poorly when the system is swapping the memory.
#
# ---------------------------------- Network -----------------------------------
#
# Set the bind address to a specific IP (IPv4 or IPv6):
#
network.host: {{ cHost.ip }}
#
# Set a custom port for HTTP:
#
http.port: {{ attrs.t_server_port }}
#
# For more information, consult the network module documentation.
#
# --------------------------------- Discovery ----------------------------------
#
# Pass an initial list of hosts to perform discovery when this node is started:
# The default list of hosts is ["127.0.0.1", "[::1]"]
#
discovery.seed_hosts: [{{ alias.MasterIPList }}]
#
# Bootstrap the cluster using an initial set of master-eligible nodes:
#
# cluster.initial_master_nodes: [{{ alias.MasterIPList }}]
#
# For more information, consult the discovery and cluster formation module documentation.
#
# ---------------------------------- Gateway -----------------------------------
#
# Block initial recovery after a full cluster restart until N nodes are started:
#
#gateway.recover_after_nodes: 3
#
# For more information, consult the gateway module documentation.
#
# ---------------------------------- Various -----------------------------------
#
# Require explicit names when deleting indices:
#
#action.destructive_requires_name: true

path.repo: ["/home/elasticsearch/ES_backup/7-5-2"]
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
## JVM configuration

################################################################
## IMPORTANT: JVM heap size
################################################################
##
## You should always set the min and max JVM heap
## size to the same value. For example, to set
## the heap to 4 GB, set:
##
## -Xms4g
## -Xmx4g
##
## See https://www.elastic.co/guide/en/elasticsearch/reference/current/heap-size.html
## for more information
##
################################################################

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

-Xms{{attrs.t_heap_size}}
-Xmx{{attrs.t_heap_size}}

################################################################
## Expert settings
################################################################
##
## All settings below this section are considered
## expert settings. Don't tamper with them unless
## you understand what you are doing
##
################################################################

## GC configuration
-XX:+UseConcMarkSweepGC
-XX:CMSInitiatingOccupancyFraction=75
-XX:+UseCMSInitiatingOccupancyOnly

## G1GC Configuration
# NOTE: G1GC is only supported on JDK version 10 or later.
# To use G1GC uncomment the lines below.
# 10-:-XX:-UseConcMarkSweepGC
# 10-:-XX:-UseCMSInitiatingOccupancyOnly
# 10-:-XX:+UseG1GC
# 10-:-XX:G1ReservePercent=25
# 10-:-XX:InitiatingHeapOccupancyPercent=30

## JVM temporary directory
-Djava.io.tmpdir=${ES_TMPDIR}

## heap dumps

# generate a heap dump when an allocation from the Java heap fails
# heap dumps are created in the working directory of the JVM
-XX:+HeapDumpOnOutOfMemoryError

# specify an alternative path for heap dumps; ensure the directory exists and
# has sufficient space
-XX:HeapDumpPath=data

# specify an alternative path for JVM fatal error logs
-XX:ErrorFile=logs/hs_err_pid%p.log

## JDK 8 GC logging
8:-XX:+PrintGCDetails
8:-XX:+PrintGCDateStamps
8:-XX:+PrintTenuringDistribution
8:-XX:+PrintGCApplicationStoppedTime
8:-Xloggc:logs/gc.log
8:-XX:+UseGCLogFileRotation
8:-XX:NumberOfGCLogFiles=32
8:-XX:GCLogFileSize=64m

# JDK 9+ GC logging
9-:-Xlog:gc*:file=logs/gc.log:t,tags,level:filecount=32,filesize=64m