使用Python对ElasticSearch获取数据及操作,供大家参考,具体内容如下
Version
Python :2.7
ElasticSearch:6.3
代码:
#!/usr/bin/env python # -*- coding: utf-8 -*- """ @Time : 2018/7/4 @Author : LiuXueWen @Site : @File : ElasticSearchOperation.py @Software: PyCharm @Description: 对elasticsearch数据的操作,包括获取数据,发送数据 """ import elasticsearch import json import Util_Ini_Operation class elasticsearch_data(): def __init__(self,hosts,username,password,maxsize,is_ssl): # 初始化ini操作脚本,获取配置文件 try: # 判断请求方式是否ssl加密 if is_ssl == "true": # 获取证书地址 cert_pem = Util_Ini_Operation.get_ini("config.ini").get_key_value("certs","certs") es_ssl = elasticsearch.Elasticsearch( # 地址 hosts=hosts, # 用户名密码 http_auth=(username,password), # 开启ssl use_ssl=True, # 确认有加密证书 verify_certs=True, # 对应的加密证书地址 client_cert=cert_pem ) self.es = es_ssl elif is_ssl == "false": # 创建普通类型的ES客户端 es_ordinary = elasticsearch.Elasticsearch(hosts, http_auth=(username, password), maxsize=int(maxsize)) self.es = es_ordinary except Exception as e: print(e) def query_data(self,keywords_list,date): gte = "now-"+str(date) query_data = { # 查询语句 "query": { "bool": { "must": [ { "query_string": { "query": keywords_list, "analyze_wildcard": True } }, { "range": { "@timestamp": { "gte": gte, "lte": "now", "format": "epoch_millis" } } } ], "must_not": [] } } } return query_data # 从es获取数据 def get_datas_by_query(self,index_name,keywords,param,date): ''' :param index_name: 索引名称 :param keywords: 关键字词,数组 :param param: 需要数据条件,例如_source :param date: 过去时间范围,字符串格式,例如过去30分钟内数据,"30m" :return: all_datas 返回查询到的所有数据(已经过param过滤) ''' all_datas = [] # 遍历所有的查询条件 for keywords_list in keywords: # DSL语句 query_data = self.query_data(keywords_list,date) res = self.es.search( index=index_name, body=query_data ) for hit in res['hits']['hits']: # 获取指定的内容 response = hit[param] # 添加所有数据到数据集中 all_datas.append(response) # 返回所有数据内容 return all_datas # 当索引不存在创建索引 def create_index(self,index_name): ''' :param index_name: 索引名称 :return:如果创建成功返回创建结果信息,试过已经存在创建新的index失败返回index的名称 ''' # 获取索引的映射 # index_mapping = IndexMapping.index_mapping # # 判断索引是否存在 # if self.es.indices.exists(index=index_name) is not True: # # 创建索引 # res = self.es.indices.create(index=index_name,body=index_mapping) # # 返回结果 # return res # else: # # 返回索引名称 # return index_name pass # 插入指定的单条数据内容 def insert_single_data(self,index_name,doc_type,data): ''' :param index_name: 索引名称 :param doc_type: 文档类型 :param data: 需要插入的数据内容 :return: 执行结果 ''' res = self.es.index(index=index_name,doc_type=doc_type,body=data) return res # 向ES中新增数据,批量插入 def insert_datas(self,index_name): ''' :desc 通过读取指定的文件内容获取需要插入的数据集 :param index_name: 索引名称 :return: 插入成功的数据条数 ''' insert_datas = [] # 判断插入数据的索引是否存在 self.createIndex(index_name=index_name) # 获取插入数据的文件地址 data_file_path = self.ini.get_key_value("datafile","datafilepath") # 获取需要插入的数据集 with open(data_file_path,"r+") as data_file: # 获取文件所有数据 data_lines = data_file.readlines() for data_line in data_lines: # string to json data_line = json.loads(data_line) insert_datas.append(data_line) # 批量处理 res = self.es.bulk(index=index_name,body=insert_datas,raise_on_error=True) return res # 从ES中在指定的索引中删除指定数据(根据id判断) def delete_data_by_id(self,index_name,doc_type,id): ''' :param index_name: 索引名称 :param index_type: 文档类型 :param id: 唯一标识id :return: 删除结果信息 ''' res = self.es.delete(index=index_name,doc_type=doc_type,id=id) return res # 根据条件删除数据 def delete_data_by_query(self,index_name,doc_type,param,gt_time,lt_time): ''' :param index_name:索引名称,为空查询所有索引 :param doc_type:文档类型,为空查询所有文档类型 :param param:过滤条件值 :param gt_time:时间范围,大于该时间 :param lt_time:时间范围,小于该时间 :return:执行条件删除后的结果信息 ''' # DSL语句 query_data = { # 查询语句 "query": { "bool": { "must": [ { "query_string": { "query": param, "analyze_wildcard": True } }, { "range": { "@timestamp": { "gte": gt_time, "lte": lt_time, "format": "epoch_millis" } } } ], "must_not": [] } } } res = self.es.delete_by_query(index=index_name,doc_type=doc_type,body=query_data,_source=True) return res # 指定index中删除指定时间段内的全部数据 def delete_all_datas(self,index_name,doc_type,gt_time,lt_time): ''' :param index_name:索引名称,为空查询所有索引 :param doc_type:文档类型,为空查询所有文档类型 :param gt_time:时间范围,大于该时间 :param lt_time:时间范围,小于该时间 :return:执行条件删除后的结果信息 ''' # DSL语句 query_data = { # 查询语句 "query": { "bool": { "must": [ { "match_all": {} }, { "range": { "@timestamp": { "gte": gt_time, "lte": lt_time, "format": "epoch_millis" } } } ], "must_not": [] } } } res = self.es.delete_by_query(index=index_name, doc_type=doc_type, body=query_data, _source=True) return res # 修改ES中指定的数据 def update_data_by_id(self,index_name,doc_type,id,data): ''' :param index_name: 索引名称 :param doc_type: 文档类型,为空表示所有类型 :param id: 文档唯一标识编号 :param data: 更新的数据 :return: 更新结果信息 ''' res = self.es.update(index=index_name,doc_type=doc_type,id=id,body=data) return res
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
广告合作:本站广告合作请联系QQ:858582 申请时备注:广告合作(否则不回)
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
免责声明:本站资源来自互联网收集,仅供用于学习和交流,请遵循相关法律法规,本站一切资源不代表本站立场,如有侵权、后门、不妥请联系本站删除!
暂无评论...
稳了!魔兽国服回归的3条重磅消息!官宣时间再确认!
昨天有一位朋友在大神群里分享,自己亚服账号被封号之后居然弹出了国服的封号信息对话框。
这里面让他访问的是一个国服的战网网址,com.cn和后面的zh都非常明白地表明这就是国服战网。
而他在复制这个网址并且进行登录之后,确实是网易的网址,也就是我们熟悉的停服之后国服发布的暴雪游戏产品运营到期开放退款的说明。这是一件比较奇怪的事情,因为以前都没有出现这样的情况,现在突然提示跳转到国服战网的网址,是不是说明了简体中文客户端已经开始进行更新了呢?
更新日志
2024年11月26日
2024年11月26日
- 凤飞飞《我们的主题曲》飞跃制作[正版原抓WAV+CUE]
- 刘嘉亮《亮情歌2》[WAV+CUE][1G]
- 红馆40·谭咏麟《歌者恋歌浓情30年演唱会》3CD[低速原抓WAV+CUE][1.8G]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[320K/MP3][193.25MB]
- 【轻音乐】曼托凡尼乐团《精选辑》2CD.1998[FLAC+CUE整轨]
- 邝美云《心中有爱》1989年香港DMIJP版1MTO东芝首版[WAV+CUE]
- 群星《情叹-发烧女声DSD》天籁女声发烧碟[WAV+CUE]
- 刘纬武《睡眠宝宝竖琴童谣 吉卜力工作室 白噪音安抚》[FLAC/分轨][748.03MB]
- 理想混蛋《Origin Sessions》[320K/MP3][37.47MB]
- 公馆青少年《我其实一点都不酷》[320K/MP3][78.78MB]
- 群星《情叹-发烧男声DSD》最值得珍藏的完美男声[WAV+CUE]
- 群星《国韵飘香·贵妃醉酒HQCD黑胶王》2CD[WAV]
- 卫兰《DAUGHTER》【低速原抓WAV+CUE】
- 公馆青少年《我其实一点都不酷》[FLAC/分轨][398.22MB]
- ZWEI《迟暮的花 (Explicit)》[320K/MP3][57.16MB]