[Hadoop] Hadoop集群一般需要关注的几个重要指标

通用监控指标

对于每个RPC服务应该监控

RpcProcessingTimeAvgTime(PRC处理的平均时间)

通常hdfs在异常任务突发大量访问时,这个参数会突然变得很大,导致其他用户访问hdfs时,会感觉到卡顿,从而影响任务的执行时间

CallQueueLength(RPC Call队列的长度)

如果callqueue队列数值一直处于较高的水平,例如对于NN来说CallQueue的长度等于handler*100,也就是说NN可能收到了大量的请求或者server在处理rpc请求时耗时很长,导致call堆积等

进程JVM监控

MemHeapUsedM(堆内存使用监控)

通过监控改参数可以查看进程的gc时间和gc发生之后释放多少内存和进程的内存使用情况

ThreadsBlocked(线程阻塞数量)

分析当问题发生时进程的线程的阻塞状况

ThreadsWaiting(线程等待数量)

分析当问题发生时进程的线程的等待状况

NameNode监控指标

TotalFiles(总的文件数量)

监控和预警文件数的总量,可以通过其看出是否有任务突然大量写文件和删除大量文件

TotalBlocks(总的block数量)

表示集群的block数量,作用同上

PercentUsed(集群hdfs使用百分比)

监控集群的hdfs的使用情况,使用率不宜太高,因为需要预留磁盘空间给任务计算使用

BlockPoolUsedSpace(集群该namespace的hdfs使用容量大小)

可以监控不同namespace的hdfs的使用情况

Total(集群hdfs总容量大小)

显示集群整体容量情况

Used(集群hdfs已使用的容量大小)

集群hdfs使用情况,可以预警是否需要增加机器和删除无用数据

NumLiveDataNodes(存活的DN数量)

NumDeadDataNodes(丢失的DN数量)

丢失节点,如果过多可能会引起丢块

VolumeFailuresTotal(坏盘的数量)

应该设定阀值,达到一定数量时处理

MissingBlocks(丢失的block数量)

丢失重要的块会引起任务报错

DataNode监控指标

ReadBlockOpAvgTime(读取block的平均时间)

可选的监控选项,如果该机器在某个时段平均时间突然升高,可能网络有打满或磁盘读取速度存在问题

WriteBlockOpAvgTime(写数据块的平均时间)

可选的监控选项

ResouceManager监控指标

NumActiveNMs(NM存活节点数量监控)

NumLostNMs(NM丢失节点数量监控)

有时节点会因为磁盘空间不足等原因导致进程退出,虽然集群具有容错机制,但当丢失节点达到一定数量之后,集群计算资源相当于减少了,所以应当设置合理的阀值报警处理

NumUnhealthyNMs(NM不健康节点数量监控)

通常会因为磁盘问题导致节点不健康

集群应用数量监控

AppsSubmitted(app提交数量)

之前集群有出现过app的id号,生成很慢的情况,可以通过改数值和其他参数去判断提交减少的问题

AppsRunning(app的运行数量)

可以通过改值去对比历史同一时刻的app的运行数量是否差异很大,去判断集群到底是否可能出现问题

AppsPending(app等待数量)

如果该数值很高,或则在某个queue的该数值很高,有可能是因为app所在的队列资源满了,导致app无法获取资源,启动master,如果资源没满,可能的一个原因是app的所在队列无法在rm中有先获取资源,或资源被预留所导致等

AppsCompleted(app完成数量)

应用完成的数量监控

AppsKilled(app被kill的数量)

应用被kill的数量监控

AppsFailed(app失败数量)

如果AppsFailed数量升高,说明集群的存在导致app批量失败的操作

集群资源使用量情况监控

AllocatedMB(已分配的内存大小)

如果集群用户反应任务运行缓慢,应该及时检查队列资源的使用情况和hdfs的响应速度

AllocatedVCores(已分配的核数量)

有时任务分配不上去,有可能是核数已经用完

AllocatedContainers(已分配的Container数量)

已分配的Container数量

AvailableMB(可用的内存大小)

有遇到过在集群反复重启NM后,导致集群计算可用资源错误的bug

AvailableVCores(可能的核数量)

PendingMB(等待分配的内存大小)

PendingVCores(等待分配的核数量)

PendingContainers(等待分配的Container数量)

如果等待分配的Container比日常出现多出很多,应该检查集群是否有问题

ReservedMB(预留的内存大小)

之前遇到因为spark任务申请很大的资源,导致把整个集群的资源都预留的情况,这时应该适当的调整最大的分配Container的内存大小

ReservedVCores(预留的核数量)

同上

ReservedContainers(预留的Container数量)

