11package org .apn .elasticsearch ;
22
3- import java .io .Closeable ;
4- import java .io .IOException ;
5- import java .util .List ;
6- import java .util .Map ;
7- import java .util .Objects ;
8-
93import org .apache .commons .logging .Log ;
104import org .apache .commons .logging .LogFactory ;
115import org .apache .http .HttpHost ;
126import org .apache .http .HttpStatus ;
137import org .apache .http .util .Asserts ;
14- import org .apn .elasticsearch .utils .Constants ;
158import org .elasticsearch .ElasticsearchException ;
169import org .elasticsearch .action .DocWriteRequest ;
1710import org .elasticsearch .action .DocWriteResponse ;
18- import org .elasticsearch .action .admin .indices .create .CreateIndexRequest ;
1911import org .elasticsearch .action .admin .indices .delete .DeleteIndexRequest ;
20- import org .elasticsearch .action .admin .indices .get .GetIndexRequest ;
2112import org .elasticsearch .action .admin .indices .refresh .RefreshRequest ;
2213import org .elasticsearch .action .admin .indices .refresh .RefreshResponse ;
2314import org .elasticsearch .action .bulk .BulkItemResponse ;
3324import org .elasticsearch .client .RequestOptions ;
3425import org .elasticsearch .client .RestClient ;
3526import org .elasticsearch .client .RestHighLevelClient ;
27+ import org .elasticsearch .client .indices .CreateIndexRequest ;
28+ import org .elasticsearch .client .indices .GetIndexRequest ;
3629import org .elasticsearch .common .xcontent .XContentType ;
3730import org .elasticsearch .index .query .QueryBuilders ;
3831import org .elasticsearch .search .SearchHit ;
3932import org .elasticsearch .search .builder .SearchSourceBuilder ;
4033
34+ import java .io .Closeable ;
35+ import java .io .IOException ;
36+ import java .util .List ;
37+ import java .util .Map ;
38+ import java .util .Objects ;
39+
4140/**
4241 * The Class deals with Elasticsearch common usecases via
4342 * {@link RestHighLevelClient}</code>.
44- *
45- * @author amit.nema
4643 *
44+ * @author amit.nema
4745 */
4846public class RestHighLevelClientApp implements Closeable {
4947
50- private static final Log LOGGER = LogFactory .getLog (RestHighLevelClientApp .class );
51- private final RestHighLevelClient client ;
52-
53- public RestHighLevelClientApp (final HttpHost [] hosts ) {
54- this .client = new RestHighLevelClient (RestClient .builder (hosts ));
55- }
56-
57- public RestHighLevelClientApp (final RestHighLevelClient client ) {
58- this .client = client ;
59- }
60-
61- public RestHighLevelClient getClient () {
62- return client ;
63- }
64-
65- @ Override
66- public void close () throws IOException {
67- if (Objects .nonNull (client ))
68- client .close ();
69- }
70-
71- public boolean refreshIndex (final String index ) {
72- Asserts .notNull (index , "No index defined for refresh()" );
73- try {
74- final RefreshResponse response = client .indices ().refresh (new RefreshRequest (index ),
75- RequestOptions .DEFAULT );
76- return response .getStatus ().getStatus () == HttpStatus .SC_OK ;
77- } catch (final IOException e ) {
78- throw new ElasticsearchException ("failed to refresh index: " + index , e );
79- }
80- }
81-
82- public void findById (final String index , final String id ) throws IOException {
83- final SearchRequest searchRequest = new SearchRequest (index );
84- searchRequest .types (Constants .DEFAULT_TYPE );
85- final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder ();
86- searchSourceBuilder .query (QueryBuilders .matchQuery ("_id" , id ));
87- searchRequest .source (searchSourceBuilder );
88- LOGGER .info ("***** source *****\n " + searchRequest .source () + "\n ***** ***** *****" );
89- final SearchResponse response = client .search (searchRequest , RequestOptions .DEFAULT );
90- final SearchHit [] searchHits = response .getHits ().getHits ();
91- for (final SearchHit searchHit : searchHits ) {
92- LOGGER .info (searchHit .getSourceAsMap ());
93- }
94- }
95-
96- public void bulkIndex (final String index , final String idKey , final List <Map <String , Object >> source )
97- throws IOException {
98- final BulkRequest request = new BulkRequest ();
99- source .forEach (doc -> request
100- .add (new IndexRequest (index , Constants .DEFAULT_TYPE , Objects .toString (doc .get (idKey ))).source (doc )));
101-
102- // multiple requests can be added
103- request .add (new DeleteRequest (index , Constants .DEFAULT_TYPE , "4" ));
104-
105- request .timeout ("2m" );
106- final BulkResponse bulkResponse = client .bulk (request , RequestOptions .DEFAULT );
107- checkBulkResponse (bulkResponse );
108- }
109-
110- private void checkBulkResponse (final BulkResponse bulkResponse ) {
111- for (final BulkItemResponse bulkItemResponse : bulkResponse ) {
112- final DocWriteResponse itemResponse = bulkItemResponse .getResponse ();
113-
114- if (bulkItemResponse .getOpType () == DocWriteRequest .OpType .INDEX
115- || bulkItemResponse .getOpType () == DocWriteRequest .OpType .CREATE ) {
116- final IndexResponse indexResponse = (IndexResponse ) itemResponse ;
117- LOGGER .info (indexResponse );
118- } else if (bulkItemResponse .getOpType () == DocWriteRequest .OpType .UPDATE ) {
119- final UpdateResponse updateResponse = (UpdateResponse ) itemResponse ;
120- LOGGER .info (updateResponse );
121- } else if (bulkItemResponse .getOpType () == DocWriteRequest .OpType .DELETE ) {
122- final DeleteResponse deleteResponse = (DeleteResponse ) itemResponse ;
123- LOGGER .info (deleteResponse );
124- }
125- }
126- }
127-
128- public boolean deleteIndex (final String index ) throws IOException {
129- Asserts .notNull (client , "Client must not be empty. Please call init()." );
130- final DeleteIndexRequest request = new DeleteIndexRequest (index );
131- return indexExists (index ) && client .indices ().delete (request , RequestOptions .DEFAULT ).isAcknowledged ();
132- }
133-
134- public boolean indexExists (final String index ) {
135- final GetIndexRequest request = new GetIndexRequest ();
136- request .indices (index );
137- try {
138- return client .indices ().exists (request , RequestOptions .DEFAULT );
139- } catch (final IOException e ) {
140- throw new ElasticsearchException ("Error while for indexExists request: " + request .toString (), e );
141- }
142- }
143-
144- public boolean createIndex (final String index , final String settings ) throws IOException {
145- Asserts .notNull (client , "Client must not be empty. Please call init()." );
146- final CreateIndexRequest request = new CreateIndexRequest (index );
147- if (Objects .nonNull (settings )) {
148- request .settings (String .valueOf (settings ), XContentType .JSON );
149- }
150- return !indexExists (index ) && client .indices ().create (request , RequestOptions .DEFAULT ).isAcknowledged ();
151- }
48+ private static final Log LOGGER = LogFactory .getLog (RestHighLevelClientApp .class );
49+ private final RestHighLevelClient client ;
50+
51+ public RestHighLevelClientApp (final HttpHost [] hosts ) {
52+ this .client = new RestHighLevelClient (RestClient .builder (hosts ));
53+ }
54+
55+ public RestHighLevelClientApp (final RestHighLevelClient client ) {
56+ this .client = client ;
57+ }
58+
59+ public RestHighLevelClient getClient () {
60+ return client ;
61+ }
62+
63+ @ Override
64+ public void close () throws IOException {
65+ if (Objects .nonNull (client ))
66+ client .close ();
67+ }
68+
69+ public boolean refreshIndex (final String index ) {
70+ Asserts .notNull (index , "No index defined for refresh()" );
71+ try {
72+ final RefreshResponse response = client .indices ().refresh (new RefreshRequest (index ),
73+ RequestOptions .DEFAULT );
74+ return response .getStatus ().getStatus () == HttpStatus .SC_OK ;
75+ } catch (final IOException e ) {
76+ throw new ElasticsearchException ("failed to refresh index: " + index , e );
77+ }
78+ }
79+
80+ public void findById (final String index , final String id ) throws IOException {
81+ final SearchRequest searchRequest = new SearchRequest (index );
82+ final SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder ();
83+ searchSourceBuilder .query (QueryBuilders .matchQuery ("_id" , id ));
84+ searchRequest .source (searchSourceBuilder );
85+ LOGGER .info ("***** source *****\n " + searchRequest .source () + "\n ***** ***** *****" );
86+ final SearchResponse response = client .search (searchRequest , RequestOptions .DEFAULT );
87+ final SearchHit [] searchHits = response .getHits ().getHits ();
88+ for (final SearchHit searchHit : searchHits ) {
89+ LOGGER .info (searchHit .getSourceAsMap ());
90+ }
91+ }
92+
93+ public void bulkIndex (final String index , final String idKey , final List <Map <String , Object >> source )
94+ throws IOException {
95+ final BulkRequest request = new BulkRequest ();
96+ source .forEach (doc -> request .add (new IndexRequest (index ).id (Objects .toString (doc .get (idKey ))).source (doc )));
97+
98+ // multiple requests can be added
99+ request .add (new DeleteRequest (index ).id ("4" ));
100+
101+ request .timeout ("2m" );
102+ final BulkResponse bulkResponse = client .bulk (request , RequestOptions .DEFAULT );
103+ checkBulkResponse (bulkResponse );
104+ }
105+
106+ private void checkBulkResponse (final BulkResponse bulkResponse ) {
107+ for (final BulkItemResponse bulkItemResponse : bulkResponse ) {
108+ final DocWriteResponse itemResponse = bulkItemResponse .getResponse ();
109+
110+ if (bulkItemResponse .getOpType () == DocWriteRequest .OpType .INDEX
111+ || bulkItemResponse .getOpType () == DocWriteRequest .OpType .CREATE ) {
112+ final IndexResponse indexResponse = (IndexResponse ) itemResponse ;
113+ LOGGER .info (indexResponse );
114+ } else if (bulkItemResponse .getOpType () == DocWriteRequest .OpType .UPDATE ) {
115+ final UpdateResponse updateResponse = (UpdateResponse ) itemResponse ;
116+ LOGGER .info (updateResponse );
117+ } else if (bulkItemResponse .getOpType () == DocWriteRequest .OpType .DELETE ) {
118+ final DeleteResponse deleteResponse = (DeleteResponse ) itemResponse ;
119+ LOGGER .info (deleteResponse );
120+ }
121+ }
122+ }
123+
124+ public boolean deleteIndex (final String index ) throws IOException {
125+ Asserts .notNull (client , "Client must not be empty. Please call init()." );
126+ final DeleteIndexRequest request = new DeleteIndexRequest (index );
127+ return indexExists (index ) && client .indices ().delete (request , RequestOptions .DEFAULT ).isAcknowledged ();
128+ }
129+
130+ public boolean indexExists (final String index ) {
131+ final GetIndexRequest request = new GetIndexRequest (index );
132+ try {
133+ return client .indices ().exists (request , RequestOptions .DEFAULT );
134+ } catch (final IOException e ) {
135+ throw new ElasticsearchException ("Error while for indexExists request: " + request .toString (), e );
136+ }
137+ }
138+
139+ public boolean createIndex (final String index , final String settings ) throws IOException {
140+ Asserts .notNull (client , "Client must not be empty. Please call init()." );
141+ final CreateIndexRequest request = new CreateIndexRequest (index );
142+ if (Objects .nonNull (settings )) {
143+ request .settings (settings , XContentType .JSON );
144+ }
145+ return !indexExists (index ) && client .indices ().create (request , RequestOptions .DEFAULT ).isAcknowledged ();
146+ }
152147
153148}
0 commit comments