6666
6767public class EnrichPlugin extends Plugin implements ActionPlugin , IngestPlugin {
6868
69- static final Setting <Integer > ENRICH_FETCH_SIZE_SETTING =
70- Setting .intSetting ("enrich.fetch_size" , 10000 , 1 , 1000000 , Setting .Property .NodeScope );
71-
72- static final Setting <Integer > ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS =
73- Setting .intSetting ("enrich.max_concurrent_policy_executions" , 50 , 1 , Setting .Property .NodeScope );
74-
75- static final Setting <TimeValue > ENRICH_CLEANUP_PERIOD =
76- Setting .timeSetting ("enrich.cleanup_period" , new TimeValue (15 , TimeUnit .MINUTES ), Setting .Property .NodeScope );
77-
78- public static final Setting <Integer > COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS =
79- Setting .intSetting ("enrich.coordinator_proxy.max_concurrent_requests" , 8 , 1 , 10000 , Setting .Property .NodeScope );
80-
81- public static final Setting <Integer > COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST =
82- Setting .intSetting ("enrich.coordinator_proxy.max_lookups_per_request" , 128 , 1 , 10000 , Setting .Property .NodeScope );
83-
84- static final Setting <Integer > ENRICH_MAX_FORCE_MERGE_ATTEMPTS =
85- Setting .intSetting ("enrich.max_force_merge_attempts" , 3 , 1 , 10 , Setting .Property .NodeScope );
69+ static final Setting <Integer > ENRICH_FETCH_SIZE_SETTING = Setting .intSetting (
70+ "enrich.fetch_size" ,
71+ 10000 ,
72+ 1 ,
73+ 1000000 ,
74+ Setting .Property .NodeScope
75+ );
76+
77+ static final Setting <Integer > ENRICH_MAX_CONCURRENT_POLICY_EXECUTIONS = Setting .intSetting (
78+ "enrich.max_concurrent_policy_executions" ,
79+ 50 ,
80+ 1 ,
81+ Setting .Property .NodeScope
82+ );
83+
84+ static final Setting <TimeValue > ENRICH_CLEANUP_PERIOD = Setting .timeSetting (
85+ "enrich.cleanup_period" ,
86+ new TimeValue (15 , TimeUnit .MINUTES ),
87+ Setting .Property .NodeScope
88+ );
89+
90+ public static final Setting <Integer > COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS = Setting .intSetting (
91+ "enrich.coordinator_proxy.max_concurrent_requests" ,
92+ 8 ,
93+ 1 ,
94+ 10000 ,
95+ Setting .Property .NodeScope
96+ );
97+
98+ public static final Setting <Integer > COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST = Setting .intSetting (
99+ "enrich.coordinator_proxy.max_lookups_per_request" ,
100+ 128 ,
101+ 1 ,
102+ 10000 ,
103+ Setting .Property .NodeScope
104+ );
105+
106+ static final Setting <Integer > ENRICH_MAX_FORCE_MERGE_ATTEMPTS = Setting .intSetting (
107+ "enrich.max_force_merge_attempts" ,
108+ 3 ,
109+ 1 ,
110+ 10 ,
111+ Setting .Property .NodeScope
112+ );
86113
87114 private static final String QUEUE_CAPACITY_SETTING_NAME = "enrich.coordinator_proxy.queue_capacity" ;
88- public static final Setting <Integer > COORDINATOR_PROXY_QUEUE_CAPACITY = new Setting <>(QUEUE_CAPACITY_SETTING_NAME ,
89- settings -> {
90- int maxConcurrentRequests = COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS .get (settings );
91- int maxLookupsPerRequest = COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST .get (settings );
92- return String .valueOf (maxConcurrentRequests * maxLookupsPerRequest );
93- },
94- val -> Setting .parseInt (val , 1 , Integer .MAX_VALUE , QUEUE_CAPACITY_SETTING_NAME ),
95- Setting .Property .NodeScope );
115+ public static final Setting <Integer > COORDINATOR_PROXY_QUEUE_CAPACITY = new Setting <>(QUEUE_CAPACITY_SETTING_NAME , settings -> {
116+ int maxConcurrentRequests = COORDINATOR_PROXY_MAX_CONCURRENT_REQUESTS .get (settings );
117+ int maxLookupsPerRequest = COORDINATOR_PROXY_MAX_LOOKUPS_PER_REQUEST .get (settings );
118+ return String .valueOf (maxConcurrentRequests * maxLookupsPerRequest );
119+ }, val -> Setting .parseInt (val , 1 , Integer .MAX_VALUE , QUEUE_CAPACITY_SETTING_NAME ), Setting .Property .NodeScope );
96120
97121 private final Settings settings ;
98122 private final Boolean enabled ;
@@ -113,13 +137,13 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
113137 return Map .of (EnrichProcessorFactory .TYPE , factory );
114138 }
115139
116- protected XPackLicenseState getLicenseState () { return XPackPlugin .getSharedLicenseState (); }
140+ protected XPackLicenseState getLicenseState () {
141+ return XPackPlugin .getSharedLicenseState ();
142+ }
117143
118144 public List <ActionHandler <? extends ActionRequest , ? extends ActionResponse >> getActions () {
119145 if (enabled == false ) {
120- return List .of (
121- new ActionHandler <>(XPackInfoFeatureAction .ENRICH , EnrichInfoTransportAction .class )
122- );
146+ return List .of (new ActionHandler <>(XPackInfoFeatureAction .ENRICH , EnrichInfoTransportAction .class ));
123147 }
124148
125149 return List .of (
@@ -135,10 +159,15 @@ public Map<String, Processor.Factory> getProcessors(Processor.Parameters paramet
135159 );
136160 }
137161
138- public List <RestHandler > getRestHandlers (Settings settings , RestController restController , ClusterSettings clusterSettings ,
139- IndexScopedSettings indexScopedSettings , SettingsFilter settingsFilter ,
140- IndexNameExpressionResolver indexNameExpressionResolver ,
141- Supplier <DiscoveryNodes > nodesInCluster ) {
162+ public List <RestHandler > getRestHandlers (
163+ Settings settings ,
164+ RestController restController ,
165+ ClusterSettings clusterSettings ,
166+ IndexScopedSettings indexScopedSettings ,
167+ SettingsFilter settingsFilter ,
168+ IndexNameExpressionResolver indexNameExpressionResolver ,
169+ Supplier <DiscoveryNodes > nodesInCluster
170+ ) {
142171 if (enabled == false ) {
143172 return List .of ();
144173 }
@@ -153,31 +182,42 @@ public List<RestHandler> getRestHandlers(Settings settings, RestController restC
153182 }
154183
155184 @ Override
156- public Collection <Object > createComponents (Client client , ClusterService clusterService , ThreadPool threadPool ,
157- ResourceWatcherService resourceWatcherService , ScriptService scriptService ,
158- NamedXContentRegistry xContentRegistry , Environment environment ,
159- NodeEnvironment nodeEnvironment , NamedWriteableRegistry namedWriteableRegistry ) {
185+ public Collection <Object > createComponents (
186+ Client client ,
187+ ClusterService clusterService ,
188+ ThreadPool threadPool ,
189+ ResourceWatcherService resourceWatcherService ,
190+ ScriptService scriptService ,
191+ NamedXContentRegistry xContentRegistry ,
192+ Environment environment ,
193+ NodeEnvironment nodeEnvironment ,
194+ NamedWriteableRegistry namedWriteableRegistry
195+ ) {
160196 if (enabled == false ) {
161197 return List .of ();
162198 }
163199
164200 EnrichPolicyLocks enrichPolicyLocks = new EnrichPolicyLocks ();
165- EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService (settings , client ,
166- clusterService , threadPool , enrichPolicyLocks );
167- enrichPolicyMaintenanceService .initialize ();
168- return List .of (
169- enrichPolicyLocks ,
170- new EnrichCoordinatorProxyAction .Coordinator (client , settings ),
171- enrichPolicyMaintenanceService
201+ EnrichPolicyMaintenanceService enrichPolicyMaintenanceService = new EnrichPolicyMaintenanceService (
202+ settings ,
203+ client ,
204+ clusterService ,
205+ threadPool ,
206+ enrichPolicyLocks
172207 );
208+ enrichPolicyMaintenanceService .initialize ();
209+ return List .of (enrichPolicyLocks , new EnrichCoordinatorProxyAction .Coordinator (client , settings ), enrichPolicyMaintenanceService );
173210 }
174211
175212 @ Override
176213 public List <NamedWriteableRegistry .Entry > getNamedWriteables () {
177214 return List .of (
178215 new NamedWriteableRegistry .Entry (MetaData .Custom .class , EnrichMetadata .TYPE , EnrichMetadata ::new ),
179- new NamedWriteableRegistry .Entry (NamedDiff .class , EnrichMetadata .TYPE ,
180- in -> EnrichMetadata .readDiffFrom (MetaData .Custom .class , EnrichMetadata .TYPE , in ))
216+ new NamedWriteableRegistry .Entry (
217+ NamedDiff .class ,
218+ EnrichMetadata .TYPE ,
219+ in -> EnrichMetadata .readDiffFrom (MetaData .Custom .class , EnrichMetadata .TYPE , in )
220+ )
181221 );
182222 }
183223
0 commit comments