客户端

予早 2025-10-05 20:55:48
Categories: Tags:

客户端

Elasticsearch8.0版本中Elasticsearch Java API Client客户端的基本使用方法

https://blog.csdn.net/anjiongyi/article/details/123391835

elasticsearch中keyword存储float,查询时返回float问题

https://github.com/elastic/elasticsearch/issues/34222

https://www.elastic.co/guide/en/elasticsearch/client/index.html

ElasticSearch Client

上述部分为理论部分,实际开发中,主要有三种方式可以作为es服务的客户端:

集群与节点操作

# 查看集群健康状态
GET /_cat/health?v
curl -X GET "localhost:9200/_cat/health?v&pretty"
- Green - everything is good (cluster is fully functional)
- Yellow - all data is available but some replicas are not yet allocated (cluster is fully functional)
- Red - some data is not available for whatever reason (cluster is partially functional)

# 查看节点
GET /_cat/nodes?v
curl -X GET "localhost:9200/_cat/nodes?v&pretty"

# 查看索引
GET /_cat/indices?v
curl -X GET "localhost:9200/_cat/indices?v&pretty"

# 查看分片
GET _cat/shards?v
curl -X GET "localhost:9200/_cat/shards?v&pretty"

elasticsearch-py线程池封装于Transport类

elasticsearch-py,官方客户端
https://github.com/elastic/elasticsearch-py
https://www.elastic.co/guide/en/elasticsearch/client/python-api/current/getting-started-python.html

elasticsearch-dsl-py,官方高级客户端
https://pypi.org/project/elasticsearch-dsl/
https://github.com/elastic/elasticsearch-dsl-py

Pydastic
Pydastic is an elasticsearch python ORM based on Pydantic.
https://github.com/RamiAwar/pydastic
https://pypi.org/project/pydastic/

elasticsearch-dsl

https://elasticsearch-dsl.readthedocs.io/en/latest/index.html

config

 from elasticsearch_dsl import connections
 
 connections.create_connection(alias="default", hosts=['localhost:9200'], timeout=30)
 from elasticsearch_dsl import connections
 
 connections.configure(
     default={'hosts': 'localhost'},
     dev={
         'hosts': ['esdev1.example.com:9200'],
         'sniff_on_start': True
     }
 )
 import decimal
 
 from elasticsearch import helpers
 from elasticsearch_dsl import Document, Date, Long, Keyword, \
     Q, ValidationException, MetaField
 from elasticsearch_dsl import connections
 
 connections.create_connection(alias="default", hosts=['localhost:9200'], timeout=30)
 
 
 class KeyWord(Keyword):
     _coerce = True
 
     def _deserialize(self, data):
         return decimal.Decimal(data)
 
     def clean(self, data):
         if data is not None:
             data = str(data)
         if data is None and self._required:
             raise ValidationException("Value required for this field.")
         return data
 
 
 class AaaIndex(Document):
     """
     净值视图
     """
 
     class Meta:
         dynamic = MetaField('strict')
 
     class Index:
         name = 'net_value_view'
         using = 'default'
 
     id = Long(required=False)
     product_code = Keyword(required=True)
     trading_day = Date(required=True)
     unit_nv = Keyword(required=False)
     accu_nv = Keyword(required=False)
     adju_nv = Keyword(required=False)
     unit_nv_drr = Keyword(required=False)
     adju_nv_drr = Keyword(required=False)
     seven_day_annualized_rr = Keyword(required=False)
     daily_return_per_10000_shares = Keyword(required=False)
     created_time = Date()
     updated_time = Date()
 
 
 AaaIndex.init()
 
 d = {"_id": "123456",
      "product_code": "P0001",
      "trading_day": "2020-01-01",
      "unit_nv": decimal.Decimal("1.2"),
      "accu_nv": "1.32",
      "adju_nv": 0.1,
      "unit_nv_drr": decimal.Decimal("1.3"),
      }
 
 
 # print(NetValueView(**d).save())
 
 
 def bulk_insert_ds_data_change_log(logs: list[dict]):
     if not logs:
         return
     actions = [{"_index": AaaIndex.Index.name, "_source": log, "_id": log.pop("_id")} for log in logs]
     res = helpers.bulk(client=connections.get_connection(AaaIndex.Index.using), actions=actions, refresh='wait_for')
     return res
 
 
 bulk_insert_ds_data_change_log([d])
 # NetValueView().to_dict()
 
 f = AaaIndex.get(id="5gVy64sB4K8-J81-B_T5")
 print(type(f.unit_nv), type(f.accu_nv), type(f.adju_nv))
 print(format(f.adju_nv, '.64f'))
 
 es = AaaIndex.search()
 es = es.query(Q('ids', **{'values': ["123456"]}))
 a = es.execute()
 print([item.to_dict() for item in a])
 

elasticsearch-py

helpers.bulk

