Allxuio之UFS数据读取

Client读取UFS的数据

首先在《Alluxio的FileInStream记录》一文中将Local模式的介绍了一遍。但是有个疑问:如果文件是写在UFS(底层文件存储)上的话,那么文件改如何读取呢?开始我以为会像Local模式一样,通过NIO的MappedByteBuffer读取UFS上的文件,并且同时cache到Mem中。再仔细看完代码后,发现事实并非如此,在FileInStream中,只要是非Local模式的,都会通过Netty走网络,即使是在本机的UFS上面也是如此。主要是Local模式外加设置USER_SHORT_CIRCUIT_ENABLED参数来使用直接读取文件,这样做是绕过Alluxio,是属于特殊情况。此外对于Local来说,文件结构是在Alluxio的控制之下的,而对于UFS的则无法控制而且也负责很多,统一的办法就是利用Netty和数据Buffer来读取数据。这部分内容的代码如下

public static BlockInStream create(...) {
    if (Configuration.getBoolean(PropertyKey.USER_SHORT_CIRCUIT_ENABLED)
        && !NettyUtils.isDomainSocketSupported(address)
        && blockSource == BlockInStreamSource.LOCAL) {
        return createLocalBlockInStream(context, address, blockId, blockSize, options);
      }
    }
    return createNettyBlockInStream(context, address, blockSource,blockSize,options);
  }
  这是BlockInStream类中的create方法,修改了无关内容,从if的条件判断可以看出上面所说的内容。

到这里client端的事情基本结束,因为再往下是《Alluxio的FileInStream记录》文中介绍的内容,当然这里在packet读取部分,是通过Netty走的是网络传输。既然如此,就不再赘述了,希望了解细节的朋友,可以结合《Alluxio的FileInStream记录》一文和代码来深入了解。

Worker读取UFS数据

那么client端既然已经结束了,实际读取的工作肯定是由worker来做了。这里有放出一个问题:worker是以什么样的方式来读取数据,以及如何将数据载入内容的?

再往下继续之前,需要介绍一下worker的结构,以及netty的简单内容。否则下面会有些小难。首先说下netty,这是一个以事件为基础的java网络框架,简单,高效,灵活,统一。用过原生nio的小伙伴,看到netty一定会有种相见恨晚的感觉。这里有一个ChannelPipeline对象,这是一个拦截流经 Channel 的入站和出站事件的 ChannelHandler 实例链

netty struct 如上图可见,一个I/O事件在pipline上进行传播。那么,在worker中也有这么一个对象,负责来处理client的请求。

    // Block Handlers
    pipeline.addLast("blockReadHandler",new BlockReadHandler(NettyExecutors.BLOCK_READER_EXECUTOR,
            mWorkerProcess.getWorker(BlockWorker.class), mFileTransferType));

这是在worker项目下的PipelineHandler类中的代码。这里的BlockReadHandler就是主要处理client的读取Block的请求。实际做事的是BlockPacketReader的内部类。由于pipline你不能阻塞在这里的,否则后面的处理器会迟迟得不到调用的。这里在BlockReadHandler收到读取请求,被调用channelRead方法时候。会创建一个BlockPacketReader以一个独立线程的方式,进行数据读取操作。

  /**
    *  Invoked when the current {@link Channel} has read a message from the peer.
    */
  @Override
  public void channelRead(ChannelHandlerContext ctx, Object object) throws Exception {
    Protocol.ReadRequest msg =((RPCProtoMessage)object).getMessage().asReadRequest();
    validateReadRequest(msg);
    //创建一个BlockPacketReader并执行。
    mPacketReaderExecutor.submit(createPacketReader(mContext, ctx.channel()));
    mContext.setPacketReaderActive(true);
    
  }
  依然只留主要部分代码。这个channelRead会在收到消息时候被调用。

那么下面继续回到worker的UFS文件读取。

BlockPacketReader的父类PacketReader有一个run方法,无外乎就是从文件所在位置,读取并且传输给client端。注意这里是完全异步的,没有阻塞存在。client端读取好数据交给netty,netty传输给client,client收到数据后是放到队列中,client读取数据是从队列中取数。这个过程中每个人都是做自己的时间,不会等待网络传输。

在run方法中,通过UnderFileSystemBlockReader类将文件中的数据读取到buffer中,再交由netty传输给client端。从文件中读取数据的同时,会将数据写入Mem中。当数据传输完之后,worker会将这个Mem的Block提交,成为正式的block。下面我们来详细看一下。 这里需要说一下Block如何可用。首先调用BlockStore的createBlock方法,创建一个临时Block。然后调用getBlockWriter方法,获得改临时Block的写对象。一直到写入数据完成,这时改Block对外是不见的,只有这个worker自身知道存在。最终调用commitBlock方法将这个Block提交,这样这个Block就称为可见可用状态了。

下面三段代码,分别对应是初始化,读数据并写入Alluxio中,关闭提交Block。我已经将代码所在的类标明了,为了容易阅读,我把无关紧要的代码都去除了,只留下主要的代码。需要看具体代码的话,按照所在类查找实际代码来阅读。

第一步.临时Block的初始化

  
  //这里是UnderFileSystemBlockReader中更新BlockWriter的方法。无关代码依然去除了
  private void updateBlockWriter(long offset) throws IOException {
    //首先创建了一个临时Block
    mLocalBlockStore.createBlock(mBlockMeta.getSessionId(), mBlockMeta.getBlockId(), loc,mInitialBlockSize);
    //获得了改临时Block的写方法。
    mBlockWriter = mLocalBlockStore.getBlockWriter(mBlockMeta.getSessionId(), mBlockMeta.getBlockId());
      
  }

第二步.读取UFS文件数据,同时写入临时Block

  //这是UnderFileSystemBlockReader中读取数据的方法。这里只是将数据读出,并且写入到到BlockerWriter中。
  public ByteBuffer read(long offset, long length) throws IOException {
 
    byte[] data = new byte[(int) bytesToRead];
    read = mUnderFileSystemInputStream.read(data, bytesRead, (int) 
    ByteBuffer buffer = ByteBuffer.wrap(data, (int) (mBlockWriter.getPosition() - offset),(int) (mInStreamPos - mBlockWriter.getPosition()));
    //同时写入Alluxio的Blocker中
    mBlockWriter.append(buffer.duplicate());
    return ByteBuffer.wrap(data, 0, bytesRead);
  }

第三步.数据读取完毕后,那么将临时Block提交,使其对外可见。

  //这是在DefaultBlockerWorker类中的代码
  public void closeUfsBlock(long sessionId, long blockId) {
      mUnderFileSystemBlockStore.closeReaderOrWriter(sessionId, blockId);
      try {
      //数据写完后,BlockerWorker关闭UFS的block,同时进行了Alluxio的Block提交工作
        commitBlock(sessionId, blockId);
      } catch (BlockDoesNotExistException e) {
    }
    mUnderFileSystemBlockStore.releaseAccess(sessionId, blockId);
  }

至此,对于UFS上的文件读取就完成了,同时这个Block文件也成为了Alluxio的文件了。下次倘若再读取这个文件,就不会进入UFS的文件读取了。

这里有一个小问题:我们都知道Alluxio会Mem的文件按照规则进行eviction操作。那么如果一个Block被evict到了Disk上面后,倘若此时再进行读取数据,是按照什么逻辑呢?是当成Local通过short circuit的直接读取方式,还是通过netty来呢?

最近的文章

Memory-Mapped Files(MMAP)的深入探讨

关于Memory-Mapped文件的问题,问题多集中在在于unmap。官方有个05年至今还是Open状态的bug(参考1),说的就是unmap方法,其中有个回复说在JDK10中会解决,所有非常期待JDK10。对于mmap的使用至今没有找到一个来源可靠的资料,能给个大概的使用。网上对于java的MMAP文件使用资料不多,而且有大量今天看来是错误使用的。官方的API中对于mmap文件的使用,并未提供任何的unmap方法。对此给出的解释(参考1)和猜测的是一样的,还没有办法给出一个安全高效的un...…

继续阅读
更早的文章

alluxio的Worker之文件读取

这个系列,从一个场景作为突破口,最后再从整体来看一遍。首先在《Alluxio的FileInStream记录》一文中将Local模式的介绍了一遍。但是有个疑问:如果文件是写在UFS(底层文件存储)上的话,那么文件改如何读取呢?开始我以为会像Local模式一样,通过NIO的MappedByteBuffer读取UFS上的文件,并且同时cache到Mem中。再仔细看完代码后,发现事实并非如此,在FileInStream中,只要是非Local模式的,都会通过Netty走网络,即使是在本机的UFS上面...…

继续阅读