下载

了运行Flink,需提前安装好 Java 8 或者 Java 11.

下载flink1.18.1安装包并解压。(1.17.2版本

tar -zxf flink-1.18.1-bin-scala_2.12.tgz
cd flink-1.18.1

修改配置

vim conf/flink-conf.yaml

官方文档-配置参数

修改监听端口

# The port to which the REST client connects to. If rest.bind-port has
# not been specified, then the server will bind to this port as well.
rest.port: 8111

修改绑定ip,使webUi可以外部访问。

# To enable this, set the bind address to one that has access to outside-facing
# network interface, such as 0.0.0.0.
rest.bind-address: 0.0.0.0

修改checkPoint间隔时间

因为flinkCDC任务和一些flinkSql任务默认是根据checkpointing获取的,所以需要在配置文件中就先开启checkpointing

execution.checkpointing.interval: 3s

修改state配置

#修改任务槽数,建议和cpu核数一致
taskmanager.numberOfTaskSlots: 16


#默认的快捷指定方式是:'hashmap' 和 'rocksdb'.
state.backend.type: hashmap
#Flink 支持的文件系统中用于存储检查点的数据文件和元数据的默认目录。存储路径必须可从所有参与的进程/节点(即所有 TaskManager 和 JobManager)访问。
state.checkpoints.dir: file:///data/software/flink/flink-checkpoints
#保存点的默认目录。由将保存点写入文件系统的状态后端使用(HashMapStateBackend、EmbeddedRocksDBStateBackend)
state.savepoints.dir: file:///data/software/flink/flink-savepoints
#可识别的快捷方式名称是“jobmanager”和“filesystem”。
state.checkpoint-storage: filesystem
#flink默认只保留最近成功生成的1个Checkpoint,此配置可以指定要保留的已完成检查点的最大数量
state.checkpoints.num-retained: 10
#此配置定义了在job取消时应如何清理checkpoint
#execution.checkpointing.externalized-checkpoint-retention: RETAIN_ON_CANCELLATION

state配置

externalized-checkpoint-retention配置

内存配置

参考文档:内存调优指南 配置 Flink 进程的内存

配置 Flink 进程内存最简单的方法是指定以下两个配置项中的任意一个:

配置项

TaskManager 配置参数

JobManager 配置参数

Flink 总内存

taskmanager.memory.flink.size

jobmanager.memory.flink.size

进程总内存

taskmanager.memory.process.size

jobmanager.memory.process.size

不建议同时设置进程总内存Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。

独立部署模式下,我们通常更关注 Flink 应用本身使用的内存大小。 建议配置 Flink 总内存taskmanager.memory.flink.size 或者 jobmanager.memory.flink.size)或其组成部分。 此外,如果出现 Metaspace 不足的问题,可以调整 JVM Metaspace 的大小。

这种情况下通常无需配置进程总内存,因为不管是 Flink 还是部署环境都不会对 JVM 开销 进行限制,它只与机器的物理资源相关。

#根据服务器实际情况选择
#配置总内存
taskmanager.memory.flink.size: 8192m
jobmanager.memory.flink.size: 4096m
#配置元空间内存
#taskmanager.memory.jvm-metaspace.size: 1gb
taskmanager.memory.jvm-metaspace.size: 512mb
jobmanager.memory.jvm-metaspace.size: 512mb

修改pekko

#由于机器速度慢或网络拥塞肯呢个会造成超时,需要配置timeout
pekko.ask.timeout: 120s

修改bin/config.sh文件

Flink启动时会把启动的进程的ID(pid)默认存到系统的/tmp目录下的一个文件中。由于是临时目录,会被系统清理,所以会导致pid找不到,导致使用./bin/stop-cluster.sh无法关闭集群。因此我们需要修改config.sh文件中默认的DEFAULT_ENV_PID_DIR目录。

vim bin/config.sh

DEFAULT_ENV_PID_DIR="/home/software/flink/flink-1.18.0/tmp"

kill掉flink的进程之后重新使用./bin/start-cluster.sh启动集群,查看新建的tmp目录,如下:

启动集群

Flink 附带了一个 bash 脚本,可以用于启动本地集群。

./bin/start-cluster.sh

测试提交一个作业

Flink 的 Releases 附带了许多的示例作业。你可以任意选择一个,快速部署到已运行的集群上

停止集群

./bin/stop-cluster.sh