Introduction
Apache Flink is a powerful stream processing framework used for processing large volumes of data in real-time. Kubernetes, on the other hand, is a container orchestration platform that automates the deployment, scaling, and management of containerized applications. Combining Apache Flink with Kubernetes provides a robust solution for managing and scaling real-time data processing applications. This tutorial aims to provide an in-depth guide on how to deploy and manage Apache Flink applications on Kubernetes for real-time data processing. We assume that the reader has a basic understanding of both Apache Flink and Kubernetes.
Prerequisites
Before we dive into the details, ensure you have the following prerequisites:
- A working Kubernetes cluster (Minikube, GKE, EKS, or AKS can be used for development and testing).
- kubectl command-line tool configured to interact with your Kubernetes cluster.
- Apache Flink binaries or a Docker image of Apache Flink.
- Basic understanding of Kubernetes concepts such as Pods, Services, Deployments, and ConfigMaps.
Setting Up Kubernetes Cluster
If you do not have a Kubernetes cluster set up, you can use Minikube for local development and testing. Here is a quick guide to set up Minikube:
Install Minikube
Follow the installation guide for your operating system from the official Minikube documentation.
Start Minikube
minikube start --cpus 4 --memory 8192
Code language: Shell Session (shell)
This command starts a Minikube cluster with 4 CPUs and 8GB of memory, which should be sufficient for running Apache Flink.
Verify Minikube
Ensure Minikube is running correctly:
kubectl get nodes
Code language: Shell Session (shell)
You should see a node named minikube
in the Ready
state.
Deploying Apache Flink on Kubernetes
Now that we have a Kubernetes cluster up and running, let’s deploy Apache Flink. We will use a Docker image of Apache Flink for this purpose.
Create a Namespace
First, create a namespace for Flink to keep the resources isolated:
kubectl create namespace flink
Code language: YAML (yaml)
Flink Docker Image
We will use the official Apache Flink Docker image. You can also build your own custom image if needed. The official Flink Docker images are available on Docker Hub.
Deploy Flink JobManager
The JobManager is the central coordinator of a Flink cluster, responsible for scheduling tasks, managing checkpoints, and handling job lifecycle events. We will create a Deployment and a Service for the JobManager.
JobManager Deployment
Create a file named jobmanager-deployment.yaml
with the following content:
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-jobmanager
namespace: flink
spec:
replicas: 1
selector:
matchLabels:
app: flink
component: jobmanager
template:
metadata:
labels:
app: flink
component: jobmanager
spec:
containers:
- name: jobmanager
image: flink:latest
args: ["jobmanager"]
ports:
- containerPort: 6123
name: rpc
- containerPort: 8081
name: web
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
Code language: YAML (yaml)
JobManager Service
Create a file named jobmanager-service.yaml
with the following content:
apiVersion: v1
kind: Service
metadata:
name: flink-jobmanager
namespace: flink
spec:
ports:
- port: 6123
name: rpc
- port: 8081
name: web
selector:
app: flink
component: jobmanager
Code language: YAML (yaml)
Deploy the JobManager resources:
kubectl apply -f jobmanager-deployment.yaml
kubectl apply -f jobmanager-service.yaml
Code language: Shell Session (shell)
Deploy Flink TaskManagers
TaskManagers are the worker nodes in a Flink cluster. They execute the tasks assigned by the JobManager.
TaskManager Deployment
Create a file named taskmanager-deployment.yaml
with the following content:
apiVersion: apps/v1
kind: Deployment
metadata:
name: flink-taskmanager
namespace: flink
spec:
replicas: 2
selector:
matchLabels:
app: flink
component: taskmanager
template:
metadata:
labels:
app: flink
component: taskmanager
spec:
containers:
- name: taskmanager
image: flink:latest
args: ["taskmanager"]
ports:
- containerPort: 6121
name: data
- containerPort: 6122
name: rpc
env:
- name: JOB_MANAGER_RPC_ADDRESS
value: flink-jobmanager
Code language: YAML (yaml)
Deploy the TaskManager resources:
kubectl apply -f taskmanager-deployment.yaml
Code language: Shell Session (shell)
Verify the Deployment
Check the status of the deployments:
kubectl get deployments -n flink
Code language: Shell Session (shell)
You should see both the flink-jobmanager
and flink-taskmanager
deployments in the Available
state.
Check the status of the pods:
kubectl get pods -n flink
Code language: Shell Session (shell)
You should see one flink-jobmanager
pod and two flink-taskmanager
pods running.
Accessing Flink Web Dashboard
To access the Flink web dashboard, you need to expose the JobManager service. You can use a port-forward for local development:
kubectl port-forward svc/flink-jobmanager 8081:8081 -n flink
Code language: Shell Session (shell)
Now, you can access the Flink web dashboard at http://localhost:8081
.
Running a Flink Job on Kubernetes
With the Flink cluster up and running, let’s run a Flink job. For demonstration purposes, we will use a sample Flink job that performs word count on a stream of text data.
Sample Flink Job
Save the following Flink job code in a file named WordCount.java
:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class WordCount {
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// create a DataStream from socket text stream
DataStream<String> text = env.socketTextStream("localhost", 9999);
// parse the data, group it, window it, and aggregate the counts
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);
// print the results
counts.print();
// execute program
env.execute("Streaming WordCount");
}
// Tokenizer function to split text into words
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
for (String word : value.split("\\s")) {
if (word.length() > 0) {
out.collect(new Tuple2<>(word, 1));
}
}
}
}
}
Code language: Java (java)
Build and Package the Flink Job
Compile and package the Flink job into a JAR file using Maven or any other build tool. Ensure you have Maven installed, then run the following command in the directory containing the pom.xml
file:
mvn clean package
Code language: Bash (bash)
This command will generate a JAR file in the target
directory.
Submit the Flink Job
Submit the Flink job to the Flink cluster running on Kubernetes using the Flink web dashboard or the Flink CLI.
Using Flink Web Dashboard
- Open the Flink web dashboard at
http://localhost:8081
. - Navigate to the “Submit new job” page.
- Upload the JAR file and submit the job.
Using Flink CLI
You can also use the Flink CLI to submit the job. First, copy the JAR file to the JobManager pod:
kubectl cp target/your-flink-job.jar flink/flink-jobmanager:/your-flink-job.jar
Code language: Bash (bash)
Then, submit the job using the Flink CLI:
kubectl exec -it $(kubectl get pods -n flink -l app=flink,component=jobmanager -o jsonpath='{.items[0].metadata.name}') -n flink -- /bin/sh
flink run /your-flink-job.jar
Code language: Bash (bash)
Replace /your-flink-job.jar
with the path to your JAR file in the JobManager pod.
Verifying the Job
Once the job is submitted, you can verify its execution through the Flink web dashboard. You should see the job listed under “Running Jobs” with details about its execution.
Sending Data to the Flink Job
The sample Flink job expects a text stream from a socket. To send data to the Flink job, you can use nc
(netcat) to create a socket server:
nc -lk 9999
Code language: Bash (bash)
Now, type some text into the terminal, and you
should see the word counts appearing in the Flink web dashboard.
Scaling Flink Cluster on Kubernetes
One of the key benefits of running Apache Flink on Kubernetes is the ease of scaling the cluster. You can scale the number of TaskManager replicas to increase the processing capacity.
Scaling TaskManagers
To scale the TaskManagers, update the replicas
field in the taskmanager-deployment.yaml
file and apply the changes:
spec:
replicas: 4
Code language: YAML (yaml)
Apply the changes:
kubectl apply -f taskmanager-deployment.yaml
Code language: Bash (bash)
Verify the scaling operation:
kubectl get pods -n flink
Code language: Bash (bash)
You should see four flink-taskmanager
pods running.
Auto-scaling with Kubernetes
Kubernetes supports Horizontal Pod Autoscaling (HPA) based on CPU utilization or custom metrics. To enable auto-scaling for Flink TaskManagers, you need to set up metrics and create an HPA resource.
Metrics Server
First, install the Kubernetes Metrics Server:
kubectl apply -f https://github.com/kubernetes-sigs/metrics-server/releases/latest/download/components.yaml
Code language: Bash (bash)
Verify the Metrics Server:
kubectl get deployment metrics-server -n kube-system
Code language: Bash (bash)
Create HPA for TaskManagers
Create a file named taskmanager-hpa.yaml
with the following content:
apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
metadata:
name: flink-taskmanager-hpa
namespace: flink
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: flink-taskmanager
minReplicas: 2
maxReplicas: 10
targetCPUUtilizationPercentage: 50
Code language: YAML (yaml)
Apply the HPA resource:
kubectl apply -f taskmanager-hpa.yaml
Code language: Bash (bash)
Verify the HPA:
kubectl get hpa -n flink
Code language: Bash (bash)
The HPA will automatically scale the number of TaskManager replicas based on the CPU utilization.
Advanced Configuration and Monitoring
Configuring Flink Properties
You can configure Flink properties using ConfigMaps in Kubernetes. Create a ConfigMap with the desired Flink configuration and mount it as a volume in the JobManager and TaskManager pods.
Create a file named flink-configmap.yaml
with the following content:
apiVersion: v1
kind: ConfigMap
metadata:
name: flink-config
namespace: flink
data:
flink-conf.yaml: |
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
state.backend: filesystem
state.checkpoints.dir: s3://your-bucket/checkpoints/
state.savepoints.dir: s3://your-bucket/savepoints/
Code language: YAML (yaml)
Apply the ConfigMap:
kubectl apply -f flink-configmap.yaml
Code language: YAML (yaml)
Update the JobManager and TaskManager deployments to mount the ConfigMap:
spec:
containers:
- name: jobmanager
image: flink:latest
volumeMounts:
- name: flink-config-volume
mountPath: /opt/flink/conf
volumes:
- name: flink-config-volume
configMap:
name: flink-config
Code language: YAML (yaml)
Apply the updated deployments:
kubectl apply -f jobmanager-deployment.yaml
kubectl apply -f taskmanager-deployment.yaml
Code language: Bash (bash)
Monitoring Flink with Prometheus and Grafana
Monitoring is essential for managing a Flink cluster in production. You can use Prometheus and Grafana for monitoring Flink metrics.
Deploy Prometheus and Grafana
You can deploy Prometheus and Grafana using Helm charts:
helm repo add prometheus-community https://prometheus-community.github.io/helm-charts
helm repo add grafana https://grafana.github.io/helm-charts
helm repo update
# Deploy Prometheus
helm install prometheus prometheus-community/prometheus
# Deploy Grafana
helm install grafana grafana/grafana
Code language: Bash (bash)
Configure Flink for Prometheus
Update the Flink configuration to enable Prometheus metrics:
data:
flink-conf.yaml: |
metrics.reporter.prom.class: org.apache.flink.metrics.prometheus.PrometheusReporter
metrics.reporter.prom.port: 9250
Code language: YAML (yaml)
Apply the ConfigMap and restart the JobManager and TaskManager pods.
Create Prometheus Scrape Config
Add a scrape configuration for Flink in Prometheus:
scrape_configs:
- job_name: 'flink'
static_configs:
- targets: ['<jobmanager-pod-ip>:9250', '<taskmanager-pod-ip>:9250']
Code language: YAML (yaml)
Replace <jobmanager-pod-ip>
and <taskmanager-pod-ip>
with the actual IP addresses of the JobManager and TaskManager pods.
Import Grafana Dashboard
Import a Flink dashboard into Grafana to visualize the metrics. You can find pre-built dashboards on the Grafana website or create your own.
Conclusion
In this tutorial, we have covered the complete process of deploying Apache Flink on Kubernetes for real-time data processing. We discussed setting up a Kubernetes cluster, deploying Flink components, running a Flink job, scaling the cluster, configuring Flink properties, and monitoring the Flink cluster using Prometheus and Grafana. By following this guide, you should be able to leverage the power of Kubernetes to manage and scale your Apache Flink applications effectively.