四时宝库

程序员的知识宝库

python操作 elasticsearch 工具类

1、安装模块

pip install elasticsearch

2、创建工具包 ,再其下创建工具类

from elasticsearch import Elasticsearch, NotFoundError
from elasticsearch.helpers import bulk

class ElasticsearchUtil:

    def __init__(self, host="localhost", port=9200):
        # 多节点 需要放到 hosts 数组中
        self.es = Elasticsearch(hosts=[f"http://{host}:{port}"])

    def __del__(self):
        self.es.close()

    # 查询版本号
    def get_version(self):
        info = self.es.info()
        print(f"elasticsearch server info: {info}")
        return self.es.info()['version']['number']

    # 查询所有索引
    def get_all_indices(self):
        try:
            indices = self.es.indices.get_alias(index='*')
            return indices
        except Exception as e:
            print(f"获取索引时发生错误: {e}")
            return None

    # 创建索引,指定索引语句
    def create_index(self, index, mappings=None, settings=None):
        try:
            self.es.indices.create(index=index, mappings=mappings, settings=settings)
            return True
        except Exception as e:
            print(f"创建索引 {index} 时发生错误: {e}")
            return False

    # 删除索引。
    def delete_index(self, index):
        try:
            self.es.indices.delete(index=index)
            return True
        except Exception as e:
            print(f"删除索引 {index} 时发生错误: {e}")
        return False

    # 通过 id 查询文档
    def get_document_by_id(self, index, documentId):
        try:
            result = self.es.get(index=index, id=documentId)
            return result['_source']
        except NotFoundError:
            print(f"未找到索引 {index} 中的文档,ID: {documentId}")
            return None
        except Exception as e:
            print(f"获取索引 {index} 中文档时发生错误: {e}")
            return None

    # 通过查询条件检索文档
    def get_document_by_query(self, index, query):
        try:
            result = self.es.search(index=index, body={"query": query})
            return result['hits']['hits']
        except Exception as e:
            print(f"在索引 {index} 中搜索时发生错误: {e}")
            return None

    # 添加文档
    def add_document(self, index, documentId, document):
        try:
            self.es.index(index=index, id=documentId, body=document)
            print(f"文档添加到索引 {index} 成功,文档 ID: {documentId}")
            return True
        except Exception as e:
            print(f"在索引 {index} 时添加文档失败,错误: {e}")
            return False

		# 批量添加文档
    def batch_add_document(self, index, documents):
        try:
            success, _ = bulk(self.es, documents)
            print(f'在索引{index}时批量添加文档数量: {success}')
            return True
        except Exception as e:
            print(f"在索引 {index} 时添加文档失败,错误: {e}")
            return False

    # 更新文档
    def update_document_by_id(self, index, documentId, document):
        try:
            self.es.update(index=index, id=documentId, body={"doc": document})
            return True
        except Exception as e:
            print(f"在索引 {index} 中删除文档错误: {e}")
            return False

    # 通过 id 删除文档
    def delete_document(self, index, documentId):
        try:
            self.es.delete(index=index, id=documentId)
            return True
        except Exception as e:
            print(f"在索引 {index} 中删除文档错误: {e}")
            return False

3、使用工具类测试

from utils.ElasticSearchUtil import ElasticsearchUtil

client = ElasticsearchUtil(host="127.0.0.1", port=9200)

# 查看版本
# client.get_version()

# 获取所有索引
# indices = client.get_all_indices()
# for ind in indices:
#     print(ind)

# 创建一个新的索引
index = "rick"

# mapping 映射
mappings = {
    "properties": {
        "id": {
            "type": "long"
        },
        "name": {
            "type": "text",
            "fields": {
                "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                }
            }
        },
        "title": {
            "type": "keyword"
        },
        "content": {
            "type": "text",
        },
        "grade": {
            "type": "long"
        },
        "sex": {
            "type": "text",
            "fields": {
                "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                }
            }
        },
        "subject": {
            "type": "text",
            "fields": {
                "keyword": {
                    "type": "keyword",
                    "ignore_above": 256
                }
            }
        }
    }
}

# res = client.create_index(index=index, document=doc)
# res = client.delete_index(index=index)

# res = client.create_index(index=index, mappings=doc)
# print(res)


#
# res = client.create_index(index, mappings=mappings)
# print(res)

# 添加文档
# document = {
#     "title": "rick",
#     "content": "测试文档."
# }
# res = client.add_document(index, documentId=1, document=document)
# print(res)

# 更新文档
# document = {
#     "title": "rick~"
# }
# res = client.update_document_by_id(index, documentId=1, document=document)
# print(res)

# 查询文档
# res = client.get_document_by_id(index, documentId=1)
# print(res)

# query = {
#     "match_all": {}
# }
# res = client.get_document_by_query(index, query=query)
# print(res)


# 删除文档
res = client.delete_document(index, documentId=1)
print(res)

发表评论:

控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言
    友情链接