当前位置: 首页 > news >正文

Spark内核解析-数据存储5(六)

1、Spark的数据存储

Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk,本文尝试分析Spark中存储子系统的构成,并以数据写入和数据读取为例,讲述清楚存储子系统中各部件的交互关系。

1.1存储子系统概览

Storage模块主要分为两层:
1)通信层:storage模块采用的是master-slave结构来实现通信层,master和slave之间传输控制信息、状态信息,这些都是通过通信层来实现的。
2)存储层:storage模块需要把数据存储到disk或是memory上面,有可能还需replicate到远端,这都是由存储层来实现和提供相应接口。
而其他模块若要和storage模块进行交互,storage模块提供了统一的操作类BlockManager,外部类与storage模块打交道都需要通过调用BlockManager相应接口来实现。

在这里插入图片描述
上图是Spark存储子系统中几个主要模块的关系示意图,现简要说明如下
1)CacheManager RDD在进行计算的时候,通过CacheManager来获取数据,并通过CacheManager来存储计算结果
2)BlockManager CacheManager在进行数据读取和存取的时候主要是依赖BlockManager接口来操作,BlockManager决定数据是从内存(MemoryStore)还是从磁盘(DiskStore)中获取
3)MemoryStore 负责将数据保存在内存或从内存读取
4)DiskStore 负责将数据写入磁盘或从磁盘读入
5)BlockManagerWorker 数据写入本地的MemoryStore或DiskStore是一个同步操作,为了容错还需要将数据复制到别的计算结点,以防止数据丢失的时候还能够恢复,数据复制的操作是异步完成,由BlockManagerWorker来处理这一部分事情
6)ConnectionManager 负责与其它计算结点建立连接,并负责数据的发送和接收
7)BlockManagerMaster 注意该模块只运行在Driver Application所在的Executor,功能是负责记录下所有BlockIds存储在哪个SlaveWorker上,比如RDD Task运行在机器A,所需要的BlockId为3,但在机器A上没有BlockId为3的数值,这个时候Slave worker需要通过BlockManager向BlockManagerMaster询问数据存储的位置,然后再通过ConnectionManager去获取。

1.2启动过程分析

上述的各个模块由SparkEnv来创建,创建过程在SparkEnv.create中完成

val blockManagerMaster = new BlockManagerMaster(registerOrLookup("BlockManagerMaster",new BlockManagerMasterActor(isLocal, conf)), conf)
val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster, serializer, conf)val connectionManager = blockManager.connectionManager
val broadcastManager = new BroadcastManager(isDriver, conf)
val cacheManager = new CacheManager(blockManager)

这段代码容易让人疑惑,看起来像是在所有的cluster node上都创建了BlockManagerMasterActor,其实不然,仔细看registerOrLookup函数的实现。如果当前节点是driver则创建这个actor,否则建立到driver的连接。

def registerOrLookup(name: String, newActor: => Actor): ActorRef = {if (isDriver) {logInfo("Registering " + name)actorSystem.actorOf(Props(newActor), name = name)} else {val driverHost: String = conf.get("spark.driver.host", "localhost")val driverPort: Int = conf.getInt("spark.driver.port", 7077)Utils.checkHost(driverHost, "Expected hostname")val url = s"akka.tcp://spark@$driverHost:$driverPort/user/$name"val timeout = AkkaUtils.lookupTimeout(conf)logInfo(s"Connecting to $name: $url")Await.result(actorSystem.actorSelection(url).resolveOne(timeout), timeout)}
}

初始化过程中一个主要的动作就是BlockManager需要向BlockManagerMaster发起注册

1.3通信层

在这里插入图片描述
BlockManager包装了BlockManagerMaster,发送信息包装成BlockManagerInfo。Spark在Driver和Worker端都创建各自的BlockManager,并通过BlockManagerMaster进行通信,通过BlockManager对Storage模块进行操作。
BlockManager对象在SparkEnv.create函数中进行创建:

def registerOrLookupEndpoint(name: String, endpointCreator: => RpcEndpoint):
RpcEndpointRef = {if (isDriver) {logInfo("Registering " + name)rpcEnv.setupEndpoint(name, endpointCreator)} else {RpcUtils.makeDriverRef(name, conf, rpcEnv)}
}
…………
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(BlockManagerMaster.DRIVER_ENDPOINT_NAME,new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus)),conf, isDriver)// NB: blockManager is not valid until initialize() is called later.
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,serializer, conf, mapOutputTracker, shuffleManager, blockTransferService,     securityManager,numUsableCores)

并且在创建之前对当前节点是否是Driver进行了判断。如果是,则创建这个Endpoint;否则,创建Driver的连接。
在创建BlockManager之后,BlockManager会调用initialize方法初始化自己。并且初始化的时候,会调用BlockManagerMaster向Driver注册自己,同时,在注册时也启动了Slave Endpoint。另外,向本地shuffle服务器注册Executor配置,如果存在的话。

def initialize(appId: String): Unit = {
…………master.registerBlockManager(blockManagerId, maxMemory, slaveEndpoint)// Register Executors' configuration with the local shuffle service, if one should exist.if (externalShuffleServiceEnabled && !blockManagerId.isDriver) {registerWithExternalShuffleServer()}
}

而BlockManagerMaster将注册请求包装成RegisterBlockManager注册到Driver。Driver的BlockManagerMasterEndpoint会调用register方法,通过对消息BlockManagerInfo检查,向Driver注册。

