下载
为了运行Flink,需提前安装好 Java 8 或者 Java 11.
下载flink1.18.1安装包并解压。(1.17.2版本)
tar -zxf flink-1.18.1-bin-scala_2.12.tgz
cd flink-1.18.1
修改配置
修改flink-conf.yaml
vim conf/flink-conf.yaml
修改监听端口
# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
rest.port: 8111
修改绑定ip,使webUi可以外部访问。
# To enable this, set the bind address to one that has access to outside-facing
# network interface, such as 0.0.0.0.
rest.bind-address: 0.0.0.0
修改checkPoint间隔时间
因为flinkCDC任务和一些flinkSql任务默认是根据checkpointing获取的,所以需要在配置文件中就先开启checkpointing
execution.checkpointing.interval: 3s
修改state配置
#修改任务槽数,建议和cpu核数一致
taskmanager.numberOfTaskSlots: 16
#默认的快捷指定方式是:'hashmap' 和 'rocksdb'.
state.backend.type: hashmap
#Flink 支持的文件系统中用于存储检查点的数据文件和元数据的默认目录。存储路径必须可从所有参与的进程/节点(即所有 TaskManager 和 JobManager)访问。
state.checkpoints.dir: file:///data/software/flink/flink-checkpoints
#保存点的默认目录。由将保存点写入文件系统的状态后端使用(HashMapStateBackend、EmbeddedRocksDBStateBackend)
state.savepoints.dir: file:///data/software/flink/flink-savepoints
#可识别的快捷方式名称是“jobmanager”和“filesystem”。
state.checkpoint-storage: filesystem
#flink默认只保留最近成功生成的1个Checkpoint,此配置可以指定要保留的已完成检查点的最大数量
state.checkpoints.num-retained: 10
#此配置定义了在job取消时应如何清理checkpoint
#execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
state配置
externalized-checkpoint-retention配置
内存配置
参考文档:内存调优指南 配置 Flink 进程的内存
配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:
不建议同时设置进程总内存和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。
独立部署模式下,我们通常更关注 Flink 应用本身使用的内存大小。 建议配置 Flink 总内存(taskmanager.memory.flink.size 或者 jobmanager.memory.flink.size)或其组成部分。 此外,如果出现 Metaspace 不足的问题,可以调整 JVM Metaspace 的大小。
这种情况下通常无需配置进程总内存,因为不管是 Flink 还是部署环境都不会对 JVM 开销 进行限制,它只与机器的物理资源相关。
#根据服务器实际情况选择
#配置总内存
taskmanager.memory.flink.size: 8192m
jobmanager.memory.flink.size: 4096m
#配置元空间内存
#taskmanager.memory.jvm-metaspace.size: 1gb
taskmanager.memory.jvm-metaspace.size: 512mb
jobmanager.memory.jvm-metaspace.size: 512mb
修改pekko
#由于机器速度慢或网络拥塞肯呢个会造成超时,需要配置timeout
pekko.ask.timeout: 120s
修改bin/config.sh文件
Flink启动时会把启动的进程的ID(pid)默认存到系统的/tmp目录下的一个文件中。由于是临时目录,会被系统清理,所以会导致pid找不到,导致使用./bin/stop-cluster.sh
无法关闭集群。因此我们需要修改config.sh
文件中默认的DEFAULT_ENV_PID_DIR
目录。
vim bin/config.sh
DEFAULT_ENV_PID_DIR="/home/software/flink/flink-1.18.0/tmp"
kill掉flink的进程之后重新使用./bin/start-cluster.sh
启动集群,查看新建的tmp目录,如下:
启动集群
Flink 附带了一个 bash 脚本,可以用于启动本地集群。
./bin/start-cluster.sh
测试提交一个作业
Flink 的 Releases 附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上
停止集群
./bin/stop-cluster.sh
评论区