Skip to content

Commit 237f20f

Browse files
committed
CSPL-3551 Update of conf files when queue name or type change
1 parent 4b064a6 commit 237f20f

File tree

5 files changed

+89
-31
lines changed

5 files changed

+89
-31
lines changed

pkg/splunk/client/enterprise.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1004,3 +1004,29 @@ func (c *SplunkClient) UpdateConfFile(fileName, property string, propertyKVList
10041004
err = c.Do(request, expectedStatus, nil)
10051005
return err
10061006
}
1007+
1008+
// Deletes conf files properties
1009+
func (c *SplunkClient) DeleteConfFileProperty(fileName, property string) error {
1010+
endpoint := fmt.Sprintf("%s/servicesNS/nobody/system/configs/conf-%s/%s", c.ManagementURI, fileName, property)
1011+
1012+
request, err := http.NewRequest("DELETE", endpoint, nil)
1013+
if err != nil {
1014+
return err
1015+
}
1016+
1017+
expectedStatus := []int{200, 201, 404}
1018+
return c.Do(request, expectedStatus, nil)
1019+
}
1020+
1021+
// Deletes conf files properties' values
1022+
func (c *SplunkClient) DeleteConfFilePropertyValue(fileName, property string, propertyKVList []string) error {
1023+
endpoint := fmt.Sprintf("%s/servicesNS/nobody/system/configs/conf-%s/%s", c.ManagementURI, fileName, property)
1024+
1025+
request, err := http.NewRequest("DELETE", endpoint, strings.NewReader(fmt.Sprintf("%s=%s", propertyKVList[0], propertyKVList[1])))
1026+
if err != nil {
1027+
return err
1028+
}
1029+
1030+
expectedStatus := []int{200, 201, 404}
1031+
return c.Do(request, expectedStatus, nil)
1032+
}

pkg/splunk/enterprise/indexercluster.go

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1277,7 +1277,25 @@ func (mgr *indexerClusterPodManager) handlePullBusOrPipelineConfigChange(ctx con
12771277
}
12781278
splunkClient := newSplunkClientForPullBusPipeline(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd))
12791279

1280-
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields := getChangedPullBusAndPipelineFieldsIndexer(&newCR.Status, newCR)
1280+
afterDelete := false
1281+
if (newCR.Spec.PullBus.SQS.QueueName != "" && newCR.Status.PullBus.SQS.QueueName != "" && newCR.Spec.PullBus.SQS.QueueName != newCR.Status.PullBus.SQS.QueueName) || (newCR.Spec.PullBus.Type != "" && newCR.Status.PullBus.Type != "" && newCR.Spec.PullBus.Type != newCR.Status.PullBus.Type) {
1282+
if err := splunkClient.DeleteConfFileProperty("outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.PullBus.SQS.QueueName)); err != nil {
1283+
updateErr = err
1284+
}
1285+
afterDelete = true
1286+
}
1287+
1288+
if (newCR.Spec.PullBus.SQS.RetryPolicy != "" && newCR.Status.PullBus.SQS.RetryPolicy != "" && newCR.Spec.PullBus.SQS.RetryPolicy != newCR.Status.PullBus.SQS.RetryPolicy) && !afterDelete {
1289+
if err := splunkClient.DeleteConfFilePropertyValue("outputs", fmt.Sprintf("remote_queue:%s", newCR.Spec.PullBus.SQS.QueueName), []string{fmt.Sprintf("remote_queue.%s.%s.max_retries_per_part", newCR.Spec.PullBus.SQS.RetryPolicy, newCR.Spec.PullBus.Type), fmt.Sprintf("%d", newCR.Spec.PullBus.SQS.MaxRetriesPerPart)}); err != nil {
1290+
updateErr = err
1291+
}
1292+
1293+
if err := splunkClient.DeleteConfFilePropertyValue("inputs", fmt.Sprintf("remote_queue:%s", newCR.Spec.PullBus.SQS.QueueName), []string{fmt.Sprintf("remote_queue.%s.%s.max_retries_per_part", newCR.Spec.PullBus.SQS.RetryPolicy, newCR.Spec.PullBus.Type), fmt.Sprintf("%d", newCR.Spec.PullBus.SQS.MaxRetriesPerPart)}); err != nil {
1294+
updateErr = err
1295+
}
1296+
}
1297+
1298+
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields := getChangedPullBusAndPipelineFieldsIndexer(&newCR.Status, newCR, afterDelete)
12811299

12821300
for _, pbVal := range pullBusChangedFieldsOutputs {
12831301
if err := splunkClient.UpdateConfFile("outputs", fmt.Sprintf("remote_queue:%s", newCR.Spec.PullBus.SQS.QueueName), [][]string{pbVal}); err != nil {
@@ -1302,15 +1320,15 @@ func (mgr *indexerClusterPodManager) handlePullBusOrPipelineConfigChange(ctx con
13021320
return updateErr
13031321
}
13041322

1305-
func getChangedPullBusAndPipelineFieldsIndexer(oldCrStatus *enterpriseApi.IndexerClusterStatus, newCR *enterpriseApi.IndexerCluster) (pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields [][]string) {
1323+
func getChangedPullBusAndPipelineFieldsIndexer(oldCrStatus *enterpriseApi.IndexerClusterStatus, newCR *enterpriseApi.IndexerCluster, afterDelete bool) (pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields [][]string) {
13061324
// Compare PullBus fields
13071325
oldPB := oldCrStatus.PullBus
13081326
newPB := newCR.Spec.PullBus
13091327
oldPC := oldCrStatus.PipelineConfig
13101328
newPC := newCR.Spec.PipelineConfig
13111329

13121330
// Push all PullBus fields
1313-
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs = pullBusChanged(oldPB, newPB)
1331+
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs = pullBusChanged(oldPB, newPB, afterDelete)
13141332

13151333
// Always set all pipeline fields, not just changed ones
13161334
pipelineChangedFields = pipelineConfigChanged(oldPC, newPC, oldCrStatus.PullBus.SQS.QueueName != "", true)
@@ -1329,37 +1347,37 @@ func imageUpdatedTo9(previousImage string, currentImage string) bool {
13291347
return strings.HasPrefix(previousVersion, "8") && strings.HasPrefix(currentVersion, "9")
13301348
}
13311349

1332-
func pullBusChanged(oldPullBus, newPullBus enterpriseApi.PushBusSpec) (inputs, outputs [][]string) {
1333-
if oldPullBus.Type != newPullBus.Type {
1350+
func pullBusChanged(oldPullBus, newPullBus enterpriseApi.PushBusSpec, afterDelete bool) (inputs, outputs [][]string) {
1351+
if oldPullBus.Type != newPullBus.Type || afterDelete {
13341352
inputs = append(inputs, []string{"remote_queue.type", newPullBus.Type})
13351353
}
1336-
if oldPullBus.SQS.AuthRegion != newPullBus.SQS.AuthRegion {
1354+
if oldPullBus.SQS.AuthRegion != newPullBus.SQS.AuthRegion || afterDelete {
13371355
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.auth_region", newPullBus.Type), newPullBus.SQS.AuthRegion})
13381356
}
1339-
if oldPullBus.SQS.Endpoint != newPullBus.SQS.Endpoint {
1357+
if oldPullBus.SQS.Endpoint != newPullBus.SQS.Endpoint || afterDelete {
13401358
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.endpoint", newPullBus.Type), newPullBus.SQS.Endpoint})
13411359
}
1342-
if oldPullBus.SQS.LargeMessageStoreEndpoint != newPullBus.SQS.LargeMessageStoreEndpoint {
1360+
if oldPullBus.SQS.LargeMessageStoreEndpoint != newPullBus.SQS.LargeMessageStoreEndpoint || afterDelete {
13431361
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", newPullBus.Type), newPullBus.SQS.LargeMessageStoreEndpoint})
13441362
}
1345-
if oldPullBus.SQS.LargeMessageStorePath != newPullBus.SQS.LargeMessageStorePath {
1363+
if oldPullBus.SQS.LargeMessageStorePath != newPullBus.SQS.LargeMessageStorePath || afterDelete {
13461364
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.large_message_store.path", newPullBus.Type), newPullBus.SQS.LargeMessageStorePath})
13471365
}
1348-
if oldPullBus.SQS.DeadLetterQueueName != newPullBus.SQS.DeadLetterQueueName {
1366+
if oldPullBus.SQS.DeadLetterQueueName != newPullBus.SQS.DeadLetterQueueName || afterDelete {
13491367
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.dead_letter_queue.name", newPullBus.Type), newPullBus.SQS.DeadLetterQueueName})
13501368
}
1351-
if oldPullBus.SQS.MaxRetriesPerPart != newPullBus.SQS.MaxRetriesPerPart || oldPullBus.SQS.RetryPolicy != newPullBus.SQS.RetryPolicy {
1369+
if oldPullBus.SQS.MaxRetriesPerPart != newPullBus.SQS.MaxRetriesPerPart || oldPullBus.SQS.RetryPolicy != newPullBus.SQS.RetryPolicy || afterDelete {
13521370
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.%s.max_retries_per_part", newPullBus.SQS.RetryPolicy, newPullBus.Type), fmt.Sprintf("%d", newPullBus.SQS.MaxRetriesPerPart)})
13531371
}
1354-
if oldPullBus.SQS.RetryPolicy != newPullBus.SQS.RetryPolicy {
1372+
if oldPullBus.SQS.RetryPolicy != newPullBus.SQS.RetryPolicy || afterDelete {
13551373
inputs = append(inputs, []string{fmt.Sprintf("remote_queue.%s.retry_policy", newPullBus.Type), newPullBus.SQS.RetryPolicy})
13561374
}
13571375

13581376
outputs = inputs
1359-
if oldPullBus.SQS.SendInterval != newPullBus.SQS.SendInterval {
1377+
if oldPullBus.SQS.SendInterval != newPullBus.SQS.SendInterval || afterDelete {
13601378
outputs = append(outputs, []string{fmt.Sprintf("remote_queue.%s.send_interval", newPullBus.Type), newPullBus.SQS.SendInterval})
13611379
}
1362-
if oldPullBus.SQS.EncodingFormat != newPullBus.SQS.EncodingFormat {
1380+
if oldPullBus.SQS.EncodingFormat != newPullBus.SQS.EncodingFormat || afterDelete {
13631381
outputs = append(outputs, []string{fmt.Sprintf("remote_queue.%s.encoding_format", newPullBus.Type), newPullBus.SQS.EncodingFormat})
13641382
}
13651383

pkg/splunk/enterprise/indexercluster_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2047,7 +2047,7 @@ func TestGetChangedPullBusAndPipelineFieldsIndexer(t *testing.T) {
20472047
},
20482048
}
20492049

2050-
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields := getChangedPullBusAndPipelineFieldsIndexer(&newCR.Status, newCR)
2050+
pullBusChangedFieldsInputs, pullBusChangedFieldsOutputs, pipelineChangedFields := getChangedPullBusAndPipelineFieldsIndexer(&newCR.Status, newCR, false)
20512051
assert.Equal(t, 8, len(pullBusChangedFieldsInputs))
20522052
assert.Equal(t, [][]string{
20532053
{"remote_queue.type", newCR.Spec.PullBus.Type},

pkg/splunk/enterprise/ingestorcluster.go

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,21 @@ func (mgr *ingestorClusterPodManager) handlePushBusOrPipelineConfigChange(ctx co
393393
}
394394
splunkClient := mgr.newSplunkClient(fmt.Sprintf("https://%s:8089", fqdnName), "admin", string(adminPwd))
395395

396-
pushBusChangedFields, pipelineChangedFields := getChangedPushBusAndPipelineFields(&newCR.Status, newCR)
396+
afterDelete := false
397+
if (newCR.Spec.PushBus.SQS.QueueName != "" && newCR.Status.PushBus.SQS.QueueName != "" && newCR.Spec.PushBus.SQS.QueueName != newCR.Status.PushBus.SQS.QueueName) || (newCR.Spec.PushBus.Type != "" && newCR.Status.PushBus.Type != "" && newCR.Spec.PushBus.Type != newCR.Status.PushBus.Type) {
398+
if err := splunkClient.DeleteConfFileProperty("outputs", fmt.Sprintf("remote_queue:%s", newCR.Status.PushBus.SQS.QueueName)); err != nil {
399+
updateErr = err
400+
}
401+
afterDelete = true
402+
}
403+
404+
if (newCR.Spec.PushBus.SQS.RetryPolicy != "" && newCR.Status.PushBus.SQS.RetryPolicy != "" && newCR.Spec.PushBus.SQS.RetryPolicy != newCR.Status.PushBus.SQS.RetryPolicy) && !afterDelete {
405+
if err := splunkClient.DeleteConfFilePropertyValue("outputs", fmt.Sprintf("remote_queue:%s", newCR.Spec.PushBus.SQS.QueueName), []string{fmt.Sprintf("remote_queue.%s.%s.max_retries_per_part", newCR.Spec.PushBus.SQS.RetryPolicy, newCR.Spec.PushBus.Type), fmt.Sprintf("%d", newCR.Spec.PushBus.SQS.MaxRetriesPerPart)}); err != nil {
406+
updateErr = err
407+
}
408+
}
409+
410+
pushBusChangedFields, pipelineChangedFields := getChangedPushBusAndPipelineFields(&newCR.Status, newCR, afterDelete)
397411

398412
for _, pbVal := range pushBusChangedFields {
399413
if err := splunkClient.UpdateConfFile("outputs", fmt.Sprintf("remote_queue:%s", newCR.Spec.PushBus.SQS.QueueName), [][]string{pbVal}); err != nil {
@@ -413,14 +427,14 @@ func (mgr *ingestorClusterPodManager) handlePushBusOrPipelineConfigChange(ctx co
413427
}
414428

415429
// Returns the names of PushBus and PipelineConfig fields that changed between oldCR and newCR.
416-
func getChangedPushBusAndPipelineFields(oldCrStatus *enterpriseApi.IngestorClusterStatus, newCR *enterpriseApi.IngestorCluster) (pushBusChangedFields, pipelineChangedFields [][]string) {
430+
func getChangedPushBusAndPipelineFields(oldCrStatus *enterpriseApi.IngestorClusterStatus, newCR *enterpriseApi.IngestorCluster, afterDelete bool) (pushBusChangedFields, pipelineChangedFields [][]string) {
417431
oldPB := oldCrStatus.PushBus
418432
newPB := newCR.Spec.PushBus
419433
oldPC := oldCrStatus.PipelineConfig
420434
newPC := newCR.Spec.PipelineConfig
421435

422436
// Push changed PushBus fields
423-
pushBusChangedFields = pushBusChanged(oldPB, newPB)
437+
pushBusChangedFields = pushBusChanged(oldPB, newPB, afterDelete)
424438

425439
// Always changed pipeline fields
426440
pipelineChangedFields = pipelineConfigChanged(oldPC, newPC, oldCrStatus.PushBus.SQS.QueueName != "", false)
@@ -468,35 +482,35 @@ func pipelineConfigChanged(oldPipelineConfig, newPipelineConfig enterpriseApi.Pi
468482
return output
469483
}
470484

471-
func pushBusChanged(oldPushBus, newPushBus enterpriseApi.PushBusSpec) (output [][]string) {
472-
if oldPushBus.Type != newPushBus.Type {
485+
func pushBusChanged(oldPushBus, newPushBus enterpriseApi.PushBusSpec, afterDelete bool) (output [][]string) {
486+
if oldPushBus.Type != newPushBus.Type || afterDelete {
473487
output = append(output, []string{"remote_queue.type", newPushBus.Type})
474488
}
475-
if oldPushBus.SQS.EncodingFormat != newPushBus.SQS.EncodingFormat {
489+
if oldPushBus.SQS.EncodingFormat != newPushBus.SQS.EncodingFormat || afterDelete {
476490
output = append(output, []string{fmt.Sprintf("remote_queue.%s.encoding_format", newPushBus.Type), newPushBus.SQS.EncodingFormat})
477491
}
478-
if oldPushBus.SQS.AuthRegion != newPushBus.SQS.AuthRegion {
492+
if oldPushBus.SQS.AuthRegion != newPushBus.SQS.AuthRegion || afterDelete {
479493
output = append(output, []string{fmt.Sprintf("remote_queue.%s.auth_region", newPushBus.Type), newPushBus.SQS.AuthRegion})
480494
}
481-
if oldPushBus.SQS.Endpoint != newPushBus.SQS.Endpoint {
495+
if oldPushBus.SQS.Endpoint != newPushBus.SQS.Endpoint || afterDelete {
482496
output = append(output, []string{fmt.Sprintf("remote_queue.%s.endpoint", newPushBus.Type), newPushBus.SQS.Endpoint})
483497
}
484-
if oldPushBus.SQS.LargeMessageStoreEndpoint != newPushBus.SQS.LargeMessageStoreEndpoint {
498+
if oldPushBus.SQS.LargeMessageStoreEndpoint != newPushBus.SQS.LargeMessageStoreEndpoint || afterDelete {
485499
output = append(output, []string{fmt.Sprintf("remote_queue.%s.large_message_store.endpoint", newPushBus.Type), newPushBus.SQS.LargeMessageStoreEndpoint})
486500
}
487-
if oldPushBus.SQS.LargeMessageStorePath != newPushBus.SQS.LargeMessageStorePath {
501+
if oldPushBus.SQS.LargeMessageStorePath != newPushBus.SQS.LargeMessageStorePath || afterDelete {
488502
output = append(output, []string{fmt.Sprintf("remote_queue.%s.large_message_store.path", newPushBus.Type), newPushBus.SQS.LargeMessageStorePath})
489503
}
490-
if oldPushBus.SQS.DeadLetterQueueName != newPushBus.SQS.DeadLetterQueueName {
504+
if oldPushBus.SQS.DeadLetterQueueName != newPushBus.SQS.DeadLetterQueueName || afterDelete {
491505
output = append(output, []string{fmt.Sprintf("remote_queue.%s.dead_letter_queue.name", newPushBus.Type), newPushBus.SQS.DeadLetterQueueName})
492506
}
493-
if oldPushBus.SQS.MaxRetriesPerPart != newPushBus.SQS.MaxRetriesPerPart || oldPushBus.SQS.RetryPolicy != newPushBus.SQS.RetryPolicy {
507+
if oldPushBus.SQS.MaxRetriesPerPart != newPushBus.SQS.MaxRetriesPerPart || oldPushBus.SQS.RetryPolicy != newPushBus.SQS.RetryPolicy || afterDelete {
494508
output = append(output, []string{fmt.Sprintf("remote_queue.%s.%s.max_retries_per_part", newPushBus.SQS.RetryPolicy, newPushBus.Type), fmt.Sprintf("%d", newPushBus.SQS.MaxRetriesPerPart)})
495509
}
496-
if oldPushBus.SQS.RetryPolicy != newPushBus.SQS.RetryPolicy {
510+
if oldPushBus.SQS.RetryPolicy != newPushBus.SQS.RetryPolicy || afterDelete {
497511
output = append(output, []string{fmt.Sprintf("remote_queue.%s.retry_policy", newPushBus.Type), newPushBus.SQS.RetryPolicy})
498512
}
499-
if oldPushBus.SQS.SendInterval != newPushBus.SQS.SendInterval {
513+
if oldPushBus.SQS.SendInterval != newPushBus.SQS.SendInterval || afterDelete {
500514
output = append(output, []string{fmt.Sprintf("remote_queue.%s.send_interval", newPushBus.Type), newPushBus.SQS.SendInterval})
501515
}
502516
return output

pkg/splunk/enterprise/ingestorcluster_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func TestApplyIngestorCluster(t *testing.T) {
273273
for i := 0; i < int(cr.Status.ReadyReplicas); i++ {
274274
podName := fmt.Sprintf("splunk-test-ingestor-%d", i)
275275
baseURL := fmt.Sprintf("https://%s.splunk-%s-ingestor-headless.%s.svc.cluster.local:8089/servicesNS/nobody/system/configs/conf-default-mode", podName, cr.GetName(), cr.GetNamespace())
276-
276+
277277
for _, field := range propertyKVList {
278278
req, _ := http.NewRequest("POST", baseURL, strings.NewReader(fmt.Sprintf("name=%s", field[0])))
279279
mockHTTPClient.AddHandler(req, 200, "", nil)
@@ -417,7 +417,7 @@ func TestGetChangedPushBusAndPipelineFieldsIngestor(t *testing.T) {
417417
},
418418
}
419419

420-
pushBusChangedFields, pipelineChangedFields := getChangedPushBusAndPipelineFields(&newCR.Status, newCR)
420+
pushBusChangedFields, pipelineChangedFields := getChangedPushBusAndPipelineFields(&newCR.Status, newCR, false)
421421

422422
assert.Equal(t, 10, len(pushBusChangedFields))
423423
assert.Equal(t, [][]string{

0 commit comments

Comments
 (0)