GVKun编程网logo

在Elasticsearch中导入和更新数据(elasticsearch如何导入数据)

9

在本文中,您将会了解到关于在Elasticsearch中导入和更新数据的新资讯,同时我们还将为您解释elasticsearch如何导入数据的相关在本文中,我们将带你探索在Elasticsearch中导

在本文中,您将会了解到关于在Elasticsearch中导入和更新数据的新资讯,同时我们还将为您解释elasticsearch如何导入数据的相关在本文中,我们将带你探索在Elasticsearch中导入和更新数据的奥秘,分析elasticsearch如何导入数据的特点,并给出一些关于46、elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中、Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象、Elasticsearch --- 向es中导入数据、elasticsearch 局部更新数据的实用技巧。

本文目录一览:

在Elasticsearch中导入和更新数据(elasticsearch如何导入数据)

在Elasticsearch中导入和更新数据(elasticsearch如何导入数据)

我们有一个现有的搜索功能,该功能涉及SQL
Server中多个表之间的数据。这给我们的数据库造成了沉重的负担,因此我试图寻找一种更好的方式来搜索这些数据(它不会经常更改)。我与Logstash和Elasticsearch一起工作了大约一个星期,使用包含120万条记录的导入。我的问题本质上是“如何使用“主键”更新现有文档”?

CSV数据文件(以竖线分隔)如下所示:

369|90045|123 ABC ST|LOS ANGELES|CA368|90045|PVKA0010|LA|CA367|90012|20000 Venice Boulvd|Los Angeles|CA365|90045|ABC ST 123|LOS ANGELES|CA363|90045|ADHOCTESTPROPERTY|DALES|CA

我的logstash配置如下所示:

input {  stdin {    type => "stdin-type"  }  file {    path => ["C:/Data/sample/*"]    start_position => "beginning"  }}filter {  csv {    columns => ["property_id","postal_code","address_1","city","state_code"]    separator => "|"  }}output {  elasticsearch {    embedded => true    index => "samples4"    index_type => "sample"  }}

然后,elasticsearch中的文档如下所示:

{   "_index": "samples4",   "_type": "sample",   "_id": "64Dc0_1eQ3uSln_k-4X26A",   "_score": 1.4054651,   "_source": {   "message": [      "369|90045|123 ABC ST|LOS ANGELES|CA\r"   ],   "@version": "1",   "@timestamp": "2014-02-11T22:58:38.365Z",   "host": "[host]",   "path": "C:/Data/sample/sample.csv",   "property_id": "369",   "postal_code": "90045",   "address_1": "123 ABC ST",   "city": "LOS ANGELES",   "state_code": "CA"}

_id字段中的唯一ID
替换为的值property_id。这个想法是,后续数据文件将包含更新。我不需要保留以前的版本,也不会出现我们在文档中添加或删除键的情况。

document_idelasticsearch输出的设置不会将该字段的值放入其中_id(它只是放在“
property_id”中,并且仅存储/更新了一个文档)。我知道我在这里想念什么。我只是采取了错误的方法吗?

编辑:工作!

使用@rutter的建议,我将output配置更新为: ``

output {  elasticsearch {    embedded => true    index => "samples6"    index_type => "sample"    document_id => "%{property_id}"  }}

现在,通过按预期将新文件放入数据文件夹来更新文档。_idproperty_id是相同的值。 ``

{   "_index": "samples6",   "_type": "sample",   "_id": "351",   "_score": 1,   "_source": {   "message": [      "351|90045|Easy as 123 ST|LOS ANGELES|CA\r"   ],   "@version": "1",   "@timestamp": "2014-02-12T16:12:52.102Z",   "host": "TXDFWL3474",   "path": "C:/Data/sample/sample_update_3.csv",   "property_id": "351",   "postal_code": "90045",   "address_1": "Easy as 123 ST",   "city": "LOS ANGELES",   "state_code": "CA"}

答案1

小编典典

从评论转换:

您可以通过发送另一个具有相同ID的文档来覆盖文档…但是对于以前的数据,这可能会有些棘手,因为默认情况下会获得随机ID。

您可以使用输出插件的document_idfield设置ID
,但是它使用文字字符串,而不是字段名称。要使用字段的内容,可以使用sprintf格式的字符串,例如%{property_id}

这样的事情,例如:

output {  elasticsearch {    ... other settings...    document_id => "%{property_id}"  }}

46、elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中

46、elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中

【百度云搜索,搜各种资料:http://www.lqkweb.com】
【搜网盘,搜各种资料:http://www.swpan.cn】

前面我们讲到的elasticsearch(搜索引擎)操作,如:增、删、改、查等操作都是用的elasticsearch的语言命令,就像sql命令一样,当然elasticsearch官方也提供了一个python操作elasticsearch(搜索引擎)的接口包,就像sqlalchemy操作数据库一样的ORM框,这样我们操作elasticsearch就不用写命令了,用elasticsearch-dsl-py这个模块来操作,也就是用python的方式操作一个类即可

elasticsearch-dsl-py下载

下载地址:https://github.com/elastic/el...

文档说明:http://elasticsearch-dsl.read...

首先安装好elasticsearch-dsl-py模块

1、elasticsearch-dsl模块使用说明

create_connection(hosts=[''127.0.0.1'']):连接elasticsearch(搜索引擎)服务器方法,可以连接多台服务器
class Meta:设置索引名称和表名称
索引类名称.init(): 生成索引和表以及字段
实例化索引类.save():将数据写入elasticsearch(搜索引擎)

elasticsearch_orm.py 操作elasticsearch(搜索引擎)文件

#!/usr/bin/env python
# -*- coding:utf8 -*-
from datetime import datetime
from elasticsearch_dsl import DocType, Date, Nested, Boolean, \
    analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer

# 更多字段类型见第三百六十四节elasticsearch(搜索引擎)的mapping映射管理

from elasticsearch_dsl.connections import connections       # 导入连接elasticsearch(搜索引擎)服务器方法
connections.create_connection(hosts=[''127.0.0.1''])

class lagouType(DocType):                                                   # 自定义一个类来继承DocType类
    # Text类型需要分词,所以需要知道中文分词器,ik_max_wordwei为中文分词器
    title = Text(analyzer="ik_max_word")                                    # 设置,字段名称=字段类型,Text为字符串类型并且可以分词建立倒排索引
    description = Text(analyzer="ik_max_word")
    keywords = Text(analyzer="ik_max_word")
    url = Keyword()                                                         # 设置,字段名称=字段类型,Keyword为普通字符串类型,不分词
    riqi = Date()                                                           # 设置,字段名称=字段类型,Date日期类型

    class Meta:                                                             # Meta是固定写法
        index = "lagou"                                                     # 设置索引名称(相当于数据库名称)
        doc_type = ''biao''                                                   # 设置表名称

if __name__ == "__main__":          # 判断在本代码文件执行才执行里面的方法,其他页面调用的则不执行里面的方法
    lagouType.init()                # 生成elasticsearch(搜索引擎)的索引,表,字段等信息

# 使用方法说明:
# 在要要操作elasticsearch(搜索引擎)的页面,导入此模块
# lagou = lagouType()           #实例化类
# lagou.title = ''值''            #要写入字段=值
# lagou.description = ''值''
# lagou.keywords = ''值''
# lagou.url = ''值''
# lagou.riqi = ''值''
# lagou.save()                  #将数据写入elasticsearch(搜索引擎)

2、scrapy写入数据到elasticsearch中

爬虫文件

# -*- coding: utf-8 -*-
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from adc.items import LagouItem,LagouItemLoader  #导入items容器类,和ItemLoader类
import time

class LagouSpider(CrawlSpider):                     #创建爬虫类
    name = ''lagou''                                  #爬虫名称
    allowed_domains = [''www.luyin.org'']             #起始域名
    start_urls = [''http://www.luyin.org/'']          #起始url

    custom_settings = {
        "AUTOTHROTTLE_ENABLED": True,                             #覆盖掉settings.py里的相同设置,开启COOKIES
        "DOWNLOAD_DELAY":5
    }

    rules = (
        #配置抓取列表页规则
        Rule(LinkExtractor(allow=(''ggwa/.*'')), follow=True),

        #配置抓取内容页规则
        Rule(LinkExtractor(allow=(''post/\d+.html.*'')), callback=''parse_job'', follow=True),
    )

    def parse_job(self, response):                  #回调函数,注意:因为CrawlS模板的源码创建了parse回调函数,所以切记我们不能创建parse名称的函数
        atime = time.localtime(time.time())         #获取系统当前时间
        dqatime = "{0}-{1}-{2} {3}:{4}:{5}".format(
            atime.tm_year,
            atime.tm_mon,
            atime.tm_mday,
            atime.tm_hour,
            atime.tm_min,
            atime.tm_sec
        )  # 将格式化时间日期,单独取出来拼接成一个完整日期

        url = response.url

        item_loader = LagouItemLoader(LagouItem(), response=response)   # 将数据填充进items.py文件的LagouItem
        item_loader.add_xpath(''title'', ''/html/head/title/text()'')
        item_loader.add_xpath(''description'', ''/html/head/meta[@name="Description"]/@content'')
        item_loader.add_xpath(''keywords'', ''/html/head/meta[@name="keywords"]/@content'')
        item_loader.add_value(''url'', url)
        item_loader.add_value(''riqi'', dqatime)
        article_item = item_loader.load_item()
yield article_item

items.py文件

# -*- coding: utf-8 -*-

# Define here the models for your scraped items
#
# See documentation in:
# http://doc.scrapy.org/en/latest/topics/items.html
#items.py,文件是专门用于,接收爬虫获取到的数据信息的,就相当于是容器文件

import scrapy
from scrapy.loader.processors import MapCompose,TakeFirst
from scrapy.loader import ItemLoader                #导入ItemLoader类也就加载items容器类填充数据
from adc.models.elasticsearch_orm import lagouType  #导入elasticsearch操作模块

class LagouItemLoader(ItemLoader):                  #自定义Loader继承ItemLoader类,在爬虫页面调用这个类填充数据到Item类
    default_output_processor = TakeFirst()          #默认利用ItemLoader类,加载items容器类填充数据,是列表类型,可以通过TakeFirst()方法,获取到列表里的内容

def tianjia(value):                                 #自定义数据预处理函数
    return value                                    #将处理后的数据返给Item

class LagouItem(scrapy.Item):                       #设置爬虫获取到的信息容器类
    title = scrapy.Field(                           #接收爬虫获取到的title信息
        input_processor=MapCompose(tianjia),        #将数据预处理函数名称传入MapCompose方法里处理,数据预处理函数的形式参数value会自动接收字段title
    )
    description = scrapy.Field()
    keywords = scrapy.Field()
    url = scrapy.Field()
    riqi = scrapy.Field()

    def save_to_es(self):
        lagou = lagouType()                         # 实例化elasticsearch(搜索引擎对象)
        lagou.title = self[''title'']                 # 字段名称=值
        lagou.description = self[''description'']
        lagou.keywords = self[''keywords'']
        lagou.url = self[''url'']
        lagou.riqi = self[''riqi'']
        lagou.save()                                # 将数据写入elasticsearch(搜索引擎对象)
        return

pipelines.py文件

# -*- coding: utf-8 -*-

# Define your item pipelines here
#
# Don''t forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
from adc.models.elasticsearch_orm import lagouType  #导入elasticsearch操作模块

class AdcPipeline(object):
    def process_item(self, item, spider):

        #也可以在这里将数据写入elasticsearch搜索引擎,这里的缺点是统一处理
        # lagou = lagouType()
        # lagou.title = item[''title'']
        # lagou.description = item[''description'']
        # lagou.keywords = item[''keywords'']
        # lagou.url = item[''url'']
        # lagou.riqi = item[''riqi'']
        # lagou.save()
        item.save_to_es()       #执行items.py文件的save_to_es方法将数据写入elasticsearch搜索引擎
        return item

settings.py文件,注册pipelines

# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
   ''adc.pipelines.AdcPipeline'': 300,
}

main.py爬虫启动文件

#!/usr/bin/env python
# -*- coding:utf8 -*-

from scrapy.cmdline import execute  #导入执行scrapy命令方法
import sys
import os

sys.path.append(os.path.join(os.getcwd())) #给Python解释器,添加模块新路径 ,将main.py文件所在目录添加到Python解释器

execute([''scrapy'', ''crawl'', ''lagou'', ''--nolog''])  #执行scrapy命令

# execute([''scrapy'', ''crawl'', ''lagou''])  #执行scrapy命令

运行爬虫

image

写入elasticsearch(搜索引擎)情况

image

补充:elasticsearch-dsl  的 增删改查

#!/usr/bin/env python
# -*- coding:utf8 -*-
from datetime import datetime
from elasticsearch_dsl import DocType, Date, Nested, Boolean, \
    analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer

# 更多字段类型见第三百六十四节elasticsearch(搜索引擎)的mapping映射管理

from elasticsearch_dsl.connections import connections       # 导入连接elasticsearch(搜索引擎)服务器方法
connections.create_connection(hosts=[''127.0.0.1''])

class lagouType(DocType):                                                   # 自定义一个类来继承DocType类
    # Text类型需要分词,所以需要知道中文分词器,ik_max_wordwei为中文分词器
    title = Text(analyzer="ik_max_word")                                    # 设置,字段名称=字段类型,Text为字符串类型并且可以分词建立倒排索引
    description = Text(analyzer="ik_max_word")
    keywords = Text(analyzer="ik_max_word")
    url = Keyword()                                                         # 设置,字段名称=字段类型,Keyword为普通字符串类型,不分词
    riqi = Date()                                                           # 设置,字段名称=字段类型,Date日期类型

    class Meta:                                                             # Meta是固定写法
        index = "lagou"                                                     # 设置索引名称(相当于数据库名称)
        doc_type = ''biao''                                                   # 设置表名称

if __name__ == "__main__":          # 判断在本代码文件执行才执行里面的方法,其他页面调用的则不执行里面的方法
    lagouType.init()                # 生成elasticsearch(搜索引擎)的索引,表,字段等信息

# 使用方法说明:
# 在要要操作elasticsearch(搜索引擎)的页面,导入此模块
# lagou = lagouType()           #实例化类
# lagou.title = ''值''            #要写入字段=值
# lagou.description = ''值''
# lagou.keywords = ''值''
# lagou.url = ''值''
# lagou.riqi = ''值''
# lagou.save()                  #将数据写入elasticsearch(搜索引擎)

1新增数据

from adc.models.elasticsearch_orm import lagouType  #导入刚才配置的elasticsearch操作模块

     lagou = lagouType()                         # 实例化elasticsearch(搜索引擎对象)
     lagou._id = 1             #自定义ID,很重要,以后都是根据ID来操作

        lagou.title = self[''title'']                 # 字段名称=值
        lagou.description = self[''description'']
        lagou.keywords = self[''keywords'']
        lagou.url = self[''url'']
        lagou.riqi = self[''riqi'']
        lagou.save()                                # 将数据写入elasticsearch(搜索引擎对象)

2删除指定数据

from adc.models.elasticsearch_orm import lagouType  #导入刚才配置的elasticsearch操作模块
sousuo_orm = lagouType()                    # 实例化
sousuo_orm.get(id=1).delete()               # 删除id等于1的数据

3修改指定的数据

from adc.models.elasticsearch_orm import lagouType  #导入刚才配置的elasticsearch操作模块

sousuo_orm = lagouType()                           # 实例化
sousuo_orm.get(id=1).update(title=''123456789'')     # 修改id等于1的数据

以上全部使用elasticsearch-dsl模块

注意下面使用的原生elasticsearch模块

删除指定使用,就是相当于删除指定数据库

使用原生elasticsearch模块删除指定索引

from elasticsearch import Elasticsearch                                     # 导入原生的elasticsearch(搜索引擎)接口
client = Elasticsearch(hosts=settings.Elasticsearch_hosts)                  # 连接原生的elasticsearch

# 使用原生elasticsearch模块删除指定索引
#要做容错处理,如果索引不存在会报错
            try:
                client.indices.delete(index=''jxiou_zuopin'')
            except Exception as e:
                pass

原生查询

from elasticsearch import Elasticsearch                 # 导入原生的elasticsearch(搜索引擎)接口
            client = Elasticsearch(hosts=Elasticsearch_hosts)       # 连接原生的elasticsearch

response = client.search(                               # 原生的elasticsearch接口的search()方法,就是搜索,可以支持原生elasticsearch语句查询
                index="jxiou_zuopin",                               # 设置索引名称
                doc_type="zuopin",                                  # 设置表名称
                body={                                              # 书写elasticsearch语句
                    "query": {
                        "multi_match": {                            # multi_match查询
                            "query": sousuoci,                      # 查询关键词
                            "fields": ["title"]                     # 查询字段
                        }
                    },
                    "from": (page - 1) * tiaoshu,                   # 从第几条开始获取
                    "size": tiaoshu,                                # 获取多少条数据
                    "highlight": {                                  # 查询关键词高亮处理
                        "pre_tags": [''<span >''],    # 高亮开始标签
                        "post_tags": [''</span>''],                   # 高亮结束标签
                        "fields": {                                 # 高亮设置
                            "title": {}                             # 高亮字段
                        }
                    }
                }
            )
            # 开始获取数据
            total_nums = response["hits"]["total"]                  # 获取查询结果的总条数

            hit_list = []                                           # 设置一个列表来储存搜索到的信息,返回给html页面

            for hit in response["hits"]["hits"]:                                # 循环查询到的结果
                hit_dict = {}                                                   # 设置一个字典来储存循环结果
                if "title" in hit["highlight"]:                                 # 判断title字段,如果高亮字段有类容
                    hit_dict["title"] = "".join(hit["highlight"]["title"])      # 获取高亮里的title
                else:
                    hit_dict["title"] = hit["_source"]["title"]                 # 否则获取不是高亮里的title

                hit_dict["id"] = hit["_source"]["nid"]                          # 获取返回nid

                # 加密样音地址
                hit_dict["yangsrc"] = jia_mi(str(hit["_source"]["yangsrc"]))    # 获取返回yangsrc

                hit_list.append(hit_dict)

Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象

Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象

您需要使用发件箱模式,请参见https://debezium.io/documentation/reference/1.2/configuration/outbox-event-router.html

或者您可以使用聚合对象,请参见 https://github.com/debezium/debezium-examples/tree/master/jpa-aggregations https://github.com/debezium/debezium-examples/tree/master/kstreams-fk-join

Elasticsearch --- 向es中导入数据

Elasticsearch --- 向es中导入数据

一.从文件导入(用django启动)

def zi_dr(request):
    f = open(''c.txt'', ''r'', encoding=''utf-8'')

    action = [
        {
            "_index": "c12",
            "_type": "doc",
            "_source": {
                "title": i.strip(),    #   去掉 空

            }
        } for i in f]

    s = time.time()
    helpers.bulk(es, action)
    print(time.time() - s)

    return HttpResponse("ok")

 

二. 从数据库中导入(用django启动)



def es2(request):

  query_obj = models.Article.objects.all() action = ( { "_index": "s18", "_type": "doc", "_source": { "title": i.title, "summary": i.summary, "a_url": i.a_url, "img_url": i.img_url, "tags": i.tags } } for i in query_obj) # print(action, next(action)) import time s = time.time() helpers.bulk(es, action) print(time.time() - s) return HttpResponse(''OK'')

 

elasticsearch 局部更新数据

elasticsearch 局部更新数据

局部变量的修改 查看文档信息:

duanlshdeMacBook-Pro:bin duanlsh$ curl -i -Xget ''http://localhost:9200/website/blog/1?pretty''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 195

{
  "_index" : "website",
  "_type" : "blog",
  "_id" : "1",
  "_version" : 5,
  "found" : true,
  "_source" : {
    "title" : "this is my external blog",
    "tags" : [
      "test"
    ]
  }
}

新增某一个参数文档信息

curl -i -XPOST ''http://localhost:9200/website/blog/_update?pretty'' -d ''{"doc":{"views":0}}''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 208

{
  "_index" : "website",
  "_type" : "blog",
  "_id" : "_update",
  "_version" : 2,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "created" : false
}

修改字段:

(1)

duanlshdeMacBook-Pro:bin duanlsh$ curl -i -XPOST ''http://localhost:9200/website/blog/1/_update?pretty'' -d ''{"script":"ctx._source.views+=1"}''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 181

{
  "_index" : "website",
  "_type" : "blog",
  "_id" : "1",
  "_version" : 7,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  }
}

(2)

duanlshdeMacBook-Pro:bin duanlsh$ curl -i -XGET ''http://localhost:9200/website/blog/1?pretty''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 214

{
  "_index" : "website",
  "_type" : "blog",
  "_id" : "1",
  "_version" : 10,
  "found" : true,
  "_source" : {
    "title" : "this is my external blog",
    "tags" : [
      "test"
    ],
    "views" : 10
  }
}

删除字段:其中 remove 中的字段必须的为 \" 转译一下

duanlshdeMacBook-Pro:bin duanlsh$ curl -i -XPOST ''http://localhost:9200/website/blog/1/_update?pretty'' -d ''{"script":"ctx._source.remove(\"views\")"}''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 182

{
  "_index" : "website",
  "_type" : "blog",
  "_id" : "1",
  "_version" : 11,
  "result" : "updated",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  }
}

retry_on_conflict 表示重试;

elasticsearch 中如果有多个操作修改某一个值的时候,会检查版本信息,如果版本信息不一致,则设置 retry_on_conflict=5 重新获取几次信息,然后进行更新;类似数据中的乐观锁

关于在Elasticsearch中导入和更新数据elasticsearch如何导入数据的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于46、elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中、Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象、Elasticsearch --- 向es中导入数据、elasticsearch 局部更新数据的相关知识,请在本站寻找。

本文标签: