对于elasticsearch:删除文档后如何释放存储大小感兴趣的读者,本文将会是一篇不错的选择,我们将详细介绍elasticsearch删除,并为您提供关于ElasticSearch-文档存储、El
对于elasticsearch:删除文档后如何释放存储大小感兴趣的读者,本文将会是一篇不错的选择,我们将详细介绍elasticsearch 删除,并为您提供关于ElasticSearch - 文档存储、Elasticsearch 2.20 文档篇:更新删除文档、Elasticsearch CentOS6.5下安装ElasticSearch6.2.4+elasticsearch-head+Kibana、Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)的有用信息。
本文目录一览:- elasticsearch:删除文档后如何释放存储大小(elasticsearch 删除)
- ElasticSearch - 文档存储
- Elasticsearch 2.20 文档篇:更新删除文档
- Elasticsearch CentOS6.5下安装ElasticSearch6.2.4+elasticsearch-head+Kibana
- Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)
elasticsearch:删除文档后如何释放存储大小(elasticsearch 删除)
在我的elasticsearch服务器上:文档总数:300万,总大小:3.6G然后,我删除了约280万文档:文档总数:约13万,总大小:3.6G
我已删除文件,如何释放文件大小?
答案1
小编典典删除文档只会将其标记为已删除,因此将不会对其进行搜索。要回收磁盘空间,必须优化索引:
curl -XPOST ''http://localhost:9200/_optimize?only_expunge_deletes=true''
文档:
http :
//www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-
optimize.html
该文档已移至:https : //www.elastic.co/guide/zh-
CN/elasticsearch/reference/current/indices-
forcemerge.html
更新资料
与Elasticsearch 2.1.x的开始,optimize
已废弃的青睐forcemerge
。API是相同的,只是端点没有更改。
curl -XPOST ''http://localhost:9200/_forcemerge?only_expunge_deletes=true''
ElasticSearch - 文档存储
shard
确定shard的公式:
shard = hash(routing) % number_of_primary_shards
routing 默认是文档的 _id ,也可以设置成一个自定义的值。
因此要在创建索引的时候就确定好主分片的数量,并且永远不会改变这个数量,因为如果数量变化了,那么所有之前路由的值都会无效。
每个节点都知道集群中任一文档位置,所以可以直接将请求发到任意节点。当然,为了扩展负载,更好的做法是轮询发送集群中所有的节点。
shard、replica同步
新建、索引或者删除请求
步骤:
- 户端向 Node 1 发送新建、索引或者删除请求。
- 节点使用文档的 _id 确定文档属于shard 0 。请求会被转发到 Node 3(因为shard 0 被分配在 Node 3 上)。
- Node 3 在shard 0上面执行请求。如果成功了,它将请求并行转发到 Node 1 和 Node 2 的replica R0上。一旦所有的R0都报告成功, Node 3 将向Node 1报告成功,Node 1向客户端报告成功。
查询请求
步骤:
- 客户端向 Node 1 发送查询请求。
- node 1使用文档的 _id 来确定文档属于shard 0 。replica 0 存在于Node 1、Node 2上。 这种情况下,每次请求都会轮询路由Node 1、Node 2,这里假设路由到Node 2。
- Node 2将文档返回给Node 1 ,然后Node 1将文档返回给客户端。
注意:
在文档被检索时,已经被索引的文档可能已经存在于shard上但是还没有复制到replica。即强一致性无法保证,但最终一致性可以保证。
更新请求
POST /{index}/{type}/{id}/_update
步骤略。
注意:
Node 3 从P0检索文档,修改 _source 字段中的 JSON ,并且尝试重新索引主分片的文档。 如果文档已经被另一个进程修改(_version冲突),它会重试步骤 3 ,超过 retry_on_conflict 次后放弃。
如果 Node 3 成功地更新文档,它将新版本的文档并行转发(新建、索引或者删除请求的步骤里转发的是请求而不是文档)到 Node 1 和 Node 2 上的R0,重新建立索引。一旦所有R0都返回成功,Node 3 向Node 1返回成功,Node 1向客户端返回成功。
为什么要转发文档而不是请求?
因为转发请求是异步的,不能保证到达replica的顺序,若转发更新请求,则可能以错误的顺序应用更改,导致得到损坏的文档。而转发文档,则replica可以直接根据_version来获取最大版本号的文档。
批量请求(_mget 、_bulk)
bulk步骤:
- 客户端向 Node 1 发送 bulk 请求。
- Node 1 为每个命中的shard创建一个批量请求,并将这些请求并行转发到每个包含该shard的节点。
- shard按顺序(请求报文体中的子请求顺序)执行每个操作。当一个子请求操作成功时,shard并行转发新文档(或删除)到replica,然后执行下一个子请求。 一旦所有的子请求的replica向shard报告完成(无论执行是否成功),该shard将向node 1报告,node 1将这些响应收集整理并返回给客户端。
可选的请求参数
-
consistency
解决分布式脑裂问题,参数的值可以设为 one (只要该index的shard状态 ok 就允许执行_写_操作),all(必须要该index的shard和多个replica的状态没问题才允许执行_写_操作), 或 quorum (默认值)。
# quorum 即大于一半的分片,注意,number_of_replicas是primary shard对应的replica数量,不是全部replica数量 int( (primary + number_of_replicas) / 2 ) + 1
注意:新索引默认有 1 个replica,这意味着为了满足consistency至少需要两个活动的node。 这就会阻止我们在单一node上做任何事情。为了避免这个问题,ES要求只有当 number_of_replicas 大于1的时候,consistency策略才会执行。
-
timeout
如果没有足够的replica,Elasticsearch会等待,直到满足consistency策略。默认最多等待1分钟。 你可以使用 timeout 参数 使它更早终止: 100 是100毫秒,30s 是30秒。
一个Lucene索引在Elasticsearch称作shard。 一个Elasticsearch索引是shard的集合。
Elasticsearch 2.20 文档篇:更新删除文档
Elasticsearch 的更新文档 API 准许通过脚本操作来更新文档。更新操作从索引中获取文档,执行脚本,然后获得返回结果。它使用版本号来控制文档获取或者重建索引。
备注:在 Elasticsearch 中的更新操作是完全从新索引文件。
我们新建一个文档:
请求:PUT http://localhost:9200/test/type1/1?pretty
参数:
{
"counter" : 1,
"tags" : ["red"]
}
脚本开启功能
在最新版本的 Elasticsearch 中,基于安全考虑 (如果用不到,请保持禁用), 默认禁用了动态脚本功能。如果被禁用,在使用脚本的时候则报以下的错误:
scripts of
type
[inline], operation [update] and lang [groovy] are disabled
可以用以下方式完全开启动态脚本功能,在 config/elasticsearch.yml 文件,在最后添加以下代码:
script.inline: on
script.indexed: on
script.file: on
配置后,重启 Elasticsearch。
下面我们用脚本来更新此文档。
请求:POST http://localhost:9200/test/type1/1/_update?pretty
参数:
{
"script" : {
"inline": "ctx._source.counter += count",
"params" : {
"count" : 4
}
}
}
执行完后,我们在查询一下文档内容,可以发现 counter 的值为 5:
{
"_index" : "test",
"_type" : "type1",
"_id" : "1",
"_version" : 5,
"found" : true,
"_source" : {
"counter" : 5,
"tags" : [ "red" ]
}
}
在看下面的更新操作:
请求:POST http://localhost:9200/test/type1/1/_update?pretty
参数:
{
"script" : {
"inline": "ctx._source.tags += tag",
"params" : {
"tag" : "blue"
}
}
}
返回的内容为,表示更新成功,我们看一下_version 为 6,比刚才的值增加了 1:
{
"_index" : "test",
"_type" : "type1",
"_id" : "1",
"_version" : 6,
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
}
}
然后我们在查询一下文档内容:
{
"_index" : "test",
"_type" : "type1",
"_id" : "1",
"_version" : 6,
"found" : true,
"_source" : {
"counter" : 5,
"tags" : [ "red", "blue" ]
}
}
在脚本中除了_source 外其他内置参数也可以使用,例如_index, _type, _id, _version, _routing, _parent, _timestamp, _ttl。
下面我们通过脚本增加一列。
请求:POST http://localhost:9200/test/type1/1/_update?pretty
参数:
{
"script" : "ctx._source.name_of_new_field = \"value_of_new_field\""
}
然后查询此文档:
{
"_index" : "test",
"_type" : "type1",
"_id" : "1",
"_version" : 7,
"found" : true,
"_source" : {
"counter" : 5,
"tags" : [ "red", "blue" ],
"name_of_new_field" : "value_of_new_field"
}
}
从中可以看出,文档中又增加了一列。
删除一列,请求和刚才的一样,参数变为:
{
"script" : "ctx._source.remove(\"name_of_new_field\")"
}
甚至可以通过表达式来判断做某些事情,例如:下面的示例将删除的文件如果标签字段包含蓝色,否则什么也不做(空):
请求参数:
{
"script" : {
"inline": "ctx._source.tags.contains(tag) ? ctx.op = \"delete\" : ctx.op = \"none\"",
"params" : {
"tag" : "blue"
}
}
}
部分文档更新:
该更新接口还支持更新部分文档,将文档合并到现有文档中(简单的递归合并、对象的内部合并、替换核心的 “键 / 值” 和数组)。例如:
{
"doc" : {
"name" : "new_name"
}
}
更新后,可以发现文档中多了一列 name。
{
"_index" : "test",
"_type" : "type1",
"_id" : "1",
"_version" : 23,
"found" : true,
"_source" : {
"counter" : 5,
"tags" : [ "red", "blue" ],
"name" : "new_name"
}
}
当文档指定的值与现有的_source 合并。当新的文档和老的文档不一致的时候,文档将会被从新建立索引。当新旧文档一样的时候,则不进行从建索引的操作。可以通过设置 detect_noop 为 false,让任何情况下都从新建立索引,例如下面的更新操作:
{
"doc" : {
"name" : "new_name"
},
"detect_noop": false
}
删除文档
删除文档相对比较简单:
请求:DELETE http://localhost:9200/test/type1/1
返回的内容为:
{
"found": true,
"_index": "test",
"_type": "type1",
"_id": "1",
"_version": 24,
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}}
则表示删除了此文档。
赛克蓝德 (secisland) 后续会逐步对 Elasticsearch 的最新版本的各项功能进行分析,近请期待。也欢迎加入 secisland 公众号进行关注。
Elasticsearch CentOS6.5下安装ElasticSearch6.2.4+elasticsearch-head+Kibana
CentOS6.5下安装ElasticSearch6.2.4
(1)配置JDK环境
配置环境变量
export JAVA_HOME="/opt/jdk1.8.0_144"
export PATH="$JAVA_HOME/bin:$PATH"
export CLASSPATH=".:$JAVA_HOME/lib"
(2)安装ElasticSearch6.2.4
下载地址:https://www.elastic.co/cn/downloads/past-releases/elasticsearch-6-2-4
启动报错:
解决方式:
bin/elasticsearch -Des.insecure.allow.root=true
或者修改bin/elasticsearch,加上ES_JAVA_OPTS属性:
ES_JAVA_OPTS="-Des.insecure.allow.root=true"
再次启动:
这是出于系统安全考虑设置的条件。由于ElasticSearch可以接收用户输入的脚本并且执行,为了系统安全考 虑,建议创建一个单独的用户用来运行ElasticSearch。
如果没有普通用户就要创建一个普通用户组和普通用户,下面介绍一下怎么创建用户组和普通用户
创建用户组和用户:
groupadd esgroup
useradd esuser -g esgroup -p espassword
更改elasticsearch文件夹及内部文件的所属用户及组:
cd /opt
chown -R esuser:esgroup elasticsearch-6.2.4
切换用户并运行:
su esuser
./bin/elasticsearch
再次启动显示已杀死:
需要调整JVM的内存大小:
vi bin/elasticsearch
ES_JAVA_OPTS="-Xms512m -Xmx512m"
再次启动:启动成功
如果显示如下类似信息:
[INFO ][o.e.c.r.a.DiskThresholdMonitor] [ZAds5FP] low disk watermark [85%] exceeded on [ZAds5FPeTY-ZUKjXd7HJKA][ZAds5FP][/opt/elasticsearch-6.2.4/data/nodes/0] free: 1.2gb[14.2%], replicas will not be assigned to this node
需要清理磁盘空间。
后台运行:./bin/elasticsearch -d
测试连接:curl 127.0.0.1:9200
会看到一下JSON数据:
[root@localhost ~]# curl 127.0.0.1:9200
{
"name" : "rBrMTNx",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "-noR5DxFRsyvAFvAzxl07g",
"version" : {
"number" : "5.1.1",
"build_hash" : "5395e21",
"build_date" : "2016-12-06T12:36:15.409Z",
"build_snapshot" : false,
"lucene_version" : "6.3.0"
},
"tagline" : "You Know, for Search"
}
实现远程访问:
需要对config/elasticsearch.yml进行 配置:
network.host: hadoop-001
再次启动报错:Failed to load settings from [elasticsearch.yml]
这个错就是参数的冒号前后没有加空格,加了之后就好,我找了好久这个问题;
后来在一个外国网站找到了这句话.
Exception in thread "main" SettingsException[Failed to load settings from [elasticsearch.yml]]; nested: ElasticsearchParseException[malformed, expected end of settings but encountered additional content starting at line number: [3], column number: [1]]; nested: ParserException[expected ''<document start>'', but found BlockMappingStart
in ''reader'', line 3, column 1:
node.rack : r1
^
];
Likely root cause: expected ''<document start>'', but found BlockMappingStart
in ''reader'', line 3, column 1:
node.rack : r1
这个是行的开头没有加空格,fuck!
Exception in thread "main" SettingsException[Failed to load settings from [elasticsearch.yml]]; nested: ScannerException[while scanning a simple key
in ''reader'', line 11, column 2:
discovery.zen.ping.unicast.hosts ...
^
参数冒号后加空格,或者是数组中间加空格
例如:
# discovery.zen.minimum_master_nodes: 3
再次启动
还是报错
max file descriptors [4096] for elasticsearch process is too low
处理第一个错误:
vim /etc/security/limits.conf //文件最后加入
esuser soft nofile 65536
esuser hard nofile 65536
esuser soft nproc 4096
esuser hard nproc 4096
处理第二个错误:
进入limits.d目录下修改配置文件。
vim /etc/security/limits.d/20-nproc.conf
修改为 esuser soft nproc 4096
注意重新登录生效!!!!!!!!
处理第三个错误:
vim /etc/sysctl.conf
vm.max_map_count=655360
执行以下命令生效:
sysctl -p
关闭防火墙:systemctl stop firewalld.service
启动又又又报错
system call filters failed to install; check the logs and fix your configuration or disable sys
直接在
config/elasticsearch.yml 末尾加上一句
bootstrap.system_call_filter: false
再次启动成功!
安装Head插件
Head是elasticsearch的集群管理工具,可以用于数据的浏览和查询
(1)elasticsearch-head是一款开源软件,被托管在github上面,所以如果我们要使用它,必须先安装git,通过git获取elasticsearch-head
(2)运行elasticsearch-head会用到grunt,而grunt需要npm包管理器,所以nodejs是必须要安装的
nodejs和npm安装:
http://blog.java1234.com/blog/articles/354.html
git安装
yum install -y git
(3)elasticsearch5.0之后,elasticsearch-head不做为插件放在其plugins目录下了。
使用git拷贝elasticsearch-head到本地
cd ~
git clone git://github.com/mobz/elasticsearch-head.git
(4)安装elasticsearch-head依赖包
[root@localhost local]# npm install -g grunt-cli
[root@localhost _site]# cd /usr/local/elasticsearch-head/
[root@localhost elasticsearch-head]# cnpm install
(5)修改Gruntfile.js
[root@localhost _site]# cd /usr/local/elasticsearch-head/
[root@localhost elasticsearch-head]# vi Gruntfile.js
在connect-->server-->options下面添加:hostname:’*’,允许所有IP可以访问
(6)修改elasticsearch-head默认连接地址
[root@localhost elasticsearch-head]# cd /usr/local/elasticsearch-head/_site/
[root@localhost _site]# vi app.js
将this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://localhost:9200";中的localhost修改成你es的服务器地址
(7)配置elasticsearch允许跨域访问
打开elasticsearch的配置文件elasticsearch.yml,在文件末尾追加下面两行代码即可:
http.cors.enabled: true
http.cors.allow-origin: "*"
(8)打开9100端口
[root@localhost elasticsearch-head]# firewall-cmd --zone=public --add-port=9100/tcp --permanent
重启防火墙
[root@localhost elasticsearch-head]# firewall-cmd --reload
(9)启动elasticsearch
(10)启动elasticsearch-head
[root@localhost _site]# cd ~/elasticsearch-head/
[root@localhost elasticsearch-head]# node_modules/grunt/bin/grunt server 或者 npm run start
(11)访问elasticsearch-head
关闭防火墙:systemctl stop firewalld.service
浏览器输入网址:hadoop-001:9100/
安装Kibana
Kibana是一个针对Elasticsearch的开源分析及可视化平台,使用Kibana可以查询、查看并与存储在ES索引的数据进行交互操作,使用Kibana能执行高级的数据分析,并能以图表、表格和地图的形式查看数据
(1)下载Kibana
https://www.elastic.co/downloads/kibana
(2)把下载好的压缩包拷贝到/soft目录下
(3)解压缩,并把解压后的目录移动到/user/local/kibana
(4)编辑kibana配置文件
[root@localhost /]# vi /usr/local/kibana/config/kibana.yml
将server.host,elasticsearch.url修改成所在服务器的ip地址
server.port: 5601 //监听端口
server.host: "hadoo-001" //监听IP地址,建议内网ip
elasticsearch.url: "http:/hadoo-001" //elasticsearch连接kibana的URL,也可以填写192.168.137.188,因为它们是一个集群
(5)开启5601端口
Kibana的默认端口是5601
开启防火墙:systemctl start firewalld.service
开启5601端口:firewall-cmd --permanent --zone=public --add-port=5601/tcp
重启防火墙:firewall-cmd –reload
(6)启动Kibana
[root@localhost /]# /usr/local/kibana/bin/kibana
浏览器访问:http://192.168.137.188:5601
安装中文分词器
一.离线安装
(1)下载中文分词器
https://github.com/medcl/elasticsearch-analysis-ik
下载elasticsearch-analysis-ik-master.zip
(2)解压elasticsearch-analysis-ik-master.zip
unzip elasticsearch-analysis-ik-master.zip
(3)进入elasticsearch-analysis-ik-master,编译源码
mvn clean install -Dmaven.test.skip=true
(4)在es的plugins文件夹下创建目录ik
(5)将编译后生成的elasticsearch-analysis-ik-版本.zip移动到ik下,并解压
(6)解压后的内容移动到ik目录下
二.在线安装
./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.2.4/elasticsearch-analysis-ik-6.2.4.zip
Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)
Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)
关于Elasticsearch
安装方式
文档
中文文档
Elasticsearch
Elasticsearch DSL操作文档
本文通过python模块对elasticsearch进行简单的增删改查。
本文测试脚本仓库地址:https://github.com/YangJunJ/studyElasticsearch.git
环境
- Python 3.6.2
- elasticsearch 7.13.1
- elasticsearch-dsl 7.3.0
- elasticsearch 7.13.1
安装下面两个python模块
sudo pip3 install elasticsearch
sudo pip3 install elasticsearch_dsl
建立连接
创建文件elasticsearch_obj.py
from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, UpdateByQuery
class ElasticSearchObj(object):
def __init__(self):
self.user_key = "user_index"
self.friend_key = "friend_index"
self.test_count_key = "count_test_key"
# 初始化一个Elasticsearch实例对象
self.client = Elasticsearch(hosts=[{'host': "127.0.0.1", "port": 9200}])
def search(self, index):
# 返回索引对象结果级
return Search(using=self.client, index=index)
def update(self, index):
# ubq = UpdateByQuery(index=index).using(self.client)
# or
ubq = UpdateByQuery(index=index, using=self.client)
return ubq
创建或者更新文档
Elasticsearch 使用 JSON作为文档的序列化格式,用起来有点项nosql,文档的字段数量也可以像mongodb文档一样,不一致不相同的形式存储
index插入
create插入
创建文件study_insert.py
import random
from elasticsearch_obj import ElasticSearchObj
class CreateNewRecord(ElasticSearchObj):
def index(self, index, doc_id=None, doc_type="_doc", **kwargs):
"""
如果索引不存在,则会创建索引并写入文档
self.client.index(index, body, doc_type=None, id=None, params=None, headers=None)
http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
:param index: 文档索引名
:param doc_id: 当doc_id=None时,通过该方法会向文档中插入一条记录,doc_id!=None时,如果doc_id已经存在则会更新,不存在则插入
:param doc_type: 文档类型,默认为_doc,相同index下的记录值必须一致,如果不相等,则会触发异常
:param kwargs: 文档内容
:return:
"""
return self.client.index(index=index, body=kwargs, doc_type=doc_type, id=doc_id)
def create(self, index, doc_id=None, doc_type="_doc", **kwargs):
"""
http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
用法同index,不同的是create必须传递文档Id,否则会触发异常错误
:param index:
:param doc_id:
:param doc_type:
:param kwargs:
:return:
"""
return self.client.create(index=index, id=doc_id, doc_type=doc_type, body=kwargs)
def test_insert(self):
"""
插入一些记录,方便等会查询时使用
:return:
"""
hobby = ["唱", "跳", "rap", "唱跳rap", "游泳", "打球", "其他"]
for i in range(100):
response = self.index(
index=self.user_key,
id=i,
name="".join(random.sample('zyxwvutsrqponmlkjihgfedcba', random.randint(3, 8))),
age=random.randint(18, 30),
hobby=random.choice(hobby),
height=random.randint(155, 185),
sex=random.choice(["男", "女", "不详"])
)
print(f"{self.user_key}插入{i}条数据, 返回:{response}")
for i in range(100):
response = self.index(
index=self.friend_key,
user_id=i,
friend_id=random.randint(0, 99),
word=random.sample(["拧螺丝", "撸代码", "CRUD", "划水", "偷偷写下bug"], random.randint(1, 3)),
other={"work_place": random.choice(["第一排", "第二排"]), "performance": random.choice(["好", "不好"])}
)
print(f"{self.friend_key}插入{i}条数据, 返回:{response}")
for i in range(20000):
response = self.index(
index=self.test_count_key,
user_id=i,
friend_id=random.randint(0, 99),
weight=random.randint(0, 10000)
)
print(f"{self.test_count_key}插入{i}条数据, 返回:{response}")
if __name__ == "__main__":
test = CreateNewRecord()
test.test_insert()
查询
页查询子句
- 模糊查询:match, 返回与提供的文本、数字、日期或布尔值匹配的文档,包括用于模糊匹配
- 精确查询:term, 该术语必须与字段值完全匹配,包括空格和大写
- 范围查询:range, 返回包含提供范围内的术语的文档
- gt: 大于(可选)
- gte: 大于或等于(可选)
- lt: 小于(可选)
- lte:小于或等于(可选)
- format:用于转换date查询中值的日期格式(可选,字符串)
查询上下文query & 过滤上下文filter 差别
用法可以查看下面脚本
创建文件study_search.py
from elasticsearch_obj import ElasticSearchObj
class Search(ElasticSearchObj):
"""
示例 elasticsearch_dsl.Search 基本操作
查询的字段值中如果包含中文、下划线等其他特殊字符, 字段需要需要加上 __keyword,否则无法匹配, 用法详见示例
介绍两个查询中需要使用到的方法:
elasticsearch_dsl.Search.scan(): 将搜索条件扫描索引,返回所有与条件匹配文档生成器
elasticsearch_dsl.Search.to_dict(): 将hit对象系列化成python字典记录
"""
def get_all_record(self):
"""
示例获取self.key下的所有文档
:return:
"""
# all_record 是 elasticsearch_dsl.search.Search对象,可以通过scan() 方法遍历self.key索引内的所有文档
all_record = self.search(self.user_key)
for record in all_record.scan():
# record 是 elasticsearch_dsl.response.hit.Hit 对象
# 可以通过.属性获取字段值, 比如: record.name
# 如果字段名不存在,则会触发'Hit' object has no attribute 错误
print(f"record.id:{record.id}, record.name:{record.name}")
# 上面的是其中一种方法,它也可以通过to_dict()方法,将记录通过python字典的形式返回
print(record.to_dict())
def print_all_record(self, explain, all_record, max_count=3):
i = 1
for record in all_record.scan():
print(explain, record.to_dict())
if i >= max_count:
print(explain, f"查询数量大于{i},后面的记录不打印了")
break
i += 1
self.cross_line()
@staticmethod
def cross_line():
print('=================================================')
def get_record_by_query(self):
"""
示例query的常用用法
:return:
"""
all_record = self.search(self.user_key)
# 例如要找到爱好中包含rap的用户信息
match_rap = all_record.query("match", hobby="rap")
self.print_all_record("查看所有包含爱好rap的用户:", match_rap)
# 查找爱好中含有rap,并且性别为女的用户信息
match_rap_female = all_record.query("match", hobby="rap").query("term", sex__keyword="女")
# match_rap_female = match_rap.query("term", sex__keyword="女")
self.print_all_record("查看爱好含有rap的所有女用户:", match_rap_female)
# 查找身高大于175的男用户,并且爱好只有唱的用户
man_sing_gt_175 = all_record.query("range", height={"gt": 175}).query(
"term", sex__keyword="男").query(
"term", hobby__keyword="唱")
self.print_all_record("查找身高大于175的男用户,并且爱好只有唱的用户", man_sing_gt_175)
def get_record_by_filter(self):
"""
示例filter的用法
filter 与 query用法一样,这里只举一个简单例子
:return:
"""
all_record = self.search(self.user_key)
# 查找年龄大于等于25的男用户,并且爱好是唱跳rap
all_record = all_record.filter("range", age={"gte": 25}).filter(
"term", sex__keyword="男").filter(
"term", hobby__keyword="唱跳rap")
self.print_all_record("查找年龄大于等于25的男用户,并且爱好是唱跳rap:", all_record)
def get_record_by_query_and_filter(self):
"""
query、filter可以混用
举一个例子
:return:
"""
all_record = self.search(self.user_key)
# 查找年龄在20-25的男性用户
all_record = all_record.query("range", age={"gte": 20, "lte": 25}).filter("term", sex__keyword="男")
self.print_all_record("查找年龄在20-25的男性用户:", all_record)
def get_record_by_exclude(self):
"""
非查询
:return:
"""
all_record = self.search(self.user_key)
# 查找性别不是男生,并且爱好不是唱的用户
records = all_record.exclude("term", sex__keyword="男").exclude("term", hobby__keyword="唱")
self.print_all_record("查找性别不是男生,并且爱好不是唱的用户:", records, 6)
# 查找男性用户中,爱好不是唱且身高大于175的用户
records = all_record.filter("term", sex__keyword="男").exclude(
"term", hobby__keyword="唱").query(
"range", height={"gt": 175})
self.print_all_record("查找男性用户中,爱好不是唱且身高大于175的用户:", records, 6)
def other_search(self):
"""
其他查询:
获取文档数量:.count()
查询结果排序:sort(field)
:return:
"""
all_record = self.search(self.user_key)
# 获取id>=90的文档数量
gte_90 = all_record.filter("range", id={"gte": 90})
print(f"id>=90的文档 存在,数量为:{gte_90.count()}") # id>=90的文档 存在,数量为:10
# 获取id>=300的文档数量
gte_300 = all_record.filter("range", id={"gte": 300})
print(f"id>=300的文档 存在,数量为:{gte_300.count()}") # id>=300的文档 存在,数量为:0
# 排序 获取排序结果后,不能使用san(), 比如:self.search(self.key).sort("-height").scan()
# 使用scan()不会以任何预先确定的顺序返回结果, 详见:
# https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
# 不使用san() 获取记录会有数量10000条的限制,如果查询文档数量大于10000,不使用san()方法便会出现异常
# 下面查询会触发异常:
# all_record = self.search(self.test_count_key)
# for record in all_record[0: 15000]:
# print(record.to_dict())
# 因为test_count_key下数量已经达到20000条,当取15000条数据时会触发:elasticsearch.exceptions.RequestError异常
# 查询id>90, 且按玩家身高排序从大到小排序
gte_90_sort = all_record.sort("-height")
# self.print_all_record("查询id>90, 且按玩家身高排序从大到小排序:", gte_90_sort, 10)
for record in gte_90_sort:
print(record.to_dict())
self.cross_line()
# 查询id>90, 且按玩家身高排序从小到大排序, 如果身高相同,则按id降序排序
gte_90_sort = all_record.filter("range", id={"gte": 90}).sort("height", "-id")
for record in gte_90_sort:
print(record.to_dict())
self.cross_line()
# 分页
# 查询用户用户数据,跳过前五十条后20条
skip_50_search_20 = all_record[50: 70]
i = 1
for record in skip_50_search_20:
print(f"第{i}条数据, 用户id:{record.to_dict()['id']}")
i += 1
if __name__ == "__main__":
search = Search()
# search.get_all_record()
# search.get_record_by_query()
# search.get_record_by_filter()
# search.get_record_by_query_and_filter()
# search.get_record_by_exclude()
search.other_search()
更新
按查询更新文档
编写执行脚本
刷新索引
创建文件study_search.py
详见示例:update_example
import time
from elasticsearch_obj import ElasticSearchObj
class Update(ElasticSearchObj):
def refresh(self, index):
self.client.indices.refresh(index=index)
@staticmethod
def cross_line():
print('=================================================')
def print_all_record(self, explain, all_record):
for record in all_record.scan():
print(explain, record.to_dict())
self.cross_line()
def update_example(self):
# 将user_key id为90记录,name改为”我是来测试的“, height增加50
# 查找user_key id为90更新前的记录
update_before = self.search(self.user_key).filter("term", id=90)
self.print_all_record("查找user_key id为90更新前的记录:", update_before)
update_obj = self.update(self.user_key).filter(
"term", id=90).script(
source="ctx._source.name=params.name;ctx._source.height+=params.height;",
params={
"name": "我是来测试的",
"height": 50
})
update_response = update_obj.execute() # 请求执行并返回执行结果
print("user_key id为90记录,name改为”我是来测试的“, height增加50 更新返回结果:", update_response)
update_after = self.search(self.user_key).filter("term", id=90)
self.print_all_record("查找user_key id为90更新后的记录1:", update_after)
time.sleep(1)
update_after = self.search(self.user_key).filter("term", id=90)
self.print_all_record("查找user_key id为90更新后的记录2:", update_after)
# 执行结果就不沾出来了,但有执行的可以看出
# 更新后记录1:是更新前的结果; 而更新后记录2:是更新后的结果
# 造成这一现象是因为上一个请求更新在内部并未完全完成,所以在sleep 一秒后便能获取更新后的记录
print(end="\n\n\n")
# 如果需要在更新后立即获取最新文档,又不知道多久能够完成更新, 可以在执行更新后刷新文档
# 看下面示例
# 将user_key 下id为70的文档,name改为”试试刷新index能否更新后获取最新数据“,height增加一百
print("user_key 下id为70的文档,name改为”试试刷新index能否更新后获取最新数据“,height增加一百")
update_before = self.search(self.user_key).filter("term", id=70)
self.print_all_record("查找user_key id为80更新前的记录:", update_before)
update_obj = self.update(self.user_key).filter(
"term", id=70).script(
source="ctx._source.name=params.name;ctx._source.height+=params.height;",
params={
"name": "试试刷新index能否更新后获取最新数据",
"height": 100
})
update_response = update_obj.execute() # 请求执行并返回执行结果
print("user_key id为70记录,name改为”试试刷新index能否更新后获取最新数据“, height增加100 更新返回结果:", update_response)
self.refresh(self.user_key) # 刷新
update_after = self.search(self.user_key).filter("term", id=70)
self.print_all_record("查找user_key id为70更新后的记录1:", update_after)
# 默认情况下,Elasticsearch 每秒都会定期刷新索引, 如果并不需要获取更新后的文档,尽量就不要手动刷新了
# 可以通过更新响应的total跟updated数量是否一致判断记录是否更新成功
# 查询更新会更新所有匹配的文档,查询条件跟上面介绍的查询用法一致
# 例如:将user_key所有age增加一岁
response = self.update(self.user_key).script(source="ctx._source.age+=1;").execute()
print("将user_key所有age增加一岁:", response["total"], response["updated"], "response=", response)
# 此处增加刷新,是因为上一个执行是更新整个user_key,如果还未自动刷新,执行下面示例,或造成并发异常,
# 导致elasticsearch.exceptions.ConflictError异常
self.refresh(self.user_key)
print(end="\n\n\n")
# 特别注意的是,如果script定义的字段,查询的文档存在则会更新,不存在则会在文档中插入字段
print("将user_key下id为1的文档增加一个字段test_field,值为:[1,2,3]")
update_before = self.search(self.user_key).filter("term", id=1)
self.print_all_record("查找user_key id为1更新前的记录:", update_before)
update_obj = self.update(self.user_key).filter(
"term", id=1).script(
source='ctx._source.test_field=params.test_field',
params={
"test_field": [1, 2, 3],
})
response = update_obj.execute()
print("将user_key下id为1的文档增加一个字段test_field:", response)
self.refresh(self.user_key) # 刷新
update_after = self.search(self.user_key).filter("term", id=1)
self.print_all_record("查找user_key 将user_key下id为1的文档增加一个字段test_field,更新后的记录:", update_after)
# 删除字段
print("将user_key下id为1的文档增加的test_field字段移除")
update_before = self.search(self.user_key).filter("term", id=1)
self.print_all_record("查找user_key id为1更新前的记录:", update_before)
response = self.update(self.user_key).filter(
"term", id=1).script(
source='ctx._source.remove("test_field")').execute()
print("将user_key下id为1的文档增加的test_field字段移除:", response)
self.refresh(self.user_key) # 刷新
update_after = self.search(self.user_key).filter("term", id=1)
self.print_all_record("将user_key下id为1的文档增加的test_field字段移除,更新后的记录:", update_after)
if __name__ == "__main__":
update = Update()
update.update_example()
删除
删除索引
根据查询条件删除文档
创建文件study_delete.py
from elasticsearch_obj import ElasticSearchObj
class StudyDelete(ElasticSearchObj):
def delete_index(self, index):
"""
https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-delete-index.html
删除整个索引
:param index:
:return:
"""
try:
self.client.indices.delete(index=index)
except Exception:
print(f"需要删除的索引:{index}不存在")
def delete_by_query(self):
"""
示例根据查询条件删除文档
:return:
"""
# 将user_key下id大于90的玩家删除
all_record = self.search(self.user_key)
print("查看删除前文档的数量:", all_record.count())
all_record.filter("range", id={"gt": 90}).delete()
self.client.indices.refresh(index=self.user_key)
all_record = self.search(self.user_key)
print(f"查看删除后文档的数量:{all_record.count()}")
if __name__ == "__main__":
study_delete = StudyDelete()
study_delete.delete_index(study_delete.friend_key) # 删除索引friend_key
study_delete.delete_by_query()
我们今天的关于elasticsearch:删除文档后如何释放存储大小和elasticsearch 删除的分享已经告一段落,感谢您的关注,如果您想了解更多关于ElasticSearch - 文档存储、Elasticsearch 2.20 文档篇:更新删除文档、Elasticsearch CentOS6.5下安装ElasticSearch6.2.4+elasticsearch-head+Kibana、Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)的相关信息,请在本站查询。
本文标签: