0%

es 集群健康状态

es 查询集群健康状态

1
2
3
4
5
6
7
8
9
10
GET /_cat/health?v

epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1488006741 15:12:21 elasticsearch yellow 1 1 1 1 0 0 1 0 - 50.0%

epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1488007113 15:18:33 elasticsearch green 2 2 2 1 0 0 0 0 - 100.0%

epoch timestamp cluster status node.total node.data shards pri relo init unassign pending_tasks max_task_wait_time active_shards_percent
1488007216 15:20:16 elasticsearch yellow 1 1 1 1 0 0 1 0 - 50.0%

集群健康状态 green、yellow、red

  • green:每个索引的primary shard和replica shard都是active状态的
  • yellow:每个索引的primary shard都是active状态的,但是部分replica shard不是active状态,处于不可用的状态
  • red:不是所有索引的primary shard都是active状态的,部分索引有数据丢失了

为什么会会处于yellow状态?

比如当前es只有一台服务启动即只有一个node,此时只启动了一个primary shard,而依据es的 容错机制 replica shard 不予 primary shard 同服务,即单机情况下 集群状态为yellow
也有当集群环境下 一台机器出现故障 处理正常运行的 primary shard 的 replica shard 恰好在故障服务器上 这时也是会显示yellow

es 基础操作

查询集群下的 索引

1
2
3
4
5
GET /_cat/indices?v

health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open student rUm9n9wMRQCCrRDEhqneBg 1 1 1 0 3.1kb 3.1kb

新建索引

1
2
PUT /student?pretty

删除索引

1
2
DELETE /student?pretty

索引的 CURD

  • 新建搜索内容 document
    PUT /index/type/id (这里的id 不添加时,系统会默认一个值)
    {
    “filed”:”value”
    } // 整个大括号 则为 一个document
    注意: 当PUT作为update操作时,需要将所有filed全部写上,否则全覆盖,一般不建议用于update
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
PUT /student/class/1
{
"name":"tom"
"age":15
}

//数据下面有用到
PUT /student/class/2
{
"name":"ajo"
"age":20
}

PUT /student/class/3
{
"name":"jack"
"age":25
}
  • 修改搜索内容
    通过调用api _update 对指定索引下的字段进行更新
    POST /index/type/id/_update (这里的id必填)
    {
    “doc”:{
      "filed":"value" //filed 表示要修改的字段    
    
    }
    }
1
2
3
4
5
6
7
PUT /student/class/1/_update
{
"doc":{
"name":"jack"
}
}

  • 删除指定搜索内容
    删除索引下的一条记录
    1
    2
    DELETE /student/class/1

es 查询操作相关

  • 查询索引下全部的内容(全文搜索)

    1
    2
    3
    4
    5
    6
    7
    GET /student/class/_search
    {
    "query":{
    "match_all":{}
    }
    }

  • 查询 包含指定关键字
    这里搜索通过倒排索引 去搜索 如下:name 为tom 则会通过上面已经新增的document 得到一个列表

id doc
1 tom
2 ajo
doc id
tom 1
o 1,2

即返回结果为 document id 为1、2的数据

1
2
3
4
5
6
7
8
9
GET /student/class/_search
{
"query":{
"match":{
"name":"tom"
}
}
}

  • phrase search 短语查询
    跟全文检索相对应,相反,全文检索会将输入的搜索串拆解开来,去倒排索引里面去一一匹配,只要能匹配上任意一个拆解后的单词,就可以作为结果返回
    phrase search,要求输入的搜索串,必须在指定的字段文本中,完全包含一模一样的,才可以算匹配,才能作为结果返回
1
2
3
4
5
6
7
8
9
GET /student/class/_search
{
"query":{
"match_phrase":{
"name":"tom"
}
}
}

  • 查询条件

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    GET /student/class/_search
    {
    "query":{
    "bool": {
    "filter": [
    {"range": {
    "ago": {
    "gte": 10,//大于等于
    "lte": 20//小于等于
    }
    }}
    ]
    }
    }
    }
  • highlight search(高亮搜索结果)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    GET /ecommerce/product/_search
    {
    "query" : {
    "match" : {
    "producer" : "producer"
    }
    },
    "highlight": {
    "fields" : {
    "producer" : {}
    }
    }
    }

mget批量查询api

mget批量查询可以将多个操作合并一个读操作去执行,可以减少网络请求次数,提升系统性能,当然如果请求数据过多,也会在一定程度上影响了影响速度
mget是很重要的,一般来说,在进行查询的时候,如果一次性要查询多条数据的话,那么一定要用batch批量操作的api,尽可能减少网络开销次数,可能可以将性能提升数倍,甚至数十倍,非常非常之重要

  • 场景1 不同index下 的查询

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    GET /_mget
    {
    "docs" : [
    {
    "_index" : "test_index1",
    "_type" : "test_type",
    "_id" : 1
    },
    {
    "_index" : "test_index2",
    "_type" : "test_type",
    "_id" : 2
    }
    ]
    }
  • 场景2 同一index下 的查询

    1
    2
    3
    4
    GET /test_index/type/_mget
    {
    "ids": [1, 2]
    }

批量写操作 _bulk

bulk api对json的语法有严格要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行,bulk 的语法格式如下

1
2
{"action": {"metadata"}}
{"data"}

bulk 主要支持一下类型的操作:

  • delete: 删除一个文档,只要1个json串就可以了
  • create: PUT /index/type/id/_create,强制创建
  • index:普通的put操作,可以是创建文档,也可以是全量替换文档
  • update:执行的partial update操作
1
2
3
4
5
6
7
8
POST /_bulk
{ "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }}
{ "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }

bulk 操作出现异常

bulk操作中,任意一个操作失败,是不会影响其他的操作,但是会在返回的结果里面告诉异常日志

bulk 性能

bulk request会加载到内存里,如果太大的话,性能反而会下降,需要反复尝试得出一个最佳的bulk size,一般从1000-5000条数据开始,尝试逐渐增加
如果看大小的话,最好控制在5~15MB之间

分页查询

分页主要有两个参数
size:每一页多少条数据
from:表示从第多少条开始分页 起始页为0 为0 与不传from 单传size效果一致

请求方式主要有:

script
1
2
3
GET _search?size=2
GET _search?size=2&from=0 # 返回结果与第一条执行结果一致
GET _search?size=2&from=2

deep paging 问题

集群环境下,当查询的目标数据很多,比如超过几十万甚至几百万数据时,这个时候进行分页,这个时候:

  • step1 client 分页请求 如 _search?size=1000&from=1000000 到 coordinate node ,节点会找到该查询多个对应的shard(假设3shard)
  • step2 请求到达对应的shard 开始执行分页操作 _search?size=1000&from=1000000 取1000条数据 并返回给coordinate node
  • step3 coordinate收到shard返回的总数据条数为3000条,这个时候开始做重排序,默认是通过相关度分数排序,取前1000条数据返回给client

deep paging问题 其实是一个深度查询的问题,如涉及分页查询较深时且数据较大时,非常消耗网络带宽,消耗内存,所以存在性能问题,应尽量避免 deep paging操作

多种搜索方式

query string search的由来:
因为search参数都是以http请求的query string来附带的。
注意,这个查询在生产环境上使用的是不多的。

(1)搜索所有的商品:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
GET /sell/product/_search
返回值:
{
"took": 12,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 1,
"hits": [
{
"_index": "sell",
"_type": "product",
"_id": "2",
"_score": 1,
"_source": {
"name": "oyr yaogao",
"desc": "zheshi oyr yagao",
"price": 3000,
"producer": "oyr yagao producer",
"tags": [
"lengcang",
"baoxian"
]
}
},
{
"_index": "sell",
"_type": "product",
"_id": "1",
"_score": 1,
"_source": {
"name": "gaolujie yagao",
"desc": "zheshi gaolujie yigeyagao",
"price": 30,
"producer": "gaolujie yagao producer",
"tags": [
"meibai",
"fangzhu"
]
}
},
{
"_index": "sell",
"_type": "product",
"_id": "3",
"_score": 1,
"_source": {
"name": "latiao",
"desc": "zheshi latiao yagao",
"price": 15,
"producer": "latiao yagao producer",
"tags": [
"la",
"meiwei"
]
}
}
]
}
}

返回值说明:
took:耗费了多少毫秒
timed_out:是否超时,这里是没有
_shards:数据拆成了5个分片,所以对于搜索请求,会打到所有的primary shard(或者是它的某个replica shard也是可以)
hits.total:查询结果的数量,3个document
hits.max_score:score的含义:就是document对应一个search的相关度的匹配分数,越相关,就越匹配,分数也越高。这里显示的是最大的一个匹配分数
hits.hits:包含了匹配搜索的document的详细数据

(2)搜索商品名称中包含yagao的商品,而且按照售价降序排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
GET /sell/product/_search?q=name:yagao&sort=price:desc
返回值:
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": null,
"hits": [
{
"_index": "sell",
"_type": "product",
"_id": "2",
"_score": null,
"_source": {
"name": "oyr yagao",
"desc": "zheshi oyr yagao",
"price": 3000,
"producer": "oyr yagao producer",
"tags": [
"lengcang",
"baoxian"
]
},
"sort": [
3000
]
},
{
"_index": "sell",
"_type": "product",
"_id": "1",
"_score": null,
"_source": {
"name": "gaolujie yagao",
"desc": "zheshi gaolujie yigeyagao",
"price": 30,
"producer": "gaolujie yagao producer",
"tags": [
"meibai",
"fangzhu"
]
},
"sort": [
30
]
},
{
"_index": "sell",
"_type": "product",
"_id": "3",
"_score": null,
"_source": {
"name": "latiao yagao",
"desc": "zheshi latiao yagao",
"price": 15,
"producer": "latiao yagao producer",
"tags": [
"la",
"meiwei"
]
},
"sort": [
15
]
}
]
}
}

query DSL

DSL:Domain Specified Language,特定领域的语言
参数是放在http request body中的。
http request body:请求体,可以用json的格式来构建查询语法,比较方便,可以构建各种复杂的语法,比query string search肯定强大多了

(1)查询所有的商品

1
2
3
4
5
6
GET /sell/product/_search
{
"query": {
"match_all": {}
}
}

(2)查询名称包含yagao的商品,同时按照价格降序排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
GET /sell/product/_search
{
"query": {
"match": {
"name": "yagao"
}
}
, "sort": [
{
"price": "desc"
}
]
}


GET /sell/product/_search
{
"query": {
"match": {
"name": "yagao"
}
},
"sort": [
{
"price": {
"order": "desc"
}
}
]
}

(3)分页查询商品,总共3条商品,假设每页就显示1条商品,现在显示第2页,所以就查出来第2个商品

1
2
3
4
5
6
7
8
GET /sell/product/_search
{
"query": {
"match_all": {}
},
"from": 1, // 从第一条开始查,并不包含第一条
"size": 1 // 查一条数据
}

(4)指定要查询出来商品的名称和价格就可以,也就是具体要显示哪些field。

1
2
3
4
5
6
7
GET /sell/product/_search
{
"query": {
"match_all": {}
},
"_source": ["name", "price"]
}

query filter

(1)搜索商品名称包含yagao,而且售价大于25元的商品

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET /sell/product/_search
{
"query": {
"bool": {
"must": {
"match": {
"name": "yagao"
}
},
"filter": {
"range": {
"price": {
"gt": 25
}
}
}
}
}
}

全文检索

1
2
3
4
5
6
7
8
GET /sell/product/_search
{
"query": {
"match": {
"producer": "oyr yagao producer"
}
}
}

producer这个字段,会先被分词拆解
yagao
producer
然后一个个去匹配文档中producer对应的倒排索引

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
返回值:
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0.7594807,
"hits": [
{
"_index": "sell",
"_type": "product",
"_id": "2",
"_score": 0.7594807,
"_source": {
"name": "oyr yagao",
"desc": "zheshi oyr yagao",
"price": 3000,
"producer": "oyr yagao producer",
"tags": [
"lengcang",
"baoxian"
]
}
},
{
"_index": "sell",
"_type": "product",
"_id": "1",
"_score": 0.5063205,
"_source": {
"name": "gaolujie yagao",
"desc": "zheshi gaolujie yigeyagao",
"price": 30,
"producer": "gaolujie yagao producer",
"tags": [
"meibai",
"fangzhu"
]
}
},
{
"_index": "sell",
"_type": "product",
"_id": "3",
"_score": 0.5063205,
"_source": {
"name": "latiao yagao",
"desc": "zheshi latiao yagao",
"price": 15,
"producer": "latiao yagao producer",
"tags": [
"la",
"meiwei"
]
}
}
]
}
}

短语搜索
跟全文检索相对应,相反,全文检索会将输入的搜索串拆解开来,去倒排索引里面去一一匹配,只要能匹配上任意一个拆解后的单词,就可以作为结果返回
phrase search,要求输入的搜索串,必须在指定的字段文本中,完全包含一模一样的,才可以算匹配,才能作为结果返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
GET /sell/product/_search
{
"query": {
"match_phrase": {
"producer": "oyr yagao producer"
}
}
}
返回值:
{
"took": 15,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 1,
"max_score": 0.7594808,
"hits": [
{
"_index": "sell",
"_type": "product",
"_id": "2",
"_score": 0.7594808,
"_source": {
"name": "oyr yagao",
"desc": "zheshi oyr yagao",
"price": 3000,
"producer": "oyr yagao producer",
"tags": [
"lengcang",
"baoxian"
]
}
}
]
}
}

聚合查询

(1)计算每个tag下的商品数量
先将文本field的fielddata属性设置为true,不然执行聚合查询会报错。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
PUT /sell/_mapping/product
{
"properties": {
"tags":{
"type": "text",
"fielddata": true
}
}
}

查询:
GET /sell/product/_search
{
"size": 0,
"aggs": {
"group_by_tags": {
"terms": {
"field": "tags"
}
}
}
}

类似于数据库sql:

1
Select tags, count(*) from product group by tags

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0,
"hits": []
},
"aggregations": {
"group_by_tags": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "meibai",
"doc_count": 2
},
{
"key": "meiwei",
"doc_count": 2
},
{
"key": "fangzhu",
"doc_count": 1
}
]
}
}
}

(2)对名称中包含yagao的商品,计算每个tag下的商品数量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
GET /sell/product/_search
{
"size": 0,
"query": {
"match": {
"name": "yagao"
}
},
"aggs": {
"group_by_tags": {
"terms": {
"field": "tags"
}
}
}
}

类似于数据库sql:

1
Select tags, count(*) from product where name='yagao' group by tags

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0,
"hits": []
},
"aggregations": {
"group_by_tags": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "meibai",
"doc_count": 2
},
{
"key": "meiwei",
"doc_count": 2
},
{
"key": "fangzhu",
"doc_count": 1
}
]
}
}
}

(3)先分组,再算每组的平均值,计算每个tag下的商品的平均价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
GET /sell/product/_search
{
"size": 0,
"aggs": {
"group_by_tags": {
"terms": {
"field": "tags"
},
"aggs": {
"avg_price": {
"avg": {"field": "price"}
}
}
}
}
}

