一、环境准备
1. Java环境安装
bash
# 安装OpenJDK 11
apt update
apt install -y openjdk-11-jdk
# 配置JAVA_HOME
echo "export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64">>~/.bashrc
echo "export PATH=\$PATH:\$JAVA_HOME/bin">>~/.bashrc
source ~/.bashrc
2. Flink安装
bash
# 下载Flink
wget https://dlcdn.apache.org/flink/flink-1.15.4/flink-1.15.4-bin-scala_2.12.tgz
tar xzf flink-1.15.4-bin-scala_2.12.tgz
mv flink-1.15.4/usr/local/flink
# 创建配置目录
mkdir -p /usr/local/flink/conf
二、集群配置
1. 基础配置
yaml
# flink-conf.yaml
jobmanager.rpc.address: node1.example.com
jobmanager.rpc.port:6123
jobmanager.memory.process.size:4096m
taskmanager.memory.process.size:8192m
taskmanager.numberOfTaskSlots:8
parallelism.default:4
2. 高可用配置
yaml
# flink-conf.yaml
high-availability: zookeeper
high-availability.storageDir: hdfs:///flink/ha/
high-availability.zookeeper.quorum: zk1:2181,zk2:2181,zk3:2181
high-availability.zookeeper.path.root:/flink
三、性能优化
1. 内存配置
yaml
# 配置TaskManager内存
taskmanager.memory.framework.heap.size:128m
taskmanager.memory.managed.size:4096m
taskmanager.memory.network.max:2048m
taskmanager.memory.network.min:1024m
2. 网络配置
yaml
# 网络调优
taskmanager.network.memory.fraction:0.1
taskmanager.network.request-backoff.initial:100
taskmanager.network.request-backoff.max:10000
四、监控系统搭建
1. Prometheus配置
yaml
# prometheus.yml
scrape_configs:
- job_name:'flink'
static_configs:
- targets:['localhost:9249']
metrics_path:/metrics
2. Grafana面板
json
{
"dashboard":{
"panels":[
{
"title":"Job Manager Memory Usage",
"type":"graph",
"targets":[
{
"expr":"flink_jobmanager_Status_JVM_Memory_Heap_Used"
}
]
}
]
}
}
五、作业管理
1. 作业提交
bash
# 提交Flink作业
./bin/flink run \
-m yarn-cluster \
-yjm 1024m \
-ytm 4096m \
-c com.example.StreamingJob \
./examples/streaming/StreamingJob.jar
2. 作业监控
java
// 配置监控指标
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
六、运维管理
1. 日志配置
xml
<!-- log4j.properties -->
log4j.rootLogger=INFO, file
log4j.appender.file=org.apache.log4j.RollingFileAppender
log4j.appender.file.File=${log.file}
log4j.appender.file.MaxFileSize=100MB
log4j.appender.file.MaxBackupIndex=10
2. 备份恢复
bash
# 备份配置文件
tar -czf flink-conf-backup.tar.gz /usr/local/flink/conf/
# 备份检查点
hdfs dfs -cp /flink/checkpoints/* /backup/flink/checkpoints/