private def register(id: BlockManagerId, maxMemSize: Long, slaveEndpoint: RpcEndpointRef) {val time = System.currentTimeMillis()if (!blockManagerInfo.contains(id)) {blockManagerIdByExecutor.get(id.executorId) match {case Some(oldId) =>// A block manager of the same executor already exists, so remove it (assumed dead)logError("Got two different block manager registrations on same executor - "+ s" will replace old one $oldId with new one $id")removeExecutor(id.executorId)case None =>}logInfo("Registering block manager %s with %s RAM, %s".format(id.hostPort, Utils.bytesToString(maxMemSize), id))blockManagerIdByExecutor(id.executorId) = idblockManagerInfo(id) = new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveEndpoint)}listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}

不难发现BlockManagerInfo对象被保存到Map映射中。
在通信层中BlockManagerMaster控制着消息的流向,这里采用了模式匹配,所有的消息模式都在BlockManagerMessage中。

1.4存储层

在这里插入图片描述
Spark Storage的最小存储单位是block,所有的操作都是以block为单位进行的。
在BlockManager被创建的时候MemoryStore和DiskStore对象就被创建出来了

val diskBlockManager = new DiskBlockManager(this, conf)
private[spark] val memoryStore = new MemoryStore(this, maxMemory)
private[spark] val diskStore = new DiskStore(this, diskBlockManager)

1.4.1Disk Store

由于当前的Spark版本对Disk Store进行了更细粒度的分工,把对文件的操作提取出来放到了DiskBlockManager中,DiskStore仅仅负责数据的存储和读取。
Disk Store会配置多个文件目录,Spark会在不同的文件目录下创建文件夹,其中文件夹的命名方式是:spark-UUID(随机UUID码)。Disk Store在存储的时候创建文件夹。并且根据“高内聚,低耦合”原则,这种服务型的工具代码就放到了Utils中(调用路径:DiskStore.putBytes—>DiskBlockManager.createLocalDirs—>Utils.createDirectory):

def createDirectory(root: String, namePrefix: String = "spark"): File = {var attempts = 0val maxAttempts = MAX_DIR_CREATION_ATTEMPTSvar dir: File = nullwhile (dir == null) {attempts += 1if (attempts > maxAttempts) {throw new IOException("Failed to create a temp directory (under " + root + ") after " +maxAttempts + " attempts!")}try {dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)if (dir.exists() || !dir.mkdirs()) {dir = null}} catch { case e: SecurityException => dir = null; }}dir.getCanonicalFile
}

在DiskBlockManager里,每个block都被存储为一个file,通过计算blockId的hash值,将block映射到文件中。

def getFile(filename: String): File = {// Figure out which local directory it hashes to, and which subdirectory in thatval hash = Utils.nonNegativeHash(filename)val dirId = hash % localDirs.lengthval subDirId = (hash / localDirs.length) % subDirsPerLocalDir// Create the subdirectory if it doesn't already existval subDir = subDirs(dirId).synchronized {val old = subDirs(dirId)(subDirId)if (old != null) {old} else {val newDir = new File(localDirs(dirId), "%02x".format(subDirId))if (!newDir.exists() && !newDir.mkdir()) {throw new IOException(s"Failed to create local dir in $newDir.")}subDirs(dirId)(subDirId) = newDirnewDir}}new File(subDir, filename)
}def getFile(blockId: BlockId): File = getFile(blockId.name)

通过hash值的取模运算,求出dirId和subDirId。然后,在从subDirs中找到subDir,如果subDir不存在,则创建一个新subDir。最后,以subDir为路径,blockId的name属性为文件名,新建该文件。
文件创建完之后,那么Spark就会在DiskStore中向文件写与之映射的block:

override def putBytes(blockId: BlockId, _bytes: ByteBuffer, level: StorageLevel): PutResult = {val bytes = _bytes.duplicate()logDebug(s"Attempting to put block $blockId")val startTime = System.currentTimeMillisval file = diskManager.getFile(blockId)val channel = new FileOutputStream(file).getChannelUtils.tryWithSafeFinally {while (bytes.remaining > 0) {channel.write(bytes)}} {channel.close()}val finishTime = System.currentTimeMillislogDebug("Block %s stored as %s file on disk in %d ms".format(file.getName, Utils.bytesToString(bytes.limit), finishTime - startTime))PutResult(bytes.limit(), Right(bytes.duplicate()))
}

读取过程就简单了,DiskStore根据blockId读取与之映射的file内容,当然,这中间需要从DiskBlockManager中得到文件信息。

private def getBytes(file: File, offset: Long, length: Long): Option[ByteBuffer] = {val channel = new RandomAccessFile(file, "r").getChannelUtils.tryWithSafeFinally {// For small files, directly read rather than memory mapif (length < minMemoryMapBytes) {val buf = ByteBuffer.allocate(length.toInt)channel.position(offset)while (buf.remaining() != 0) {if (channel.read(buf) == -1) {throw new IOException("Reached EOF before filling buffer\n" +s"offset=$offset\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")}}buf.flip()Some(buf)} else {Some(channel.map(MapMode.READ_ONLY, offset, length))}} {channel.close()}
}override def getBytes(blockId: BlockId): Option[ByteBuffer] = {val file = diskManager.getFile(blockId.name)getBytes(file, 0, file.length)
}

1.4.2Memory Store

相对Disk Store,Memory Store就显得容易很多。Memory Store用一个LinkedHashMap来管理,其中Key是blockId,Value是MemoryEntry样例类,MemoryEntry存储着数据信息。

private case class MemoryEntry(value: Any, size: Long, deserialized: Boolean)
private val entries = new LinkedHashMap[BlockId, MemoryEntry](32, 0.75f, true)

在MemoryStore中存储block的前提是当前内存有足够的空间存放。通过对tryToPut函数的调用对内存空间进行判断。

def putBytes(blockId: BlockId, size: Long, _bytes: () => ByteBuffer): PutResult = {// Work on a duplicate - since the original input might be used elsewhere.lazy val bytes = _bytes().duplicate().rewind().asInstanceOf[ByteBuffer]val putAttempt = tryToPut(blockId, () => bytes, size, deserialized = false)val data =if (putAttempt.success) {assert(bytes.limit == size)Right(bytes.duplicate())} else {null}PutResult(size, data, putAttempt.droppedBlocks)
}

在tryToPut函数中,通过调用enoughFreeSpace函数判断内存空间。如果内存空间足够,那么就把block放到LinkedHashMap中;如果内存不足,那么就告诉BlockManager内存不足,如果允许Disk Store,那么就把该block放到disk上。

private def tryToPut(blockId: BlockId, value: () => Any, size: Long, deserialized: Boolean): ResultWithDroppedBlocks = {var putSuccess = falseval droppedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]accountingLock.synchronized {val freeSpaceResult = ensureFreeSpace(blockId, size)val enoughFreeSpace = freeSpaceResult.successdroppedBlocks ++= freeSpaceResult.droppedBlocksif (enoughFreeSpace) {val entry = new MemoryEntry(value(), size, deserialized)entries.synchronized {entries.put(blockId, entry)currentMemory += size}val valuesOrBytes = if (deserialized) "values" else "bytes"logInfo("Block %s stored as %s in memory (estimated size %s, free %s)".format(blockId, valuesOrBytes, Utils.bytesToString(size), Utils.bytesToString(freeMemory)))putSuccess = true} else {lazy val data = if (deserialized) {Left(value().asInstanceOf[Array[Any]])} else {Right(value().asInstanceOf[ByteBuffer].duplicate())}val droppedBlockStatus = blockManager.dropFromMemory(blockId, () => data)droppedBlockStatus.foreach { status => droppedBlocks += ((blockId, status)) }}releasePendingUnrollMemoryForThisTask()}ResultWithDroppedBlocks(putSuccess, droppedBlocks)
}

Memory Store读取block也很简单,只需要从LinkedHashMap中取出blockId的Value即可。

override def getValues(blockId: BlockId): Option[Iterator[Any]] = {val entry = entries.synchronized {entries.get(blockId)}if (entry == null) {None} else if (entry.deserialized) {Some(entry.value.asInstanceOf[Array[Any]].iterator)} else {val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate() // Doesn't actually copy dataSome(blockManager.dataDeserialize(blockId, buffer))}
}

1.5数据写入过程分析

在这里插入图片描述
数据写入的简要流程
1)RDD.iterator是与storage子系统交互的入口
2)CacheManager.getOrCompute调用BlockManager的put接口来写入数据
3)数据优先写入到MemoryStore即内存,如果MemoryStore中的数据已满则将最近使用次数不频繁的数据写入到磁盘
4)通知BlockManagerMaster有新的数据写入,在BlockManagerMaster中保存元数据
5)将写入的数据与其它slave worker进行同步,一般来说在本机写入的数据,都会另先一台机器来进行数据的备份,即replicanumber=1
其实,我们在put和get block的时候并没有那么复杂,前面的细节BlockManager都包装好了,我们只需要调用BlockManager中的put和get函数即可。