类似于数据库sql:

1
Select tags, count(*), avg(price) from product group by tags

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0,
"hits": []
},
"aggregations": {
"group_by_tags": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "meibai",
"doc_count": 2,
"avg_price": {
"value": 1515
}
},
{
"key": "meiwei",
"doc_count": 2,
"avg_price": {
"value": 1507.5
}
},
{
"key": "fangzhu",
"doc_count": 1,
"avg_price": {
"value": 30
}
}
]
}
}
}

(4)计算每个tag下的商品的平均价格,并且按照平均价格降序排序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
GET /sell/product/_search
{
"size": 0,
"aggs": {
"group_by_tags": {
"terms": {
"field": "tags",
"order": {
"avg_price": "desc"
}
},
"aggs": {
"avg_price": {
"avg": {"field": "price"}
}
}
}
}
}

类似于数据库sql:

1
Select tags, count(*), avg(price) as avg_price from product group by tags order by avg_price

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
{
"took": 2,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0,
"hits": []
},
"aggregations": {
"group_by_tags": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "meibai",
"doc_count": 2,
"avg_price": {
"value": 1515
}
},
{
"key": "meiwei",
"doc_count": 2,
"avg_price": {
"value": 1507.5
}
},
{
"key": "fangzhu",
"doc_count": 1,
"avg_price": {
"value": 30
}
}
]
}
}
}

(5)按照指定的价格范围区间进行分组,然后在每组内再按照tag进行分组,最后再计算每组的平均价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
GET /sell/product/_search
{
"size": 0,
"aggs": {
"group_by_price": {
"range": {
"field": "price",
"ranges": [
{
"from": 0,
"to": 20
},
{
"from": 20,
"to": 10000
}
]
},
"aggs": {
"group_by_tags": {
"terms": {
"field": "tags"
},
"aggs": {
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
}
}
}
}

返回值:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
{
"took": 3,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"failed": 0
},
"hits": {
"total": 3,
"max_score": 0,
"hits": []
},
"aggregations": {
"group_by_price": {
"buckets": [
{
"key": "0.0-20.0",
"from": 0,
"to": 20,
"doc_count": 1,
"group_by_tags": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "meiwei",
"doc_count": 1,
"avg_price": {
"value": 15
}
}
]
}
},
{
"key": "20.0-10000.0",
"from": 20,
"to": 10000,
"doc_count": 2,
"group_by_tags": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "meibai",
"doc_count": 2,
"avg_price": {
"value": 1515
}
},
{
"key": "fangzhu",
"doc_count": 1,
"avg_price": {
"value": 30
}
},
{
"key": "meiwei",
"doc_count": 1,
"avg_price": {
"value": 3000
}
}
]
}
}
]
}
}
}

filter 与 query的深入比较

filter与query示例

script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
PUT /company/employee/2
{
"address": {
"country": "china",
"province": "jiangsu",
"city": "nanjing"
},
"name": "tom",
"age": 30,
"join_date": "2016-01-01"
}

PUT /company/employee/3
{
"address": {
"country": "china",
"province": "shanxi",
"city": "xian"
},
"name": "marry",
"age": 35,
"join_date": "2015-01-01"
}

#搜索请求:年龄必须大于等于30,同时join_date必须是2016-01-01
GET /company/employee/_search
{
"query": {
"bool": {
"must": [
{
"match": {
"join_date": "2016-01-01"
}
}
],
"filter": {
"range": {
"age": {
"gte": 30
}
}
}
}
}
}

filter 与 query对比

  • filter: 仅仅只是按搜索条件过滤出所需要的数据而已,不涉及相关度分数的计算,对相关度没有影响
  • query: 会去计算每个document相对搜索条件的相关度,并按相关度进行排序

总结:

  • 如果是在进行搜索,需要将最匹配搜索条件的数据先返回,那么用query。
  • 如果只是想根据搜索条件筛选出一部分数据,那么用filter
  • 如果需要将符合条件的document排名靠前,用query包含,如果想其他的条件不影响到前面的document排序则用filter过滤

filter与query性能

  • filter: 不需要计算相关度分数,不需要按照相关度分数进行排序,同时还有内置的自动cache最常使用filter的数据
  • query: 相反,要计算相关度分数,按照分数进行排序,而且无法cache结果

单执行filter需注意

script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 参数 constant_score 必传 否则会出现异常
GET /student/class/_search
{
"query": {
"constant_score": {
"filter": {
"range": {
"mark": {
"gte": 30
}
}
}
}
}
}

几种query的搜索语法

match all

查询所有
script
1
2
3
4
5
6
7
#查询所有index的内容
GET /_search
{
"query": {
"match_all": {}
}
}
script
1
2
3
4
5
6
7
#查询指定index下的所有内容
GET student/class/_search
{
"query":{
"match_all": {}
}
}

match

匹配某一个filed是否包含某个文本,会触发分词,相当于是 full text(全文检索)
script
1
2
3
4
5
6
7
8
9
# 查询所有index 指定field是否包含查询内容
GET /_search
{
"query":{
"match": {
"title": "学习"
}
}
}
script
1
2
3
4
5
6
7
8
9
# 查询指定index 指定field是否包含查询内容
GET student/class/_search
{
"query":{
"match": {
"title": "学习"
}
}
}

multi match

查询多个field是否包含查询内容
script
1
2
3
4
5
6
7
8
9
10
# 查询所有index 指定的fields下是否包含查询内容
GET /_search
{
"query":{
"multi_match":{
"query":"学习",
"fields":["title","desc"]
}
}
}
script
1
2
3
4
5
6
7
8
9
10
11
# 查询指定index 指定的fields下是否包含查询内容
GET student/class/_search
{
"query":{
"multi_match":{
"query":"学习",
"fields":["title","desc"]
}
}
}

range query

查询field是否在指定的范围值内,放query里会对相关度产生影响,放filter里面无影响
script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 查询所有index 指定的field是否在查询内容范围内
GET /_search
{
"query":{
"range":{
"mark":{
#"gt":80, 大于
#"lt":120 小于
"gte":80, #大于等于
"lte":120 #小于等于

}
}
}
}
script
1
2
3
4
5
6
7
8
GET student/class/_search
{
"query":{
"range":{
"mark":{"gte":80}
}
}
}

term query

把查询的数据当成 exact value(精准匹配)进行查询。
这里查询的内容是要精准匹配field内容
需要建立索引的时候,指定field不分词才能查询到
或者field已经是最小分词单位

script
1
2
3
4
5
6
7
8
GET /_search
{
"query":{
"term": {
"name": "八"
}
}
}
script
1
2
3
4
5
6
7
8
GET student/class/_search
{
"query":{
"term": {
"name": "八"
}
}
}

terms query

原理与term一致,不过对指定的field可以查询指定多个搜索词
script
1
2
3
4
5
6
7
8
GET student/class/_search
{
"query":{
"terms": {
"name": ["八","九"]
}
}
}

组合查询

参数 bool 多条件组合查询参数,bool中可以使用 must、 must_not 、should 来组合查询条件 ,bool 可嵌套
一下参数在与match、multi match、term、terms一起使用时注意查询方式是exact value or full value

  • must: 需要满足条件 ==或like
  • must_not: 不需要在满足条件内的 !=或 not like
  • should: should中的两个条件至少满足一个就可以,should下有多个条件时注意加参数 minimum_should_match
  • filter
script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 查询学习成绩在30~100的学生信息,成绩倒叙,备注含学习,李四作弊取消成绩
GET student/class/_search?sort=mark:desc
{
"query":{
"bool": {
"must_not": [
{"match": {
"name": "李四"
}}
],
"minimum_should_match":2, # should下满足几个条件
"should": [
{"match":{"title":"委员"}}
,
{"match":{"desc":"突出"}}
],
"filter": {
"bool": {
"must":
[
{"range":{"mark": {"gte":20,"lte":100}}},
{"match":{"desc": "学习"}}
]
}

}
}
}
}

校验不合法的搜索

特别复杂庞大的搜索下,比如你一下子写了上百行的搜索,这个时候可以先用validate api去验证一下,搜索是否合法

script
1
2
3
4
5
6
7
#格式: GET index/type/_validate/query?explain
GET student/class/_validate/query?explain
{
"query":{
"match_all": {}
}
}

返回结果

script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
{
"_shards" : {
"total" : 1,
"successful" : 1,
"failed" : 0
},
"valid" : true,
"explanations" : [
{
"index" : "student",
"valid" : true,
"explanation" : "+*:* #*:*"
}
]
}

什么是 elasticsearch?

它是一个分布式、可扩展、实时的搜索与数据分析引擎。
它能从项目一开始就赋予你的数据以搜索、分析和探索的能力,这是通常没有预料到的。
以下 简称 es

es在Windows环境下的安装

1、安装JDK,至少1.8.0_73以上版本,java -version
2、下载和解压缩Elasticsearch安装包,目录结构,安装包链接,提取码:bepq
3、启动Elasticsearch:bin\elasticsearch.bat,es本身特点之一就是开箱即用,如果是中小型应用,数据量少,操作不是很复杂,直接启动就可以用了出现started就是启动成功了。
es-windows启动
4、检查ES是否启动成功:http://localhost:9200/?pretty

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
name: node名称
cluster_name: 集群名称(默认的集群名称就是elasticsearch)
version.number: 5.2.0,es版本号
{
"name" : "4onsTYV",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "nKZ9VK_vQdSQ1J0Dx9gx1Q",
"version" : {
"number" : "5.2.0",
"build_hash" : "24e05b9",
"build_date" : "2017-01-24T19:52:35.800Z",
"build_snapshot" : false,
"lucene_version" : "6.4.0"
},
"tagline" : "You Know, for Search"
}

5、修改集群名称:elasticsearch.yml
6、下载和解压缩Kibana安装包,使用里面的开发界面,去操作elasticsearch,作为我们学习es知识点的一个主要的界面入口
7、启动Kibana:bin\kibana.bat
8、访问http://localhost:5601 进入Dev Tools界面
9、在Dev Tools界面上运行 GET _cluster/health

端口号9300与9200区别

区别:  
9300端口:ES节点之间通讯使用  
9200端口:ES节点和外部通讯使用  

9300是TCP协议端口号,ES集群之间通讯的端口号  
9200端口号,暴露ES Restful接口端口号

lucene 与 es 的关系

lucene 是最先进、功能最强大的搜索库。
如果直接基于 lucene 开发,非常复杂,即便写一些简单的功能,也要写大量的 Java 代码,需要深入理解原理。

es 基于lucene, 隐蔽了 lucene的复杂性,提供了restful api 和 java api接口(还有其他语言的api接口)
特点:

  • 分布式的文档存储引擎
  • 分布式的搜索引擎和分析引擎
  • 分布式 支持PB级数据

es 的核心概念

near realtime(准实时)、cluster集群、node节点、  
document&field、index、shard、replica

Near realtime 准实时

  • 从写入数据到数据可以被搜索有一个时间延迟(大概1s)
  • 基于es执行搜索和分析可以达到秒级

Node 节点

Node 是集群中的一个节点,节点也有一个名称,默认是随机分配的。
默认节点会去加入一个名称为 elasticsearch 的集群。
如果直接启动一堆节点,那么它们会自动组成一个 elasticsearch 集群,当然一个节点也可以组成 elasticsearch 集群。

cluster 集群

集群包含多个节点,每个节点属于哪个集群可以通过 elasticsearch.yml 配置文件来决定
中小型应用开始一个集群一个节点也正常

Document & field

文档是 es 中最小的数据单元,一个 document 可以是一条客户数据、一条商品分类数据、一条订单数据。
通常用 json 数据结构来表示。每个 index 下的 type,都可以存储多条 document。
一个 document 里面有多个 field,每个 field 就是一个数据字段。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
{
"product_id": "1",
"product_name": "iPhone X",
"product_desc": "苹果手机",
"category_id": "2",
"category_name": "电子产品"
}
```
### document -文档
es集群中document 有点类似于 DB中的表,而document中的field则对应DB中的表字段 所以es依据这个结构特性,适应在NOSQL中使用

### document查询内部原理
* client 请求到任意一个node 使其成为coordinate node
* coordinate node 对document进行路由,将请求转发对应的node,此时会使用round-robin随机轮询算法,在所有的primary shard 和replica shard中随机取一个,让读请求负载均衡
* 接收请求shard对应的node 会将响应结果返回coordinate node
* coordinate node 再返回的client

特殊场景可能无法读取到document:document还在建立索引过程中,可能只在primary shard上有,replica没有,此时round-robin刚好随机指定了replica,从而导致document无法正常读取到,但是在其索引建立完成后,primary 和 replica可以正常读取

![document读请求内部原理](https://kubpang.gitee.io/sourceFile/elasticsearch/document读请求内部原理.png)

## index - 索引
索引包含了一堆有相似结构的文档数据,比如商品索引。
一个索引包含<font color=red>很多 document</font>,一个索引就代表了一类相似或者相同的 document,操作时<font color=red>index能是小写,可以包含下划线</font>。

## type - 类型
每个索引里可以有一个或者多个 type,type 是 index 的一个逻辑分类,比如商品 index 下有多个 type:日化商品 type、电器商品 type、生鲜商品 type。
每个 type 下的 document 的 field 可能不太一样。
<font color=red>注意:</font> 6.x 只有一个type 7.x后 type取消


## mapping
index的type的元数据,每个type都有一个自己的mapping,决定了数据类型,建立倒排索引的行为,还有进行搜索的行为,简称为mapping
dynamic mapping:自动建立index,创建type,以及type对应的mapping,mapping中包含了每个field对应的数据类型,以及分词等设置
es 在自动建立mapping的时候,对不同的field设置了data type,而不同data type的 分词、搜索等行为是不一致的,所以会导致 在在_search时,_all_field 和 指定字段的查询方式返回的结果可能不一致
查看mapping
```shell script
#GET /index/_mapping/?pretty
GET /student/_mapping/?pretty

精准匹配与全文搜索的对比

  • exact value: 精准匹配
    只有搜索内容与查询内容一致时才可以被查询出来
  • full text: 全文检索 不是说单纯的只是匹配完整的一个值,而是可以对值进行拆分词语后(分词)进行匹配,也可以通过缩写、时态、大小写、同义词等进行匹配
    • 缩写 vs. 全程:cn vs. china
    • 格式转化:like liked likes
    • 大小写:Tom vs tom
    • 同义词:like vs love

mapping的核心数据类型

字符串类型:string
整形:byte,short,integer,long
浮点类型:float,double
布尔类型:boolean
日期类型:date

dynamic mapping

true or false –> boolean
123 –> long
123.45 –> double
2017-01-01 –> date
“hello world” –> string/text

如何建立索引

analyzed :建立分词
not_analyzed: 不建立分词
no:不被索引和搜索

修改mapping

只能建立index是手动建立mapping,或者新增field mapping,但是不能 update field mapping

script
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#6.x 版本正常运行 7.x 版本执行错误
#新建field mapping
PUT /website
{
"mappings": {
"article": {
"properties": {
"author_id": {
"type": "long"
},
"title": {
"type": "text",
"analyzer": "english"
},
"content": {
"type": "text"
},
"post_date": {
"type": "date"
},
"publisher_id": {
"type": "text",
"index": "not_analyzed"
}
}
}
}
}

#7.x 去掉了type
PUT /website
{
"mappings": {
"properties": {
"author_id": {
"type": "long"
},
"title": {
"type": "text",
"analyzer": "english"
},
"content": {
"type": "text"
},
"post_date": {
"type": "date"
},
"publisher_id": {
"type": "text",
"index": false
}
}
}
}

PUT /website/_mapping/article
{
"properties" : {
"new_field" : {
"type" : "string",
"index": "not_analyzed"
}
}
}

# 修改field mapping 异常
PUT /website
{
"mappings": {
"article": {
"properties": {
"author_id": {
"type": "text"
}
}
}
}
}

{
"error": {
"root_cause": [
{
"type": "index_already_exists_exception",
"reason": "index [website/co1dgJ-uTYGBEEOOL8GsQQ] already exists",
"index_uuid": "co1dgJ-uTYGBEEOOL8GsQQ",
"index": "website"
}
],
"type": "index_already_exists_exception",
"reason": "index [website/co1dgJ-uTYGBEEOOL8GsQQ] already exists",
"index_uuid": "co1dgJ-uTYGBEEOOL8GsQQ",
"index": "website"
},
"status": 400
}

mapping 总结

  • 往es里面直接插入数据,es会自动建立索引,同时建立type以及对应的mapping
  • mapping中就自动定义了每个field的数据类型
  • 不同的数据类型(比如说text和date),可能有的是exact value,有的是full text
  • exact value,在建立倒排索引的时候,分词的时候,是将整个值一起作为一个关键词建立到倒排索引中的;full text,会经历各种各样的处理,分词,normaliztion(时态转换,同义词转换,大小写转换),才会建立到倒排索引中
  • 同时呢,exact value和full text类型的field就决定了,在一个搜索过来的时候,对exact value field或者是full text field进行搜索的行为也是不一样的,会跟建立倒排索引的行为保持一致;比如说exact value搜索的时候,就是直接按照整个值进行匹配,full text query string,也会进行分词和normalization再去倒排索引中去搜索
  • 可以用es的dynamic mapping,让其自动建立mapping,包括自动设置数据类型;也可以提前手动创建index和type的mapping,自己对各个field进行设置,包括数据类型,包括索引行为,包括分词器,等等

shard & replica

  • index包含多个shard

  • 每个shard都是一个最小工作单元,承载部分数据,每个 shard 都是一个 lucene 实例。,有完整的建立索引和处理请求的能力。

  • 增减节点时,shard会自动在nodes中负载均衡(尽量保证每个节点都是一样的负载)

  • primary shard和replica shard,每个document肯定只存在于某一个primary shard以及其对应的replica shard中,不可能存在于多个primary shard

  • replica shard是primary shard的副本,负责容错,以及承担读请求负载

  • primary shard的数量在创建索引的时候就固定了,replica shard的数量可以随时修改,primary shard的数量是不能的修改的。

  • primary shard的默认数量是5,replica默认是1,默认有10个shard,5个primary shard,5个replica shard(每个primary shard都对应一个replica shard)

  • primary shard不能和自己的replica shard放在同一个节点上(否则节点宕机,primary shard和副本都丢失,起不到容错的作用),但是可以和其他primary shard的replica shard放在同一个节点上

replica - 副本 (replica shard 简称 replica)

当服务出现宕机时,shard可能会丢失,因此可以为每个shard创建多个replica副本。
replica 可以在shard出现故障时提供备用服务,保障数据不会丢失,多个replica可以提升搜索的吞吐量和性能。
primary shard(建立索引时一次设置,不能修改,默认 5 个),replica shard(随时修改数量,默认 1 个),默认每个索引 10 个 shard,5 个 primary shard,5个 replica shard,最小的高可用配置,是 2 台服务器。

es 集群 结构图

es 核心数据 与 db的比较

es db
index
type
document 一行数据

资源路径:https://shishan100.gitee.io/docs/#/./docs/high-concurrency/es-introduction

为什么要使用注解编程

注解编程可以使得代码更简洁,提供开发效率。  
替换xml这种配置形式,简化配置

例如 开发一个User类:

1
public class User(){....}

添加注解@Component后如下:

1
2
@Component
public class User(){....}

该形式等价于在spring配置文件中的

1
<bean id="user" class="com.study.User"></bean>

容器相关注解

组件注册

@Configuration

作用在类上,表示这个是一个配置类,相当于配置文件xml一样

@Bean

作用在方法上,和 bean 标签功能一致,返回值是class,方法名默认是id,也可以指定

@ComponentScan

作用在类上,配置扫描包,可以使用excludeFilters或includeFilters来过滤或指定包含哪个类

使用excludeFilters:过滤掉哪些类

1
2
3
@ComponentScan(value = {"com.study"}, excludeFilters={
@Filter(type=FilterType.ANNOTATION, value={Controller.class, Service.class})
}) //配置扫描包

使用includeFilters:包含哪些类

1
2
3
@ComponentScan(value = {"com.study"}, includeFilters={
@Filter(type=FilterType.ANNOTATION, value={Service.class})
}, useDefaultFilters=false) //配置扫描包

@ComponentScans

配置扫描包,可以设置多个值

1
2
3
4
5
@ComponentScans({
@ComponentScan(value = {"com.study"}, excludeFilters={
@Filter(type=FilterType.ANNOTATION, value={Controller.class, Service.class})
}) //配置扫描包
})

自动过滤规则:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class MyTypeFilter implements TypeFilter{

public boolean match(MetadataReader metadataReader, MetadataReaderFactory metadataReaderFactory) throws IOException {
//注解信息
AnnotationMetadata annotationMetadata = metadataReader.getAnnotationMetadata();
//类信息
ClassMetadata classMetadata = metadataReader.getClassMetadata();
//路径信息
Resource resource = metadataReader.getResource();
//获取类名
String className = classMetadata.getClassName();
System.out.println(className);
if(className.contains("er")){
return true;
}
return false;
}
}
1
2
3
4
5
6
@ComponentScans({
@ComponentScan(value = {"com.study"}, excludeFilters={
@Filter(type=FilterType.ANNOTATION, value={Controller.class, Service.class}),
@Filter(type=FilterType.CUSTOM, value={MyTypeFilter.class}) //使用自定义的过滤规则
}) //配置扫描包
})

@Scope

作用在方法上,相当于 bean 中的scope属性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
/**
* prototype 多实例,当要用到时才会创建实例
* singleton 单例(默认),在spring容器创建时就创建实例了
* request 同一个请求创建一个实例(很少用到)
* session 同一个session创建一个实例(很少用到)
* @return
*/
@Scope("prototype")
@Bean
public Student student(){
System.out.println("创建student对象");
return new Student("张三", 15);
}

@Lazy

懒加载,作用单实例bean中,原来的单实例bean在spring容器加载后会自动创建实例,加了此注解后,会在第一次使用到此bean时才会创建。

1
2
3
4
5
6
@Lazy
@Bean
public Student student(){
System.out.println("创建student对象");
return new Student("张三", 15);
}

@Conditional

可以作用在类或方法上。
作用在方法上: 如果条件返回true,则创建bean
作用在类上: 如果返回true,此类的注册bean才会生效。

1
2
3
4
5
6
7
8
9
10
11
   @Conditional({WindowsCondition.class})
@Bean("windows")
public Student student2(){
return new Student("my is windows", 60);
}

@Conditional({LinuxCondition.class})
@Bean("linux")
public Student student3(){
return new Student("my is linux", 50);
}
1
2
3
4
5
6
7
8
9
10
11
//判断是否是Linux系统
public class LinuxCondition implements Condition {
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
Environment environment = context.getEnvironment();
String name = environment.getProperty("os.name");
if(name.contains("linux")){
return true;
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24

//判断是否是windows系统
public class WindowsCondition implements Condition{

/**
* context:程序上下文
* metadata:注解信息
*/
public boolean matches(ConditionContext context, AnnotatedTypeMetadata metadata) {
//bean工厂
ConfigurableListableBeanFactory beanFactory = context.getBeanFactory();
//程序运行环境
Environment environment = context.getEnvironment();
//bean注册信息
BeanDefinitionRegistry registry = context.getRegistry();

String name = environment.getProperty("os.name");
if(name.contains("Windows")){
return true;
}
return false;
}

}

@Import

作用:快速的给容器导入一个组件

  • 使用方法1:

    1
    2
    @Import({要导入到容器中的组件}),容器中就会自动注册这个组件,id是类的全路径
    @Import({RedisUtils.class})
  • 使用方法2:
    ImportSelector:返回需要导入的组件的全类名数组,这个在springboot中底层很常用

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    //实现ImportSelector接口
    public class MyImportSelector implements ImportSelector{

    //importingClassMetadata:注解信息
    public String[] selectImports(AnnotationMetadata importingClassMetadata) {
    return new String[]{"com.oyr.bean.Red", "com.oyr.bean.Yellow"};
    }

    }
    @Import({ MyImportSelector.class})
  • 使用方法3:
    ImportBeanDefinitionRegistrar:手动注册bean到容器中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    //实现ImportBeanDefinitionRegistrar接口
    public class MyImportBeanDefinitionRegistrar implements ImportBeanDefinitionRegistrar{

    /**
    * importingClassMetadata:注解信息
    * registry:bean注册对象
    */
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
    //是否有id为red Bean
    boolean b1 = registry.containsBeanDefinition("com.oyr.bean.Red");
    //是否有id为yellow bean
    boolean b2 = registry.containsBeanDefinition("com.oyr.bean.Yellow");
    if(b1 && b2){
    registry.registerBeanDefinition("blue", new RootBeanDefinition(Blue.class));
    }
    }

    }

容器注册逐渐总结:

  1. 包扫描 + 逐渐标注注解(@Controller/ @Service/ @Repository/ @Componnet)
  2. Bean 导入的第三方包里面的组件
  3. import 快速的给融资导入一个组件
    • @Import 需要导入容器中的bean,容器就会自动注解这个Bean,id是类的全路径
    • ImportSelector: 返回需要导入的组件的全类名数组
    • ImportBeanDefinitionRegistrar 手动注册bean到容器中
  4. 使用spring提供的FactoryBean(goncBean)
    • 默认获取到的是工厂bean调用getObject()创建的对象
    • 要获取到工厂bean本身,我们要在id前面加一个&

Bean 生命周期

指定初始化方法和销毁方法

  • 对象的创建:
    • 单实例: 在容器启动的时候创建对象
    • 多实例: 在每次获取对象时创建
  • 初始化:
    • 对象创建完成,并赋值好,调用初始化方法
  • 销毁:
    • 单实例在容器关闭的时候销毁
    • 多实例不会调用销毁方法,spring并不销毁,需要自己调用销毁方法。
  1. 使用@Bean指定初始化方法和销毁方法

    1
    2
    3
    4
    @Bean(initMethod="init", destroyMethod="destroy")
    public Color color(){
    return new Color();
    }
  2. 通过让bena实现InitializingBean(定义初始化逻辑), DisposableBean(定义销毁逻辑)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    public class Color2 implements InitializingBean, DisposableBean{

    public void destroy() throws Exception {
    // TODO Auto-generated method stub
    System.out.println("destroy");
    }

    public void afterPropertiesSet() throws Exception {
    // TODO Auto-generated method stub
    System.out.println("afterPropertiesSet");
    }
    }
  3. 使用JSR250
    @PostConstruct:在bean创建完成并且属性赋值完成:来执行初始化方法
    @PreDestroy:在容器销毁bena之前通知我们进行销毁工作

BeanPostProcessor:bean的前后置处理器
在bean初始化前后进行一些处理工作
postProcessBeforeInitialization:在初始化之前工作
postProcessAfterInitialization:在初始化之后进行工作

spring底层对BeanPostProcessor的使用:
Bean赋值,注入其他组件,@AutoWitred,生命周期注解功能,@Async,等等
都是使用BeanPostProcessor实现的。

线程池作用

java中的线程池是运用场景最多的并发框架,几乎所有需要异步或并发执行任务的程序都可以使用线程池。在并发过程中,合理使用线程池可以带来3个好处

  1. 降低资源消耗: 通过重复利用已创建的线程降低创建和销毁造成的消耗

  2. 提高响应速度: 当任务到达时,任务可以不需要等到线程创建就能立即执行。

  3. 提高线程的可管理性: 线程是稀缺资源,如果无限量的创建,不经不会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。

Executor 框架

Executor 框架是java5之后引进的,Executor框架不仅包过了线程池的管理,还提供了线程工厂、队列以及拒绝策略等,Executor框架让开发编程变的更加简单。

Executor框架

Executor 结构主要包含任务、任务的执行和异步结果的计算

  1. 任务(Runnable/Callable): 包括执行任务需要实现的Runnable接口或Callable接口。

    • Runnable接口或Callable接口实现类都可以被ThreadPoolExecutor 或 ScheduledThreadPoolExecutor执行。
  2. 任务的执行(Executor): 包过任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口。

    • Executor框架 有两个关键类实现了ExecutorService接口(ThreadPoolExecutor 和 SchecduledThreadPoolExecutor)。
  3. 异步结果的计算(Future): 包括接口Future 和实现了Future接口的FutureTasklei。

在 Executor 使用过程中,主线程首先要创建实现 Runnable 或者 Callable 接口的任务对象。工具类 Executors 可以把一个 Runnable 对象封装为一个 Callable 对象(Executors.callable(Runnable task) 或 Executors.callable(Runnable task,Object resule))。

如果执行 ExecutorService.submit(),ExecutorService 将返回一个实现 Future 接口的对象(FutureTask)。
由于 FutureTask 实现了 Runnable,我们也可以创建 FutureTask,然后直接交给 ExecutorService 执行。最后,主线程可以执行 FutureTask.get() 方法来等待任务执行完成。主线程也可以执行 FutureTask.cancel(boolean mayInterruptIfRunning)来取消此任务的执行。

ThreadPoolExecutor

核心参数

  • corePoolSize: 核心线程数。
    • 定义了最小可以同时运行的线程数据
  • maximumPoolSize: 最大线程数
    • 当队列中存放的任务达到队列容量时,当前可以同时运行的线程数量便为最大线程数。
  • wordQueue: 任务队列
    • 当新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到了则放入队列中。

常见参数

  • keepAliveTime: 多余线程存活时间
    • 当线程池中线程舒朗大于 corePoolSize时,如果没有新的任务提交,核心线程外的线程不会立即销毁,而是等待,知道等待的时候超过了keepAliveTime才会被销毁
  • unit: 时间单位
    • keepAliveTime参数的时间单位
  • threadFactory: 线程工厂
    • executor创建新线程时用到,可以用来给线程命名,查看当前创建线程数量,设置优先级,设置是否后台运行等。
  • handler: 饱和策略
    ThreadPoolExecutor饱和策略

如果当前同时运行的线程数量达到最大线程数量并且队列液晶被放满时,ThreadPoolTaskExecutor定义了一些策略:

  • ThreadPoolExecutor.AbortPolicy: 抛抛出 RejectedExecutionException 来拒绝新任务的处理。
  • ThreadPoolExecutor.CallerRunsPolicy: 调用执行自己的线程运行任务,也就是直接在调用 execute 方法的线程中运行被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。另外,这个策略喜欢增加队列容量。如果您的应用程序可以承受此延迟并且你不能任务丢弃任何一个任务请求的话,你可以选择这个策略。
  • ThreadPoolExecutor.DiscardPolicy: 不处理新任务,直接丢弃掉。
  • ThreadPoolExecutor.DiscardOldestPolicy: 此策略将丢弃最早的未处理的任务请求。

阿里建议用 ThreadPoolExecutor 构造函数的方式去创建线程池,不建议用 Executors 去创建,弊端如下:

  1. FixedThreadPool 和 SingleThreadExecutor : 允许请求的队列长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM。
  2. CachedThreadPool 和 ScheduledThreadPool : 允许创建的线程数量为 Integer.MAX_VALUE ,可能会创建大量线程,从而导致 OOM。

线程池执行原理

什么是锁

在计算机科学中,锁(lock)或互斥(mutex)是一种同步机制,用于在有许多执行线程的环境中强制对资源的访问限制。锁旨在强制实施互斥排他、并发控制策略。

锁的三个概念

  1. 锁开销(lock overhead): 锁占用内存空间、cpu初始化和销毁锁、获取和释放锁的时间。程序使用的锁越多,相应的锁开销也就越大。

  2. 锁竞争(lock contention): 一个进程或一个线程尝试获取另一个进程或线程持有的锁是,就会发生竞争,锁粒度越小,发生锁竞争的可能性就越小。锁的粒度有表锁、页锁、行锁、字段锁、字段的一部分锁

  3. 死锁(deadlock): 指两个或两个以上的进程在执行过程中,由于竞争资源或者由于彼此通讯而造成的一种阻塞现象,若无外力作用,它们都无法推进下去。
    死锁的规范定义:集合中的每一个进程都在等待只能由本集合中的其他进程才能引发的事件,那么该组进程是死锁的。
    死锁图解

Monitor 原理(核心)

java中每个对象都会携带一个monitor对象,存在在对象头(指针)中,sychronized就是通过它来实现同步锁的,这也是java中任何对象都可以作为锁的原因。同时也是 notify/notifyAll/wait等方法存在于顶级对象Object中的原因。

在HotSpot JVM中,Monitor由ObjectMonitor实现,她的数据结构分为三个区域:

Monitor的数据结构

  • 进入区(Entry Set): 存放处理阻塞状态的线程。表示线程要求获取对象的锁。
    • 如果对象未被锁住,则成为拥有者。
    • 否则 则进入等待区。 一旦对象锁被其他线程释放,立即参与竞争。
  • 等待区(Wait Set): 存放等待状态的线程。
    • 表示线程通过对象的wait方法,释放了对象的锁,并在等待区等待被唤醒。
  • 拥有者(Owner): 指向持有monitor对象的线程,表示某一线程成功竞争到对象锁。

当多个线程同时访问一段同步代码时,首先会进入 EntrySet,当线程获取到对象的 Monitor 后进入 Owner 区域并把 owner 变量设置为当前线程,同时 Monitor 中的计数器加1。若线程调用 wait() 方法,将释放当前持有的 monitor,owner 恢复为 null,计数器减 1,同时该线程进入 WaitSet 等待被唤醒。

MarkWord 原理(核心)

注意:整个对象头的描述结构的长度并不是固定不变的,首先在 32 位操作系统和 64 位操作系统中就有结构长度上的差异。另外在启用的对象指针压缩和没有启用对象指针压缩的情况下,整个对象头的长度也不一样:64 位平台下,原生对象头大小为 16 字节,压缩后为 12 字节。

前面有说明对象头会存放monitor对象的指针,对象头的结构主要三部分:

  • MarkWord: 保存对象当前锁机制的记录信息。
  • klassPointer: 指向mataspace中该类的元数据,用来表示是那个类的实例。
  • 数组长度: 只有数组形式的对象会有这个区域,用来表示数组长度。

当对象被sychronized关键字当成同步锁时,围绕这个锁的一系列操作都和MarkWord有关。Mark Word 在 32 位 JVM 中的长度是 32bit,在 64 位 JVM 中长度是 64bit。Mark Word 在不同的锁状态下存储的内容不同,在 32 位 JVM 中是这么存的:
MarkWord 32位JVM中存储的内容

其中无锁和偏向锁的锁标志位都是 01,只是在前面的 1bit 区分了这是无锁状态还是偏向锁状态。Epoch 是指偏向锁的时间戳。

锁的种类

java 主流锁

  1. 偏向锁/轻量级锁/重量级锁
  • 偏向锁
    自始至终,偏向锁都是不存在竞争的。因为其只是打个标记而已。一个对象初始化且没有线程获取它的锁时,即为可偏向。当第一个线程访问并尝试获取锁时,其会将这个线程记录下来。后续如果尝试获取锁的线程是其拥有者,即可直接获得锁,开销很小,性能最好。(锁标志位为01,是否偏向锁是 1)

  • 轻量级锁
    如果只有短时间的锁竞争,直接可通过CAS就可以解决而不需要完全互斥的重量级锁。轻量级锁是指在锁为偏向锁时,被另一个线程访问(已获取锁,说明存在竞争),那么偏向锁为自动升级轻量级锁,这时,等待的线程会通过自旋的方式尝试获取锁,这样就不会造成阻塞。(锁标志位为00)

  • 重量级锁
    重量级锁是互斥锁,是利用操作系统的同步机制实现的,所以开销大。一般为多个线程之间长时间的锁竞争,轻量级锁自旋一定的次数后(默认10次,jdk7以后,默认开启自旋,可以使用-XX:PreBlockSpin参数来设置自旋锁等待的次数)锁膨胀为重量级锁。重量级锁会让其他等待获取锁的线程直接进入阻塞状态。(锁标志位为10)

  • 锁的升级:无锁 -> 偏向锁 -> 轻量级锁 -> 重量级锁
  1. 可重入锁/非可重入锁
  • 可重入锁指:是线程持有了对象的锁,能在不释放锁的情况下,再次获得这把锁。
    • 可重入锁最典型的就是ReentrantLock了,是Lock接口最主要的一个实现类
  • 不可重入锁:指的是虽然线程当前持有了对象的锁,但是想再次获取锁必须先释放才能获取。
  1. 共享锁/独占锁
  • 共享锁: 一个对象的锁,能被多个线程同时获取。
    • 读写锁中的读锁即为共享锁。读锁可以被同时读,可以同时被多个线程持有
  • 独占锁: 一个对象的锁,同一时间只能被一个线程使用。
    • 读写锁中的写锁即为独占锁,写锁最多只能同时被一个线程持有。
  1. 公平锁/非公平锁
  • 公平锁:公平锁是指当线程拿不到对象锁时,会进去等待队列,而等待队列中,等待时间长则优先测试获取锁,先进先出。
    AQS中线程2进入等待队列 则提现了公平锁
    线程2的公平锁

  • 非公平锁:非公平锁则会忽略掉等待队列中的线程,直接去尝试获取锁,发生插队现象。
    线程2的不公平锁

  • 如果用默认的构建函数来创建ReentrantLock对象,默认的锁策略就是非公平的,如果想构建的ReentrantLock实现公平锁策略

