From f15d2770c27683c57f3e85c5e2cd3cf96c209157 Mon Sep 17 00:00:00 2001 From: wangyang0918 Date: Sat, 8 May 2021 11:54:13 +0800 Subject: [PATCH] [FLINK-19545][e2e] Add e2e test for native Kubernetes HA The HA e2e test will start a Flink application first and wait for three successful checkpoints. Then kill the JobManager. A new JobManager should be launched and recover the job from latest successful checkpoint. Finally, cancel the job and all the K8s resources should be cleaned up automatically. This closes #14172. --- flink-end-to-end-tests/run-nightly-tests.sh | 1 + .../test-scripts/common_kubernetes.sh | 41 +++++++-- .../kubernetes-pod-template.yaml | 47 +++++++++++ .../test_kubernetes_application_ha.sh | 83 +++++++++++++++++++ 4 files changed, 166 insertions(+), 6 deletions(-) create mode 100644 flink-end-to-end-tests/test-scripts/container-scripts/kubernetes-pod-template.yaml create mode 100755 flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh diff --git a/flink-end-to-end-tests/run-nightly-tests.sh b/flink-end-to-end-tests/run-nightly-tests.sh index 83b95baf5438a..c0e185a245534 100755 --- a/flink-end-to-end-tests/run-nightly-tests.sh +++ b/flink-end-to-end-tests/run-nightly-tests.sh @@ -133,6 +133,7 @@ if [[ ${PROFILE} != *"jdk11"* && ${PROFILE} != *"enable-adaptive-scheduler"* ]]; run_test "Run kubernetes session test (default input)" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh" run_test "Run kubernetes session test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh dummy-fs" run_test "Run kubernetes application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application.sh" + run_test "Run kubernetes application HA test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application_ha.sh" run_test "Run Kubernetes IT test" "$END_TO_END_DIR/test-scripts/test_kubernetes_itcases.sh" run_test "Running Flink over NAT end-to-end test" "$END_TO_END_DIR/test-scripts/test_nat.sh" "skip_check_exceptions" diff --git a/flink-end-to-end-tests/test-scripts/common_kubernetes.sh b/flink-end-to-end-tests/test-scripts/common_kubernetes.sh index 57e8a1bad71ed..7a393d910825d 100755 --- a/flink-end-to-end-tests/test-scripts/common_kubernetes.sh +++ b/flink-end-to-end-tests/test-scripts/common_kubernetes.sh @@ -139,24 +139,53 @@ function debug_and_show_logs { } function wait_rest_endpoint_up_k8s { + wait_for_logs $1 "Rest endpoint listening at" +} + +function wait_num_checkpoints { + POD_NAME=$1 + NUM_CHECKPOINTS=$2 + + echo "Waiting for job ($POD_NAME) to have at least $NUM_CHECKPOINTS completed checkpoints ..." + + # wait at most 120 seconds + local TIMEOUT=120 + for i in $(seq 1 ${TIMEOUT}); do + N=$(kubectl logs $POD_NAME 2> /dev/null | grep -o "Completed checkpoint [1-9]* for job" | awk '{print $3}' | tail -1) + + if [ -z $N ]; then + N=0 + fi + + if (( N < NUM_CHECKPOINTS )); then + sleep 1 + else + return + fi + done + echo "Could not get $NUM_CHECKPOINTS completed checkpoints in $TIMEOUT sec" + exit 1 +} + +function wait_for_logs { local jm_pod_name=$1 - local successful_response_regex="Rest endpoint listening at" + local successful_response_regex=$2 echo "Waiting for jobmanager pod ${jm_pod_name} ready." kubectl wait --for=condition=Ready --timeout=30s pod/$jm_pod_name || exit 1 - # wait at most 30 seconds until the endpoint is up + # wait at most 30 seconds until the log shows up local TIMEOUT=30 + echo "Waiting for log \"$2\"..." for i in $(seq 1 ${TIMEOUT}); do if check_logs_output $jm_pod_name $successful_response_regex; then - echo "REST endpoint is up." + echo "Log \"$2\" shows up." return fi - echo "Waiting for REST endpoint to come up..." sleep 1 done - echo "REST endpoint has not started within a timeout of ${TIMEOUT} sec" + echo "Log $2 does not show up within a timeout of ${TIMEOUT} sec" exit 1 } @@ -185,7 +214,7 @@ function get_host_machine_address { if [[ "${OS_TYPE}" != "linux" ]]; then echo $(minikube ssh "route -n | grep ^0.0.0.0 | awk '{ print \$2 }' | tr -d '[:space:]'") else - echo "localhost" + echo $(hostname --ip-address) fi } diff --git a/flink-end-to-end-tests/test-scripts/container-scripts/kubernetes-pod-template.yaml b/flink-end-to-end-tests/test-scripts/container-scripts/kubernetes-pod-template.yaml new file mode 100644 index 0000000000000..b7ffaeb85b21f --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/container-scripts/kubernetes-pod-template.yaml @@ -0,0 +1,47 @@ +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +apiVersion: v1 +kind: Pod +metadata: + name: pod-template + annotations: + annotation-key-of-pod-template: annotation-value-of-pod-template + labels: + label-key-of-pod-template: label-value-of-pod-template +spec: + containers: + # Do not change the main container name + - name: flink-main-container + imagePullPolicy: Never + resources: + requests: + ephemeral-storage: 256Mi + limits: + ephemeral-storage: 256Mi + env: + - name: ENV_OF_POD_TEMPLATE + value: env-value-of-pod-template + volumeMounts: + - mountPath: /flink-ha + name: flink-volume-hostpath + volumes: + - name: flink-volume-hostpath + hostPath: + path: /tmp + type: Directory diff --git a/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh b/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh new file mode 100755 index 0000000000000..0d8541ef173a5 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_kubernetes_application_ha.sh @@ -0,0 +1,83 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common_kubernetes.sh + +CLUSTER_ROLE_BINDING="flink-role-binding-default" +CLUSTER_ID="flink-native-k8s-application-ha-1" +FLINK_IMAGE_NAME="test_kubernetes_application_ha" +LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log" +IMAGE_BUILD_RETRIES=3 +IMAGE_BUILD_BACKOFF=2 + +function internal_cleanup { + if [[ "${OS_TYPE}" != "linux" ]]; then + minikube ssh "sudo rm -rf /tmp/${CLUSTER_ID}" + else + sudo rm -rf /tmp/${CLUSTER_ID} + fi + kubectl delete deployment ${CLUSTER_ID} + kubectl delete cm --selector="app=${CLUSTER_ID}" + kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING} +} + +start_kubernetes + +if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image ${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then + echo "ERROR: Could not build image. Aborting..." + exit 1 +fi + +kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit --serviceaccount=default:default --namespace=default + +mkdir -p "$LOCAL_LOGS_PATH" + +# Set the memory and cpu smaller than default, so that the jobmanager and taskmanager pods could be allocated in minikube. +"$FLINK_DIR"/bin/flink run-application -t kubernetes-application \ + -Dkubernetes.cluster-id=${CLUSTER_ID} \ + -Dkubernetes.container.image=${FLINK_IMAGE_NAME} \ + -Djobmanager.memory.process.size=1088m \ + -Dkubernetes.jobmanager.cpu=0.5 \ + -Dkubernetes.taskmanager.cpu=0.5 \ + -Dkubernetes.rest-service.exposed.type=NodePort \ + -Dkubernetes.pod-template-file=${CONTAINER_SCRIPTS}/kubernetes-pod-template.yaml \ + -Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \ + -Dhigh-availability.storageDir=file:///flink-ha \ + -Drestart-strategy=fixed-delay \ + -Drestart-strategy.fixed-delay.attempts=10 \ + local:///opt/flink/examples/streaming/StateMachineExample.jar + +kubectl wait --for=condition=Available --timeout=30s deploy/${CLUSTER_ID} || exit 1 +jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}') + +wait_num_checkpoints $jm_pod_name 3 + +job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}') + +# Kill the JobManager +kubectl exec $jm_pod_name -- /bin/sh -c "kill 1" + +# Check the new JobManager recovering from latest successful checkpoint +wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint" +wait_num_checkpoints $jm_pod_name 1 + +"$FLINK_DIR"/bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=${CLUSTER_ID} $job_id + +kubectl wait --for=delete configmap --timeout=60s --selector="app=${CLUSTER_ID}" +kubectl wait --for=delete pod --timeout=60s --selector="app=${CLUSTER_ID}"