在 Elasticsearch 中,_op_type 是用于标识执行的操作类型的关键字。主要的 _op_type 可选值包括:

  1. index: 创建新的文档或替换现有文档。
  2. create: 创建新的文档,如果文档已存在则返回错误。
  3. update: 更新现有文档。
  4. delete: 删除现有文档。

这些值代表了针对文档的不同操作类型。在使用 helpers.bulk 方法时,可以通过设置 _op_type 来指定要执行的操作类型。根据具体的操作需求,选择相应的 _op_type 值来进行相应的操作。

import elasticsearch
# import tqdm
from elasticsearch import helpers

# es
es_client = elasticsearch.Elasticsearch(hosts=[{"host": "127.0.0.1", "port": 9200}])
# es = Elasticsearch(['10.10.13.12'], http_auth=('xiao', '123456'), timeout=3600)
# 如果需要验证,需要用户名密码,timeout默认为10秒



# 创建索引
result = es_client.indices.create(index="user", ignore=400, body={
    "mappings": {
        "my_doc": {
            "properties": {
                "my_id": {"type": "integer"},
                "my_word": {"type": "text"}
            }
        }
    }
})
print(result)

# 判断索引是否存在
result = es_client.indices.exists("user")
print(result)


def keywordSearch(keyword, index):
    body = {
        "query": {
            "match": {
                "my_word": keyword
            }
        }
    }

    # 直接查询
    res = es_client.search(index=index, body=body)
    total = res["hits"]["total"]
    print("共查询到%d条数据" % total)

    # helpers查询
    res = helpers.scan(client=es_client, query=body, index=index)
    res_dict = []
    for item in res:
        temp = item["_source"]
        res_dict.append((temp["my_id"], temp["my_word"]))
    print("共%d条数据" % len(res_dict))
    print(res_dict)


keywordSearch("三","user")
# 插入数据
import elasticsearch

es_client = elasticsearch.Elasticsearch()

# body是一个字典
body = {
    "id": "",
    "name": "",
    "age": "",
    "gender": "",
    "id_card": "",
    "address": "",
    "contact": {
        "phone": "",
        "email": "",
        "QQ": "",
        "Wechat": ""
    }

}

# create需要制定id
result = es_client.create(index='user', doc_type='dc', id=1, body=body,ignore=409)
print(result)

# index不需要,由系统自动生成
result = es_client.index(index='user', doc_type='dc', body=body)

print(result)

# 更新数据
# index() 方法可以代替我们完成两个操作,如果数据不存在,那就执行插入操作,如果已经存在,那就执行更新操作,非常方便。
# 两种方法

body = {
    "id": "",
    "name": "",
    "age": "",
    "gender": "",
    "birthday": "",
    "id_card": "",
    "address": "",
    "contact": {
        "phone": "",
        "email": "",
        "QQ": "",
        "Wechat": ""
    }

}
result = es_client.update(index='user', doc_type='dc', id=1, body=body)

print(result)

result = es_client.index(index='user', doc_type='dc', body=body,id=1)
print(result)
import elasticsearch
import json

# 更新mapping
es_client = elasticsearch.Elasticsearch()
mapping = {
    "properties":{
        ""
    }
}

result = es_client.indices.put_mapping(index='news', doc_type='politics', body=mapping)

print(result)







# 查询文档
result = es_client.search(index='news', doc_type='politics')

print(result)

dsl = {

    'query': {

        'match': {

            'title': '中国 领事馆'

        }

    }

}


result = es_client.search(index='news', doc_type='politics', body=dsl)

print(json.dumps(result, indent=2, ensure_ascii=False))
import elasticsearch

# es客户端
es_client = elasticsearch.Elasticsearch()

# 索引操作

# 创建索引
result = es_client.indices.create(index='user', ignore=400)
print(result)
# 400状态码是索引已经存在,ignore=400会忽略该类型报错

# 删除索引
result = es_client.indices.delete(index='user', ignore=[400, 404])
print(result)



# 文档操作

# elasticsearch dsl

import elasticsearch
from datetime import datetime
from elasticsearch import Elasticsearch
print(elasticsearch.__version__)

# 创建ES客户端对象
es_client = Elasticsearch(hosts=[{"scheme": "http", "host": "127.0.0.1", "port": 9200}])

doc = {
    'author': 'kimchy',
    'text': 'Elasticsearch: cool. bonsai cool.',
    'timestamp': datetime.now(),
}

resp = es_client.index(index="test-index", id="1", document=doc)
print(resp['result'])

resp = es_client.get(index="test-index", id="1")
print(resp['_source'])

es_client.indices.refresh(index="test-index")

resp = es_client.search(index="test-index", query={"match_all": {}})
print("Got %d Hits:" % resp['hits']['total']['value'])
for hit in resp['hits']['hits']:
    print("%(timestamp)s %(author)s: %(text)s" % hit["_source"])