1、安装模块
pip install elasticsearch
2、创建工具包 ,再其下创建工具类
from elasticsearch import Elasticsearch, NotFoundError
from elasticsearch.helpers import bulk
class ElasticsearchUtil:
def __init__(self, host="localhost", port=9200):
# 多节点 需要放到 hosts 数组中
self.es = Elasticsearch(hosts=[f"http://{host}:{port}"])
def __del__(self):
self.es.close()
# 查询版本号
def get_version(self):
info = self.es.info()
print(f"elasticsearch server info: {info}")
return self.es.info()['version']['number']
# 查询所有索引
def get_all_indices(self):
try:
indices = self.es.indices.get_alias(index='*')
return indices
except Exception as e:
print(f"获取索引时发生错误: {e}")
return None
# 创建索引,指定索引语句
def create_index(self, index, mappings=None, settings=None):
try:
self.es.indices.create(index=index, mappings=mappings, settings=settings)
return True
except Exception as e:
print(f"创建索引 {index} 时发生错误: {e}")
return False
# 删除索引。
def delete_index(self, index):
try:
self.es.indices.delete(index=index)
return True
except Exception as e:
print(f"删除索引 {index} 时发生错误: {e}")
return False
# 通过 id 查询文档
def get_document_by_id(self, index, documentId):
try:
result = self.es.get(index=index, id=documentId)
return result['_source']
except NotFoundError:
print(f"未找到索引 {index} 中的文档,ID: {documentId}")
return None
except Exception as e:
print(f"获取索引 {index} 中文档时发生错误: {e}")
return None
# 通过查询条件检索文档
def get_document_by_query(self, index, query):
try:
result = self.es.search(index=index, body={"query": query})
return result['hits']['hits']
except Exception as e:
print(f"在索引 {index} 中搜索时发生错误: {e}")
return None
# 添加文档
def add_document(self, index, documentId, document):
try:
self.es.index(index=index, id=documentId, body=document)
print(f"文档添加到索引 {index} 成功,文档 ID: {documentId}")
return True
except Exception as e:
print(f"在索引 {index} 时添加文档失败,错误: {e}")
return False
# 批量添加文档
def batch_add_document(self, index, documents):
try:
success, _ = bulk(self.es, documents)
print(f'在索引{index}时批量添加文档数量: {success}')
return True
except Exception as e:
print(f"在索引 {index} 时添加文档失败,错误: {e}")
return False
# 更新文档
def update_document_by_id(self, index, documentId, document):
try:
self.es.update(index=index, id=documentId, body={"doc": document})
return True
except Exception as e:
print(f"在索引 {index} 中删除文档错误: {e}")
return False
# 通过 id 删除文档
def delete_document(self, index, documentId):
try:
self.es.delete(index=index, id=documentId)
return True
except Exception as e:
print(f"在索引 {index} 中删除文档错误: {e}")
return False
3、使用工具类测试
from utils.ElasticSearchUtil import ElasticsearchUtil
client = ElasticsearchUtil(host="127.0.0.1", port=9200)
# 查看版本
# client.get_version()
# 获取所有索引
# indices = client.get_all_indices()
# for ind in indices:
# print(ind)
# 创建一个新的索引
index = "rick"
# mapping 映射
mappings = {
"properties": {
"id": {
"type": "long"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"title": {
"type": "keyword"
},
"content": {
"type": "text",
},
"grade": {
"type": "long"
},
"sex": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"subject": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
# res = client.create_index(index=index, document=doc)
# res = client.delete_index(index=index)
# res = client.create_index(index=index, mappings=doc)
# print(res)
#
# res = client.create_index(index, mappings=mappings)
# print(res)
# 添加文档
# document = {
# "title": "rick",
# "content": "测试文档."
# }
# res = client.add_document(index, documentId=1, document=document)
# print(res)
# 更新文档
# document = {
# "title": "rick~"
# }
# res = client.update_document_by_id(index, documentId=1, document=document)
# print(res)
# 查询文档
# res = client.get_document_by_id(index, documentId=1)
# print(res)
# query = {
# "match_all": {}
# }
# res = client.get_document_by_query(index, query=query)
# print(res)
# 删除文档
res = client.delete_document(index, documentId=1)
print(res)