博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Elasitcsearch High Level Rest Client学习笔记(三)批量api
阅读量:6690 次
发布时间:2019-06-25

本文共 5317 字,大约阅读时间需要 17 分钟。

  hot3.png

Bulk Request

BulkRequest可以在一起从请求执行批量添加、更新和删除,至少需要添加一个操作

BulkRequest request = new BulkRequest(); //创建BulkRequestrequest.add(new IndexRequest("posts", "doc", "1")  //添加操作        .source(XContentType.JSON,"field", "foo"));request.add(new IndexRequest("posts", "doc", "2")  //添加操作        .source(XContentType.JSON,"field", "bar"));request.add(new IndexRequest("posts", "doc", "3")  //添加操作        .source(XContentType.JSON,"field", "baz"));

注意:每次只支持一种encoded,否则会报错

可以在同一个BulkRequest中添加不同类型操作

BulkRequest request = new BulkRequest();request.add(new DeleteRequest("posts", "doc", "3")); request.add(new UpdateRequest("posts", "doc", "2")         .doc(XContentType.JSON,"other", "test"));request.add(new IndexRequest("posts", "doc", "4")          .source(XContentType.JSON,"field", "baz"));

可选参数

超时时间设置

request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");

 

刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");

 

设置副本shard活跃验证,执行index、update、delete操作前必须有多少个副本shard活跃

request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL);

调用方式

同步

BulkResponse bulkResponse = client.bulk(request);

异步

client.bulkAsync(request, new ActionListener
() { @Override public void onResponse(BulkResponse bulkResponse) { //成功 } @Override public void onFailure(Exception e) { //失败 }});

响应对象

响应对象包括操作信息,并且可以便利每一个结果

for (BulkItemResponse bulkItemResponse : bulkResponse) {     DocWriteResponse itemResponse = bulkItemResponse.getResponse();     if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {     //index操作        IndexResponse indexResponse = (IndexResponse) itemResponse;    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {     //update操作        UpdateResponse updateResponse = (UpdateResponse) itemResponse;    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {     //delete操作        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;    }}

BulkResponce提供方法快速查看操作是否失败

if (bulkResponse.hasFailures()) {     //todo}

BulkProcessor

RestHighLevelClient:执行BulkRequest并且返回BulkResponse

BulkProcessor.Listener:在bulk请求前后执行,并且可以处理失败情况

BulkProcessor.Listener listener = new BulkProcessor.Listener() {     @Override    public void beforeBulk(long executionId, BulkRequest request) {        //bulk请求前执行    }    @Override    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {        //bulk请求后执行    }    @Override    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {        //失败后执行    }};BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build();  //BulkProcessor通过 BulkProcessor.Builder build()方法构建, RestHighLevelClient.bulkAsync() 用来执行bulk请求

BulkProcessor.Builder提供方法使BulkProcessor调整请求参数

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);builder.setBulkActions(500); //按照数量批量处理(默认1000,-1禁用) builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); //按照大小批量处理builder.setConcurrentRequests(0); //并发处理线程builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //设置flush索引周期builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //回退策略,等待1秒并重试3次, BackoffPolicy.noBackoff()  BackoffPolicy.constantBackoff()  BackoffPolicy.exponentialBackoff()  查看更多选项

添加请求

IndexRequest one = new IndexRequest("posts", "doc", "1").        source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");IndexRequest two = new IndexRequest("posts", "doc", "2")        .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");IndexRequest three = new IndexRequest("posts", "doc", "3")        .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");bulkProcessor.add(one);bulkProcessor.add(two);bulkProcessor.add(three);

BulkProcessor通过 BulkProcessor.Listener 监控请求, BulkProcessor.Listener 提供方法接受BulkRequest和BulkResponse

BulkProcessor.Listener listener = new BulkProcessor.Listener() {    @Override    public void beforeBulk(long executionId, BulkRequest request) {        int numberOfActions = request.numberOfActions(); //在每个execution前执行,可以获知每次执行多少个操作        logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);    }    @Override    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {        if (response.hasFailures()) {  //在每个execution后执行,可以获知是否包含错误            logger.warn("Bulk [{}] executed with failures", executionId);        } else {            logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());        }    }    @Override    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {        logger.error("Failed to execute bulk", failure); //发生错误时执行    }};

批量请求执行后需要关闭BulkProcessor。两种关闭方式选其一

awaitClose(),所有请求被处理后或者等待时间结束后关闭,返回ture表明所有请求已经完成,false说明等待时间结束后请求并未执行结束

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

close(),立即关闭BulkProcessor

bulkProcessor.close();

关闭processor之前,所有已经被添加的请求会被提交执行,并且不能再向其中添加请求

转载于:https://my.oschina.net/muziH/blog/1859001

你可能感兴趣的文章
【springmvc+mybatis项目实战】杰信商贸-26.出货表修饰+下载
查看>>
【Android开发】图形图像处理技术-旋转、缩放、倾斜和平移图像
查看>>
简易Java爬虫制作
查看>>
linux中生成考核用的GPT分区结构样例(二)
查看>>
我的友情链接
查看>>
编辑vi 查看网卡命令
查看>>
常见的内存错误及其对策
查看>>
C语言:冒泡法排序一组数,如何优化?
查看>>
分享16个javascript&jQuery的MVC教程
查看>>
阿里云域名配置与解析
查看>>
容器是实现操作系统虚拟化的一种途径
查看>>
详细介绍Java中的堆、栈和常量池
查看>>
Go环境变量
查看>>
用Doxygen优化Inkpad的模块关系
查看>>
Delphi 数据类型列表
查看>>
eclipse 创建maven Web项目
查看>>
Angular之父子路由
查看>>
llvm每日谈之三 如何创建一个LLVM工程
查看>>
高性能Web服务之tomcat基础应用详解(一)
查看>>
处理logs,进行分析web展示。
查看>>