diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index c688a64580..1266efcf8e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -23,8 +23,7 @@ struct SDiskCol { int8_t type; int8_t flag; int32_t nVal; - uint8_t bit; - SCompressor *pBitC; + uint8_t *pBitMap; int32_t offset; SCompressor *pOffC; SCompressor *pValC; @@ -39,7 +38,6 @@ static int32_t tDiskColReset(SDiskCol *pDiskCol, int16_t cid, int8_t type, uint8 pDiskCol->flag = 0; pDiskCol->nVal = 0; - tCompressorReset(pDiskCol->pBitC, TSDB_DATA_TYPE_TINYINT, cmprAlg, 1); tCompressorReset(pDiskCol->pOffC, TSDB_DATA_TYPE_INT, cmprAlg, 1); tCompressorReset(pDiskCol->pValC, type, cmprAlg, 1); @@ -82,17 +80,13 @@ static int32_t tDiskColAddVal1(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_N if (!pColVal->isNone) { // bit map - pDiskCol->bit = 0; - for (int32_t i = 0; i < (pDiskCol->nVal >> 3); i++) { - code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); - if (code) goto _exit; - } - SET_BIT1(&pDiskCol->bit, (pDiskCol->nVal & 0x3), 1); - if ((pDiskCol->nVal & 0x3) == 7) { - code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); - if (code) goto _exit; - pDiskCol->bit = 0; - } + int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1); + + code = tRealloc(&pDiskCol->pBitMap, nBit); + if (code) goto _exit; + + memset(pDiskCol->pBitMap, 0, nBit); + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); // value if (pColVal->isNull) { @@ -119,34 +113,20 @@ static int32_t tDiskColAddVal2(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_N int32_t code = 0; if (!pColVal->isNull) { + int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1); + code = tRealloc(&pDiskCol->pBitMap, nBit); + if (code) goto _exit; + if (pColVal->isNone) { pDiskCol->flag |= HAS_NONE; - pDiskCol->bit = 1; - for (int32_t i = 0; i < (pDiskCol->nVal >> 3); i++) { - code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); - if (code) goto _exit; - } - SET_BIT1(&pDiskCol->bit, (pDiskCol->nVal & 0x3), 0); - if ((pDiskCol->nVal & 0x3) == 7) { - code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); - if (code) goto _exit; - pDiskCol->bit = 0; - } + memset(pDiskCol->pBitMap, 255, nBit); + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); } else { pDiskCol->flag |= HAS_VALUE; - pDiskCol->bit = 0; - for (int32_t i = 0; i < (pDiskCol->nVal >> 3); i++) { - code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); - if (code) goto _exit; - } - SET_BIT1(&pDiskCol->bit, (pDiskCol->nVal & 0x3), 1); - if ((pDiskCol->nVal & 0x3) == 7) { - code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); - if (code) goto _exit; - pDiskCol->bit = 0; - } + memset(pDiskCol->pBitMap, 0, nBit); + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { @@ -166,23 +146,30 @@ _exit: static int32_t tDiskColAddVal3(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_NULL|HAS_NONE int32_t code = 0; - uint8_t mod8 = (pDiskCol->nVal & 0x3); if (pColVal->isNone) { - SET_BIT1(&pDiskCol->bit, mod8, 0); - if (mod8 == 7) { - code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); - if (code) goto _exit; - } + code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1)); + if (code) goto _exit; + + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); } else if (pColVal->isNull) { - SET_BIT1(&pDiskCol->bit, mod8, 1); - if (mod8 == 7) { - code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); - if (code) goto _exit; - } + code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1)); + if (code) goto _exit; + + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); } else { pDiskCol->flag |= HAS_VALUE; - // convert from bit1 to bit2 and add a 2 (todo) + uint8_t *pBitMap = NULL; + code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1)); + if (code) goto _exit; + + for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { + SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal)); + } + SET_BIT2(pBitMap, pDiskCol->nVal, 2); + + tFree(pDiskCol->pBitMap); + pDiskCol->pBitMap = pBitMap; SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { @@ -208,7 +195,12 @@ static int32_t tDiskColAddVal4(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_V pDiskCol->flag |= HAS_NULL; } - // set bitmap (todo) + int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1); + code = tRealloc(&pDiskCol->pBitMap, nBit); + if (code) goto _exit; + + memset(pDiskCol->pBitMap, 255, nBit); + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); code = tDiskColAddValue(pDiskCol, pColVal); if (code) goto _exit; @@ -216,7 +208,6 @@ static int32_t tDiskColAddVal4(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_V code = tDiskColAddValue(pDiskCol, pColVal); if (code) goto _exit; } - pDiskCol->nVal++; _exit: @@ -228,12 +219,25 @@ static int32_t tDiskColAddVal5(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_V if (pColVal->isNull) { pDiskCol->flag |= HAS_NULL; - // convert bit1 to bit2 + uint8_t *pBitMap = NULL; + code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1)); + if (code) goto _exit; + + for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { + SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal) ? 2 : 0); + } + SET_BIT2(pBitMap, pDiskCol->nVal, 1); + + tFree(pDiskCol->pBitMap); + pDiskCol->pBitMap = pBitMap; } else { + code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1)); + if (code) goto _exit; + if (pColVal->isNone) { - // SET_BIT1(0); + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); } else { - // SET_BIT1(1); + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); } } code = tDiskColAddValue(pDiskCol, pColVal); @@ -248,12 +252,26 @@ static int32_t tDiskColAddVal6(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_V if (pColVal->isNone) { pDiskCol->flag |= HAS_NONE; - // bit1 to bit2 + + uint8_t *pBitMap = NULL; + code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1)); + if (code) goto _exit; + + for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { + SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal) ? 2 : 1); + } + SET_BIT2(pBitMap, pDiskCol->nVal, 0); + + tFree(pDiskCol->pBitMap); + pDiskCol->pBitMap = pBitMap; } else { + code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1)); + if (code) goto _exit; + if (pColVal->isNull) { - // SET_BIT1(0) + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); } else { - // SET_BIT1(1) + SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); } } code = tDiskColAddValue(pDiskCol, pColVal); @@ -266,12 +284,15 @@ _exit: static int32_t tDiskColAddVal7(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE|HAS_NULL|HAS_NONE int32_t code = 0; + code = tRealloc(&pDiskCol->pBitMap, BIT2_SIZE(pDiskCol->nVal + 1)); + if (code) goto _exit; + if (pColVal->isNone) { - // set_bit2(0); + SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 0); } else if (pColVal->isNull) { - // set_bit2(1); + SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 1); } else { - // set_bit2(2); + SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 2); } code = tDiskColAddValue(pDiskCol, pColVal); if (code) goto _exit;