GVKun编程网logo

elasticsearch:删除文档后如何释放存储大小(elasticsearch 删除)

13

对于elasticsearch:删除文档后如何释放存储大小感兴趣的读者,本文将会是一篇不错的选择,我们将详细介绍elasticsearch删除,并为您提供关于ElasticSearch-文档存储、El

对于elasticsearch:删除文档后如何释放存储大小感兴趣的读者,本文将会是一篇不错的选择,我们将详细介绍elasticsearch 删除,并为您提供关于ElasticSearch - 文档存储、Elasticsearch 2.20 文档篇:更新删除文档、Elasticsearch CentOS6.5下安装ElasticSearch6.2.4+elasticsearch-head+Kibana、Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)的有用信息。

本文目录一览:

elasticsearch:删除文档后如何释放存储大小(elasticsearch 删除)

elasticsearch:删除文档后如何释放存储大小(elasticsearch 删除)

在我的elasticsearch服务器上:文档总数:300万,总大小:3.6G然后,我删除了约280万文档:文档总数:约13万,总大小:3.6G

我已删除文件,如何释放文件大小?

答案1

小编典典

删除文档只会将其标记为已删除,因此将不会对其进行搜索。要回收磁盘空间,必须优化索引:

curl -XPOST ''http://localhost:9200/_optimize?only_expunge_deletes=true''

文档:
http :
//www.elasticsearch.org/guide/en/elasticsearch/reference/current/indices-
optimize.html

该文档已移至:https : //www.elastic.co/guide/zh-
CN/elasticsearch/reference/current/indices-
forcemerge.html

更新资料

与Elasticsearch 2.1.x的开始,optimize
已废弃的青睐
forcemerge。API是相同的,只是端点没有更改。

curl -XPOST ''http://localhost:9200/_forcemerge?only_expunge_deletes=true''

ElasticSearch - 文档存储

ElasticSearch - 文档存储

shard

确定shard的公式:

shard = hash(routing) % number_of_primary_shards

routing 默认是文档的 _id ,也可以设置成一个自定义的值。

因此要在创建索引的时候就确定好主分片的数量,并且永远不会改变这个数量,因为如果数量变化了,那么所有之前路由的值都会无效。

每个节点都知道集群中任一文档位置,所以可以直接将请求发到任意节点。当然,为了扩展负载,更好的做法是轮询发送集群中所有的节点。

shard、replica同步

新建、索引或者删除请求

elas_0402.png

步骤:

  1. 户端向 Node 1 发送新建、索引或者删除请求。
  2. 节点使用文档的 _id 确定文档属于shard 0 。请求会被转发到 Node 3(因为shard 0 被分配在 Node 3 上)。
  3. Node 3 在shard 0上面执行请求。如果成功了,它将请求并行转发到 Node 1 和 Node 2 的replica R0上。一旦所有的R0都报告成功, Node 3 将向Node 1报告成功,Node 1向客户端报告成功。

查询请求

elas_0403.png

步骤:

  1. 客户端向 Node 1 发送查询请求。
  2. node 1使用文档的 _id 来确定文档属于shard 0 。replica 0 存在于Node 1、Node 2上。 这种情况下,每次请求都会轮询路由Node 1、Node 2,这里假设路由到Node 2。
  3. Node 2将文档返回给Node 1 ,然后Node 1将文档返回给客户端。

注意:
在文档被检索时,已经被索引的文档可能已经存在于shard上但是还没有复制到replica。即强一致性无法保证,但最终一致性可以保证。

更新请求

POST /{index}/{type}/{id}/_update

elas_0404.png

步骤略。
注意:
Node 3 从P0检索文档,修改 _source 字段中的 JSON ,并且尝试重新索引主分片的文档。 如果文档已经被另一个进程修改(_version冲突),它会重试步骤 3 ,超过 retry_on_conflict 次后放弃。
如果 Node 3 成功地更新文档,它将新版本的文档并行转发(新建、索引或者删除请求的步骤里转发的是请求而不是文档)到 Node 1 和 Node 2 上的R0,重新建立索引。一旦所有R0都返回成功,Node 3 向Node 1返回成功,Node 1向客户端返回成功。

为什么要转发文档而不是请求?
因为转发请求是异步的,不能保证到达replica的顺序,若转发更新请求,则可能以错误的顺序应用更改,导致得到损坏的文档。而转发文档,则replica可以直接根据_version来获取最大版本号的文档。

批量请求(_mget 、_bulk)

elas_0406.png

bulk步骤:

  1. 客户端向 Node 1 发送 bulk 请求。
  2. Node 1 为每个命中的shard创建一个批量请求,并将这些请求并行转发到每个包含该shard的节点。
  3. shard按顺序(请求报文体中的子请求顺序)执行每个操作。当一个子请求操作成功时,shard并行转发新文档(或删除)到replica,然后执行下一个子请求。 一旦所有的子请求的replica向shard报告完成(无论执行是否成功),该shard将向node 1报告,node 1将这些响应收集整理并返回给客户端。

