在本文中,您将会了解到关于在Elasticsearch中导入和更新数据的新资讯,同时我们还将为您解释elasticsearch如何导入数据的相关在本文中,我们将带你探索在Elasticsearch中导
在本文中,您将会了解到关于在Elasticsearch中导入和更新数据的新资讯,同时我们还将为您解释elasticsearch如何导入数据的相关在本文中,我们将带你探索在Elasticsearch中导入和更新数据的奥秘,分析elasticsearch如何导入数据的特点,并给出一些关于46、elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中、Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象、Elasticsearch --- 向es中导入数据、elasticsearch 局部更新数据的实用技巧。
本文目录一览:- 在Elasticsearch中导入和更新数据(elasticsearch如何导入数据)
- 46、elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中
- Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象
- Elasticsearch --- 向es中导入数据
- elasticsearch 局部更新数据
在Elasticsearch中导入和更新数据(elasticsearch如何导入数据)
我们有一个现有的搜索功能,该功能涉及SQL
Server中多个表之间的数据。这给我们的数据库造成了沉重的负担,因此我试图寻找一种更好的方式来搜索这些数据(它不会经常更改)。我与Logstash和Elasticsearch一起工作了大约一个星期,使用包含120万条记录的导入。我的问题本质上是“如何使用“主键”更新现有文档”?
CSV数据文件(以竖线分隔)如下所示:
369|90045|123 ABC ST|LOS ANGELES|CA368|90045|PVKA0010|LA|CA367|90012|20000 Venice Boulvd|Los Angeles|CA365|90045|ABC ST 123|LOS ANGELES|CA363|90045|ADHOCTESTPROPERTY|DALES|CA
我的logstash配置如下所示:
input { stdin { type => "stdin-type" } file { path => ["C:/Data/sample/*"] start_position => "beginning" }}filter { csv { columns => ["property_id","postal_code","address_1","city","state_code"] separator => "|" }}output { elasticsearch { embedded => true index => "samples4" index_type => "sample" }}
然后,elasticsearch中的文档如下所示:
{ "_index": "samples4", "_type": "sample", "_id": "64Dc0_1eQ3uSln_k-4X26A", "_score": 1.4054651, "_source": { "message": [ "369|90045|123 ABC ST|LOS ANGELES|CA\r" ], "@version": "1", "@timestamp": "2014-02-11T22:58:38.365Z", "host": "[host]", "path": "C:/Data/sample/sample.csv", "property_id": "369", "postal_code": "90045", "address_1": "123 ABC ST", "city": "LOS ANGELES", "state_code": "CA"}
我 想 将_id
字段中的唯一ID
替换为的值property_id
。这个想法是,后续数据文件将包含更新。我不需要保留以前的版本,也不会出现我们在文档中添加或删除键的情况。
document_id
elasticsearch输出的设置不会将该字段的值放入其中_id
(它只是放在“
property_id”中,并且仅存储/更新了一个文档)。我知道我在这里想念什么。我只是采取了错误的方法吗?
编辑:工作!
使用@rutter的建议,我将output
配置更新为: ``
output { elasticsearch { embedded => true index => "samples6" index_type => "sample" document_id => "%{property_id}" }}
现在,通过按预期将新文件放入数据文件夹来更新文档。_id
和property_id
是相同的值。 ``
{ "_index": "samples6", "_type": "sample", "_id": "351", "_score": 1, "_source": { "message": [ "351|90045|Easy as 123 ST|LOS ANGELES|CA\r" ], "@version": "1", "@timestamp": "2014-02-12T16:12:52.102Z", "host": "TXDFWL3474", "path": "C:/Data/sample/sample_update_3.csv", "property_id": "351", "postal_code": "90045", "address_1": "Easy as 123 ST", "city": "LOS ANGELES", "state_code": "CA"}
答案1
小编典典从评论转换:
您可以通过发送另一个具有相同ID的文档来覆盖文档…但是对于以前的数据,这可能会有些棘手,因为默认情况下会获得随机ID。
您可以使用输出插件的document_id
field设置ID
,但是它使用文字字符串,而不是字段名称。要使用字段的内容,可以使用sprintf格式的字符串,例如%{property_id}
。
这样的事情,例如:
output { elasticsearch { ... other settings... document_id => "%{property_id}" }}
46、elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中
【百度云搜索,搜各种资料:http://www.lqkweb.com】
【搜网盘,搜各种资料:http://www.swpan.cn】
前面我们讲到的elasticsearch(搜索引擎)操作,如:增、删、改、查等操作都是用的elasticsearch的语言命令,就像sql命令一样,当然elasticsearch官方也提供了一个python操作elasticsearch(搜索引擎)的接口包,就像sqlalchemy操作数据库一样的ORM框,这样我们操作elasticsearch就不用写命令了,用elasticsearch-dsl-py这个模块来操作,也就是用python的方式操作一个类即可
elasticsearch-dsl-py下载
下载地址:https://github.com/elastic/el...
文档说明:http://elasticsearch-dsl.read...
首先安装好elasticsearch-dsl-py模块
1、elasticsearch-dsl模块使用说明
create_connection(hosts=[''127.0.0.1'']):连接elasticsearch(搜索引擎)服务器方法,可以连接多台服务器
class Meta:设置索引名称和表名称
索引类名称.init(): 生成索引和表以及字段
实例化索引类.save():将数据写入elasticsearch(搜索引擎)
elasticsearch_orm.py 操作elasticsearch(搜索引擎)文件
#!/usr/bin/env python
# -*- coding:utf8 -*-
from datetime import datetime
from elasticsearch_dsl import DocType, Date, Nested, Boolean, \
analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer
# 更多字段类型见第三百六十四节elasticsearch(搜索引擎)的mapping映射管理
from elasticsearch_dsl.connections import connections # 导入连接elasticsearch(搜索引擎)服务器方法
connections.create_connection(hosts=[''127.0.0.1''])
class lagouType(DocType): # 自定义一个类来继承DocType类
# Text类型需要分词,所以需要知道中文分词器,ik_max_wordwei为中文分词器
title = Text(analyzer="ik_max_word") # 设置,字段名称=字段类型,Text为字符串类型并且可以分词建立倒排索引
description = Text(analyzer="ik_max_word")
keywords = Text(analyzer="ik_max_word")
url = Keyword() # 设置,字段名称=字段类型,Keyword为普通字符串类型,不分词
riqi = Date() # 设置,字段名称=字段类型,Date日期类型
class Meta: # Meta是固定写法
index = "lagou" # 设置索引名称(相当于数据库名称)
doc_type = ''biao'' # 设置表名称
if __name__ == "__main__": # 判断在本代码文件执行才执行里面的方法,其他页面调用的则不执行里面的方法
lagouType.init() # 生成elasticsearch(搜索引擎)的索引,表,字段等信息
# 使用方法说明:
# 在要要操作elasticsearch(搜索引擎)的页面,导入此模块
# lagou = lagouType() #实例化类
# lagou.title = ''值'' #要写入字段=值
# lagou.description = ''值''
# lagou.keywords = ''值''
# lagou.url = ''值''
# lagou.riqi = ''值''
# lagou.save() #将数据写入elasticsearch(搜索引擎)
2、scrapy写入数据到elasticsearch中
爬虫文件
# -*- coding: utf-8 -*-
import scrapy
from scrapy.linkextractors import LinkExtractor
from scrapy.spiders import CrawlSpider, Rule
from adc.items import LagouItem,LagouItemLoader #导入items容器类,和ItemLoader类
import time
class LagouSpider(CrawlSpider): #创建爬虫类
name = ''lagou'' #爬虫名称
allowed_domains = [''www.luyin.org''] #起始域名
start_urls = [''http://www.luyin.org/''] #起始url
custom_settings = {
"AUTOTHROTTLE_ENABLED": True, #覆盖掉settings.py里的相同设置,开启COOKIES
"DOWNLOAD_DELAY":5
}
rules = (
#配置抓取列表页规则
Rule(LinkExtractor(allow=(''ggwa/.*'')), follow=True),
#配置抓取内容页规则
Rule(LinkExtractor(allow=(''post/\d+.html.*'')), callback=''parse_job'', follow=True),
)
def parse_job(self, response): #回调函数,注意:因为CrawlS模板的源码创建了parse回调函数,所以切记我们不能创建parse名称的函数
atime = time.localtime(time.time()) #获取系统当前时间
dqatime = "{0}-{1}-{2} {3}:{4}:{5}".format(
atime.tm_year,
atime.tm_mon,
atime.tm_mday,
atime.tm_hour,
atime.tm_min,
atime.tm_sec
) # 将格式化时间日期,单独取出来拼接成一个完整日期
url = response.url
item_loader = LagouItemLoader(LagouItem(), response=response) # 将数据填充进items.py文件的LagouItem
item_loader.add_xpath(''title'', ''/html/head/title/text()'')
item_loader.add_xpath(''description'', ''/html/head/meta[@name="Description"]/@content'')
item_loader.add_xpath(''keywords'', ''/html/head/meta[@name="keywords"]/@content'')
item_loader.add_value(''url'', url)
item_loader.add_value(''riqi'', dqatime)
article_item = item_loader.load_item()
yield article_item
items.py文件
# -*- coding: utf-8 -*-
# Define here the models for your scraped items
#
# See documentation in:
# http://doc.scrapy.org/en/latest/topics/items.html
#items.py,文件是专门用于,接收爬虫获取到的数据信息的,就相当于是容器文件
import scrapy
from scrapy.loader.processors import MapCompose,TakeFirst
from scrapy.loader import ItemLoader #导入ItemLoader类也就加载items容器类填充数据
from adc.models.elasticsearch_orm import lagouType #导入elasticsearch操作模块
class LagouItemLoader(ItemLoader): #自定义Loader继承ItemLoader类,在爬虫页面调用这个类填充数据到Item类
default_output_processor = TakeFirst() #默认利用ItemLoader类,加载items容器类填充数据,是列表类型,可以通过TakeFirst()方法,获取到列表里的内容
def tianjia(value): #自定义数据预处理函数
return value #将处理后的数据返给Item
class LagouItem(scrapy.Item): #设置爬虫获取到的信息容器类
title = scrapy.Field( #接收爬虫获取到的title信息
input_processor=MapCompose(tianjia), #将数据预处理函数名称传入MapCompose方法里处理,数据预处理函数的形式参数value会自动接收字段title
)
description = scrapy.Field()
keywords = scrapy.Field()
url = scrapy.Field()
riqi = scrapy.Field()
def save_to_es(self):
lagou = lagouType() # 实例化elasticsearch(搜索引擎对象)
lagou.title = self[''title''] # 字段名称=值
lagou.description = self[''description'']
lagou.keywords = self[''keywords'']
lagou.url = self[''url'']
lagou.riqi = self[''riqi'']
lagou.save() # 将数据写入elasticsearch(搜索引擎对象)
return
pipelines.py文件
# -*- coding: utf-8 -*-
# Define your item pipelines here
#
# Don''t forget to add your pipeline to the ITEM_PIPELINES setting
# See: http://doc.scrapy.org/en/latest/topics/item-pipeline.html
from adc.models.elasticsearch_orm import lagouType #导入elasticsearch操作模块
class AdcPipeline(object):
def process_item(self, item, spider):
#也可以在这里将数据写入elasticsearch搜索引擎,这里的缺点是统一处理
# lagou = lagouType()
# lagou.title = item[''title'']
# lagou.description = item[''description'']
# lagou.keywords = item[''keywords'']
# lagou.url = item[''url'']
# lagou.riqi = item[''riqi'']
# lagou.save()
item.save_to_es() #执行items.py文件的save_to_es方法将数据写入elasticsearch搜索引擎
return item
settings.py文件,注册pipelines
# Configure item pipelines
# See http://scrapy.readthedocs.org/en/latest/topics/item-pipeline.html
ITEM_PIPELINES = {
''adc.pipelines.AdcPipeline'': 300,
}
main.py爬虫启动文件
#!/usr/bin/env python
# -*- coding:utf8 -*-
from scrapy.cmdline import execute #导入执行scrapy命令方法
import sys
import os
sys.path.append(os.path.join(os.getcwd())) #给Python解释器,添加模块新路径 ,将main.py文件所在目录添加到Python解释器
execute([''scrapy'', ''crawl'', ''lagou'', ''--nolog'']) #执行scrapy命令
# execute([''scrapy'', ''crawl'', ''lagou'']) #执行scrapy命令
运行爬虫
写入elasticsearch(搜索引擎)情况
补充:elasticsearch-dsl 的 增删改查
#!/usr/bin/env python
# -*- coding:utf8 -*-
from datetime import datetime
from elasticsearch_dsl import DocType, Date, Nested, Boolean, \
analyzer, InnerObjectWrapper, Completion, Keyword, Text, Integer
# 更多字段类型见第三百六十四节elasticsearch(搜索引擎)的mapping映射管理
from elasticsearch_dsl.connections import connections # 导入连接elasticsearch(搜索引擎)服务器方法
connections.create_connection(hosts=[''127.0.0.1''])
class lagouType(DocType): # 自定义一个类来继承DocType类
# Text类型需要分词,所以需要知道中文分词器,ik_max_wordwei为中文分词器
title = Text(analyzer="ik_max_word") # 设置,字段名称=字段类型,Text为字符串类型并且可以分词建立倒排索引
description = Text(analyzer="ik_max_word")
keywords = Text(analyzer="ik_max_word")
url = Keyword() # 设置,字段名称=字段类型,Keyword为普通字符串类型,不分词
riqi = Date() # 设置,字段名称=字段类型,Date日期类型
class Meta: # Meta是固定写法
index = "lagou" # 设置索引名称(相当于数据库名称)
doc_type = ''biao'' # 设置表名称
if __name__ == "__main__": # 判断在本代码文件执行才执行里面的方法,其他页面调用的则不执行里面的方法
lagouType.init() # 生成elasticsearch(搜索引擎)的索引,表,字段等信息
# 使用方法说明:
# 在要要操作elasticsearch(搜索引擎)的页面,导入此模块
# lagou = lagouType() #实例化类
# lagou.title = ''值'' #要写入字段=值
# lagou.description = ''值''
# lagou.keywords = ''值''
# lagou.url = ''值''
# lagou.riqi = ''值''
# lagou.save() #将数据写入elasticsearch(搜索引擎)
1新增数据
from adc.models.elasticsearch_orm import lagouType #导入刚才配置的elasticsearch操作模块
lagou = lagouType() # 实例化elasticsearch(搜索引擎对象)
lagou._id = 1 #自定义ID,很重要,以后都是根据ID来操作
lagou.title = self[''title''] # 字段名称=值
lagou.description = self[''description'']
lagou.keywords = self[''keywords'']
lagou.url = self[''url'']
lagou.riqi = self[''riqi'']
lagou.save() # 将数据写入elasticsearch(搜索引擎对象)
2删除指定数据
from adc.models.elasticsearch_orm import lagouType #导入刚才配置的elasticsearch操作模块
sousuo_orm = lagouType() # 实例化
sousuo_orm.get(id=1).delete() # 删除id等于1的数据
3修改指定的数据
from adc.models.elasticsearch_orm import lagouType #导入刚才配置的elasticsearch操作模块
sousuo_orm = lagouType() # 实例化
sousuo_orm.get(id=1).update(title=''123456789'') # 修改id等于1的数据
以上全部使用elasticsearch-dsl模块
注意下面使用的原生elasticsearch模块
删除指定使用,就是相当于删除指定数据库
使用原生elasticsearch模块删除指定索引
from elasticsearch import Elasticsearch # 导入原生的elasticsearch(搜索引擎)接口
client = Elasticsearch(hosts=settings.Elasticsearch_hosts) # 连接原生的elasticsearch
# 使用原生elasticsearch模块删除指定索引
#要做容错处理,如果索引不存在会报错
try:
client.indices.delete(index=''jxiou_zuopin'')
except Exception as e:
pass
原生查询
from elasticsearch import Elasticsearch # 导入原生的elasticsearch(搜索引擎)接口
client = Elasticsearch(hosts=Elasticsearch_hosts) # 连接原生的elasticsearch
response = client.search( # 原生的elasticsearch接口的search()方法,就是搜索,可以支持原生elasticsearch语句查询
index="jxiou_zuopin", # 设置索引名称
doc_type="zuopin", # 设置表名称
body={ # 书写elasticsearch语句
"query": {
"multi_match": { # multi_match查询
"query": sousuoci, # 查询关键词
"fields": ["title"] # 查询字段
}
},
"from": (page - 1) * tiaoshu, # 从第几条开始获取
"size": tiaoshu, # 获取多少条数据
"highlight": { # 查询关键词高亮处理
"pre_tags": [''<span >''], # 高亮开始标签
"post_tags": [''</span>''], # 高亮结束标签
"fields": { # 高亮设置
"title": {} # 高亮字段
}
}
}
)
# 开始获取数据
total_nums = response["hits"]["total"] # 获取查询结果的总条数
hit_list = [] # 设置一个列表来储存搜索到的信息,返回给html页面
for hit in response["hits"]["hits"]: # 循环查询到的结果
hit_dict = {} # 设置一个字典来储存循环结果
if "title" in hit["highlight"]: # 判断title字段,如果高亮字段有类容
hit_dict["title"] = "".join(hit["highlight"]["title"]) # 获取高亮里的title
else:
hit_dict["title"] = hit["_source"]["title"] # 否则获取不是高亮里的title
hit_dict["id"] = hit["_source"]["nid"] # 获取返回nid
# 加密样音地址
hit_dict["yangsrc"] = jia_mi(str(hit["_source"]["yangsrc"])) # 获取返回yangsrc
hit_list.append(hit_dict)
Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象
您需要使用发件箱模式,请参见https://debezium.io/documentation/reference/1.2/configuration/outbox-event-router.html
或者您可以使用聚合对象,请参见 https://github.com/debezium/debezium-examples/tree/master/jpa-aggregations https://github.com/debezium/debezium-examples/tree/master/kstreams-fk-join
Elasticsearch --- 向es中导入数据
一.从文件导入(用django启动)
def zi_dr(request):
f = open(''c.txt'', ''r'', encoding=''utf-8'')
action = [
{
"_index": "c12",
"_type": "doc",
"_source": {
"title": i.strip(), # 去掉 空
}
} for i in f]
s = time.time()
helpers.bulk(es, action)
print(time.time() - s)
return HttpResponse("ok")
二. 从数据库中导入(用django启动)
def es2(request):
query_obj = models.Article.objects.all()
action = (
{
"_index": "s18",
"_type": "doc",
"_source": {
"title": i.title,
"summary": i.summary,
"a_url": i.a_url,
"img_url": i.img_url,
"tags": i.tags
}
} for i in query_obj)
# print(action, next(action))
import time
s = time.time()
helpers.bulk(es, action)
print(time.time() - s)
return HttpResponse(''OK'')
elasticsearch 局部更新数据
局部变量的修改 查看文档信息:
duanlshdeMacBook-Pro:bin duanlsh$ curl -i -Xget ''http://localhost:9200/website/blog/1?pretty''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 195
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"_version" : 5,
"found" : true,
"_source" : {
"title" : "this is my external blog",
"tags" : [
"test"
]
}
}
新增某一个参数文档信息
curl -i -XPOST ''http://localhost:9200/website/blog/_update?pretty'' -d ''{"doc":{"views":0}}''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 208
{
"_index" : "website",
"_type" : "blog",
"_id" : "_update",
"_version" : 2,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"created" : false
}
修改字段:
(1)
duanlshdeMacBook-Pro:bin duanlsh$ curl -i -XPOST ''http://localhost:9200/website/blog/1/_update?pretty'' -d ''{"script":"ctx._source.views+=1"}''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 181
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"_version" : 7,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
}
}
(2)
duanlshdeMacBook-Pro:bin duanlsh$ curl -i -XGET ''http://localhost:9200/website/blog/1?pretty''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 214
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"_version" : 10,
"found" : true,
"_source" : {
"title" : "this is my external blog",
"tags" : [
"test"
],
"views" : 10
}
}
删除字段:其中 remove 中的字段必须的为 \" 转译一下
duanlshdeMacBook-Pro:bin duanlsh$ curl -i -XPOST ''http://localhost:9200/website/blog/1/_update?pretty'' -d ''{"script":"ctx._source.remove(\"views\")"}''
HTTP/1.1 200 OK
content-type: application/json; charset=UTF-8
content-length: 182
{
"_index" : "website",
"_type" : "blog",
"_id" : "1",
"_version" : 11,
"result" : "updated",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
}
}
retry_on_conflict 表示重试;
elasticsearch 中如果有多个操作修改某一个值的时候,会检查版本信息,如果版本信息不一致,则设置 retry_on_conflict=5 重新获取几次信息,然后进行更新;类似数据中的乐观锁
关于在Elasticsearch中导入和更新数据和elasticsearch如何导入数据的介绍现已完结,谢谢您的耐心阅读,如果想了解更多关于46、elasticsearch(搜索引擎)scrapy写入数据到elasticsearch中、Debezium Postgres和ElasticSearch-在ElasticSearch中存储复杂对象、Elasticsearch --- 向es中导入数据、elasticsearch 局部更新数据的相关知识,请在本站寻找。
本文标签: