[YARN] 一个JDK的bug导致RM无法分配Container

最近集群因为NM的OOM,然后决定把ContainerMetric给关闭了,然后采取了批量重启NM的方式,采取的步骤是先批量下线,然后在上线,后来发现集群任务越来越慢,集群的利用率越来越低

以下是集群可用内存,一直在增加

其中出现的现象是:

  • 1.事件队列队列堆积非常严重,最高500W+
  • 2.Container每秒的分配速率从原来的k级别到变成几十每秒
  • 3.发现NM注册之后有些节点从NEW->RUNNING状态需要花费20+s的是状态转化时间,从而导致FINISHED_CONTAINERS_PULLED_BY_AM事件不在状态机的处理范围当中抛出了异常

然后发现社区也有类似的错误问题YARN-4741,但是看他们讨论的结果,并不能解释为什么调度事件下来的时候,NM还是分配不到container

节点每次进行addNode就不进行分配了

[2017-01-17T18:29:31.769+08:00] [INFO] resourcemanager.rmnode.RMNodeImpl.handle(RMNodeImpl.java 424) [AsyncDispatcher event handler] : xxx:50086 Node Transitioned from NEW to RUNNING
[2017-01-17T18:29:44.191+08:00] [INFO] scheduler.fair.FairScheduler.addNode(FairScheduler.java 899) [ResourceManager Event Processor] : Added node xxx:50086 cluster capacity: <memory:226007040, vCores:67872>
[2017-01-17T18:31:43.538+08:00] [INFO] hadoop.util.HostsFileReader.readFileToSetWithFileInputStream(HostsFileReader.java 88) [IPC Server handler 0 on 8033] : Adding xxx to the list of included hosts from xxx/hosts/mapred_hosts

而且NodeUpdate的平均时间很低

然后做出假设,假设NM没分配上

  • 1.NM因为状态错误,RM认为他满了,资源不满足,所以跳过分配
  • 2.NM因为被别的应用预留,状态错误,导致没分配
  • 3.队列状态错误,导致RM认为队列资源满了,所以没分配

经过反复排查之后,都不能解释上面的问题,后来通过不断上线和下线节点在测试集群重现了改问题

后来晨宇同学发现了连续调度的操作次数一直没有增加

然后追踪该节点的日志,FairSchedulerContinuousScheduling线程因为timsort的bug导致线程退出了

[2017-01-17T15:52:12.791+08:00] [ERROR] hadoop.yarn.YarnUncaughtExceptionHandler.uncaughtException(YarnUncaughtExceptionHandler.java 68) [FairSchedulerContinuousScheduling] : Thread Thread[FairSchedulerContinuousScheduling,5,main] threw an Exception.
java.lang.IllegalArgumentException: Comparison method violates its general contract!
        at java.util.TimSort.mergeLo(TimSort.java:747)
        at java.util.TimSort.mergeAt(TimSort.java:483)
        at java.util.TimSort.mergeCollapse(TimSort.java:410)
        at java.util.TimSort.sort(TimSort.java:214)
        at java.util.TimSort.sort(TimSort.java:173)
        at java.util.Arrays.sort(Arrays.java:659)
        at java.util.Collections.sort(Collections.java:217)
        at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler.continuousSchedulingAttempt(FairScheduler.java:1058)
        at org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler$ContinuousSchedulingThread.run(FairScheduler.java:292)

原因是我们开启了continuousSchedulingEnabled,当下线的节点重新注册时,completedContainers.isEmpty()肯定是true,新加入的节点无法加入调度,导致集群使用率越来越低

if (continuousSchedulingEnabled) {
      if (!completedContainers.isEmpty()) {
        attemptScheduling(node);
      }
    } else {
      attemptScheduling(node);
    }

因为TimSort的compare函数不够严谨,然后引发这个bug,所以为了解决这个问题,可以在启动时加上一下参数

-Djava.util.Arrays.useLegacyMergeSort=true

以下是瑜标同学找到的,提供的参考资料TimSort in Java 7

总 结

  1. 以后遇到集群慢的时候优先检查队列资源的使用率,如果对比两天的使用有明显差异应该出现了问题
  2. 关注每秒分配container的数据对比是否处于正常值
  3. 平均nodeupdate时间和连续调度的次数

[Spark] 简单了解Spark2.0的内存管理

spark在1.6的时候就已经默认把内存管理变为UnifiedMemoryManager,如果需要用回StaticMemoryManager,可以通过设置spark.memory.useLegacyMode为true,就可以用回原来的模式,下图为默认时,不同区域的内存使用的划分

  • 1.ReservedMemory:预留内存,默认为300M,如果系统内存SystemMemory小于1.5倍的ReservedMemory就会报错
  • 2.UsableMemory:可用内存,计算方式为SystemMemory-ReservedMemory
  • 3.MaxMemory:最大可以使用的内存,等于usableMemory x memoryFraction(spark.memory.fraction=0.6)
  • 4.HeapStorageMemory:存储内存等于maxMemory x storageFraction(spark.memory.storageFraction默认0.5)
  • 5 .HeapExecutionMemory:等于maxMemory – HeapStorageMemory