可选的请求参数

  • consistency

    解决分布式脑裂问题,参数的值可以设为 one (只要该index的shard状态 ok 就允许执行_写_操作),all(必须要该index的shard和多个replica的状态没问题才允许执行_写_操作), 或 quorum (默认值)。

    # quorum 即大于一半的分片,注意,number_of_replicas是primary shard对应的replica数量,不是全部replica数量
    int( (primary + number_of_replicas) / 2 ) + 1
    

    注意:新索引默认有 1 个replica,这意味着为了满足consistency至少需要两个活动的node。 这就会阻止我们在单一node上做任何事情。为了避免这个问题,ES要求只有当 number_of_replicas 大于1的时候,consistency策略才会执行。

  • timeout
    如果没有足够的replica,Elasticsearch会等待,直到满足consistency策略。默认最多等待1分钟。 你可以使用 timeout 参数 使它更早终止: 100 是100毫秒,30s 是30秒。

一个Lucene索引在Elasticsearch称作shard。 一个Elasticsearch索引是shard的集合。

Elasticsearch 2.20 文档篇:更新删除文档

Elasticsearch 2.20 文档篇:更新删除文档

    Elasticsearch 的更新文档 API 准许通过脚本操作来更新文档。更新操作从索引中获取文档,执行脚本,然后获得返回结果。它使用版本号来控制文档获取或者重建索引。

 备注:在 Elasticsearch 中的更新操作是完全从新索引文件。

    我们新建一个文档:

请求:PUT http://localhost:9200/test/type1/1?pretty

参数:

{
    "counter" : 1,
    "tags" : ["red"]
}

脚本开启功能

    在最新版本的 Elasticsearch 中,基于安全考虑 (如果用不到,请保持禁用), 默认禁用了动态脚本功能。如果被禁用,在使用脚本的时候则报以下的错误:

scripts of type [inline], operation [update] and lang [groovy] are disabled

    可以用以下方式完全开启动态脚本功能,在 config/elasticsearch.yml 文件,在最后添加以下代码

script.inline: on

script.indexed: on

script.file: on

配置后,重启 Elasticsearch。

下面我们用脚本来更新此文档。

请求:POST http://localhost:9200/test/type1/1/_update?pretty

参数:

{
    "script" : {
        "inline""ctx._source.counter += count",
        "params" : {
            "count" : 4
        }
    }
}

执行完后,我们在查询一下文档内容,可以发现 counter 的值为 5:

{
  "_index" : "test",
  "_type" : "type1",
  "_id" : "1",
  "_version" : 5,
  "found" : true,
  "_source" : {
    "counter" : 5,
    "tags" : [ "red" ]
  }
}

在看下面的更新操作:

请求:POST http://localhost:9200/test/type1/1/_update?pretty

参数:

{
    "script" : {
        "inline""ctx._source.tags += tag",
        "params" : {
            "tag" : "blue"
        }
    }
}

返回的内容为,表示更新成功,我们看一下_version 为 6,比刚才的值增加了 1:

{
  "_index" : "test",
  "_type" : "type1",
  "_id" : "1",
  "_version" : 6,
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  }
}

然后我们在查询一下文档内容:

{
  "_index" : "test",
  "_type" : "type1",
  "_id" : "1",
  "_version" : 6,
  "found" : true,
  "_source" : {
    "counter" : 5,
    "tags" : [ "red""blue" ]
  }
}

在脚本中除了_source 外其他内置参数也可以使用,例如_index, _type, _id, _version, _routing, _parent, _timestamp, _ttl。

下面我们通过脚本增加一列。

请求:POST http://localhost:9200/test/type1/1/_update?pretty

参数:

{
    "script" : "ctx._source.name_of_new_field = \"value_of_new_field\""
}

然后查询此文档:

{
  "_index" : "test",
  "_type" : "type1",
  "_id" : "1",
  "_version" : 7,
  "found" : true,
  "_source" : {
    "counter" : 5,
    "tags" : [ "red""blue" ],
    "name_of_new_field" : "value_of_new_field"
  }
}

从中可以看出,文档中又增加了一列。

删除一列,请求和刚才的一样,参数变为:

{
    "script" : "ctx._source.remove(\"name_of_new_field\")"
}

甚至可以通过表达式来判断做某些事情,例如:下面的示例将删除的文件如果标签字段包含蓝色,否则什么也不做(空):

请求参数:

{
    "script" : {
        "inline""ctx._source.tags.contains(tag) ? ctx.op = \"delete\" : ctx.op = \"none\"",
        "params" : {
            "tag" : "blue"
        }
    }
}

部分文档更新:

    该更新接口还支持更新部分文档,将文档合并到现有文档中(简单的递归合并、对象的内部合并、替换核心的 “键 / 值” 和数组)。例如:

{
    "doc" : {
        "name" : "new_name"
    }
}

更新后,可以发现文档中多了一列 name。

{
  "_index" : "test",
  "_type" : "type1",
  "_id" : "1",
  "_version" : 23,
  "found" : true,
  "_source" : {
    "counter" : 5,
    "tags" : [ "red""blue" ],
    "name" : "new_name"
  }
}

    当文档指定的值与现有的_source 合并。当新的文档和老的文档不一致的时候,文档将会被从新建立索引。当新旧文档一样的时候,则不进行从建索引的操作。可以通过设置 detect_noop 为 false,让任何情况下都从新建立索引,例如下面的更新操作:

{
    "doc" : {
        "name" : "new_name"
    },
    "detect_noop"false
}

删除文档

    删除文档相对比较简单:

请求:DELETE http://localhost:9200/test/type1/1

返回的内容为:

{
    "found"true, 
    "_index""test", 
    "_type""type1", 
    "_id""1", 
    "_version"24, 
    "_shards": {
        "total"2, 
        "successful"1, 
        "failed"0
    }}

则表示删除了此文档。

    赛克蓝德 (secisland) 后续会逐步对 Elasticsearch 的最新版本的各项功能进行分析,近请期待。也欢迎加入 secisland 公众号进行关注。

Elasticsearch CentOS6.5下安装ElasticSearch6.2.4+elasticsearch-head+Kibana

Elasticsearch CentOS6.5下安装ElasticSearch6.2.4+elasticsearch-head+Kibana

CentOS6.5下安装ElasticSearch6.2.4
(1)配置JDK环境

配置环境变量

export JAVA_HOME="/opt/jdk1.8.0_144"

export PATH="$JAVA_HOME/bin:$PATH"

export CLASSPATH=".:$JAVA_HOME/lib"

(2)安装ElasticSearch6.2.4

下载地址:https://www.elastic.co/cn/downloads/past-releases/elasticsearch-6-2-4

启动报错:

 

解决方式:


bin/elasticsearch -Des.insecure.allow.root=true


 

或者修改bin/elasticsearch,加上ES_JAVA_OPTS属性:

 


ES_JAVA_OPTS="-Des.insecure.allow.root=true"


 

再次启动:

这是出于系统安全考虑设置的条件。由于ElasticSearch可以接收用户输入的脚本并且执行,为了系统安全考 虑,建议创建一个单独的用户用来运行ElasticSearch。

如果没有普通用户就要创建一个普通用户组和普通用户,下面介绍一下怎么创建用户组和普通用户

创建用户组和用户:

 


groupadd esgroup

useradd esuser -g esgroup -p espassword


 

更改elasticsearch文件夹及内部文件的所属用户及组:

 


cd /opt

chown -R esuser:esgroup elasticsearch-6.2.4


 

切换用户并运行:

su esuser

./bin/elasticsearch

再次启动显示已杀死:

 

需要调整JVM的内存大小:

vi bin/elasticsearch

ES_JAVA_OPTS="-Xms512m -Xmx512m"

再次启动:启动成功

如果显示如下类似信息:

 


[INFO ][o.e.c.r.a.DiskThresholdMonitor] [ZAds5FP] low disk watermark [85%] exceeded on [ZAds5FPeTY-ZUKjXd7HJKA][ZAds5FP][/opt/elasticsearch-6.2.4/data/nodes/0] free: 1.2gb[14.2%], replicas will not be assigned to this node


 

需要清理磁盘空间。

 


后台运行:./bin/elasticsearch -d

测试连接:curl 127.0.0.1:9200


 

会看到一下JSON数据:


[root@localhost ~]# curl 127.0.0.1:9200
{
"name" : "rBrMTNx",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "-noR5DxFRsyvAFvAzxl07g",
"version" : {
"number" : "5.1.1",
"build_hash" : "5395e21",
"build_date" : "2016-12-06T12:36:15.409Z",
"build_snapshot" : false,
"lucene_version" : "6.3.0"
},
"tagline" : "You Know, for Search"
}



实现远程访问:
需要对config/elasticsearch.yml进行 配置:
network.host: hadoop-001

再次启动报错:Failed to load settings from [elasticsearch.yml]

这个错就是参数的冒号前后没有加空格,加了之后就好,我找了好久这个问题;

后来在一个外国网站找到了这句话.

 

Exception in thread "main" SettingsException[Failed to load settings from [elasticsearch.yml]]; nested: ElasticsearchParseException[malformed, expected end of settings but encountered additional content starting at line number: [3], column number: [1]]; nested: ParserException[expected ''<document start>'', but found BlockMappingStart

 in ''reader'', line 3, column 1:

    node.rack : r1

    ^

];

Likely root cause: expected ''<document start>'', but found BlockMappingStart

 in ''reader'', line 3, column 1:

    node.rack : r1

 

这个是行的开头没有加空格,fuck!

Exception in thread "main" SettingsException[Failed to load settings from [elasticsearch.yml]]; nested: ScannerException[while scanning a simple key

 in ''reader'', line 11, column 2:

     discovery.zen.ping.unicast.hosts ... 

     ^

 

参数冒号后加空格,或者是数组中间加空格

例如:

# discovery.zen.minimum_master_nodes: 3

 

再次启动

还是报错

max file descriptors [4096] for elasticsearch process is too low

 

处理第一个错误:

vim /etc/security/limits.conf //文件最后加入

esuser soft nofile 65536

esuser hard nofile 65536

esuser soft nproc 4096

esuser hard nproc 4096

 

处理第二个错误:

进入limits.d目录下修改配置文件。

vim /etc/security/limits.d/20-nproc.conf
修改为 esuser soft nproc 4096

 

注意重新登录生效!!!!!!!!

处理第三个错误:

vim /etc/sysctl.conf

vm.max_map_count=655360

执行以下命令生效:
sysctl -p

关闭防火墙:systemctl stop firewalld.service

启动又又又报错

 

system call filters failed to install; check the logs and fix your configuration or disable sys

 

直接在

config/elasticsearch.yml  末尾加上一句


bootstrap.system_call_filter: false

再次启动成功!

 


安装Head插件


Head是elasticsearch的集群管理工具,可以用于数据的浏览和查询

(1)elasticsearch-head是一款开源软件,被托管在github上面,所以如果我们要使用它,必须先安装git,通过git获取elasticsearch-head

(2)运行elasticsearch-head会用到grunt,而grunt需要npm包管理器,所以nodejs是必须要安装的

nodejs和npm安装:

http://blog.java1234.com/blog/articles/354.html

git安装


yum install -y git


 

 

(3)elasticsearch5.0之后,elasticsearch-head不做为插件放在其plugins目录下了。
使用git拷贝elasticsearch-head到本地

cd ~

git clone git://github.com/mobz/elasticsearch-head.git

(4)安装elasticsearch-head依赖包

[root@localhost local]# npm install -g grunt-cli

[root@localhost _site]# cd /usr/local/elasticsearch-head/

[root@localhost elasticsearch-head]# cnpm install

(5)修改Gruntfile.js

[root@localhost _site]# cd /usr/local/elasticsearch-head/

[root@localhost elasticsearch-head]# vi Gruntfile.js

在connect-->server-->options下面添加:hostname:’*’,允许所有IP可以访问

(6)修改elasticsearch-head默认连接地址
[root@localhost elasticsearch-head]# cd /usr/local/elasticsearch-head/_site/

[root@localhost _site]# vi app.js

将this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://localhost:9200";中的localhost修改成你es的服务器地址

(7)配置elasticsearch允许跨域访问

打开elasticsearch的配置文件elasticsearch.yml,在文件末尾追加下面两行代码即可:

http.cors.enabled: true

http.cors.allow-origin: "*"

(8)打开9100端口

[root@localhost elasticsearch-head]# firewall-cmd --zone=public --add-port=9100/tcp --permanent

重启防火墙

[root@localhost elasticsearch-head]# firewall-cmd --reload

(9)启动elasticsearch

(10)启动elasticsearch-head

 


[root@localhost _site]# cd ~/elasticsearch-head/

[root@localhost elasticsearch-head]# node_modules/grunt/bin/grunt server  或者 npm run start


 

(11)访问elasticsearch-head

关闭防火墙:systemctl stop firewalld.service

浏览器输入网址:hadoop-001:9100/

 

 

安装Kibana
Kibana是一个针对Elasticsearch的开源分析及可视化平台,使用Kibana可以查询、查看并与存储在ES索引的数据进行交互操作,使用Kibana能执行高级的数据分析,并能以图表、表格和地图的形式查看数据

(1)下载Kibana
https://www.elastic.co/downloads/kibana

(2)把下载好的压缩包拷贝到/soft目录下

(3)解压缩,并把解压后的目录移动到/user/local/kibana

(4)编辑kibana配置文件

[root@localhost /]# vi /usr/local/kibana/config/kibana.yml

 

将server.host,elasticsearch.url修改成所在服务器的ip地址

 


 

server.port: 5601 //监听端口

server.host: "hadoo-001" //监听IP地址,建议内网ip

elasticsearch.url: "http:/hadoo-001" //elasticsearch连接kibana的URL,也可以填写192.168.137.188,因为它们是一个集群

 


 

(5)开启5601端口

Kibana的默认端口是5601

开启防火墙:systemctl start firewalld.service

开启5601端口:firewall-cmd --permanent --zone=public --add-port=5601/tcp

重启防火墙:firewall-cmd –reload

(6)启动Kibana

[root@localhost /]# /usr/local/kibana/bin/kibana

浏览器访问:http://192.168.137.188:5601

 

安装中文分词器

一.离线安装

(1)下载中文分词器
https://github.com/medcl/elasticsearch-analysis-ik

下载elasticsearch-analysis-ik-master.zip

(2)解压elasticsearch-analysis-ik-master.zip

unzip elasticsearch-analysis-ik-master.zip

(3)进入elasticsearch-analysis-ik-master,编译源码

mvn clean install -Dmaven.test.skip=true

(4)在es的plugins文件夹下创建目录ik

(5)将编译后生成的elasticsearch-analysis-ik-版本.zip移动到ik下,并解压

(6)解压后的内容移动到ik目录下

二.在线安装

./elasticsearch-plugin install https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.2.4/elasticsearch-analysis-ik-6.2.4.zip

Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)

Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)

Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)

关于Elasticsearch
安装方式
文档
中文文档
Elasticsearch
Elasticsearch DSL操作文档


本文通过python模块对elasticsearch进行简单的增删改查。
本文测试脚本仓库地址:https://github.com/YangJunJ/studyElasticsearch.git

环境

  • Python 3.6.2
  • elasticsearch 7.13.1
  • elasticsearch-dsl 7.3.0
  • elasticsearch 7.13.1

安装下面两个python模块

sudo pip3 install elasticsearch
sudo pip3 install elasticsearch_dsl



建立连接

创建文件elasticsearch_obj.py

from elasticsearch import Elasticsearch
from elasticsearch_dsl import Search, UpdateByQuery


class ElasticSearchObj(object):
    def __init__(self):
        self.user_key = "user_index"
        self.friend_key = "friend_index"
        self.test_count_key = "count_test_key"
        # 初始化一个Elasticsearch实例对象
        self.client = Elasticsearch(hosts=[{'host': "127.0.0.1", "port": 9200}])

    def search(self, index):
        # 返回索引对象结果级
        return Search(using=self.client, index=index)

    def update(self, index):
        # ubq = UpdateByQuery(index=index).using(self.client)
        # or
        ubq = UpdateByQuery(index=index, using=self.client)
        return ubq




创建或者更新文档

Elasticsearch 使用 JSON作为文档的序列化格式,用起来有点项nosql,文档的字段数量也可以像mongodb文档一样,不一致不相同的形式存储
index插入
create插入
创建文件study_insert.py

import random
from elasticsearch_obj import ElasticSearchObj


class CreateNewRecord(ElasticSearchObj):
    def index(self, index, doc_id=None, doc_type="_doc", **kwargs):
        """
        如果索引不存在,则会创建索引并写入文档
        self.client.index(index, body, doc_type=None, id=None, params=None, headers=None)
        http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
        :param index: 文档索引名
        :param doc_id: 当doc_id=None时,通过该方法会向文档中插入一条记录,doc_id!=None时,如果doc_id已经存在则会更新,不存在则插入
        :param doc_type: 文档类型,默认为_doc,相同index下的记录值必须一致,如果不相等,则会触发异常
        :param kwargs: 文档内容
        :return:
        """
        return self.client.index(index=index, body=kwargs, doc_type=doc_type, id=doc_id)

    def create(self, index, doc_id=None, doc_type="_doc", **kwargs):
        """
        http://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html
        用法同index,不同的是create必须传递文档Id,否则会触发异常错误
        :param index:
        :param doc_id:
        :param doc_type:
        :param kwargs:
        :return:
        """
        return self.client.create(index=index, id=doc_id, doc_type=doc_type, body=kwargs)

    def test_insert(self):
        """
        插入一些记录,方便等会查询时使用
        :return:
        """
        hobby = ["唱", "跳", "rap", "唱跳rap", "游泳", "打球", "其他"]
        for i in range(100):
            response = self.index(
                index=self.user_key,
                id=i,
                name="".join(random.sample('zyxwvutsrqponmlkjihgfedcba', random.randint(3, 8))),
                age=random.randint(18, 30),
                hobby=random.choice(hobby),
                height=random.randint(155, 185),
                sex=random.choice(["男", "女", "不详"])
            )
            print(f"{self.user_key}插入{i}条数据, 返回:{response}")

        for i in range(100):
            response = self.index(
                index=self.friend_key,
                user_id=i,
                friend_id=random.randint(0, 99),
                word=random.sample(["拧螺丝", "撸代码", "CRUD", "划水", "偷偷写下bug"], random.randint(1, 3)),
                other={"work_place": random.choice(["第一排", "第二排"]), "performance": random.choice(["好", "不好"])}
            )
            print(f"{self.friend_key}插入{i}条数据, 返回:{response}")

        for i in range(20000):
            response = self.index(
                index=self.test_count_key,
                user_id=i,
                friend_id=random.randint(0, 99),
                weight=random.randint(0, 10000)
            )
            print(f"{self.test_count_key}插入{i}条数据, 返回:{response}")


if __name__ == "__main__":
    test = CreateNewRecord()
    test.test_insert()



查询

页查询子句
  • 模糊查询:match, 返回与提供的文本、数字、日期或布尔值匹配的文档,包括用于模糊匹配
  • 精确查询:term, 该术语必须与字段值完全匹配,包括空格和大写
  • 范围查询:range, 返回包含提供范围内的术语的文档
    • gt: 大于(可选)
    • gte: 大于或等于(可选)
    • lt: 小于(可选)
    • lte:小于或等于(可选)
    • format:用于转换date查询中值的日期格式(可选,字符串)

查询上下文query & 过滤上下文filter 差别

用法可以查看下面脚本
创建文件study_search.py

from elasticsearch_obj import ElasticSearchObj


class Search(ElasticSearchObj):
    """
        示例 elasticsearch_dsl.Search 基本操作

        查询的字段值中如果包含中文、下划线等其他特殊字符, 字段需要需要加上 __keyword,否则无法匹配, 用法详见示例


        介绍两个查询中需要使用到的方法:
            elasticsearch_dsl.Search.scan(): 将搜索条件扫描索引,返回所有与条件匹配文档生成器
            elasticsearch_dsl.Search.to_dict(): 将hit对象系列化成python字典记录
    """
    def get_all_record(self):
        """
        示例获取self.key下的所有文档
        :return:
        """
        # all_record 是 elasticsearch_dsl.search.Search对象,可以通过scan() 方法遍历self.key索引内的所有文档
        all_record = self.search(self.user_key)
        for record in all_record.scan():
            # record 是 elasticsearch_dsl.response.hit.Hit 对象
            # 可以通过.属性获取字段值, 比如: record.name
            # 如果字段名不存在,则会触发'Hit' object has no attribute 错误
            print(f"record.id:{record.id}, record.name:{record.name}")

            # 上面的是其中一种方法,它也可以通过to_dict()方法,将记录通过python字典的形式返回
            print(record.to_dict())

    def print_all_record(self, explain, all_record, max_count=3):
        i = 1
        for record in all_record.scan():
            print(explain, record.to_dict())
            if i >= max_count:
                print(explain, f"查询数量大于{i},后面的记录不打印了")
                break
            i += 1
        self.cross_line()

    @staticmethod
    def cross_line():
        print('=================================================')

    def get_record_by_query(self):
        """
        示例query的常用用法
        :return:
        """
        all_record = self.search(self.user_key)

        # 例如要找到爱好中包含rap的用户信息
        match_rap = all_record.query("match", hobby="rap")
        self.print_all_record("查看所有包含爱好rap的用户:", match_rap)

        # 查找爱好中含有rap,并且性别为女的用户信息
        match_rap_female = all_record.query("match", hobby="rap").query("term", sex__keyword="女")
        # match_rap_female = match_rap.query("term", sex__keyword="女")
        self.print_all_record("查看爱好含有rap的所有女用户:", match_rap_female)

        # 查找身高大于175的男用户,并且爱好只有唱的用户
        man_sing_gt_175 = all_record.query("range", height={"gt": 175}).query(
            "term", sex__keyword="男").query(
            "term", hobby__keyword="唱")
        self.print_all_record("查找身高大于175的男用户,并且爱好只有唱的用户", man_sing_gt_175)

    def get_record_by_filter(self):
        """
        示例filter的用法
        filter 与 query用法一样,这里只举一个简单例子
        :return:
        """
        all_record = self.search(self.user_key)

        # 查找年龄大于等于25的男用户,并且爱好是唱跳rap
        all_record = all_record.filter("range", age={"gte": 25}).filter(
            "term", sex__keyword="男").filter(
            "term", hobby__keyword="唱跳rap")
        self.print_all_record("查找年龄大于等于25的男用户,并且爱好是唱跳rap:", all_record)

    def get_record_by_query_and_filter(self):
        """
        query、filter可以混用
        举一个例子
        :return:
        """
        all_record = self.search(self.user_key)

        # 查找年龄在20-25的男性用户
        all_record = all_record.query("range", age={"gte": 20, "lte": 25}).filter("term", sex__keyword="男")
        self.print_all_record("查找年龄在20-25的男性用户:", all_record)

    def get_record_by_exclude(self):
        """
        非查询
        :return:
        """
        all_record = self.search(self.user_key)

        # 查找性别不是男生,并且爱好不是唱的用户
        records = all_record.exclude("term", sex__keyword="男").exclude("term", hobby__keyword="唱")
        self.print_all_record("查找性别不是男生,并且爱好不是唱的用户:", records, 6)

        # 查找男性用户中,爱好不是唱且身高大于175的用户
        records = all_record.filter("term", sex__keyword="男").exclude(
            "term", hobby__keyword="唱").query(
            "range", height={"gt": 175})
        self.print_all_record("查找男性用户中,爱好不是唱且身高大于175的用户:", records, 6)

    def other_search(self):
        """
        其他查询:
            获取文档数量:.count()
            查询结果排序:sort(field)
        :return:
        """
        all_record = self.search(self.user_key)

        # 获取id>=90的文档数量
        gte_90 = all_record.filter("range", id={"gte": 90})
        print(f"id>=90的文档 存在,数量为:{gte_90.count()}")  # id>=90的文档 存在,数量为:10

        # 获取id>=300的文档数量
        gte_300 = all_record.filter("range", id={"gte": 300})
        print(f"id>=300的文档 存在,数量为:{gte_300.count()}")  # id>=300的文档 存在,数量为:0

        #  排序 获取排序结果后,不能使用san(), 比如:self.search(self.key).sort("-height").scan()
        #  使用scan()不会以任何预先确定的顺序返回结果, 详见:
            # https://elasticsearch-py.readthedocs.io/en/master/helpers.html#elasticsearch.helpers.scan
        # 不使用san() 获取记录会有数量10000条的限制,如果查询文档数量大于10000,不使用san()方法便会出现异常
        # 下面查询会触发异常:
        #     all_record = self.search(self.test_count_key)
        #     for record in all_record[0: 15000]:
        #         print(record.to_dict())
        #  因为test_count_key下数量已经达到20000条,当取15000条数据时会触发:elasticsearch.exceptions.RequestError异常
        #  查询id>90, 且按玩家身高排序从大到小排序
        gte_90_sort = all_record.sort("-height")
        # self.print_all_record("查询id>90, 且按玩家身高排序从大到小排序:", gte_90_sort, 10)
        for record in gte_90_sort:
            print(record.to_dict())
        self.cross_line()
        #  查询id>90, 且按玩家身高排序从小到大排序, 如果身高相同,则按id降序排序
        gte_90_sort = all_record.filter("range", id={"gte": 90}).sort("height", "-id")
        for record in gte_90_sort:
            print(record.to_dict())
        self.cross_line()

        # 分页
        # 查询用户用户数据,跳过前五十条后20条
        skip_50_search_20 = all_record[50: 70]
        i = 1
        for record in skip_50_search_20:
            print(f"第{i}条数据, 用户id:{record.to_dict()['id']}")
            i += 1


if __name__ == "__main__":
    search = Search()
    # search.get_all_record()
    # search.get_record_by_query()
    # search.get_record_by_filter()
    # search.get_record_by_query_and_filter()
    # search.get_record_by_exclude()
    search.other_search()



更新

按查询更新文档
编写执行脚本
刷新索引

创建文件study_search.py
详见示例:update_example

import time

from elasticsearch_obj import ElasticSearchObj


class Update(ElasticSearchObj):
    def refresh(self, index):
        self.client.indices.refresh(index=index)

    @staticmethod
    def cross_line():
        print('=================================================')

    def print_all_record(self, explain, all_record):
        for record in all_record.scan():
            print(explain, record.to_dict())
        self.cross_line()

    def update_example(self):
        # 将user_key id为90记录,name改为”我是来测试的“, height增加50
        # 查找user_key id为90更新前的记录
        update_before = self.search(self.user_key).filter("term", id=90)
        self.print_all_record("查找user_key id为90更新前的记录:", update_before)

        update_obj = self.update(self.user_key).filter(
            "term", id=90).script(
            source="ctx._source.name=params.name;ctx._source.height+=params.height;",
            params={
                "name": "我是来测试的",
                "height": 50
            })

        update_response = update_obj.execute()  # 请求执行并返回执行结果
        print("user_key id为90记录,name改为”我是来测试的“, height增加50 更新返回结果:", update_response)

        update_after = self.search(self.user_key).filter("term", id=90)
        self.print_all_record("查找user_key id为90更新后的记录1:", update_after)

        time.sleep(1)
        update_after = self.search(self.user_key).filter("term", id=90)
        self.print_all_record("查找user_key id为90更新后的记录2:", update_after)

        # 执行结果就不沾出来了,但有执行的可以看出
        # 更新后记录1:是更新前的结果; 而更新后记录2:是更新后的结果
        # 造成这一现象是因为上一个请求更新在内部并未完全完成,所以在sleep 一秒后便能获取更新后的记录
        print(end="\n\n\n")
        # 如果需要在更新后立即获取最新文档,又不知道多久能够完成更新, 可以在执行更新后刷新文档
        # 看下面示例
        # 将user_key 下id为70的文档,name改为”试试刷新index能否更新后获取最新数据“,height增加一百
        print("user_key 下id为70的文档,name改为”试试刷新index能否更新后获取最新数据“,height增加一百")
        update_before = self.search(self.user_key).filter("term", id=70)
        self.print_all_record("查找user_key id为80更新前的记录:", update_before)
        update_obj = self.update(self.user_key).filter(
            "term", id=70).script(
            source="ctx._source.name=params.name;ctx._source.height+=params.height;",
            params={
                "name": "试试刷新index能否更新后获取最新数据",
                "height": 100
            })

        update_response = update_obj.execute()  # 请求执行并返回执行结果
        print("user_key id为70记录,name改为”试试刷新index能否更新后获取最新数据“, height增加100 更新返回结果:", update_response)
        self.refresh(self.user_key)  # 刷新
        update_after = self.search(self.user_key).filter("term", id=70)
        self.print_all_record("查找user_key id为70更新后的记录1:", update_after)

        # 默认情况下,Elasticsearch 每秒都会定期刷新索引, 如果并不需要获取更新后的文档,尽量就不要手动刷新了
        # 可以通过更新响应的total跟updated数量是否一致判断记录是否更新成功
        # 查询更新会更新所有匹配的文档,查询条件跟上面介绍的查询用法一致
        # 例如:将user_key所有age增加一岁
        response = self.update(self.user_key).script(source="ctx._source.age+=1;").execute()
        print("将user_key所有age增加一岁:", response["total"], response["updated"], "response=", response)

        # 此处增加刷新,是因为上一个执行是更新整个user_key,如果还未自动刷新,执行下面示例,或造成并发异常,
        # 导致elasticsearch.exceptions.ConflictError异常
        self.refresh(self.user_key)
        print(end="\n\n\n")
        # 特别注意的是,如果script定义的字段,查询的文档存在则会更新,不存在则会在文档中插入字段
        print("将user_key下id为1的文档增加一个字段test_field,值为:[1,2,3]")
        update_before = self.search(self.user_key).filter("term", id=1)
        self.print_all_record("查找user_key id为1更新前的记录:", update_before)
        update_obj = self.update(self.user_key).filter(
            "term", id=1).script(
            source='ctx._source.test_field=params.test_field',
            params={
                "test_field": [1, 2, 3],
            })
        response = update_obj.execute()
        print("将user_key下id为1的文档增加一个字段test_field:", response)
        self.refresh(self.user_key)  # 刷新
        update_after = self.search(self.user_key).filter("term", id=1)
        self.print_all_record("查找user_key 将user_key下id为1的文档增加一个字段test_field,更新后的记录:", update_after)

        # 删除字段
        print("将user_key下id为1的文档增加的test_field字段移除")
        update_before = self.search(self.user_key).filter("term", id=1)
        self.print_all_record("查找user_key id为1更新前的记录:", update_before)
        response = self.update(self.user_key).filter(
            "term", id=1).script(
            source='ctx._source.remove("test_field")').execute()
        print("将user_key下id为1的文档增加的test_field字段移除:", response)
        self.refresh(self.user_key)  # 刷新
        update_after = self.search(self.user_key).filter("term", id=1)
        self.print_all_record("将user_key下id为1的文档增加的test_field字段移除,更新后的记录:", update_after)