    1
    2
    //构建ReentrantLock的时候 传入true 即代表ReentrantLock是公平锁策略
    ReentrantLock lock = new ReentrantLock(true);
  1. 悲观锁/乐观锁
  • 悲观锁: 在获取对对象时,必须先获取锁,以达到独占的状态。

  • 乐观锁: 与悲观锁相反,在获取对象前不要求先获得锁,往往利用CAS,在不独占对象的情况下,实现对象的修改。

  1. 自旋锁/非自旋锁
  • 自旋锁: 指线程在获取不到锁的时候,不直接阻塞或释放CPU资源,而是开始利用循环,不停的尝试获取锁,就像是线程在 “自我旋转” 所以叫自旋,轻量级锁中常用到。

  • 非自旋锁: 指线程在获取不到锁的时候,线程直接放弃,或去进行其他操作,如:阻塞、排队等。

  1. 可中断锁/不可中断锁
  • 可中断锁: java中sychronized关键字修饰的锁就是不可中断的,一旦线程申请了锁,只能等拿到锁后才能进行其他的逻辑处理

  • 不可中断锁: ReentrantLock 是一种典型的可中断锁,例如lockInterruptibly 方式在获取锁的过程中,突然不想获取了,那么也可以在中断之后去做其他的事情,不需等到获取锁才能离开

什么是AQS

AQS Abstract Queued Synchronizer 抽象队列同步器

AQS定义了一套多线程访问共享资源的同步器框架,许多同步类实现都依赖于它一个

AQS对象内部的有一个核心变量state, int类型,代表加锁状态。初始化状态下state为0
AQS 内部还有一个关键变量,用来记录当前加锁的事哪个线程,init下这个变量为null

java并发与AQS的关系

java 并发api的使用 简单如

1
2
3
4
5
6
7
8
9
ReentrantLock lock = new ReentrantLock();

lock.lock(); //加锁

//...业务逻辑

lock.unlock();//释放锁

//以上代码:初始化一个lock 对象,然后加锁 和释放锁
以reentrantLock为例, 与AQS的关系主要是因为:
java并发包下很多API都是基于AQS来实现 加锁和释放锁的等功能的,
AQS 是java并发包的基础类。

ReentrantLock-重入锁

可重入锁即是 可以对一个ReentrantLock对象多次的执行 lock()与unlock()操作。
也就是可以对一个锁加锁解锁多次

每次线程可重入加锁一次,会判断一下 当前加锁的线程如果是自己,那么就线程就可重入多次加锁,每次加锁都是将state的 值累加1,其他不变化。

ReentrantLock的加锁和释放锁的底层原理

当一个ReentrantLock 尝试对一个对象进行lock 操作时 主要有以下操作
  1. 线程1通过调用ReentranLock的lock()来参数进行加锁,这里的加锁过程是直接通过CAS操作将 state 由0 变为1。
    如果之前没有线程尝试过获取锁,那么state肯定为0,此时线程1就可以加锁成功 线程1加锁成功后,就可以设置AQS的加锁线程变量设置为自己

线程1尝试加锁

从上面的图中可以简单的看出 ReentrantLock 其实就是一个外层API,
内核中实现的锁机制都是用来的AQS实现的。
  1. 线程1加锁了后,线程2跑过来加锁时,会通过CAS判断state是否为0,为1则代表了当前对象有线程加锁了。紧接着会去判断,加锁线程是否为自己,是自己则获取锁成功,而当前是线程1获取,则线程2 获取锁失败

线程2尝试加锁

接着 线程2 会将自己放入AQS的一个线程的等待队列中等待,
当线程1是否锁后,可以再次尝试去加锁

AQS加锁失败等待队列

线程1在执行完业务逻辑后,就会释放锁,释放锁的过程很简单,就是将AQS的state值逐步减1,当state为0时,则彻底释放锁,同时设置加锁线程变量为null
线程1释放锁

  1. 接下来就是从等待队列中唤醒线程2尝试重新加锁。
    线程2开始重复步骤2的操作加锁,加锁成功后,将state设置1,并将加锁线程变量设置为自己

线程2唤醒后尝试加锁

总结

本文主要介绍了AQS的作用、ReentrantLock以及AQS与java并发的关系
AQS 其实就是一个java并发的基础组件,用来实现各种锁、各种同步组件。  
它包含了:state变量、加锁线程变量、等待队列等并发中的核心组件。

[来源参考]:https://shishan100.gitee.io/docs/#/./docs/page/page3

前提

当前文章代码演示,是基于amqp原生或spring整合的。

持久化

在生产过程中,难免会发生服务器宕机的事情,RabbitMQ也不例外,可能由于某种特殊情况下的异常而导致RabbitMQ宕机从而重启,那么这个时候对于消息队列里的数据,包括交换机、队列以及队列中存在消息恢复就显得尤为重要了。
RabbitMQ本身带有持久化机制,包括交换机、队列以及消息的持久化。持久化的主要机制就是将信息写入磁盘,当RabbtiMQ服务宕机重启后,从磁盘中读取存入的持久化信息,恢复数据。

交换机持久化

概念

交换机可以有两个状态:持久(durable)、暂存(transient)。
持久化的交换机会在消息代理(broker)重启后依旧存在,而暂存的交换机则不会(它们需要在代理再次上线后重新被声明)。

注意:并不是所有的应用场景都需要持久化的交换机。

交换机持久化实现

1
2
3
// 声明交换机,并且指定持久化
boolean exchangeDurable = true;
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, exchangeDurable, false, null);

队列持久化

概念

持久化队列(Durable queues)会被存储在磁盘上,当消息代理(broker)重启的时候,它依旧存在。没有被持久化的队列称作暂存队列(Transient queues)。
持久化的队列并不会使得路由到它的消息也具有持久性。倘若消息代理挂掉了,重新启动,那么在重启的过程中持久化队列会被重新声明,无论怎样,只有经过持久化的消息才能被重新恢复。

注意:并不是所有的应用场景都需要持久化的队列。

队列持久化实现

1
2
3
// 声明队列,并且指定持久化
boolean durable = true;
channel.queueDeclare(QUEUE_NAME, durable, false, false, null);

消息持久化

概念

消息能够以持久化的方式发布,RabbitMQ 会将此消息存储在磁盘上。如果服务器重启,持久化消息不会丢失。
将消息发送给一个持久化的交换机或者路由给一个持久化的队列,并不会使得此消息具有持久化性质:它完全取决与消息本身的持久模式(persistence mode)。将消息以持久化方式发布时,会对性能造成一定的影响(就像数据库操作一样,健壮性的存在必定造成一些性能牺牲)。

注意:RabbitMQ的消息是依附于队列存在的,所以想要消息持久化,那么前提是队列也要持久化。

消息持久化实现

要想让消息实现持久化需要在消息生产者修改代码,在发送消息时添加属性:MessageProperties.PERSISTENT_TEXT_PLAIN

1
2
3
// 发送消息,并且指定持久化
// channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, "hello".getBytes());

消息确认(producer ack)

消息确认是针对消息生产者的。

概念

前提知识点:rabbitmq整个消息投递的路径为 producer -> rabbitmq broker -> exchange -> queue -> consumer。

在生产环境中由于一些不明原因,导致 rabbitmq 重启,在 RabbitMQ 重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复。
思考:如何才能进行 RabbitMQ 的消息可靠投递呢?

消息确认包含两部分。
第一部分:用来确认生产者是否成功将消息发送到 exchange。
第二部分:用来确认 exchange 路由消息给 queue 的过程中,消息是否成功投递。

RabbitMQ 为我们提供了两种方式来确保消息的投递:

  • confirm 确认模式
    • 对应消息确认第一部分
  • return 退回模式
    • 对应消息确认第二部分
阅读全文 »