Container因为资源不足,优先预留节点

集群分配数据监控

AssignContainerCallNumOps(分配Container的次数)

我们可以通过该监控可以知道RM每秒能够分配多少的Container,在高峰期是否可能存在瓶颈,经过社区的patch优化之后,RM的分配Container最大值可以达到4k+

AssignContainerCallAvgTime(分配Container的平均时间)

如果时间突然变大,说明可能遇到分配瓶颈等其他问题

ContinuousScheduleCallNumOps(连续调度次数)

如果该数值没有增加,说明连续调度线程出现问题

ContinuousScheduleCallAvgTime(连续调度平均时间)

连续调度的平均时间

NodeUpdateCallNumOps(NM心跳汇报次数)

NodeUpdateCallAvgTime(心跳汇报处理时间)

rm资源分配是通过每一次NM的心跳进行分配和领取Container的,如果该时间变长,则分配速度可能会存在下降

Linux机器监控

网络带宽情况

通过监控DN的网络情况可以查找,该节点是否在当时是热节点,一般情况下如果在该机器的网络情况已经满了,会影响任务的正常运行速度

机器负载情况

网络丢包情况

机器内存使用情况

总 结

集群监控指标属于逐渐完善的一个过程,以上分享的是一般的重要常用指标,详细的问题分析可能需要通过工具和不同指标配合才能找到问题,希望这篇文章能够给大家带来用处

[TensorFlow] Ubuntu12.04安装TensorFlow记录

因为我的12.04出现包依赖的问题,所以记录下,官网写的很清楚了,先直接照官网安装即可

因为出现包依赖的问题,所以需要先安装aptitude, 参考Python-pip 安装失败问题解决

sudo apt-get install aptitude

然后执行命令

sudo  aptitude install python-pip
sudo aptitude install python-dev

之后又遇到了pip error: No such file or directory: ‘/tmp/pip-…-build/setup.py’这个错误,用了社区的解决方案,升级pip,详细看Issues-56

pip install --upgrade pip

接下来就是社区的安装方案了

export TF_BINARY_URL=https://storage.googleapis.com/tensorflow/linux/cpu/tensorflow-0.12.1-cp27-none-linux_x86_64.whl
sudo pip install --upgrade $TF_BINARY_URL

验证是否安装成功

参考文档

Tensorflow123

[YARN] 一个JDK的bug导致RM无法分配Container

最近集群因为NM的OOM,然后决定把ContainerMetric给关闭了,然后采取了批量重启NM的方式,采取的步骤是先批量下线,然后在上线,后来发现集群任务越来越慢,集群的利用率越来越低

以下是集群可用内存,一直在增加

其中出现的现象是:

  • 1.事件队列队列堆积非常严重,最高500W+
  • 2.Container每秒的分配速率从原来的k级别到变成几十每秒
  • 3.发现NM注册之后有些节点从NEW->RUNNING状态需要花费20+s的是状态转化时间,从而导致FINISHED_CONTAINERS_PULLED_BY_AM事件不在状态机的处理范围当中抛出了异常

然后发现社区也有类似的错误问题YARN-4741,但是看他们讨论的结果,并不能解释为什么调度事件下来的时候,NM还是分配不到container

节点每次进行addNode就不进行分配了

[2017-01-17T18:29:31.769+08:00] [INFO] resourcemanager.rmnode.RMNodeImpl.handle(RMNodeImpl.java 424) [AsyncDispatcher event handler] : xxx:50086 Node Transitioned from NEW to RUNNING
[2017-01-17T18:29:44.191+08:00] [INFO] scheduler.fair.FairScheduler.addNode(FairScheduler.java 899) [ResourceManager Event Processor] : Added node xxx:50086 cluster capacity: <memory:226007040, vCores:67872>
[2017-01-17T18:31:43.538+08:00] [INFO] hadoop.util.HostsFileReader.readFileToSetWithFileInputStream(HostsFileReader.java 88) [IPC Server handler 0 on 8033] : Adding xxx to the list of included hosts from xxx/hosts/mapred_hosts

而且NodeUpdate的平均时间很低

然后做出假设,假设NM没分配上

  • 1.NM因为状态错误,RM认为他满了,资源不满足,所以跳过分配
  • 2.NM因为被别的应用预留,状态错误,导致没分配
  • 3.队列状态错误,导致RM认为队列资源满了,所以没分配

经过反复排查之后,都不能解释上面的问题,后来通过不断上线和下线节点在测试集群重现了改问题

后来发现了连续调度的操作次数一直没有增加

然后追踪该节点的日志,FairSchedulerContinuousScheduling线程因为timsort的bug导致线程退出了

[2017-01-17T15:52:12.791+08:00] [ERROR] hadoop.yarn.YarnUncaughtExceptionHandler.uncaughtException(YarnUncaughtExceptionHandler.java 68) [FairSchedulerContinuousScheduling] : Thread Thread[FairSchedulerContinuousScheduling,5,main] threw an Exception.
java.lang.IllegalArgumentException: Comparison method violates its general contract!
        at java.util.TimSort.mergeLo(TimSort.java:747)
        at java.util.TimSort.mergeAt(TimSort.java:483)
        at java.util.TimSort.mergeCollapse(TimSort.java:410)
        at java.util.TimSort.sort(TimSort.java:214)
        at java.util.TimSort.sort(TimSort.java:173)
        at java.util.Arrays.sort(Arrays.java:659)
        at java.util.Collections.sort(Collections.java:217)
        at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.continuousSchedulingAttempt(FairScheduler.java:1058)
        at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler$ContinuousSchedulingThread.run(FairScheduler.java:292)

原因是我们开启了continuousSchedulingEnabled,当下线的节点重新注册时,completedContainers.isEmpty()肯定是true,新加入的节点无法加入调度,导致集群使用率越来越低

if (continuousSchedulingEnabled) {
      if (!completedContainers.isEmpty()) {
        attemptScheduling(node);
      }
    } else {
      attemptScheduling(node);
    }

因为TimSort的compare函数不够严谨,然后引发这个bug,所以为了解决这个问题,可以在启动时加上一下参数

-Djava.util.Arrays.useLegacyMergeSort=true

以下是提供的参考资料TimSort in Java 7

总 结

  1. 以后遇到集群慢的时候优先检查队列资源的使用率,如果对比两天的使用有明显差异应该出现了问题
  2. 关注每秒分配container的数据对比是否处于正常值
  3. 平均nodeupdate时间和连续调度的次数

[Spark] 简单了解Spark2.0的内存管理

spark在1.6的时候就已经默认把内存管理变为UnifiedMemoryManager,如果需要用回StaticMemoryManager,可以通过设置spark.memory.useLegacyMode为true,就可以用回原来的模式,下图为默认时,不同区域的内存使用的划分

  • 1.ReservedMemory:预留内存,默认为300M,如果系统内存SystemMemory小于1.5倍的ReservedMemory就会报错
  • 2.UsableMemory:可用内存,计算方式为SystemMemory-ReservedMemory
  • 3.MaxMemory:最大可以使用的内存,等于usableMemory x memoryFraction(spark.memory.fraction=0.6)
  • 4.HeapStorageMemory:存储内存等于maxMemory x storageFraction(spark.memory.storageFraction默认0.5)
  • 5 .HeapExecutionMemory:等于maxMemory – HeapStorageMemory

UnifiedMemoryManager

通过上面的简图内存划分为不同的区域,而MemoryManager主要用来管理execution和storage他们之间是如何共享内存,他有两个实现类UnifiedMemoryManager和StaticMemoryManager,接下来会重点查看UnifiedMemoryManager是如何进行内存的分配的。

MemoryManager作为抽象类,它在初始化的时候出事会初始化ON_HEAP,OFF_HEAP的两个StorageMemoryPool和ExecutionMemoryPool对象,用来管理不同模式下的内存区域。

StorageMemoryPool主要是用来记录storage的一个可调整内存池的大小,而ExecutionMemoryPool相对它复杂,因为它需要保证每个task能够合理的共享内存,如果这里有N个task,它会确保每个task在spill之前会有至少1/2N的内存,且最多不能超过1/N,因为N是动态变化的,当task的数量改变的时候,我们会需要重新计算1/2N和1/N。

从ExecutionMemoryPool获取分配需要调用acquireMemory方法,首先第一步会先判断这个task是否属于Active task,如果不属于,则把他放到memoryForTask这个map的数据结构当中,value则记录该task当前使用了多少内存。

val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

并在每次循环获取足够acquireMemory的numBytes之前,都会尝试的去回收storage从execution借去的内存

当获取到的内存不能够满足required大小的需求时,它就会阻塞等待,直到内存满足

    // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }

TaskMemoryManager

UnifiedMemoryManager是对内存的统一管理,而TaskMemoryManager则是管理每个独立Task的内存分配,TaskMemoryManager通过MemoryManager的acquireExecutionMemory接口进行内存申请,如果不能满足,则从consumers中挑选可以spill,进行内存释放,什么是consumer,其实我们可以用过日志打印直观的看出改Task使用了哪些consumer.