def putBytes(blockId: BlockId,bytes: ByteBuffer,level: StorageLevel,tellMaster: Boolean = true,effectiveStorageLevel: Option[StorageLevel] = None): Seq[(BlockId, BlockStatus)] = {require(bytes != null, "Bytes is null")doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)}private def doPut(blockId: BlockId,data: BlockValues,level: StorageLevel,tellMaster: Boolean = true,effectiveStorageLevel: Option[StorageLevel] = None)
: Seq[(BlockId, BlockStatus)] = {require(blockId != null, "BlockId is null")require(level != null && level.isValid, "StorageLevel is null or invalid")effectiveStorageLevel.foreach { level =>require(level != null && level.isValid, "Effective StorageLevel is null or invalid")}val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]val putBlockInfo = {val tinfo = new BlockInfo(level, tellMaster)val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)if (oldBlockOpt.isDefined) {if (oldBlockOpt.get.waitForReady()) {logWarning(s"Block $blockId already exists on this machine; not re-adding it")return updatedBlocks}oldBlockOpt.get} else {tinfo}
}val startTimeMs = System.currentTimeMillisvar valuesAfterPut: Iterator[Any] = nullvar bytesAfterPut: ByteBuffer = nullvar size = 0Lval putLevel = effectiveStorageLevel.getOrElse(level)val replicationFuture = data match {case b: ByteBufferValues if putLevel.replication > 1 =>// Duplicate doesn't copy the bytes, but just creates a wrapperval bufferView = b.buffer.duplicate()Future {replicate(blockId, bufferView, putLevel)}(futureExecutionContext)case _ => null}putBlockInfo.synchronized {logTrace("Put for block %s took %s to get into synchronized block".format(blockId, Utils.getUsedTimeMs(startTimeMs)))var marked = falsetry {val (returnValues, blockStore: BlockStore) = {if (putLevel.useMemory) {(true, memoryStore)} else if (putLevel.useOffHeap) {(false, externalBlockStore)} else if (putLevel.useDisk) {(putLevel.replication > 1, diskStore)} else {assert(putLevel == StorageLevel.NONE)throw new BlockException(blockId, s"Attempted to put block $blockId without specifying storage level!")}}val result = data match {case IteratorValues(iterator) =>blockStore.putIterator(blockId, iterator, putLevel, returnValues)case ArrayValues(array) =>blockStore.putArray(blockId, array, putLevel, returnValues)case ByteBufferValues(bytes) =>bytes.rewind()blockStore.putBytes(blockId, bytes, putLevel)}size = result.sizeresult.data match {case Left (newIterator) if putLevel.useMemory => valuesAfterPut = newIteratorcase Right (newBytes) => bytesAfterPut = newBytescase _ =>}if (putLevel.useMemory) {result.droppedBlocks.foreach { updatedBlocks += _ }}val putBlockStatus = getCurrentBlockStatus(blockId, putBlockInfo)if (putBlockStatus.storageLevel != StorageLevel.NONE) {marked = trueputBlockInfo.markReady(size)if (tellMaster) {reportBlockStatus(blockId, putBlockInfo, putBlockStatus)}updatedBlocks += ((blockId, putBlockStatus))}} finally {if (!marked) {blockInfo.remove(blockId)putBlockInfo.markFailure()logWarning(s"Putting block $blockId failed")}}}logDebug("Put block %s locally took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))if (putLevel.replication > 1) {data match {case ByteBufferValues(bytes) =>if (replicationFuture != null) {Await.ready(replicationFuture, Duration.Inf)}case _ =>val remoteStartTime = System.currentTimeMillisif (bytesAfterPut == null) {if (valuesAfterPut == null) {throw new SparkException("Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")}bytesAfterPut = dataSerialize(blockId, valuesAfterPut)}replicate(blockId, bytesAfterPut, putLevel)logDebug("Put block %s remotely took %s".format(blockId, Utils.getUsedTimeMs(remoteStartTime)))}}BlockManager.dispose(bytesAfterPut)if (putLevel.replication > 1) {logDebug("Putting block %s with replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))} else {logDebug("Putting block %s without replication took %s".format(blockId, Utils.getUsedTimeMs(startTimeMs)))}updatedBlocks}

对于doPut函数,主要做了以下几个操作
创建BlockInfo对象存储block信息;
将BlockInfo加锁,然后根据Storage Level判断存储到Memory还是Disk。同时,对于已经准备好读的BlockInfo要进行解锁。
根据block的副本数量决定是否向远程发送副本。

1.5.1序列化与否

写入的具体内容可以是序列化之后的bytes也可以是没有序列化的value. 此处有一个对scala的语法中Either, Left, Right关键字的理解。

1.6数据读取过程分析

def get(blockId: BlockId): Option[Iterator[Any]] = {val local = getLocal(blockId)if (local.isDefined) {logInfo("Found block %s locally".format(blockId))return local}val remote = getRemote(blockId)if (remote.isDefined) {logInfo("Found block %s remotely".format(blockId))return remote}None
}

1.6.1本地读取

首先在查询本机的MemoryStore和DiskStore中是否有所需要的block数据存在,如果没有则发起远程数据获取。

1.6.2远程读取

远程获取调用路径, getRemote->doGetRemote, 在doGetRemote中最主要的就是调用BlockManagerWorker.syncGetBlock来从远程获得数据

def syncGetBlock(msg: GetBlock, toConnManagerId: ConnectionManagerId): ByteBuffer = {val blockManager = blockManagerWorker.blockManagerval connectionManager = blockManager.connectionManagerval blockMessage = BlockMessage.fromGetBlock(msg)val blockMessageArray = new BlockMessageArray(blockMessage)val responseMessage = connectionManager.sendMessageReliablySync(toConnManagerId, blockMessageArray.toBufferMessage)responseMessage match {case Some(message) => {val bufferMessage = message.asInstanceOf[BufferMessage]logDebug("Response message received " + bufferMessage)BlockMessageArray.fromBufferMessage(bufferMessage).foreach(blockMessage => {logDebug("Found " + blockMessage)return blockMessage.getData})}case None => logDebug("No response message received")}null
}

上述这段代码中最有意思的莫过于sendMessageReliablySync,远程数据读取毫无疑问是一个异步i/o操作,这里的代码怎么写起来就像是在进行同步的操作一样呢。也就是说如何知道对方发送回来响应的呢?
别急,继续去看看sendMessageReliablySync的定义

def sendMessageReliably(connectionManagerId: ConnectionManagerId, message: Message): Future[Option[Message]] = {val promise = Promise[Option[Message]]val status = new MessageStatus(message, connectionManagerId, s => promise.success(s.ackMessage))messageStatuses.synchronized {messageStatuses += ((message.id, status))}sendMessage(connectionManagerId, message)promise.future
}

要是我说秘密在这里,你肯定会说我在扯淡,但确实在此处。注意到关键字Promise和Future没。
如果这个future执行完毕,返回s.ackMessage。我们再看看这个ackMessage是在什么地方被写入的呢。看一看ConnectionManager.handleMessage中的代码片段

case bufferMessage: BufferMessage =>{if (authEnabled) {val res = handleAuthentication(connection, bufferMessage)if (res == true) {// message was security negotiation so skip the restlogDebug("After handleAuth result was true, returning")return}}if (bufferMessage.hasAckId) {val sentMessageStatus = messageStatuses. synchronized {messageStatuses.get(bufferMessage.ackId) match {case Some(status) =>{messageStatuses -= bufferMessage.ackIdstatus}case None =>{throw new Exception("Could not find reference for received ack message " +message.id)null}}}sentMessageStatus. synchronized {sentMessageStatus.ackMessage = Some(message)sentMessageStatus.attempted = truesentMessageStatus.acked = truesentMessageStaus.markDone()}}
}

注意,此处的所调用的sentMessageStatus.markDone就会调用在sendMessageReliablySync中定义的promise.Success. 不妨看看MessageStatus的定义。

class MessageStatus(
val message: Message,
val connectionManagerId: ConnectionManagerId,
completionHandler: MessageStatus => Unit) {var ackMessage: Option[Message] = Nonevar attempted = falsevar acked = falsedef markDone() { completionHandler(this) }
}

1.7Partition如何转化为Block

在storage模块里面所有的操作都是和block相关的,但是在RDD里面所有的运算都是基于partition的,那么partition是如何与block对应上的呢?
RDD计算的核心函数是iterator()函数:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)} else {computeOrReadCheckpoint(split, context)}
}

