Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions flink-end-to-end-tests/run-nightly-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ if [[ ${PROFILE} != *"jdk11"* && ${PROFILE} != *"enable-adaptive-scheduler"* ]];
run_test "Run kubernetes session test (default input)" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh"
run_test "Run kubernetes session test (custom fs plugin)" "$END_TO_END_DIR/test-scripts/test_kubernetes_session.sh dummy-fs"
run_test "Run kubernetes application test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application.sh"
run_test "Run kubernetes application HA test" "$END_TO_END_DIR/test-scripts/test_kubernetes_application_ha.sh"
run_test "Run Kubernetes IT test" "$END_TO_END_DIR/test-scripts/test_kubernetes_itcases.sh"

run_test "Running Flink over NAT end-to-end test" "$END_TO_END_DIR/test-scripts/test_nat.sh" "skip_check_exceptions"
Expand Down
41 changes: 35 additions & 6 deletions flink-end-to-end-tests/test-scripts/common_kubernetes.sh
Original file line number Diff line number Diff line change
Expand Up @@ -139,24 +139,53 @@ function debug_and_show_logs {
}

function wait_rest_endpoint_up_k8s {
wait_for_logs $1 "Rest endpoint listening at"
}

function wait_num_checkpoints {
POD_NAME=$1
NUM_CHECKPOINTS=$2

echo "Waiting for job ($POD_NAME) to have at least $NUM_CHECKPOINTS completed checkpoints ..."

# wait at most 120 seconds
local TIMEOUT=120
for i in $(seq 1 ${TIMEOUT}); do
N=$(kubectl logs $POD_NAME 2> /dev/null | grep -o "Completed checkpoint [1-9]* for job" | awk '{print $3}' | tail -1)

if [ -z $N ]; then
N=0
fi

if (( N < NUM_CHECKPOINTS )); then
sleep 1
else
return
fi
done
echo "Could not get $NUM_CHECKPOINTS completed checkpoints in $TIMEOUT sec"
exit 1
}

function wait_for_logs {
local jm_pod_name=$1
local successful_response_regex="Rest endpoint listening at"
local successful_response_regex=$2

echo "Waiting for jobmanager pod ${jm_pod_name} ready."
kubectl wait --for=condition=Ready --timeout=30s pod/$jm_pod_name || exit 1

# wait at most 30 seconds until the endpoint is up
# wait at most 30 seconds until the log shows up
local TIMEOUT=30
echo "Waiting for log \"$2\"..."
for i in $(seq 1 ${TIMEOUT}); do
if check_logs_output $jm_pod_name $successful_response_regex; then
echo "REST endpoint is up."
echo "Log \"$2\" shows up."
return
fi

echo "Waiting for REST endpoint to come up..."
sleep 1
done
echo "REST endpoint has not started within a timeout of ${TIMEOUT} sec"
echo "Log $2 does not show up within a timeout of ${TIMEOUT} sec"
exit 1
}

Expand Down Expand Up @@ -185,7 +214,7 @@ function get_host_machine_address {
if [[ "${OS_TYPE}" != "linux" ]]; then
echo $(minikube ssh "route -n | grep ^0.0.0.0 | awk '{ print \$2 }' | tr -d '[:space:]'")
else
echo "localhost"
echo $(hostname --ip-address)
fi
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

apiVersion: v1
kind: Pod
metadata:
name: pod-template
annotations:
annotation-key-of-pod-template: annotation-value-of-pod-template
labels:
label-key-of-pod-template: label-value-of-pod-template
spec:
containers:
# Do not change the main container name
- name: flink-main-container
imagePullPolicy: Never
resources:
requests:
ephemeral-storage: 256Mi
limits:
ephemeral-storage: 256Mi
env:
- name: ENV_OF_POD_TEMPLATE
value: env-value-of-pod-template
volumeMounts:
- mountPath: /flink-ha
name: flink-volume-hostpath
volumes:
- name: flink-volume-hostpath
hostPath:
path: /tmp
type: Directory
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

source "$(dirname "$0")"/common_kubernetes.sh

CLUSTER_ROLE_BINDING="flink-role-binding-default"
CLUSTER_ID="flink-native-k8s-application-ha-1"
FLINK_IMAGE_NAME="test_kubernetes_application_ha"
LOCAL_LOGS_PATH="${TEST_DATA_DIR}/log"
IMAGE_BUILD_RETRIES=3
IMAGE_BUILD_BACKOFF=2

function internal_cleanup {
if [[ "${OS_TYPE}" != "linux" ]]; then
minikube ssh "sudo rm -rf /tmp/${CLUSTER_ID}"
else
sudo rm -rf /tmp/${CLUSTER_ID}
fi
kubectl delete deployment ${CLUSTER_ID}
kubectl delete cm --selector="app=${CLUSTER_ID}"
kubectl delete clusterrolebinding ${CLUSTER_ROLE_BINDING}
}

start_kubernetes

if ! retry_times $IMAGE_BUILD_RETRIES $IMAGE_BUILD_BACKOFF "build_image ${FLINK_IMAGE_NAME} $(get_host_machine_address)"; then
echo "ERROR: Could not build image. Aborting..."
exit 1
fi

kubectl create clusterrolebinding ${CLUSTER_ROLE_BINDING} --clusterrole=edit --serviceaccount=default:default --namespace=default

mkdir -p "$LOCAL_LOGS_PATH"

# Set the memory and cpu smaller than default, so that the jobmanager and taskmanager pods could be allocated in minikube.
"$FLINK_DIR"/bin/flink run-application -t kubernetes-application \
-Dkubernetes.cluster-id=${CLUSTER_ID} \
-Dkubernetes.container.image=${FLINK_IMAGE_NAME} \
-Djobmanager.memory.process.size=1088m \
-Dkubernetes.jobmanager.cpu=0.5 \
-Dkubernetes.taskmanager.cpu=0.5 \
-Dkubernetes.rest-service.exposed.type=NodePort \
-Dkubernetes.pod-template-file=${CONTAINER_SCRIPTS}/kubernetes-pod-template.yaml \
-Dhigh-availability=org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory \
-Dhigh-availability.storageDir=file:///flink-ha \
-Drestart-strategy=fixed-delay \
-Drestart-strategy.fixed-delay.attempts=10 \
local:///opt/flink/examples/streaming/StateMachineExample.jar

kubectl wait --for=condition=Available --timeout=30s deploy/${CLUSTER_ID} || exit 1
jm_pod_name=$(kubectl get pods --selector="app=${CLUSTER_ID},component=jobmanager" -o jsonpath='{..metadata.name}')

wait_num_checkpoints $jm_pod_name 3

job_id=$(kubectl logs $jm_pod_name | grep -E -o 'Job [a-z0-9]+ is submitted' | awk '{print $2}')

# Kill the JobManager
kubectl exec $jm_pod_name -- /bin/sh -c "kill 1"

# Check the new JobManager recovering from latest successful checkpoint
wait_for_logs $jm_pod_name "Restoring job $job_id from Checkpoint"
wait_num_checkpoints $jm_pod_name 1

"$FLINK_DIR"/bin/flink cancel -t kubernetes-application -Dkubernetes.cluster-id=${CLUSTER_ID} $job_id

kubectl wait --for=delete configmap --timeout=60s --selector="app=${CLUSTER_ID}"
kubectl wait --for=delete pod --timeout=60s --selector="app=${CLUSTER_ID}"