diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index b7b03e8288..c688a64580 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -23,7 +23,9 @@ struct SDiskCol { int8_t type; int8_t flag; int32_t nVal; + uint8_t bit; SCompressor *pBitC; + int32_t offset; SCompressor *pOffC; SCompressor *pValC; }; @@ -44,7 +46,21 @@ static int32_t tDiskColReset(SDiskCol *pDiskCol, int16_t cid, int8_t type, uint8 return code; } -static int32_t tDiskColAddVal0(SDiskCol *pDiskCol, SColVal *pColVal) { +static int32_t tDiskColAddValue(SDiskCol *pDiskCol, SColVal *pColVal) { + int32_t code = 0; + + if (IS_VAR_DATA_TYPE(pColVal->type)) { + code = tCompress(pDiskCol->pOffC, &pDiskCol->offset, sizeof(int32_t)); + if (code) goto _exit; + pDiskCol->offset += pColVal->value.nData; + } + code = tCompress(pDiskCol->pValC, pColVal->value.pData, pColVal->value.nData /*TODO*/); + if (code) goto _exit; + +_exit: + return code; +} +static int32_t tDiskColAddVal0(SDiskCol *pDiskCol, SColVal *pColVal) { // 0 int32_t code = 0; if (pColVal->isNone) { @@ -53,7 +69,7 @@ static int32_t tDiskColAddVal0(SDiskCol *pDiskCol, SColVal *pColVal) { pDiskCol->flag = HAS_NULL; } else { pDiskCol->flag = HAS_VALUE; - code = tCompress(pDiskCol->pValC, pColVal->value.pData, pColVal->value.nData /*TODO*/); + code = tDiskColAddValue(pDiskCol, pColVal); if (code) goto _exit; } pDiskCol->nVal++; @@ -61,42 +77,144 @@ static int32_t tDiskColAddVal0(SDiskCol *pDiskCol, SColVal *pColVal) { _exit: return code; } -static int32_t tDiskColAddVal1(SDiskCol *pDiskCol, SColVal *pColVal) { +static int32_t tDiskColAddVal1(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_NONE int32_t code = 0; if (!pColVal->isNone) { + // bit map + pDiskCol->bit = 0; + for (int32_t i = 0; i < (pDiskCol->nVal >> 3); i++) { + code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); + if (code) goto _exit; + } + SET_BIT1(&pDiskCol->bit, (pDiskCol->nVal & 0x3), 1); + if ((pDiskCol->nVal & 0x3) == 7) { + code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); + if (code) goto _exit; + pDiskCol->bit = 0; + } + + // value + if (pColVal->isNull) { + pDiskCol->flag |= HAS_NULL; + } else { + pDiskCol->flag |= HAS_VALUE; + + SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); + for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { + code = tDiskColAddValue(pDiskCol, &cv); + if (code) goto _exit; + } + + code = tDiskColAddValue(pDiskCol, pColVal); + if (code) goto _exit; + } } pDiskCol->nVal++; _exit: return code; } -static int32_t tDiskColAddVal2(SDiskCol *pDiskCol, SColVal *pColVal) { +static int32_t tDiskColAddVal2(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_NULL int32_t code = 0; if (!pColVal->isNull) { + if (pColVal->isNone) { + pDiskCol->flag |= HAS_NONE; + + pDiskCol->bit = 1; + for (int32_t i = 0; i < (pDiskCol->nVal >> 3); i++) { + code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); + if (code) goto _exit; + } + SET_BIT1(&pDiskCol->bit, (pDiskCol->nVal & 0x3), 0); + if ((pDiskCol->nVal & 0x3) == 7) { + code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); + if (code) goto _exit; + pDiskCol->bit = 0; + } + } else { + pDiskCol->flag |= HAS_VALUE; + + pDiskCol->bit = 0; + for (int32_t i = 0; i < (pDiskCol->nVal >> 3); i++) { + code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); + if (code) goto _exit; + } + SET_BIT1(&pDiskCol->bit, (pDiskCol->nVal & 0x3), 1); + if ((pDiskCol->nVal & 0x3) == 7) { + code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); + if (code) goto _exit; + pDiskCol->bit = 0; + } + + SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); + for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { + code = tDiskColAddValue(pDiskCol, &cv); + if (code) goto _exit; + } + + code = tDiskColAddValue(pDiskCol, pColVal); + if (code) goto _exit; + } } pDiskCol->nVal++; _exit: return code; } -static int32_t tDiskColAddVal3(SDiskCol *pDiskCol, SColVal *pColVal) { +static int32_t tDiskColAddVal3(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_NULL|HAS_NONE int32_t code = 0; + uint8_t mod8 = (pDiskCol->nVal & 0x3); if (pColVal->isNone) { + SET_BIT1(&pDiskCol->bit, mod8, 0); + if (mod8 == 7) { + code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); + if (code) goto _exit; + } } else if (pColVal->isNull) { + SET_BIT1(&pDiskCol->bit, mod8, 1); + if (mod8 == 7) { + code = tCompress(pDiskCol->pBitC, &pDiskCol->bit, 1); + if (code) goto _exit; + } } else { + pDiskCol->flag |= HAS_VALUE; + + // convert from bit1 to bit2 and add a 2 (todo) + + SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); + for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { + code = tDiskColAddValue(pDiskCol, &cv); + if (code) goto _exit; + } + + code = tDiskColAddValue(pDiskCol, pColVal); + if (code) goto _exit; } pDiskCol->nVal++; +_exit: return code; } -static int32_t tDiskColAddVal4(SDiskCol *pDiskCol, SColVal *pColVal) { +static int32_t tDiskColAddVal4(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE int32_t code = 0; if (pColVal->isNone || pColVal->isNull) { + if (pColVal->isNone) { + pDiskCol->flag |= HAS_NONE; + } else { + pDiskCol->flag |= HAS_NULL; + } + + // set bitmap (todo) + + code = tDiskColAddValue(pDiskCol, pColVal); + if (code) goto _exit; } else { + code = tDiskColAddValue(pDiskCol, pColVal); + if (code) goto _exit; } pDiskCol->nVal++; @@ -104,35 +222,62 @@ static int32_t tDiskColAddVal4(SDiskCol *pDiskCol, SColVal *pColVal) { _exit: return code; } -static int32_t tDiskColAddVal5(SDiskCol *pDiskCol, SColVal *pColVal) { +static int32_t tDiskColAddVal5(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE|HAS_NONE int32_t code = 0; if (pColVal->isNull) { + pDiskCol->flag |= HAS_NULL; + + // convert bit1 to bit2 } else { + if (pColVal->isNone) { + // SET_BIT1(0); + } else { + // SET_BIT1(1); + } } + code = tDiskColAddValue(pDiskCol, pColVal); + if (code) goto _exit; pDiskCol->nVal++; +_exit: return code; } -static int32_t tDiskColAddVal6(SDiskCol *pDiskCol, SColVal *pColVal) { +static int32_t tDiskColAddVal6(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE|HAS_NULL int32_t code = 0; if (pColVal->isNone) { + pDiskCol->flag |= HAS_NONE; + // bit1 to bit2 } else { + if (pColVal->isNull) { + // SET_BIT1(0) + } else { + // SET_BIT1(1) + } } + code = tDiskColAddValue(pDiskCol, pColVal); + if (code) goto _exit; pDiskCol->nVal++; +_exit: return code; } -static int32_t tDiskColAddVal7(SDiskCol *pDiskCol, SColVal *pColVal) { +static int32_t tDiskColAddVal7(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE|HAS_NULL|HAS_NONE int32_t code = 0; if (pColVal->isNone) { + // set_bit2(0); } else if (pColVal->isNull) { + // set_bit2(1); } else { + // set_bit2(2); } + code = tDiskColAddValue(pDiskCol, pColVal); + if (code) goto _exit; pDiskCol->nVal++; +_exit: return code; } static int32_t (*tDiskColAddValImpl[])(SDiskCol *pDiskCol, SColVal *pColVal) = {