log4j.logger.org.apache.spark.memory.TaskMemoryManager=DEBUG

然后就可以清楚的在日志里面看到了

17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 5.2 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 10.3 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 21.4 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 47.7 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9

对于consumer来说,他有不同的实现类,ShuffleExternalSorter就是其中一个consumer的实现类,当我们在ShuffleExternalSorter插入一条record时他就会调用acquireNewPageIfNecessary,尝试的从TaskMemoryManager获取一个MemoryBlock(即一个page)同时,会将这个page记录到自己的pageTable中,并得到对应的pageCursor偏移量。

Spark用MemoryLocation记录和追踪在off-heap或on-heap内存地址,在off-heap模式,内存可以直接通过64-bit长度的地址进行寻址,在In-heap模式中,内存由该对象的引用和64-bit的offset去寻址,MemoryBlock在MemoryLocation的基础上增加了pageNumber和对应数据的length。

简单的函数调用关系为:consumer.allocatePage -> TaskMemoryManager.allocatePage -> MemoryManager的MemoryAllocator.allocate

在HeapMemoryAllocator中,我们可以看到,我们会通过内存对齐产生一个long类型的数组,并通过这个数组构成一个MemoryBlock

long[] array = new long[(int) ((size + 7) / 8)];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);

在得到对应的MemoryBlock之后,通过pageCursor偏移量(通过unsafe的arrayBaseOffset获取对象头的长度,我的是16)将数据写入到内存当中

/**
 * Created by tangshangwen on 17-1-13.
 */
public class TestUnsafe {
    public static void main(String[] args) {
        sun.misc.Unsafe unsafe;
        try {
            Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            unsafe = (sun.misc.Unsafe) unsafeField.get(null);
        } catch (Throwable cause) {
            unsafe = null;
        }
        System.out.println(unsafe.arrayBaseOffset(long[].class));
    }
}

和copyMemory将数据复制到对应的内存位置当中,并每次对pageCursor进行和数据长度length相加,找到下次数据写入的位置

总结

通过阅读和理解代码,加深了spark对内存管理方面知识的理解

参考资料

Spark Tungsten in-heap / off-heap 内存管理机制

[Hadoop] NameNode sshFence的一个小bug

前几天集群在发生异常切换的时候,除了了以下警告日志

[2017-01-10T01:42:37.234+08:00] [WARN] hadoop.ha.SshFenceByTcpPort.pump(StreamPumper.java 88) [nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: StreamPumper for STDERR] : nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: nc: invalid option -- 'z'
[2017-01-10T01:42:37.235+08:00] [WARN] hadoop.ha.SshFenceByTcpPort.pump(StreamPumper.java 88) [nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: StreamPumper for STDERR] : nc -z xxx-xxx-17224.hadoop.xxx.com8021 via ssh: Ncat: Try `--help' or man(1) ncat for more information, usage options and help. QUITTING.

我们知道tryGracefulFence不成功之后会去fence的那个进程NN,我们看下代码

private boolean doFence(Session session, InetSocketAddress serviceAddr)
      throws JSchException {
    int port = serviceAddr.getPort();
    try {
      //这段日志已经出现,所以忽略
      LOG.info("Looking for process running on port " + port);
      int rc = execCommand(session,
          "PATH=$PATH:/sbin:/usr/sbin fuser -v -k -n tcp " + port);
      //这段日志没出现,所以代表执行该命令返回值为rc == 1
      if (rc == 0) {
        LOG.info("Successfully killed process that was " +
            "listening on port " + port);
        // exit code 0 indicates the process was successfully killed.
        return true;
      } else if (rc == 1) {
        // exit code 1 indicates either that the process was not running
        // or that fuser didn't have root privileges in order to find it
        // (eg running as a different user)
        LOG.info(
            "Indeterminate response from trying to kill service. " +
            "Verifying whether it is running using nc...");
     //然后通过这个命令去检查端口是否还在的时候,报错了,返回2,并不是执行返回1,而是执行方法有误
        rc = execCommand(session, "nc -z " + serviceAddr.getHostName() +
            " " + serviceAddr.getPort());
        if (rc == 0) {
          // the service is still listening - we are unable to fence
          LOG.warn("Unable to fence - it is running but we cannot kill it");
          return false;
        } else {
          LOG.info("Verified that the service is down.");
          return true;          
        }
      } else {
        // other 
      }
      LOG.info("rc: " + rc);
      return rc == 0;
    } catch (InterruptedException e) {
      LOG.warn("Interrupted while trying to fence via ssh", e);
      return false;
    } catch (IOException e) {
      LOG.warn("Unknown failure while trying to fence via ssh", e);
      return false;
    }
  }

然后在CentOS7执行这个命令时

[XXX@XXX-XXX-17223 ~]$ nc -z
nc: invalid option -- 'z'
Ncat: Try `--help' or man(1) ncat for more information, usage options and help. QUITTING.
[XX@XXX-XXX-17223 ~]$ echo $?       
2

意味着这句话返回值不为0,不代表fence成功,因为操作系统的nc版本问题,也有可能不为0,而为2

rc = execCommand(session, "nc -z " + serviceAddr.getHostName() +
            " " + serviceAddr.getPort());

更严格的判断为不为1,也不能判断fence成功

➜  ~ nc -z 127.0.0.1 3200
➜  ~ echo $?             
0
➜  ~ nc -z 127.0.0.3 3200
➜  ~ echo $?             
1

改代码已经在社区被提出了,也有了解决方案,但是没被合并,详情请看HDFS-3618HDFS-11308

[Spark] 简单分析Spark的RPC通信框架

在Spark中,已经采用了Netty作为RPC的通信框架,其通信的RpcEndpointRef和RpcEndpoint都是通过RpcEnv进行创建,在创建RpcEnv时,最终会调用NettyRpcEnvFactory中的create方法,并通过传入的clientMode去决定是否启动TransportServer。

以下是NettyRpcEnv的简图

除了Dispatcher之外,还有Inbox,Outbox,RpcEndpoint,RpcEndpointRef等几个重要的类

RpcEndpointRef的作用

进行远程通信时,一般都需要一个client一个server,而RpcEndpointRef就相当于一个client的角色,并且通过RpcEnv的实现类(NettyRpcEnv)的asyncSetupEndpointRefByURI进行创建在NettyRpcEndpointRef中我们可以看到,其实他只是需要RpcEndpoint对应的ip,port和RpcEndpoint name,然后程序在调用ask或者send方法发送信息时,NettyRpcEnv会根据他所提供的地址信息封装成RequestMessage进行处理,这里这个this指的是一个NettyRpcEndpointRef

  override def send(message: Any): Unit = {
    require(message != null, "Message is null")
    nettyEnv.send(RequestMessage(nettyEnv.address, this, message))
  }

程序在发送RequestMessage之前,会先判断改发送的地址是否是本地的地址,如果不是,则将message封装为OutboxMessage,并放到Outbox当中,如果是本地,则通过Dispatcher把message放到对应EndpointData的inbox里面

  private[netty] def send(message: RequestMessage): Unit = {
    val remoteAddr = message.receiver.address
    if (remoteAddr == address) {
      // Message to a local RPC endpoint.
      try {
        dispatcher.postOneWayMessage(message)
      } catch {
        case e: RpcEnvStoppedException => logWarning(e.getMessage)
      }
    } else {
      // Message to a remote RPC endpoint.
      postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
    }
  }

Inbox,Outbox的作用

简单来说,Inbox主要的作用是存储发送给RpcEndpoint的消息,Outbox就是存放发送到remote host的message的地方

当程序在调用targetOutbox.send(message)时,该message会先放到OutBox内部的messages的list当中,然后通过传入的TransportClient发送到对应的RpcEndpoint

  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }

RpcEndpoint的作用

像刚才说的,客户端有了,服务端也就是RpcEndpoint,就像CoarseGrainedExecutorBackend一样,它也是一个RpcEndpoint,并实现了对应的接口(receive,onStart)等等。 在RpcEndpoint启动时,需要RpcEnv中setupEndpoint,也就是向Dispatcher注册RpcEndpoint,这样dispatcher才能把message分发对应的RpcEndpoint当中

    env.rpcEnv.setupEndpoint("Executor", 
new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

下面的就是Dispatcher中的注册逻辑,里面维护着endpoints,endpointRefs和receivers等几个重要的数据结构

      def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        val addr = RpcEndpointAddress(nettyEnv.address, name)
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
        synchronized {
          if (stopped) {
            throw new IllegalStateException("RpcEnv has been stopped")
          }
          if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
          val data = endpoints.get(name)
          endpointRefs.put(data.endpoint, data.ref)
          receivers.offer(data)  // for the OnStart message
        }
    endpointRef
}

就想刚才所说的如果我传进去的clientMode为false,就会启动相应的TransportServer监听该主机对应的端口,NettyRpcHandler通过反序列化得到对应的RequestMessage,并通过message的message.receiver.name找到对应EndpointData,并把message放到对应的inbox中

  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

然后通过把对应的EndpointData放到receivers中,通过设置好的线程池的线程去消费receivers里面的EndpointData,从而调用Endpoint里面的的receive等实现方法进行不同的逻辑处理

/** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            val data = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

一个流程就大约这样了,通过分析能够加深对框架设计和spark的认识。

参考资料

Spark RPC通信层设计原理分析

[基础操作] 上线节点和下线节点步骤参考

一、上线节点

1、将新增节点追加到/etc/hosts中,同步到集群所有节点

2、将新节点生成新的机架信息更新到rack.data中,并同步到集群所有节点

3、将新的节点加入到$HADOOP_CONF_DIR/slaves,$HADOOP_CONF_DIR/hosts/datanode_hosts,$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts,$HADOOP_CONF_DIR/hosts/mapred_hosts中,同步到集群所有节点。

4、刷新NN,RM

刷新NN
hdfs dfsadmin -refreshNodes
刷新RM
yarn rmadmin -refreshNodes

5、启动DN,并继续保持balance,直到达到一定(建议40%)存储左右的容量

Tips1: 因为在机器刚加进集群时,如果该节点启动计算,将会耗费大量的网络带宽,影响在上面跑的Task,从而影响任务,而任务在写数据时,如果来自DN节点的请求,会先在本地写一份数据,再写远程节点,因此不会影响原有任务,当新节点数据达到一定量时,可以启动NM,换句话说,相当于提高了任务的本地化率,降低影响任务的风险.

Tips2: 在Balance同时,我们可以适当的增加某些目录的副本数一定时间后,恢复副本数量,这样可以加快Balance的效率,让节点能够较快的达到平衡状态。

hadoop-daemon.sh start datanode

6、将节点从$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts去掉,刷新yarn rmadmin -refreshNodes,在启动NM

yarn-daemon.sh start nodemanager

二、下线节点

1、将下线节点加入到$HADOOP_CONF_DIR/hosts/exclude_datanode_hosts,$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts中,同步到所有节点,刷新

刷新NN

hdfs dfsadmin -refreshNodes

刷新RM

yarn rmadmin -refreshNodes

2、直到页面Decommissioned: NUM完成,或者 Number of Under-Replicated Blocks变得较小,几乎没变化时,即可完成下线节点

3、节点下线完成后,把节点从slaves,datanode_hosts,exclude_datanode_hosts,exclude_mapred_hosts,mapred_hosts,机架信息中删除对应的机器信息。

其实也就是选择适合的方式把集群的变动变到最小,这里只做参考

[YARN] 基于ZKRMStateStore的Yarn的HA机制分析

前面已经说过HDFS的HA的相关机制简单了解NameNode的ZKFC机制,所以我们接着上面的说,YARN的HA切换由EmbeddedElectorService类控制,和ZKFailoverController的ElectorCallbacks一样,实现了ActiveStandbyElectorCallback接口,他们的区别是fenceOldActive方法的实现

  private Stat fenceOldActive() throws InterruptedException, KeeperException {
    .......
    if (Arrays.equals(data, appData)) {
      LOG.info("But old node has our own data, so don't need to fence it.");
    } else {
      appClient.fenceOldActive(data);
    }
    return stat;
  }

ZKFailoverController为,先GracefulFence,不行则进行真正的fence

  private synchronized void fenceOldActive(byte[] data) {
    HAServiceTarget target = dataToTarget(data);

    try {
      doFence(target);
    } catch (Throwable t) {
      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
      Throwables.propagate(t);
    }
  }

而Yarn的HA则为

 @Override
  public void fenceOldActive(byte[] oldActiveData) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Request to fence old active being ignored, " +
          "as embedded leader election doesn't support fencing");
    }
  }

NameNode通过rpc或ssh kill的防止脑裂,而ZKRMStateStore是怎么在防止脑裂的呢?

在ZKRMStateStore中,大部分的操作都会在实际操作之前创建RM_ZK_FENCING_LOCK的文件,操作完成之后则删除对应的文件,这些操作是事务性的,这样意味着同时只有一个client去写rmstore目录,当有两个rm同时写,创建RM_ZK_FENCING_LOCK时则会抛出异常,同时rm则会捕获异常,并将自己的状态转化为standby的状态。

private synchronized void doDeleteMultiWithRetries( final List<Op> opList) throws Exception { 
  final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2); 
  execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); 
  execOpList.add(deleteFencingNodePathOp); 
  new ZKAction<Void>() { 
    @Override 
    public Void run() throws KeeperException, InterruptedException { 
      setHasDeleteNodeOp(true); 
      zkClient.multi(execOpList);
       return null;
   } }.runWithRetries(); }

举一个例子,异常会被store.notifyStoreOperationFailed(e)处理

public void transition(RMStateStore store, RMStateStoreEvent event) {
    ......
      try {
        LOG.info("Storing RMDelegationToken and SequenceNumber");
        store.storeRMDelegationTokenState(
            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
      } catch (Exception e) {
        LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
            e);
        store.notifyStoreOperationFailed(e);
      }
    }

这里就进行context相关的关闭,转化为standby的状态

  /**
   * This method is called to notify the ResourceManager that the store
   * operation has failed.
   * @param failureCause the exception due to which the operation failed
   */
  protected void notifyStoreOperationFailed(Exception failureCause) {
    if (failureCause instanceof StoreFencedException) {
      updateFencedState();
      Thread standByTransitionThread =
          new Thread(new StandByTransitionThread());
      standByTransitionThread.setName("StandByTransitionThread Handler");
      standByTransitionThread.start();
    } else {
      rmDispatcher.getEventHandler().handle(
        new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
    }
  }

除了防止同时写的情况发生,ZKRMStateStore还在切换的时候对ZKRMStateStore的存储目录进行权限的设置,只允许自己读写,其他用户只有读的权限,我们可以通过zk命令去看到这样的权限设置

[zk: localhost:2181(CONNECTED) 27] ls /yarn-test/rmstore/ZKRMStateRoot    
[AMRMTokenSecretManagerRoot, RMAppRoot, EpochNode, RMVersionNode, RMDTSecretManagerRoot]

[zk: localhost:2181(CONNECTED) 28] getAcl /yarn-test/rmstore/ZKRMStateRoot
'world,'anyone
: rwa
'digest,'xxx-xxxx-xxx15.hadoop.xxx.com:0vfG9l2cyt85oF5/H01oip5KEGU=
: cd

参考资料

[RM HA3] Zookeeper在RM HA的应用

[Spark] Spark的History关联对应的executor的日志

场景在于,因为很多用户想在Spark的history上看到对应的executor日志,其实这个并不是Spark的什么新功能,他们不需要用yarn application -logs下载所有的日志,因为我们之前一直忽略这个,没配置,后来配置上就可以了。

除了要开启yarn的日志收集功能之外

<property>
     <name>yarn.log-aggregation-enable</name>
     <value>true</value>
</property>

还要在yarn-site.xml增加参数,并且重启NodeManager即可,当我们访问时,就会帮我们重定向到日志服务器上,找到对应的日志

<property>
      <name>yarn.log.server.url</name>
      <value>http://HistoryServerAddress:19888/jobhistory/logs</value>
</property>

然后点击对应的日志链接就可以链接过去了

记录一下以免忘记。

[YARN] FairScheduler的资源分配机制分析(一)

以下是队列资源分配的简单架构

调度器的container资源分配

通过简图可以看出,container是分配是随着每一次nodeupdate而进行资源分配的,在每一次尝试调度container之前,首先会检查改节点是否曾经有预留的app(预留指的是该node曾经因为资源不足,为了提高本地性原因,当节点有资源更新时,优先的把这个节点的资源分配给这个app的策略)

// 类名FairScheduler.java
//  Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations

FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
  Priority reservedPriority = node.getReservedContainer().getReservedPriority();
  FSQueue queue = reservedAppSchedulable.getQueue();
// 之前有预留,不满足条件则释放
  if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
      || !fitsInMaxShare(queue,
      node.getReservedContainer().getReservedResource())) {
    // Don't hold the reservation if app can no longer use it
    LOG.info("Releasing reservation that cannot be satisfied for application "
        + reservedAppSchedulable.getApplicationAttemptId()
        + " on node " + node);
    reservedAppSchedulable.unreserve(reservedPriority, node);
    reservedAppSchedulable = null;
  } else {
    // Reservation exists; try to fulfill the reservation
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to fulfill reservation for application "
          + reservedAppSchedulable.getApplicationAttemptId()
          + " on node: " + node);
    }
   // 满足条件,则分配
    node.getReservedAppSchedulable().assignReservedContainer(node);
  }
}

总的来说:

  • 1、如果该app在该节点没有container的需求,则释放预留container
  • 2、如果该app在该节点还有container的需求,但队列的MaxShare无法满足,则释放该节点预留的container
  • 3、如果该app这个优先级的资源的请求已经为0,释放预留的container

否则检查这个node的可用资源和需求的资源是否满足,如果不满足,则预留,满足则分配

如果该节点没有app预留,则把该node从RootQueue进行分配下去,其中还有分配参数assignMultiple和maxAssign限制该node分配的container数量。

因为我们通过图可以知道,其实资源分配可以看成三层结构,ParetnQueue到LeafQueue,再到FSAppAttempt

那我们提出一个问题,当资源分配,如何选取最适合的队列?如何选取最适合的App?

在每次进行资源分配,都会采取排序的操作

Collections.sort(childQueues, policy.getComparator());

app进行排序

Collections.sort(runnableApps, comparator);

找出最适合的子队列和最适合的app, 对于采取fair算法策略的队列来说,其比较实现在FairShareComparator的compare方法中,对于每个可排序的Schedulable,包括叶子队列,FSAppAttempt,第一步都会计算两个Schedulable的minShare,即通过配置的最小资源要求和每个Schedulable的demand进行比较的最小值

  Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
      s1.getMinShare(), s1.getDemand());
  Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
      s2.getMinShare(), s2.getDemand());

然后通过当前的资源使用和minshare进行比较等等,详细可以自行查看源码

  boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s1.getResourceUsage(), minShare1);
  boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s2.getResourceUsage(), minShare2);

以上可以总结为,如果Schedulable都低于他们的minShare,则通过他们低于minShare的比例的多少来比较,例如,如果一个job A拥有8个task(minShare 10即比例为80%),job B拥有50个task(minShare 100即比例为50%),则job B将会有更高的优先级在下次资源分配时获取资源。

如果Schedulable在他们的minShare之上,则通过比较他们的(runningTasks/weight),如果所有的权重都相等,则slot资源需求少的job优先获得资源,另外,如果一个job拥有更高的权重则拥有机会获得多slot。

解决完选取子队列和app之后,接下来FSAppAttempt自身处理资源的问题了,也就是主要考虑这个FSAppAttempt的计算时的本地性问题

为了提升本地性,对于每个优先级,都会尝试的优先分配本地节点,然后再是机架,off-switch的请求通常会被延迟调度。

分配的策略简要为

从appSchedulingInfo中获取该节点或机架,该优先级对应的资源请求

ResourceRequest rackLocalRequest = getResourceRequest(priority,
    node.getRackName());
ResourceRequest localRequest = getResourceRequest(priority,
    node.getNodeName());

然后满足条件的情况下尝试分配node local的节点,如果满足分配条件则分配成功,如果container的请求满足队列最大的MaxShare,则预留该节点给这个FSAppAttempt

if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
    && localRequest != null && localRequest.getNumContainers() != 0) {
  return assignContainer(node, localRequest,
      NodeType.NODE_LOCAL, reserved);
} 

因为我们经常想优先的调度node-local的container, 其次再为rack-local和off-switch的container去保证最大可能的本地性,为了达到这个目标,我们首先对给定的优先级分配node-local,如果我们在很长的一段时间内没有成功调度,则放松本地性的阀值。

如上所述,通过判断上一次的NodeType类型去判断使用nodeLocalityThreshold( yarn.scheduler.fair.locality.threshold.node默认-1.0f ) 或 rackLocalityThreshold( yarn.scheduler.fair.locality.threshold.rack默认-1.0f ),从而去决定是否改变本地性级别。

if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
  // 满足调度机会条件,则将NODE_LOCAL级别降为RACK_LOCAL级别
  if (allowed.equals(NodeType.NODE_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
    resetSchedulingOpportunities(priority);
  }
  // 满足调度机会条件,则将RACK_LOCAL级别降为OFF_SWITCH级别
  else if (allowed.equals(NodeType.RACK_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
    resetSchedulingOpportunities(priority);
  }
}
// 否则本地性等级不变
return allowedLocalityLevel.get(priority);

Application所需要的demand如何计算?

在FSAppAttempt中,其维护着每一个Application在Fair Scheduler的相关调度信息,包括Application所需要的demand,它的fairShare和allowedLocalityLevel等其他相关信息

然而在上文也有提到,每个FSAppAttempt或queue都会用它的minshare和demand比较,而demand是怎么来的呢?

在FSAppAttempt中,每一次AppMaster申请资源都会更新appSchedulingInfo里面的requests对象

在FairScheduler中则有线程定时的去调度UpdateThread线程,去重新更新每个队列,FSAppAttempt所需demand和重新计算FairShare等

每一层的demand等于min(下一层demand,最大资源分配),FSAppAttempt的demand则为当前使用的资源大小+未分配的资源大小

  @Override
  public void updateDemand() {
    demand = Resources.createResource(0);
    // Demand is current consumption plus outstanding requests
    Resources.addTo(demand, getCurrentConsumption());

    // Add up outstanding resource requests
    synchronized (this) {
      for (Priority p : getPriorities()) {
        for (ResourceRequest r : getResourceRequests(p).values()) {
          Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
          Resources.addTo(demand, total);
        }
      }
    }
  }