当前位置:首页 > 开发 > 系统架构 > 架构 > 正文

hbase compact源码分析

发表于: 2014-06-05   作者:blackproof   来源:转载   浏览次数:
摘要: 工作的地方不让上网,回家补个笔记,好惨好惨   主要的步骤都在HRegion下的Store compact方法中 Store.compact(final List<StoreFile> filesToCompact, final boolean majorCompaction, final long ma

工作的地方不让上网,回家补个笔记,好惨好惨

 

主要的步骤都在HRegion下的Store compact方法中

Store.compact(final List<StoreFile> filesToCompact, 
                               final boolean majorCompaction, final long maxId)

1.根据filesToCompat,生成hfile所对应的StoreFileScanner 

2.创建StoreScanner 类,负责顺序便利所有要compact的file,对生成的storeFileScanner进行封装,主要方法为next

                   2.1创建ScanQueryMatcher 类,负责判断是否过滤删除keyvalue

                   2.2创建KeyValueHeap类,存放StoreFileScanner,进行堆的排序,根据hfile的startrowkey进行排序,hfile自身有序 

                   2.3维护一个current的StoreFileScanner

3.调用StoreScanner 的next方法

                  3.1获取current的StoreFileScanner的startrowkey,进行poll操作

                  3.2使用ScanQueryMatcher类,对输出的keyValue进行流程分支判断

                  3.3调用KeyValueHeap类next方法,更新current的StoreFileScanner,若当前current.startrowkey>heap.peek.startrowkey,则将current放入堆中,并从堆中取出作为current,使得下次获得当前最小keyvalue;(keyvalueheap使用PriorityQueue)

 

 

补贴一个StoreScanner调用ScanQueryMatcher类的next方法,在这里确定kv是写入新tmp文件,还是skip掉

 

StoreScanner类 
public boolean next(List<Cell> outResult, int limit) throws IOException {
    lock.lock();
    try {
    if (checkReseek()) {
      return true;
    }

    // if the heap was left null, then the scanners had previously run out anyways, close and
    // return.
    if (this.heap == null) {
      close();
      return false;
    }

    KeyValue peeked = this.heap.peek();
    if (peeked == null) {
      close();
      return false;
    }

    // only call setRow if the row changes; avoids confusing the query matcher
    // if scanning intra-row
    byte[] row = peeked.getBuffer();
    int offset = peeked.getRowOffset();
    short length = peeked.getRowLength();
    if (limit < 0 || matcher.row == null || !Bytes.equals(row, offset, length, matcher.row,
        matcher.rowOffset, matcher.rowLength)) {
      this.countPerRow = 0;
      matcher.setRow(row, offset, length);
    }

    KeyValue kv;

    // Only do a sanity-check if store and comparator are available.
    KeyValue.KVComparator comparator =
        store != null ? store.getComparator() : null;

    int count = 0;
    LOOP: while((kv = this.heap.peek()) != null) {
      if (prevKV != kv) ++kvsScanned; // Do object compare - we set prevKV from the same heap.
      checkScanOrder(prevKV, kv, comparator);
      prevKV = kv;

      ScanQueryMatcher.MatchCode qcode = matcher.match(kv);//使用ScanQueryMatcher类,进行判断,查看是否
      switch(qcode) {
        case INCLUDE:

 

   删除数据的逻辑
     /*
     * The delete logic is pretty complicated now.
     * This is corroborated by the following:
     * 1. The store might be instructed to keep deleted rows around.
     * 2. A scan can optionally see past a delete marker now.
     * 3. If deleted rows are kept, we have to find out when we can
     *    remove the delete markers.
     * 4. Family delete markers are always first (regardless of their TS)
     * 5. Delete markers should not be counted as version
     * 6. Delete markers affect puts of the *same* TS
     * 7. Delete marker need to be version counted together with puts
     *    they affect
     */
 byte type = bytes[initialOffset + keyLength - 1];
    if (kv.isDelete()) {
      if (!keepDeletedCells) {
        // first ignore delete markers if the scanner can do so, and the
        // range does not include the marker
        //
        // during flushes and compactions also ignore delete markers newer
        // than the readpoint of any open scanner, this prevents deleted
        // rows that could still be seen by a scanner from being collected
        boolean includeDeleteMarker = seePastDeleteMarkers ?
            tr.withinTimeRange(timestamp) :
            tr.withinOrAfterTimeRange(timestamp);
        if (includeDeleteMarker
            && kv.getMvccVersion() <= maxReadPointToTrackVersions) {
          this.deletes.add(bytes, offset, qualLength, timestamp, type);
        }
        // Can't early out now, because DelFam come before any other keys
      }
      if (retainDeletesInOutput
          || (!isUserScan && (EnvironmentEdgeManager.currentTimeMillis() - timestamp) <= timeToPurgeDeletes)
          || kv.getMvccVersion() > maxReadPointToTrackVersions) {//minor compact
        // always include or it is not time yet to check whether it is OK
        // to purge deltes or not
        if (!isUserScan) {
          // if this is not a user scan (compaction), we can filter this deletemarker right here
          // otherwise (i.e. a "raw" scan) we fall through to normal version and timerange checking
          return MatchCode.INCLUDE;
        }
      } else if (keepDeletedCells) {
        if (timestamp < earliestPutTs) {
          // keeping delete rows, but there are no puts older than
          // this delete in the store files.
          return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
        }
        // else: fall through and do version counting on the
        // delete markers
      } else {//major compact
        return MatchCode.SKIP;
      }
      // note the following next else if...
      // delete marker are not subject to other delete markers
    } else if (!this.deletes.isEmpty()) {
      DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
          timestamp);
      switch (deleteResult) {
        case FAMILY_DELETED:
        case COLUMN_DELETED:
          return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
        case VERSION_DELETED:
        case FAMILY_VERSION_DELETED:
          return MatchCode.SKIP;
        case NOT_DELETED:
          break;
        default:
          throw new RuntimeException("UNEXPECTED");
        }
    }

 

/**
 *做scan的类型
 * Enum to distinguish general scan types.
 */
@InterfaceAudience.Private
public enum ScanType {
  COMPACT_DROP_DELETES,
  COMPACT_RETAIN_DELETES,
  USER_SCAN
}
  
对delete data的处理
  /*
   * The following three booleans define how we deal with deletes.
   * There are three different aspects:
   * 1. Whether to keep delete markers. This is used in compactions.
   *    Minor compactions always keep delete markers.
   * 2. Whether to keep deleted rows. This is also used in compactions,
   *    if the store is set to keep deleted rows. This implies keeping
   *    the delete markers as well.
   *    In this case deleted rows are subject to the normal max version
   *    and TTL/min version rules just like "normal" rows.
   * 3. Whether a scan can do time travel queries even before deleted
   *    marker to reach deleted rows.
   */
 

 

 

 

 

 

 

hbase compact源码分析

  • 0

    开心

    开心

  • 0

    板砖

    板砖

  • 0

    感动

    感动

  • 0

    有用

    有用

  • 0

    疑问

    疑问

  • 0

    难过

    难过

  • 0

    无聊

    无聊

  • 0

    震惊

    震惊

编辑推荐
先上一张图讲一下Compaction和Split的关系,这样会比较直观一些。 Compaction把多个MemStore flush
再次吐槽公司的sb环境,不让上网不能插优盘,今天有事回家写一下笔记HBase region split 在管理集群
再次吐槽公司的sb环境,不让上网不能插优盘,今天有事回家写一下笔记HBase region split 在管理集群
在debug region一文中http://blog.csdn.net/mrtitan/article/details/8209994已经说到Hbase中每个re
在debug region一文中http://blog.csdn.net/mrtitan/article/details/8209994已经说到Hbase中每个re
从这篇文章开始终于要讨论比较正常版本的hbase了---0.92.1~~ Scan是hbase提供的非常重要的功能之一
现象:60020中有许多连接,并且长久不放; hbase hbck已经连接不上60020 日志中出现大量以下日志:
现象:60020中有许多连接,并且长久不放; hbase hbck已经连接不上60020 日志中出现大量以下日志:
* table disabled is NOT different from region,when all regions have been closed,then the tabl
1.principle 2.note -as is hbase-0.94.2,the table .meta. is not splittable,therefore,there is
版权所有 IT知识库 CopyRight © 2009-2015 IT知识库 IT610.com , All Rights Reserved. 京ICP备09083238号