如果当前RDD的storage level不是NONE的话,表示该RDD在BlockManager中有存储,那么调用CacheManager中的getOrCompute()函数计算RDD,在这个函数中partition和block发生了关系:
首先根据RDD id和partition index构造出block id (rdd_xx_xx),接着从BlockManager中取出相应的block。
如果该block存在,表示此RDD在之前已经被计算过和存储在BlockManager中,因此取出即可,无需再重新计算。
如果该block不存在则需要调用RDD的computeOrReadCheckpoint()函数计算出新的block,并将其存储到BlockManager中。
需要注意的是block的计算和存储是阻塞的,若另一线程也需要用到此block则需等到该线程block的loading结束。

def getOrCompute[T](rdd:RDD[T],split:Partition,context:TaskContext,storageLevel:StorageLevel):Iterator[T]=
{val key = "rdd_%d_%d".format(rdd.id, split.index)logDebug("Looking for partition " + key)blockManager.get(key) match {case Some(values) =>// Partition is already materialized, so just return its valuesreturn values.asInstanceOf[Iterator[T]]case None =>// Mark the split as loading (unless someone else marks it first)loading. synchronized {if (loading.contains(key)) {logInfo("Another thread is loading %s, waiting for it to finish...".format(key))while (loading.contains(key)) {try {loading.wait()} catch {case _:Throwable =>}}logInfo("Finished waiting for %s".format(key))// See whether someone else has successfully loaded it. The main way this would fail// is for the RDD-level cache eviction policy if someone else has loaded the same RDD// partition but we didn't want to make space for it. However, that case is unlikely// because it's unlikely that two threads would work on the same RDD partition. One// downside of the current code is that threads wait serially if this does happen.blockManager.get(key) match {case Some(values) =>return values.asInstanceOf[Iterator[T]]case None =>logInfo("Whoever was loading %s failed; we'll try it ourselves".format(key))loading.add(key)}} else {loading.add(key)}}try {// If we got here, we have to load the splitlogInfo("Partition %s not found, computing it".format(key))val computedValues = rdd.computeOrReadCheckpoint(split, context)// Persist the result, so long as the task is not running locallyif (context.runningLocally) {return computedValues}val elements = new ArrayBuffer[Any]elements++ = computedValuesblockManager.put(key, elements, storageLevel, true)return elements.iterator.asInstanceOf[Iterator[T]]} finally {loading. synchronized {loading.remove(key)loading.notifyAll()}}
}

这样RDD的transformation、action就和block数据建立了联系,虽然抽象上我们的操作是在partition层面上进行的,但是partition最终还是被映射成为block,因此实际上我们的所有操作都是对block的处理和存取。

1.8partition和block的对应关系

在RDD中,核心的函数是iterator:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {if (storageLevel != StorageLevel.NONE) {SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)} else {computeOrReadCheckpoint(split, context)}
}

如果当前RDD的storage level不是NONE的话,表示该RDD在BlockManager中有存储,那么调用CacheManager中的getOrCompute函数计算RDD,在这个函数中partition和block就对应起来了:
getOrCompute函数会先构造RDDBlockId,其中RDDBlockId就把block和partition联系起来了,RDDBlockId产生的name就是BlockId的name属性,形式是:rdd_rdd.id_partition.index。

def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {val key = RDDBlockId(rdd.id, partition.index)logDebug(s"Looking for partition $key")blockManager.get(key) match {case Some(blockResult) =>val existingMetrics = context.taskMetrics.getInputMetricsForReadMethod(blockResult.readMethod)existingMetrics.incBytesRead(blockResult.bytes)val iter = blockResult.data.asInstanceOf[Iterator[T]]new InterruptibleIterator[T](context, iter) {override def next(): T = {existingMetrics.incRecordsRead(1)delegate.next()}}case None =>val storedValues = acquireLockForPartition[T](key)if (storedValues.isDefined) {return new InterruptibleIterator[T](context, storedValues.get)}try {logInfo(s"Partition $key not found, computing it")val computedValues = rdd.computeOrReadCheckpoint(partition, context)if (context.isRunningLocally) {return computedValues}val updatedBlocks = new ArrayBuffer[(BlockId, BlockStatus)]val cachedValues = putInBlockManager(key, computedValues, storageLevel, updatedBlocks)val metrics = context.taskMetricsval lastUpdatedBlocks = metrics.updatedBlocks.getOrElse(Seq[(BlockId, BlockStatus)]())metrics.updatedBlocks = Some(lastUpdatedBlocks ++ updatedBlocks.toSeq)new InterruptibleIterator(context, cachedValues)} finally {loading.synchronized {loading.remove(key)loading.notifyAll()}}}
}

同时getOrCompute函数会对block进行判断:
如果该block存在,表示此RDD在之前已经被计算过和存储在BlockManager中,因此取出即可,无需再重新计算。
如果该block不存在则需要调用RDD的computeOrReadCheckpoint()函数计算出新的block,并将其存储到BlockManager中。
需要注意的是block的计算和存储是阻塞的,若另一线程也需要用到此block则需等到该线程block的loading结束。

相关文章:

Spark内核解析-数据存储5(六)

1、Spark的数据存储 Spark计算速度远胜于Hadoop的原因之一就在于中间结果是缓存在内存而不是直接写入到disk&#xff0c;本文尝试分析Spark中存储子系统的构成&#xff0c;并以数据写入和数据读取为例&#xff0c;讲述清楚存储子系统中各部件的交互关系。 1.1存储子系统概览 …...

ASP.NET Core高级之认证与授权(一)--JWT入门-颁发、验证令牌

阅读本文你的收获 了解认证和授权的作用了解在ASP.NET Core中实现身份认证的技术都有哪些学习基于JWT认证并学会颁发和验证JWT令牌 一、重要的前置概念 在一个系统中&#xff0c;不是所有的功能和资源都能够被自由地访问&#xff0c;比如你存在银行系统里面的资金&#xff0c…...

实例:NodeJS 操作 Kafka

本人是C#出身的程序员&#xff0c;c#很简单就能实现&#xff0c;有需要的可以加我私聊。但是就目前流行的开发语言&#xff0c;尤其是面向web方向应用的&#xff0c;我感觉就是Nodejs最简单了。下面介绍&#xff1a; 本文将会介绍在windows环境下启动Kafka&#xff0c;并通过n…...

AI实景无人直播创业项目:开启自动直播新时代,一部手机即可实现增长

在当今社会&#xff0c;直播已经成为了人们日常生活中不可或缺的一部分。无论是商家推广产品、明星互动粉丝还是普通人分享生活&#xff0c;直播已经渗透到了各行各业。然而&#xff0c;传统直播方式存在着一些不足之处&#xff0c;如需现场主持人操作、高昂的费用等。近年来&a…...

YOLOv5改进 | 损失函数篇 | InnerIoU、InnerSIoU、InnerWIoU、FocusIoU等损失函数

一、本文介绍 本文给大家带来的是YOLOv5最新改进,为大家带来最近新提出的InnerIoU的内容同时用Inner的思想结合SIoU、WIoU、GIoU、DIoU、EIOU、CIoU等损失函数,形成 InnerIoU、InnerSIoU、InnerWIoU等新版本损失函数,同时还结合了Focus和AIpha思想,形成的新的损失函数,其…...

构建高效PythonWeb:GraphQL+Sanic

1.1 简介&#xff1a;在当今快速发展的技术时代&#xff0c;Web应用的性能和灵活性变得越来越重要。在众多技术中&#xff0c;GraphQL和Sanic以其独特的优势脱颖而出。GraphQL&#xff0c;作为一个强大的数据查询语言&#xff0c;为前端和后端之间的通信提供了极大的灵活性。而…...

【通义千问】大模型Qwen GitHub开源工程学习笔记(5)-- 模型的微调【全参数微调】【LoRA方法】【Q-LoRA方法】

摘要: 训练数据的准备 你需要将所有样本放到一个列表中并存入json文件中。每个样本对应一个字典,包含id和conversation,其中后者为一个列表。示例如下所示: [{"id": "identity_0","conversations": [{"from": "user",…...

PCL 大地坐标转空间直角坐标(C++详细过程版)

目录 一、算法原理二、代码实现三、结果展示四、测试数据本文由CSDN点云侠原创,原文链接。如果你不是在点云侠的博客中看到该文章,那么此处便是不要脸的爬虫与GPT生成的文章。 一、算法原理 二、代码实现 头文件及读取保存函数见:PCL 空间直角坐标转大地坐标(直接求解法C…...

Linux之Shell编程

shell是什么 shell是一个命令行解释器&#xff0c;他为用户提供一个向linux内核发送请求以便运行程序的界面系统级程序&#xff0c;用户可以用shell来启动&#xff0c;挂起&#xff0c;停止甚至编写一些程序。 shell脚本的执行方式 脚本格式要求 脚本以#!/bin/bash开头脚本需…...

Unity组件开发--传送点

本组件仅实现A传送点到B传送的功能&#xff0c;是可以双向传送的&#xff0c;如果只要单向传送&#xff0c;可以另外改脚本实现&#xff1b; 先看效果&#xff1a; unity组件传送点演示 1.传送组件shader是怎么写的&#xff1a;这种效果的实现方案 shader编辑器是这样的&#…...

vue结合Cesium加载gltf模型

Cesium支持什么格式&#xff1f; Cesium支持的格式包括&#xff1a;3D模型格式&#xff08;如COLLADA、gITF、OBJ&#xff09;、影像格式&#xff08;如JPEG、PNG、GeoTIFF&#xff09;、地形格式&#xff08;如STL、Heightmap&#xff09;、矢量数据格式&#xff08;如GeoJSON…...

逆置算法和数组循环移动算法

元素逆置 概述&#xff1a;其实就是将 第一个元素和最后一个元素交换&#xff0c;第二个元素和倒数第二个元素交换&#xff0c;依次到中间位置。用途&#xff1a;可用于数组的移动&#xff0c;字符串反转&#xff0c;链表反转操作&#xff0c;栈和队列反转等操作。 逆置图解 …...

【MATLAB】数豆子

Matlab数豆子 创建一个变量来表示豆子的数量。例如&#xff0c;可以使用豆子数量 100;来表示有100颗豆子。 使用disp函数打印出豆子的数量。例如&#xff0c;可以使用disp([目前有 num2str(豆子数量) 颗豆子])来打印出当前豆子的数量。 进行豆子的计数操作。例如&#xff0c…...

QT C++中调用python脚本时,import第三方库失败问题解决

QT C中调用python脚本时&#xff0c;import第三方库失败问题解决 文章目录 QT C中调用python脚本时&#xff0c;import第三方库失败问题解决前言一、问题复现二、调试过程三、问题解决1 numpy问题解决2 matplotlib问题解决 四、补充说明五、参考资料 前言 项目需要&#xff0c…...

【AI视野·今日Robot 机器人论文速览 第七十期】Thu, 4 Jan 2024

AI视野今日CS.Robotics 机器人学论文速览 Thu, 4 Jan 2024 Totally 17 papers &#x1f449;上期速览✈更多精彩请移步主页 Daily Robotics Papers Many-Objective-Optimized Semi-Automated Robotic Disassembly Sequences Authors Takuya Kiyokawa, Kensuke Harada, Weiwei …...

Flutter中的布局组件介绍及使用

1. 引言 Flutter 是一款由 Google 开发的开源 UI 软件开发工具&#xff0c;可用于在单个代码库中构建漂亮、本机编译的应用程序。在 Flutter 中&#xff0c;布局是构建用户界面的核心部分之一。本文将介绍 Flutter 中的全部布局组件&#xff0c;以及它们的使用方式。 2. 基础…...

【面试高频算法解析】算法练习2 回溯(Backtracking)

前言 本专栏旨在通过分类学习算法&#xff0c;使您能够牢固掌握不同算法的理论要点。通过策略性地练习精选的经典题目&#xff0c;帮助您深度理解每种算法&#xff0c;避免出现刷了很多算法题&#xff0c;还是一知半解的状态 专栏导航 二分查找回溯&#xff08;Backtracking&…...

认识Git

&#x1f30e;初识Git 初识Git 什么是Git Git的安装       Centos平台安装Git       Ubuntu平台安装Git Git的基本操作       创建远程仓库       配置Git 认识工作区、暂存区与版本库       添加文件到暂存区       将暂存区文件提交至本…...

@RequestParam,@RequestBody和@PathVariable 区别

RequestParam&#xff0c;RequestBody和PathVariable 这三者是spring常见的接受前端数据的注解&#xff0c;那么他们分别是接受什么的前端数据呢&#xff1f; RequestParam&#xff1a;这个注解主要用于处理请求参数&#xff0c;尤其是GET请求中的查询参数和表单参数。它可以用…...

vue3组件传参

1、props: 2、自定义事件子传父 3、mitt任意组件通讯 4、v-model通讯(v-model绑定在组件上) (1)V2中父子组件的v-model通信&#xff0c;限制了popos接收的属性名必须为value和emit触发的事件名必须为input,所以有时会有冲突; 父组件: 子组件: (2)V3中:限制了popos接收的属性名…...

React16源码: React中创建更新的方式及ReactDOM.render的源码实现

React当中创建更新的主要方式 ReactDOM.render || hydrate 这两个API都是我们要把整个应用第一次进行渲染到我们的页面上面能够展现出来我们整个应用的样子的一个过程这是初次渲染 setState 后续更新应用 forceUpdate 后续更新应用 replaceState 在后续被舍弃 关于 ReactDOM…...

CentOS 7 系列默认的网卡接口名称

CentOS 7 系列默认的网卡接口是随机的&#xff0c;如果要修改网卡名称以 eth 开头&#xff0c;有两种方式。 方法一&#xff1a;安装系统时 在安装界面移动光标到 Install Centos 7.按 TAB 键 在出现的代码的末尾添加&#xff1a;net.ifnames0 biosdevname0.按下回车开始安装即…...

多文件上传

HTML中实现多文件上传是通过用<input type"file">元素的multiple属性&#xff0c;以下简单描述多文件上传的步骤 HTML表单准备&#xff0c;使用<input type"file">元素&#xff0c;并为其添加multiple属性&#xff0c;以允许用户选择多个文件…...

2024.1.7力扣每日一题——赎金信

2024.1.7 题目来源我的题解方法一 哈希表方法二 数组 题目来源 力扣每日一题&#xff1b;题序&#xff1a;383 我的题解 方法一 哈希表 使用哈希表记录ransomNote中所需字符的数量&#xff0c;然后遍历magazine并将哈希表中存在的对应的数量减一 时间复杂度&#xff1a;O(nm…...

C#中List<T>底层原理剖析

C#中List底层原理剖析 1. 基础用法2. List的Capacity与Count&#xff1a;3.List的底层原理3.1. 构造3.2 Add()接口3.3 Remove()接口3.4 Inster()接口3.5 Clear()接口3.6 Contains()接口3.7 ToArray()接口3.8 Find()接口3.8 Sort()接口 4. 总结5. 参考 1. 基础用法 list.Max() …...

Leetcode 3003. Maximize the Number of Partitions After Operations

Leetcode 3003. Maximize the Number of Partitions After Operations 1. 解题思路2. 代码实现 题目链接&#xff1a;10038. Maximize the Number of Partitions After Operations 1. 解题思路 这一题我看实际比赛当中只有72个人做出来&#xff0c;把我吓得够呛&#xff0c;…...

MySQL第一讲:MySQL知识体系详解(P6精通)

MySQL知识体系详解(P6精通) MySQL不论在实践还是面试中,都是频率最高的。本系列主要对MySQL知识体系梳理,将给大家构建JVM核心知识点全局知识体系,本文是MySQL第一讲,MySQL知识体系详解。 文章目录 MySQL知识体系详解(P6精通)1、MySQL学习建议1.1、为什么学习 MySQL?1.2、…...

逻辑回归简单案例分析--鸢尾花数据集

文章目录 1. IRIS数据集介绍2. 具体步骤2.1 手动将数据转化为numpy矩阵2.1.1 从csv文件数据构建Numpy数据2.1.2 模型的搭建与训练2.1.3 分类器评估2.1.4 分类器的分类报告总结2.1.5 用交叉验证&#xff08;Cross Validation&#xff09;来验证分类器性能2.1.6 完整代码&#xf…...

Python print 高阶玩法

Python print 高阶玩法 当涉及到在Python中使用print函数时&#xff0c;有许多方式可以玩转文本样式、字体和颜色。在此将深入探讨这些主题&#xff0c;并介绍一些print函数的高级用法。 1. 基本的文本样式与颜色设置 使用ANSI转义码 ANSI转义码是一种用于在终端&#xff0…...

Wpf 使用 Prism 实战开发Day09

设置模块设计 1.效果图 一.系统设置模块&#xff0c;主要有个性化(用于更改主题颜色)&#xff0c;系统设置&#xff0c;关于更多&#xff0c;3个功能点。 个性化的颜色内容样式&#xff0c;主要是从 Material Design Themes UI简称md、提供的demo里复制代码过来使用的。 1.设置…...

赚钱网站入口/百度指数官网入口登录

污水中含磷量过高&#xff0c;是不允许排放的&#xff0c;过高的含磷量会导致水体富营养化&#xff0c;从而引发“绿潮”“水华”等水体污染&#xff0c;破坏水生态的自我净化能力&#xff0c;导致水底生物因为缺氧而死亡。所以不管是需要处理才能排放的污水&#xff0c;还是已…...

网站开发z亿玛酷1流量订制/国际新闻直播

天津科技大学计算机科学与信息工程学院简介(College of Computer Science and Information Engineering)计算机科学与信息工程学院成立于2003年&#xff0c;其前身为成立于2000年的计算机系。学院以本科教育为主&#xff0c;稳步发展研究生教育&#xff0c;积极开展中外合作办学…...

长沙公司网站开发/世界营销大师排名

提出问题&#xff1a; 回调函数是基于C编程的Windows SDK的技术&#xff0c;不是针对C的&#xff0c;程序员可以将一个C函数直接作为回调函数&#xff0c;但是如果试图直接使用C的成员函数作为回调函数将发生错误&#xff0c;甚至编译就不能通过。分析原因&#xff1a;普通的C成…...

设计公司企业介绍/广州seo网络优化公司

当我们的电脑出现一些问题的时候&#xff0c;很多都可以从bios设置里面进行修改解决问题。这里我们整理了BIOS设置问题详解&#xff0c;如果你遇到问题可以查看相应的进行解决。1.电脑的系统时间不准【故障现象】一台使用了较长时间的兼容机,每次启动后系统的时间都是从1998年1…...

广州地铁2号线/seo短视频网页入口

详见&#xff1a;http://blog.yemou.net/article/query/info/tytfjhfascvhzxcyt205 利用Java反射机制你可以在运行期动态的创建接口的实现。java.lang.reflect.Proxy类就可以实现这一功能。这个类的名字&#xff08;译者注&#xff1a;Proxy意思为代理&#xff09;就是为什么把…...

合肥网站建设求职简历/新媒体运营是做什么

参考https://blog.csdn.net/weixin_44678261/article/details/102503018 下载mysql rpm安装包 MySQL :: Download MySQL Community Server # /usr/local/目录下 创建mysql目录 cd /usr/local mkdir mysql cd /usr/local/mysql #解压缩指定目录下的mysql压缩文件到mysql目录下…...