66package org .elasticsearch .xpack .monitoring .exporter .http ;
77
88import org .apache .http .entity .ContentType ;
9- import org .apache .http .nio . entity .NByteArrayEntity ;
9+ import org .apache .http .entity .InputStreamEntity ;
1010import org .apache .logging .log4j .LogManager ;
1111import org .apache .logging .log4j .Logger ;
12- import org .apache .logging .log4j .message .ParameterizedMessage ;
13- import org .apache .logging .log4j .util .Supplier ;
14- import org .apache .lucene .util .BytesRef ;
1512import org .elasticsearch .action .ActionListener ;
1613import org .elasticsearch .client .Request ;
1714import org .elasticsearch .client .Response ;
1815import org .elasticsearch .client .ResponseListener ;
1916import org .elasticsearch .client .RestClient ;
2017import org .elasticsearch .common .bytes .BytesReference ;
2118import org .elasticsearch .common .io .stream .BytesStreamOutput ;
19+ import org .elasticsearch .common .io .stream .StreamOutput ;
2220import org .elasticsearch .common .time .DateFormatter ;
2321import org .elasticsearch .common .util .concurrent .ThreadContext ;
22+ import org .elasticsearch .common .xcontent .ToXContent ;
2423import org .elasticsearch .common .xcontent .XContent ;
2524import org .elasticsearch .common .xcontent .XContentBuilder ;
26- import org .elasticsearch .common .xcontent .XContentHelper ;
2725import org .elasticsearch .common .xcontent .XContentType ;
2826import org .elasticsearch .xpack .core .monitoring .exporter .MonitoringDoc ;
2927import org .elasticsearch .xpack .core .monitoring .exporter .MonitoringTemplateUtils ;
@@ -60,7 +58,7 @@ class HttpExportBulk extends ExportBulk {
6058 /**
6159 * The bytes payload that represents the bulk body is created via {@link #doAdd(Collection)}.
6260 */
63- private byte [] payload = null ;
61+ private BytesReference payload = null ;
6462
6563 HttpExportBulk (final String name , final RestClient client , final Map <String , String > parameters ,
6664 final DateFormatter dateTimeFormatter , final ThreadContext threadContext ) {
@@ -77,12 +75,11 @@ public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
7775 if (docs != null && docs .isEmpty () == false ) {
7876 try (BytesStreamOutput payload = new BytesStreamOutput ()) {
7977 for (MonitoringDoc monitoringDoc : docs ) {
80- // any failure caused by an individual doc will be written as an empty byte[], thus not impacting the rest
81- payload .write (toBulkBytes (monitoringDoc ));
78+ writeDocument (monitoringDoc , payload );
8279 }
8380
8481 // store the payload until we flush
85- this .payload = BytesReference . toBytes ( payload .bytes () );
82+ this .payload = payload .bytes ();
8683 }
8784 }
8885 } catch (Exception e ) {
@@ -94,12 +91,19 @@ public void doAdd(Collection<MonitoringDoc> docs) throws ExportException {
9491 public void doFlush (ActionListener <Void > listener ) throws ExportException {
9592 if (payload == null ) {
9693 listener .onFailure (new ExportException ("unable to send documents because none were loaded for export bulk [{}]" , name ));
97- } else if (payload .length != 0 ) {
94+ } else if (payload .length () != 0 ) {
9895 final Request request = new Request ("POST" , "/_bulk" );
9996 for (Map .Entry <String , String > param : params .entrySet ()) {
10097 request .addParameter (param .getKey (), param .getValue ());
10198 }
102- request .setEntity (new NByteArrayEntity (payload , ContentType .APPLICATION_JSON ));
99+ try {
100+ request .setEntity (new InputStreamEntity (payload .streamInput (), payload .length (), ContentType .APPLICATION_JSON ));
101+ } catch (IOException e ) {
102+ listener .onFailure (e );
103+ return ;
104+ }
105+ // null out serialized docs to make things easier on the GC
106+ payload = null ;
103107
104108 client .performRequestAsync (request , new ResponseListener () {
105109 @ Override
@@ -123,51 +127,43 @@ public void onFailure(Exception exception) {
123127 }
124128 }
125129
126- private byte [] toBulkBytes ( final MonitoringDoc doc ) throws IOException {
130+ private void writeDocument ( MonitoringDoc doc , StreamOutput out ) throws IOException {
127131 final XContentType xContentType = XContentType .JSON ;
128132 final XContent xContent = xContentType .xContent ();
129133
130134 final String index = MonitoringTemplateUtils .indexName (formatter , doc .getSystem (), doc .getTimestamp ());
131135 final String id = doc .getId ();
132136
133- try (BytesStreamOutput out = new BytesStreamOutput ()) {
134- try (XContentBuilder builder = new XContentBuilder (xContent , out )) {
135- // Builds the bulk action metadata line
136- builder .startObject ();
137+ try (XContentBuilder builder = new XContentBuilder (xContent , out )) {
138+ // Builds the bulk action metadata line
139+ builder .startObject ();
140+ {
141+ builder .startObject ("index" );
137142 {
138- builder .startObject ("index" );
139- {
140- builder .field ("_index" , index );
141- if (id != null ) {
142- builder .field ("_id" , id );
143- }
143+ builder .field ("_index" , index );
144+ if (id != null ) {
145+ builder .field ("_id" , id );
144146 }
145- builder .endObject ();
146147 }
147148 builder .endObject ();
148149 }
150+ builder .endObject ();
151+ }
149152
150- // Adds action metadata line bulk separator
151- out .write (xContent .streamSeparator ());
152-
153- // Adds the source of the monitoring document
154- final BytesRef source = XContentHelper .toXContent (doc , xContentType , false ).toBytesRef ();
155- out .write (source .bytes , source .offset , source .length );
156-
157- // Adds final bulk separator
158- out .write (xContent .streamSeparator ());
153+ // Adds action metadata line bulk separator
154+ out .write (xContent .streamSeparator ());
159155
160- logger . trace (
161- "http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]" ,
162- name , index , id , doc .getType ()
163- );
156+ // Adds the source of the monitoring document
157+ try ( XContentBuilder builder = new XContentBuilder ( xContent , out )) {
158+ doc .toXContent ( builder , ToXContent . EMPTY_PARAMS );
159+ }
164160
165- return BytesReference .toBytes (out .bytes ());
166- } catch (Exception e ) {
167- logger .warn ((Supplier <?>) () -> new ParameterizedMessage ("failed to render document [{}], skipping it [{}]" , doc , name ), e );
161+ // Adds final bulk separator
162+ out .write (xContent .streamSeparator ());
168163
169- return BytesRef .EMPTY_BYTES ;
170- }
164+ logger .trace (
165+ "http exporter [{}] - added index request [index={}, id={}, monitoring data type={}]" ,
166+ name , index , id , doc .getType ()
167+ );
171168 }
172-
173169}
0 commit comments