UnifiedMemoryManager

通过上面的简图内存划分为不同的区域,而MemoryManager主要用来管理execution和storage他们之间是如何共享内存,他有两个实现类UnifiedMemoryManager和StaticMemoryManager,接下来会重点查看UnifiedMemoryManager是如何进行内存的分配的。

MemoryManager作为抽象类,它在初始化的时候出事会初始化ON_HEAP,OFF_HEAP的两个StorageMemoryPool和ExecutionMemoryPool对象,用来管理不同模式下的内存区域。

StorageMemoryPool主要是用来记录storage的一个可调整内存池的大小,而ExecutionMemoryPool相对它复杂,因为它需要保证每个task能够合理的共享内存,如果这里有N个task,它会确保每个task在spill之前会有至少1/2N的内存,且最多不能超过1/N,因为N是动态变化的,当task的数量改变的时候,我们会需要重新计算1/2N和1/N。

从ExecutionMemoryPool获取分配需要调用acquireMemory方法,首先第一步会先判断这个task是否属于Active task,如果不属于,则把他放到memoryForTask这个map的数据结构当中,value则记录该task当前使用了多少内存。

val numActiveTasks = memoryForTask.keys.size
val curMem = memoryForTask(taskAttemptId)

并在每次循环获取足够acquireMemory的numBytes之前,都会尝试的去回收storage从execution借去的内存

当获取到的内存不能够满足required大小的需求时,它就会阻塞等待,直到内存满足

    // We want to let each task get at least 1 / (2 * numActiveTasks) before blocking;
      // if we can't give it this much now, wait for other tasks to free up memory
      // (this happens if older tasks allocated lots of memory before N grew)
      if (toGrant < numBytes && curMem + toGrant < minMemoryPerTask) {
        logInfo(s"TID $taskAttemptId waiting for at least 1/2N of $poolName pool to be free")
        lock.wait()
      } else {
        memoryForTask(taskAttemptId) += toGrant
        return toGrant
      }

TaskMemoryManager

UnifiedMemoryManager是对内存的统一管理,而TaskMemoryManager则是管理每个独立Task的内存分配,TaskMemoryManager通过MemoryManager的acquireExecutionMemory接口进行内存申请,如果不能满足,则从consumers中挑选可以spill,进行内存释放,什么是consumer,其实我们可以用过日志打印直观的看出改Task使用了哪些consumer.

log4j.logger.org.apache.spark.memory.TaskMemoryManager=DEBUG

然后就可以清楚的在日志里面看到了

17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 5.2 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 10.3 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 21.4 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9
17/01/13 16:07:24 DEBUG TaskMemoryManager: Task 32 acquired 47.7 MB for org.apache.spark.util.collection.ExternalAppendOnlyMap@3bb49e9

对于consumer来说,他有不同的实现类,ShuffleExternalSorter就是其中一个consumer的实现类,当我们在ShuffleExternalSorter插入一条record时他就会调用acquireNewPageIfNecessary,尝试的从TaskMemoryManager获取一个MemoryBlock(即一个page)同时,会将这个page记录到自己的pageTable中,并得到对应的pageCursor偏移量。

Spark用MemoryLocation记录和追踪在off-heap或on-heap内存地址,在off-heap模式,内存可以直接通过64-bit长度的地址进行寻址,在In-heap模式中,内存由该对象的引用和64-bit的offset去寻址,MemoryBlock在MemoryLocation的基础上增加了pageNumber和对应数据的length。

简单的函数调用关系为:consumer.allocatePage -> TaskMemoryManager.allocatePage -> MemoryManager的MemoryAllocator.allocate

在HeapMemoryAllocator中,我们可以看到,我们会通过内存对齐产生一个long类型的数组,并通过这个数组构成一个MemoryBlock

long[] array = new long[(int) ((size + 7) / 8)];
MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size);

在得到对应的MemoryBlock之后,通过pageCursor偏移量(通过unsafe的arrayBaseOffset获取对象头的长度,我的是16)将数据写入到内存当中

/**
 * Created by tangshangwen on 17-1-13.
 */
public class TestUnsafe {
    public static void main(String[] args) {
        sun.misc.Unsafe unsafe;
        try {
            Field unsafeField = Unsafe.class.getDeclaredField("theUnsafe");
            unsafeField.setAccessible(true);
            unsafe = (sun.misc.Unsafe) unsafeField.get(null);
        } catch (Throwable cause) {
            unsafe = null;
        }
        System.out.println(unsafe.arrayBaseOffset(long[].class));
    }
}

和copyMemory将数据复制到对应的内存位置当中,并每次对pageCursor进行和数据长度length相加,找到下次数据写入的位置

总结

通过阅读和理解代码,加深了spark对内存管理方面知识的理解

参考资料

Spark Tungsten in-heap / off-heap 内存管理机制

[Hadoop] NameNode sshFence的一个小bug

前几天集群在发生异常切换的时候,除了了以下警告日志

[2017-01-10T01:42:37.234+08:00] [WARN] hadoop.ha.SshFenceByTcpPort.pump(StreamPumper.java 88) [nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: StreamPumper for STDERR] : nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: nc: invalid option -- 'z'
[2017-01-10T01:42:37.235+08:00] [WARN] hadoop.ha.SshFenceByTcpPort.pump(StreamPumper.java 88) [nc -z xxx-xxx-17224.hadoop.xxx.com 8021 via ssh: StreamPumper for STDERR] : nc -z xxx-xxx-17224.hadoop.xxx.com8021 via ssh: Ncat: Try `--help' or man(1) ncat for more information, usage options and help. QUITTING.

我们知道tryGracefulFence不成功之后会去fence的那个进程NN,我们看下代码

private boolean doFence(Session session, InetSocketAddress serviceAddr)
      throws JSchException {
    int port = serviceAddr.getPort();
    try {
      //这段日志已经出现,所以忽略
      LOG.info("Looking for process running on port " + port);
      int rc = execCommand(session,
          "PATH=$PATH:/sbin:/usr/sbin fuser -v -k -n tcp " + port);
      //这段日志没出现,所以代表执行该命令返回值为rc == 1
      if (rc == 0) {
        LOG.info("Successfully killed process that was " +
            "listening on port " + port);
        // exit code 0 indicates the process was successfully killed.
        return true;
      } else if (rc == 1) {
        // exit code 1 indicates either that the process was not running
        // or that fuser didn't have root privileges in order to find it
        // (eg running as a different user)
        LOG.info(
            "Indeterminate response from trying to kill service. " +
            "Verifying whether it is running using nc...");
     //然后通过这个命令去检查端口是否还在的时候,报错了,返回2,并不是执行返回1,而是执行方法有误
        rc = execCommand(session, "nc -z " + serviceAddr.getHostName() +
            " " + serviceAddr.getPort());
        if (rc == 0) {
          // the service is still listening - we are unable to fence
          LOG.warn("Unable to fence - it is running but we cannot kill it");
          return false;
        } else {
          LOG.info("Verified that the service is down.");
          return true;          
        }
      } else {
        // other 
      }
      LOG.info("rc: " + rc);
      return rc == 0;
    } catch (InterruptedException e) {
      LOG.warn("Interrupted while trying to fence via ssh", e);
      return false;
    } catch (IOException e) {
      LOG.warn("Unknown failure while trying to fence via ssh", e);
      return false;
    }
  }

然后在CentOS7执行这个命令时

[XXX@XXX-XXX-17223 ~]$ nc -z
nc: invalid option -- 'z'
Ncat: Try `--help' or man(1) ncat for more information, usage options and help. QUITTING.
[XX@XXX-XXX-17223 ~]$ echo $?       
2

意味着这句话返回值不为0,不代表fence成功,因为操作系统的nc版本问题,也有可能不为0,而为2

rc = execCommand(session, "nc -z " + serviceAddr.getHostName() +
            " " + serviceAddr.getPort());

更严格的判断为不为1,也不能判断fence成功

➜  ~ nc -z 127.0.0.1 3200
➜  ~ echo $?             
0
➜  ~ nc -z 127.0.0.3 3200
➜  ~ echo $?             
1

改代码已经在社区被提出了,也有了解决方案,但是没被合并,详情请看HDFS-3618HDFS-11308

[Spark] 简单分析Spark的RPC通信框架

在Spark中,已经采用了Netty作为RPC的通信框架,其通信的RpcEndpointRef和RpcEndpoint都是通过RpcEnv进行创建,在创建RpcEnv时,最终会调用NettyRpcEnvFactory中的create方法,并通过传入的clientMode去决定是否启动TransportServer。

以下是NettyRpcEnv的简图

除了Dispatcher之外,还有Inbox,Outbox,RpcEndpoint,RpcEndpointRef等几个重要的类

RpcEndpointRef的作用

进行远程通信时,一般都需要一个client一个server,而RpcEndpointRef就相当于一个client的角色,并且通过RpcEnv的实现类(NettyRpcEnv)的asyncSetupEndpointRefByURI进行创建在NettyRpcEndpointRef中我们可以看到,其实他只是需要RpcEndpoint对应的ip,port和RpcEndpoint name,然后程序在调用ask或者send方法发送信息时,NettyRpcEnv会根据他所提供的地址信息封装成RequestMessage进行处理,这里这个this指的是一个NettyRpcEndpointRef

  override def send(message: Any): Unit = {
    require(message != null, "Message is null")
    nettyEnv.send(RequestMessage(nettyEnv.address, this, message))
  }

程序在发送RequestMessage之前,会先判断改发送的地址是否是本地的地址,如果不是,则将message封装为OutboxMessage,并放到Outbox当中,如果是本地,则通过Dispatcher把message放到对应EndpointData的inbox里面

  private[netty] def send(message: RequestMessage): Unit = {
    val remoteAddr = message.receiver.address
    if (remoteAddr == address) {
      // Message to a local RPC endpoint.
      try {
        dispatcher.postOneWayMessage(message)
      } catch {
        case e: RpcEnvStoppedException => logWarning(e.getMessage)
      }
    } else {
      // Message to a remote RPC endpoint.
      postToOutbox(message.receiver, OneWayOutboxMessage(serialize(message)))
    }
  }

Inbox,Outbox的作用

简单来说,Inbox主要的作用是存储发送给RpcEndpoint的消息,Outbox就是存放发送到remote host的message的地方

当程序在调用targetOutbox.send(message)时,该message会先放到OutBox内部的messages的list当中,然后通过传入的TransportClient发送到对应的RpcEndpoint

  def send(message: OutboxMessage): Unit = {
    val dropped = synchronized {
      if (stopped) {
        true
      } else {
        messages.add(message)
        false
      }
    }
    if (dropped) {
      message.onFailure(new SparkException("Message is dropped because Outbox is stopped"))
    } else {
      drainOutbox()
    }
  }

RpcEndpoint的作用

像刚才说的,客户端有了,服务端也就是RpcEndpoint,就像CoarseGrainedExecutorBackend一样,它也是一个RpcEndpoint,并实现了对应的接口(receive,onStart)等等。 在RpcEndpoint启动时,需要RpcEnv中setupEndpoint,也就是向Dispatcher注册RpcEndpoint,这样dispatcher才能把message分发对应的RpcEndpoint当中

    env.rpcEnv.setupEndpoint("Executor", 
new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))

下面的就是Dispatcher中的注册逻辑,里面维护着endpoints,endpointRefs和receivers等几个重要的数据结构

      def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
        val addr = RpcEndpointAddress(nettyEnv.address, name)
        val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
        synchronized {
          if (stopped) {
            throw new IllegalStateException("RpcEnv has been stopped")
          }
          if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
            throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
          }
          val data = endpoints.get(name)
          endpointRefs.put(data.endpoint, data.ref)
          receivers.offer(data)  // for the OnStart message
        }
    endpointRef
}

就想刚才所说的如果我传进去的clientMode为false,就会启动相应的TransportServer监听该主机对应的端口,NettyRpcHandler通过反序列化得到对应的RequestMessage,并通过message的message.receiver.name找到对应EndpointData,并把message放到对应的inbox中

  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
    }
    // We don't need to call `onStop` in the `synchronized` block
    error.foreach(callbackIfStopped)
  }

然后通过把对应的EndpointData放到receivers中,通过设置好的线程池的线程去消费receivers里面的EndpointData,从而调用Endpoint里面的的receive等实现方法进行不同的逻辑处理

/** Message loop used for dispatching messages. */
  private class MessageLoop extends Runnable {
    override def run(): Unit = {
      try {
        while (true) {
          try {
            val data = receivers.take()
            if (data == PoisonPill) {
              // Put PoisonPill back so that other MessageLoops can see it.
              receivers.offer(PoisonPill)
              return
            }
            data.inbox.process(Dispatcher.this)
          } catch {
            case NonFatal(e) => logError(e.getMessage, e)
          }
        }
      } catch {
        case ie: InterruptedException => // exit
      }
    }
  }

一个流程就大约这样了,通过分析能够加深对框架设计和spark的认识。

参考资料

Spark RPC通信层设计原理分析

[基础操作] 上线节点和下线节点步骤参考

一、上线节点

1、将新增节点追加到/etc/hosts中,同步到集群所有节点

2、将新节点生成新的机架信息更新到rack.data中,并同步到集群所有节点

3、将新的节点加入到$HADOOP_CONF_DIR/slaves,$HADOOP_CONF_DIR/hosts/datanode_hosts,$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts,$HADOOP_CONF_DIR/hosts/mapred_hosts中,同步到集群所有节点。

4、刷新NN,RM

刷新NN
hdfs dfsadmin -refreshNodes
刷新RM
yarn rmadmin -refreshNodes

5、启动DN,并继续保持balance,直到达到一定(建议40%)存储左右的容量

Tips1: 因为在机器刚加进集群时,如果该节点启动计算,将会耗费大量的网络带宽,影响在上面跑的Task,从而影响任务,而任务在写数据时,如果来自DN节点的请求,会先在本地写一份数据,再写远程节点,因此不会影响原有任务,当新节点数据达到一定量时,可以启动NM,换句话说,相当于提高了任务的本地化率,降低影响任务的风险.

Tips2: 在Balance同时,我们可以适当的增加某些目录的副本数一定时间后,恢复副本数量,这样可以加快Balance的效率,让节点能够较快的达到平衡状态。

hadoop-daemon.sh start datanode

6、将节点从$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts去掉,刷新yarn rmadmin -refreshNodes,在启动NM

yarn-daemon.sh start nodemanager

二、下线节点

1、将下线节点加入到$HADOOP_CONF_DIR/hosts/exclude_datanode_hosts,$HADOOP_CONF_DIR/hosts/exclude_mapred_hosts中,同步到所有节点,刷新

刷新NN

hdfs dfsadmin -refreshNodes

刷新RM

yarn rmadmin -refreshNodes

2、直到页面Decommissioned: NUM完成,或者 Number of Under-Replicated Blocks变得较小,几乎没变化时,即可完成下线节点

3、节点下线完成后,把节点从slaves,datanode_hosts,exclude_datanode_hosts,exclude_mapred_hosts,mapred_hosts,机架信息中删除对应的机器信息。

其实也就是选择适合的方式把集群的变动变到最小,这里只做参考

[YARN] 基于ZKRMStateStore的Yarn的HA机制分析

前面已经说过HDFS的HA的相关机制简单了解NameNode的ZKFC机制,所以我们接着上面的说,YARN的HA切换由EmbeddedElectorService类控制,和ZKFailoverController的ElectorCallbacks一样,实现了ActiveStandbyElectorCallback接口,他们的区别是fenceOldActive方法的实现

  private Stat fenceOldActive() throws InterruptedException, KeeperException {
    .......
    if (Arrays.equals(data, appData)) {
      LOG.info("But old node has our own data, so don't need to fence it.");
    } else {
      appClient.fenceOldActive(data);
    }
    return stat;
  }

ZKFailoverController为,先GracefulFence,不行则进行真正的fence

  private synchronized void fenceOldActive(byte[] data) {
    HAServiceTarget target = dataToTarget(data);

    try {
      doFence(target);
    } catch (Throwable t) {
      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active: " + StringUtils.stringifyException(t)));
      Throwables.propagate(t);
    }
  }

而Yarn的HA则为

 @Override
  public void fenceOldActive(byte[] oldActiveData) {
    if (LOG.isDebugEnabled()) {
      LOG.debug("Request to fence old active being ignored, " +
          "as embedded leader election doesn't support fencing");
    }
  }

NameNode通过rpc或ssh kill的防止脑裂,而ZKRMStateStore是怎么在防止脑裂的呢?

在ZKRMStateStore中,大部分的操作都会在实际操作之前创建RM_ZK_FENCING_LOCK的文件,操作完成之后则删除对应的文件,这些操作是事务性的,这样意味着同时只有一个client去写rmstore目录,当有两个rm同时写,创建RM_ZK_FENCING_LOCK时则会抛出异常,同时rm则会捕获异常,并将自己的状态转化为standby的状态。

private synchronized void doDeleteMultiWithRetries( final List<Op> opList) throws Exception { 
  final List<Op> execOpList = new ArrayList<Op>(opList.size() + 2); 
  execOpList.add(createFencingNodePathOp); execOpList.addAll(opList); 
  execOpList.add(deleteFencingNodePathOp); 
  new ZKAction<Void>() { 
    @Override 
    public Void run() throws KeeperException, InterruptedException { 
      setHasDeleteNodeOp(true); 
      zkClient.multi(execOpList);
       return null;
   } }.runWithRetries(); }

举一个例子,异常会被store.notifyStoreOperationFailed(e)处理

public void transition(RMStateStore store, RMStateStoreEvent event) {
    ......
      try {
        LOG.info("Storing RMDelegationToken and SequenceNumber");
        store.storeRMDelegationTokenState(
            dtEvent.getRmDTIdentifier(), dtEvent.getRenewDate());
      } catch (Exception e) {
        LOG.error("Error While Storing RMDelegationToken and SequenceNumber ",
            e);
        store.notifyStoreOperationFailed(e);
      }
    }

这里就进行context相关的关闭,转化为standby的状态

  /**
   * This method is called to notify the ResourceManager that the store
   * operation has failed.
   * @param failureCause the exception due to which the operation failed
   */
  protected void notifyStoreOperationFailed(Exception failureCause) {
    if (failureCause instanceof StoreFencedException) {
      updateFencedState();
      Thread standByTransitionThread =
          new Thread(new StandByTransitionThread());
      standByTransitionThread.setName("StandByTransitionThread Handler");
      standByTransitionThread.start();
    } else {
      rmDispatcher.getEventHandler().handle(
        new RMFatalEvent(RMFatalEventType.STATE_STORE_OP_FAILED, failureCause));
    }
  }

除了防止同时写的情况发生,ZKRMStateStore还在切换的时候对ZKRMStateStore的存储目录进行权限的设置,只允许自己读写,其他用户只有读的权限,我们可以通过zk命令去看到这样的权限设置

[zk: localhost:2181(CONNECTED) 27] ls /yarn-test/rmstore/ZKRMStateRoot    
[AMRMTokenSecretManagerRoot, RMAppRoot, EpochNode, RMVersionNode, RMDTSecretManagerRoot]

[zk: localhost:2181(CONNECTED) 28] getAcl /yarn-test/rmstore/ZKRMStateRoot
'world,'anyone
: rwa
'digest,'xxx-xxxx-xxx15.hadoop.xxx.com:0vfG9l2cyt85oF5/H01oip5KEGU=
: cd

参考资料

[RM HA3] Zookeeper在RM HA的应用

[Spark] Spark的History关联对应的executor的日志

场景在于,因为很多用户想在Spark的history上看到对应的executor日志,其实这个并不是Spark的什么新功能,他们不需要用yarn application -logs下载所有的日志,因为我们之前一直忽略这个,没配置,后来配置上就可以了。

除了要开启yarn的日志收集功能之外

<property>
     <name>yarn.log-aggregation-enable</name>
     <value>true</value>
</property>

还要在yarn-site.xml增加参数,并且重启NodeManager即可,当我们访问时,就会帮我们重定向到日志服务器上,找到对应的日志

<property>
      <name>yarn.log.server.url</name>
      <value>http://HistoryServerAddress:19888/jobhistory/logs</value>
</property>

然后点击对应的日志链接就可以链接过去了

记录一下以免忘记。

[YARN] FairScheduler的资源分配机制分析(一)

以下是队列资源分配的简单架构

调度器的container资源分配

通过简图可以看出,container是分配是随着每一次nodeupdate而进行资源分配的,在每一次尝试调度container之前,首先会检查改节点是否曾经有预留的app(预留指的是该node曾经因为资源不足,为了提高本地性原因,当节点有资源更新时,优先的把这个节点的资源分配给这个app的策略)

// 类名FairScheduler.java
//  Assign new containers...
// 1. Check for reserved applications
// 2. Schedule if there are no reservations

FSAppAttempt reservedAppSchedulable = node.getReservedAppSchedulable();
if (reservedAppSchedulable != null) {
  Priority reservedPriority = node.getReservedContainer().getReservedPriority();
  FSQueue queue = reservedAppSchedulable.getQueue();
// 之前有预留,不满足条件则释放
  if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
      || !fitsInMaxShare(queue,
      node.getReservedContainer().getReservedResource())) {
    // Don't hold the reservation if app can no longer use it
    LOG.info("Releasing reservation that cannot be satisfied for application "
        + reservedAppSchedulable.getApplicationAttemptId()
        + " on node " + node);
    reservedAppSchedulable.unreserve(reservedPriority, node);
    reservedAppSchedulable = null;
  } else {
    // Reservation exists; try to fulfill the reservation
    if (LOG.isDebugEnabled()) {
      LOG.debug("Trying to fulfill reservation for application "
          + reservedAppSchedulable.getApplicationAttemptId()
          + " on node: " + node);
    }
   // 满足条件,则分配
    node.getReservedAppSchedulable().assignReservedContainer(node);
  }
}

总的来说:

  • 1、如果该app在该节点没有container的需求,则释放预留container
  • 2、如果该app在该节点还有container的需求,但队列的MaxShare无法满足,则释放该节点预留的container
  • 3、如果该app这个优先级的资源的请求已经为0,释放预留的container

否则检查这个node的可用资源和需求的资源是否满足,如果不满足,则预留,满足则分配

如果该节点没有app预留,则把该node从RootQueue进行分配下去,其中还有分配参数assignMultiple和maxAssign限制该node分配的container数量。

因为我们通过图可以知道,其实资源分配可以看成三层结构,ParetnQueue到LeafQueue,再到FSAppAttempt

那我们提出一个问题,当资源分配,如何选取最适合的队列?如何选取最适合的App?

在每次进行资源分配,都会采取排序的操作

Collections.sort(childQueues, policy.getComparator());

app进行排序

Collections.sort(runnableApps, comparator);

找出最适合的子队列和最适合的app, 对于采取fair算法策略的队列来说,其比较实现在FairShareComparator的compare方法中,对于每个可排序的Schedulable,包括叶子队列,FSAppAttempt,第一步都会计算两个Schedulable的minShare,即通过配置的最小资源要求和每个Schedulable的demand进行比较的最小值

  Resource minShare1 = Resources.min(RESOURCE_CALCULATOR, null,
      s1.getMinShare(), s1.getDemand());
  Resource minShare2 = Resources.min(RESOURCE_CALCULATOR, null,
      s2.getMinShare(), s2.getDemand());

然后通过当前的资源使用和minshare进行比较等等,详细可以自行查看源码

  boolean s1Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s1.getResourceUsage(), minShare1);
  boolean s2Needy = Resources.lessThan(RESOURCE_CALCULATOR, null,
      s2.getResourceUsage(), minShare2);

以上可以总结为,如果Schedulable都低于他们的minShare,则通过他们低于minShare的比例的多少来比较,例如,如果一个job A拥有8个task(minShare 10即比例为80%),job B拥有50个task(minShare 100即比例为50%),则job B将会有更高的优先级在下次资源分配时获取资源。

如果Schedulable在他们的minShare之上,则通过比较他们的(runningTasks/weight),如果所有的权重都相等,则slot资源需求少的job优先获得资源,另外,如果一个job拥有更高的权重则拥有机会获得多slot。

解决完选取子队列和app之后,接下来FSAppAttempt自身处理资源的问题了,也就是主要考虑这个FSAppAttempt的计算时的本地性问题

为了提升本地性,对于每个优先级,都会尝试的优先分配本地节点,然后再是机架,off-switch的请求通常会被延迟调度。

分配的策略简要为

从appSchedulingInfo中获取该节点或机架,该优先级对应的资源请求

ResourceRequest rackLocalRequest = getResourceRequest(priority,
    node.getRackName());
ResourceRequest localRequest = getResourceRequest(priority,
    node.getNodeName());

然后满足条件的情况下尝试分配node local的节点,如果满足分配条件则分配成功,如果container的请求满足队列最大的MaxShare,则预留该节点给这个FSAppAttempt

if (rackLocalRequest != null && rackLocalRequest.getNumContainers() != 0
    && localRequest != null && localRequest.getNumContainers() != 0) {
  return assignContainer(node, localRequest,
      NodeType.NODE_LOCAL, reserved);
} 

因为我们经常想优先的调度node-local的container, 其次再为rack-local和off-switch的container去保证最大可能的本地性,为了达到这个目标,我们首先对给定的优先级分配node-local,如果我们在很长的一段时间内没有成功调度,则放松本地性的阀值。

如上所述,通过判断上一次的NodeType类型去判断使用nodeLocalityThreshold( yarn.scheduler.fair.locality.threshold.node默认-1.0f ) 或 rackLocalityThreshold( yarn.scheduler.fair.locality.threshold.rack默认-1.0f ),从而去决定是否改变本地性级别。

if (getSchedulingOpportunities(priority) > (numNodes * threshold)) {
  // 满足调度机会条件,则将NODE_LOCAL级别降为RACK_LOCAL级别
  if (allowed.equals(NodeType.NODE_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.RACK_LOCAL);
    resetSchedulingOpportunities(priority);
  }
  // 满足调度机会条件,则将RACK_LOCAL级别降为OFF_SWITCH级别
  else if (allowed.equals(NodeType.RACK_LOCAL)) {
    allowedLocalityLevel.put(priority, NodeType.OFF_SWITCH);
    resetSchedulingOpportunities(priority);
  }
}
// 否则本地性等级不变
return allowedLocalityLevel.get(priority);

Application所需要的demand如何计算?

在FSAppAttempt中,其维护着每一个Application在Fair Scheduler的相关调度信息,包括Application所需要的demand,它的fairShare和allowedLocalityLevel等其他相关信息

然而在上文也有提到,每个FSAppAttempt或queue都会用它的minshare和demand比较,而demand是怎么来的呢?

在FSAppAttempt中,每一次AppMaster申请资源都会更新appSchedulingInfo里面的requests对象

在FairScheduler中则有线程定时的去调度UpdateThread线程,去重新更新每个队列,FSAppAttempt所需demand和重新计算FairShare等

每一层的demand等于min(下一层demand,最大资源分配),FSAppAttempt的demand则为当前使用的资源大小+未分配的资源大小

  @Override
  public void updateDemand() {
    demand = Resources.createResource(0);
    // Demand is current consumption plus outstanding requests
    Resources.addTo(demand, getCurrentConsumption());

    // Add up outstanding resource requests
    synchronized (this) {
      for (Priority p : getPriorities()) {
        for (ResourceRequest r : getResourceRequests(p).values()) {
          Resource total = Resources.multiply(r.getCapability(), r.getNumContainers());
          Resources.addTo(demand, total);
        }
      }
    }
  }

[YARN] FairScheduler设置AM的vcore无法生效

在集群上了Linux Container之后,如果我们想增加AM在NM上获取时间片的能力,我们可以在配置文件中找到以下参数

<property> 
<name>yarn.app.mapreduce.am.resource.cpu-vcores</name> 
<value>15</value> 
</property>

该参数描述的是给AM多少个虚拟核,但是查看RM日志确发现,不管怎么设置AM的核数量都无法生效

[2016-10-27T16:36:37.280 08:00] [INFO] resourcemanager.scheduler.SchedulerNode.allocateContainer(SchedulerNode.java 153) [ResourceManager Event Processor] : Assigned container container_1477059529836_336635_01_000001 of capacity <memory:2048, vCores:1>

原因是对于FairScheduler默认的策略说,每次都会对AM的提交请求进行规整,而且只会考虑mem的情况

SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(), scheduler.getClusterResource(), scheduler.getMinimumResourceCapability(), scheduler.getMaximumResourceCapability(), scheduler.getMinimumResourceCapability());

在DefaultResourceCalculator类中,只对内存进行规整,createResource(memory, (memory > 0) ? 1 : 0),如果内存大于0,则默认变为1

  @Override
  public Resource normalize(Resource r, Resource minimumResource,
      Resource maximumResource, Resource stepFactor) {
    int normalizedMemory = Math.min(
        roundUp(
            Math.max(r.getMemory(), minimumResource.getMemory()),
            stepFactor.getMemory()),
            maximumResource.getMemory());
    return Resources.createResource(normalizedMemory);
  }

所以在默认的DefaultResourceCalculator当中,当你不管怎么设置AM的核数都无法生效

[基础操作] 新增加NS操作参考

0.前期环境准备

  • 硬件检查确认准备的机器的物理内存是和之前的NameNode一样
  • 打通hadp用户nn1到nn5,nn6的ssh
  • 打通nn5,nn6的hadp用户的双方的ssh,两台机器互通,不需要输入密码
  • 在nn5,nn6新建mapred,yarn用户,并将hadp,yarn,mapred用户加入hadoop组
  • 调整linux网络参数和其余NS的NameNode保持一致
  • 把新增节点的host加入到集群的/etc/hosts

1.找到奇数台机器,并安装配置ZooKeeper(如果共享Zookeeper,跳过此步骤)

zkHost1-zkHost5

sh zkServer.sh start

2.找到奇数台机器,安装配置hadoop,并启动JournalNode

jnHost1-jnHost5

hadoop-daemon.sh start journalnode

3.增加配置(假设添加NS3)

修改core-site.xml(大体需要注意)

    <property>
      <name>fs.defaultFS</name>
      <value>hdfs://ns3</value>
    </property>

如果和原来的NS不共用ZK,修改填写ZK地址

    <property>
       <name>ha.zookeeper.quorum</name>
       <value>zkHost1:2181,zkHost2:2181,zkHost3:2181,zkHost4:2181,zkHost5:2181</value>
    </property>

修改hdfs-site.xml,增加NS3

    <property>
      <name>dfs.nameservices</name>
      <value>ns1,ns2,ns3</value>
    </property>

增加配置

<property>
  <name>dfs.ha.namenodes.ns3</name>
  <value>nn5,nn6</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.ns3.nn5</name>
  <value>nn5Host:8020</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.ns3.nn6</name>
  <value>nn6Host:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.ns3.nn5</name>
  <value>nn5Host:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.ns3.nn6</name>
  <value>nn6Host:50070</value>
</property>
<property>
    <name>dfs.namenode.servicerpc-address.ns3.nn5</name>
    <value>nn5Host:8021</value>
</property>
<property>
    <name>dfs.namenode.servicerpc-address.ns3.nn6</name>
    <value>nn6Host:8021</value>
</property>
<property>
    <name>dfs.client.failover.proxy.provider.ns3</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

修改JournalNode的上传地址和目录

 <property> 
        <name>dfs.namenode.shared.edits.dir</name> 
        <value>qjournal://jnHost1:8485;jnHost2:8485;jnHost3:8485;jnHost4:8485;jnHost5:8485/ns3</value> 
</property>

4.执行操作

找到ns1的集群cid(例如:CID-338ca6d3-15bb-4941-bb1a-8faa3c3ba79d)

在nn5上指定clusterId执行

hdfs namenode -format -clusterId cid

hadoop-daemon.sh start namenode

在nn6上执行

hdfs namenode -bootstrapStandby

hadoop-daemon.sh start namenode

刷新datanode

cat $HADOOP_CONF_DIR/slaves | xargs -t -i hdfs dfsadmin -refreshNamenodes {}:50020

初始化zkfc目录

hdfs zkfc -formatZK

在两个节点上启动ZKFC

hadoop-daemon.sh start zkfc

滚动重启datanode