if __name__ == "__main__":
    update = Update()
    update.update_example()




删除

删除索引
根据查询条件删除文档
创建文件study_delete.py

from elasticsearch_obj import ElasticSearchObj


class StudyDelete(ElasticSearchObj):
    def delete_index(self, index):
        """
        https://www.elastic.co/guide/en/elasticsearch/reference/7.9/indices-delete-index.html
        删除整个索引
        :param index:
        :return:
        """
        try:
            self.client.indices.delete(index=index)
        except Exception:
            print(f"需要删除的索引:{index}不存在")

    def delete_by_query(self):
        """
        示例根据查询条件删除文档
        :return:
        """

        # 将user_key下id大于90的玩家删除
        all_record = self.search(self.user_key)
        print("查看删除前文档的数量:", all_record.count())
        all_record.filter("range", id={"gt": 90}).delete()
        self.client.indices.refresh(index=self.user_key)
        all_record = self.search(self.user_key)
        print(f"查看删除后文档的数量:{all_record.count()}")


if __name__ == "__main__":
    study_delete = StudyDelete()
    study_delete.delete_index(study_delete.friend_key)  # 删除索引friend_key
    study_delete.delete_by_query()


我们今天的关于elasticsearch:删除文档后如何释放存储大小elasticsearch 删除的分享已经告一段落,感谢您的关注,如果您想了解更多关于ElasticSearch - 文档存储、Elasticsearch 2.20 文档篇:更新删除文档、Elasticsearch CentOS6.5下安装ElasticSearch6.2.4+elasticsearch-head+Kibana、Elasticsearch python操作实践(elasticsearch 、Elasticsearch DSL)的相关信息,请在本站查询。

本文标签: