55 */
66package org .elasticsearch .xpack .upgrade ;
77
8+ import org .apache .logging .log4j .LogManager ;
9+ import org .apache .logging .log4j .Logger ;
10+ import org .elasticsearch .ElasticsearchException ;
811import org .elasticsearch .action .ActionListener ;
912import org .elasticsearch .action .support .master .AcknowledgedResponse ;
1013import org .elasticsearch .client .Client ;
1518import org .elasticsearch .cluster .metadata .IndexMetaData ;
1619import org .elasticsearch .cluster .metadata .MetaData ;
1720import org .elasticsearch .cluster .service .ClusterService ;
21+ import org .elasticsearch .common .Strings ;
1822import org .elasticsearch .common .settings .Settings ;
1923import org .elasticsearch .index .IndexNotFoundException ;
2024import org .elasticsearch .index .reindex .BulkByScrollResponse ;
2529import org .elasticsearch .transport .TransportResponse ;
2630
2731import java .util .function .BiConsumer ;
28- import java .util .function .Consumer ;
2932
3033import static org .elasticsearch .index .IndexSettings .same ;
3134
3942 * - Delete index .{name} and add alias .{name} to .{name}-6
4043 */
4144public class InternalIndexReindexer <T > {
45+ private static final Logger logger = LogManager .getLogger (InternalIndexReindexer .class );
4246
4347 private final Client client ;
4448 private final ClusterService clusterService ;
4549 private final Script transformScript ;
4650 private final String [] types ;
4751 private final int version ;
48- private final Consumer < ActionListener <T >> preUpgrade ;
52+ private final BiConsumer < ClusterState , ActionListener <T >> preUpgrade ;
4953 private final BiConsumer <T , ActionListener <TransportResponse .Empty >> postUpgrade ;
5054
5155 public InternalIndexReindexer (Client client , ClusterService clusterService , int version , Script transformScript , String [] types ,
52- Consumer < ActionListener <T >> preUpgrade ,
56+ BiConsumer < ClusterState , ActionListener <T >> preUpgrade ,
5357 BiConsumer <T , ActionListener <TransportResponse .Empty >> postUpgrade ) {
5458 this .client = client ;
5559 this .clusterService = clusterService ;
@@ -62,7 +66,7 @@ public InternalIndexReindexer(Client client, ClusterService clusterService, int
6266
6367 public void upgrade (TaskId task , String index , ClusterState clusterState , ActionListener <BulkByScrollResponse > listener ) {
6468 ParentTaskAssigningClient parentAwareClient = new ParentTaskAssigningClient (client , task );
65- preUpgrade .accept (ActionListener .wrap (
69+ preUpgrade .accept (clusterState , ActionListener .wrap (
6670 t -> innerUpgrade (parentAwareClient , index , clusterState , ActionListener .wrap (
6771 response -> postUpgrade .accept (t , ActionListener .wrap (
6872 empty -> listener .onResponse (response ),
@@ -76,32 +80,61 @@ public void upgrade(TaskId task, String index, ClusterState clusterState, Action
7680 private void innerUpgrade (ParentTaskAssigningClient parentAwareClient , String index , ClusterState clusterState ,
7781 ActionListener <BulkByScrollResponse > listener ) {
7882 String newIndex = index + "-" + version ;
83+ logger .trace ("upgrading index {} to new index {}" , index , newIndex );
7984 try {
8085 checkMasterAndDataNodeVersion (clusterState );
81- parentAwareClient .admin ().indices ().prepareCreate (newIndex ).execute (ActionListener .wrap (createIndexResponse ->
82- setReadOnlyBlock (index , ActionListener .wrap (setReadOnlyResponse ->
83- reindex (parentAwareClient , index , newIndex , ActionListener .wrap (
84- bulkByScrollResponse -> // Successful completion of reindexing - delete old index
85- removeReadOnlyBlock (parentAwareClient , index , ActionListener .wrap (unsetReadOnlyResponse ->
86- parentAwareClient .admin ().indices ().prepareAliases ().removeIndex (index )
87- .addAlias (newIndex , index ).execute (ActionListener .wrap (deleteIndexResponse ->
88- listener .onResponse (bulkByScrollResponse ), listener ::onFailure
89- )), listener ::onFailure
90- )),
91- e -> // Something went wrong during reindexing - remove readonly flag and report the error
92- removeReadOnlyBlock (parentAwareClient , index , ActionListener .wrap (unsetReadOnlyResponse -> {
93- listener .onFailure (e );
94- }, e1 -> {
95- listener .onFailure (e );
96- }))
97- )), listener ::onFailure
98- )), listener ::onFailure
99- ));
86+ parentAwareClient .admin ().indices ().prepareCreate (newIndex ).execute (ActionListener .wrap (createIndexResponse -> {
87+ setReadOnlyBlock (index , ActionListener .wrap (
88+ setReadOnlyResponse -> reindex (parentAwareClient , index , newIndex , ActionListener .wrap (bulkByScrollResponse -> {
89+ if ((bulkByScrollResponse .getBulkFailures () != null
90+ && bulkByScrollResponse .getBulkFailures ().isEmpty () == false )
91+ || (bulkByScrollResponse .getSearchFailures () != null
92+ && bulkByScrollResponse .getSearchFailures ().isEmpty () == false )) {
93+ ElasticsearchException ex = logAndThrowExceptionForFailures (bulkByScrollResponse );
94+ removeReadOnlyBlockOnReindexFailure (parentAwareClient , index , listener , ex );
95+ } else {
96+ // Successful completion of reindexing - remove read only and delete old index
97+ removeReadOnlyBlock (parentAwareClient , index ,
98+ ActionListener .wrap (unsetReadOnlyResponse -> parentAwareClient .admin ().indices ().prepareAliases ()
99+ .removeIndex (index ).addAlias (newIndex , index )
100+ .execute (ActionListener .wrap (
101+ deleteIndexResponse -> listener .onResponse (bulkByScrollResponse ),
102+ listener ::onFailure )),
103+ listener ::onFailure ));
104+ }
105+ }, e -> {
106+ logger .error ("error occurred while reindexing" , e );
107+ removeReadOnlyBlockOnReindexFailure (parentAwareClient , index , listener , e );
108+ })), listener ::onFailure ));
109+ }, listener ::onFailure ));
100110 } catch (Exception ex ) {
111+ logger .error ("error occurred while upgrading index" , ex );
112+ removeReadOnlyBlockOnReindexFailure (parentAwareClient , index , listener , ex );
101113 listener .onFailure (ex );
102114 }
103115 }
104116
117+ private void removeReadOnlyBlockOnReindexFailure (ParentTaskAssigningClient parentAwareClient , String index ,
118+ ActionListener <BulkByScrollResponse > listener , Exception ex ) {
119+ removeReadOnlyBlock (parentAwareClient , index , ActionListener .wrap (unsetReadOnlyResponse -> {
120+ listener .onFailure (ex );
121+ }, e1 -> {
122+ listener .onFailure (ex );
123+ }));
124+ }
125+
126+ private ElasticsearchException logAndThrowExceptionForFailures (BulkByScrollResponse bulkByScrollResponse ) {
127+ String bulkFailures = (bulkByScrollResponse .getBulkFailures () != null )
128+ ? Strings .collectionToCommaDelimitedString (bulkByScrollResponse .getBulkFailures ())
129+ : "" ;
130+ String searchFailures = (bulkByScrollResponse .getSearchFailures () != null )
131+ ? Strings .collectionToCommaDelimitedString (bulkByScrollResponse .getSearchFailures ())
132+ : "" ;
133+ logger .error ("error occurred while reindexing, bulk failures [{}], search failures [{}]" , bulkFailures , searchFailures );
134+ return new ElasticsearchException ("error occurred while reindexing, bulk failures [{}], search failures [{}]" , bulkFailures ,
135+ searchFailures );
136+ }
137+
105138 private void checkMasterAndDataNodeVersion (ClusterState clusterState ) {
106139 if (clusterState .nodes ().getMinNodeVersion ().before (Upgrade .UPGRADE_INTRODUCED )) {
107140 throw new IllegalStateException ("All nodes should have at least version [" + Upgrade .UPGRADE_INTRODUCED + "] to upgrade" );
0 commit comments