more code

This commit is contained in:
Hongze Cheng 2022-09-21 15:00:32 +08:00
parent f51c86430a
commit e12ac1dd6f
1 changed files with 155 additions and 154 deletions

View File

@ -16,10 +16,10 @@
#include "tsdb.h" #include "tsdb.h"
typedef struct SDiskDataBuilder SDiskDataBuilder; typedef struct SDiskDataBuilder SDiskDataBuilder;
typedef struct SDiskCol SDiskCol; typedef struct SDiskColBuilder SDiskColBuilder;
typedef struct SDiskData SDiskData; typedef struct SDiskData SDiskData;
struct SDiskCol { struct SDiskColBuilder {
int16_t cid; int16_t cid;
int8_t type; int8_t type;
int8_t flag; int8_t flag;
@ -31,332 +31,332 @@ struct SDiskCol {
SCompressor *pValC; SCompressor *pValC;
}; };
// SDiskCol ================================================ // SDiskColBuilder ================================================
static int32_t tDiskColInit(SDiskCol *pDiskCol, int16_t cid, int8_t type, uint8_t cmprAlg) { static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) {
int32_t code = 0; int32_t code = 0;
pDiskCol->cid = cid; pBuilder->cid = cid;
pDiskCol->type = type; pBuilder->type = type;
pDiskCol->flag = 0; pBuilder->flag = 0;
pDiskCol->cmprAlg = cmprAlg; pBuilder->cmprAlg = cmprAlg;
pDiskCol->nVal = 0; pBuilder->nVal = 0;
pDiskCol->offset = 0; pBuilder->offset = 0;
if (IS_VAR_DATA_TYPE(type)) { if (IS_VAR_DATA_TYPE(type)) {
if (pDiskCol->pOffC == NULL) { if (pBuilder->pOffC == NULL) {
code = tCompressorCreate(&pDiskCol->pOffC); code = tCompressorCreate(&pBuilder->pOffC);
if (code) return code; if (code) return code;
} }
code = tCompressorReset(pDiskCol->pOffC, TSDB_DATA_TYPE_INT, cmprAlg); code = tCompressorReset(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg);
if (code) return code; if (code) return code;
} }
if (pDiskCol->pValC == NULL) { if (pBuilder->pValC == NULL) {
code = tCompressorCreate(&pDiskCol->pValC); code = tCompressorCreate(&pBuilder->pValC);
if (code) return code; if (code) return code;
} }
code = tCompressorReset(pDiskCol->pValC, type, cmprAlg); code = tCompressorReset(pBuilder->pValC, type, cmprAlg);
if (code) return code; if (code) return code;
return code; return code;
} }
static int32_t tDiskColClear(SDiskCol *pDiskCol) { static int32_t tDiskColClear(SDiskColBuilder *pBuilder) {
int32_t code = 0; int32_t code = 0;
tFree(pDiskCol->pBitMap); tFree(pBuilder->pBitMap);
if (pDiskCol->pOffC) tCompressorDestroy(pDiskCol->pOffC); if (pBuilder->pOffC) tCompressorDestroy(pBuilder->pOffC);
if (pDiskCol->pValC) tCompressorDestroy(pDiskCol->pValC); if (pBuilder->pValC) tCompressorDestroy(pBuilder->pValC);
return code; return code;
} }
static int32_t tDiskColToBinary(SDiskCol *pDiskCol, const uint8_t **ppData, int32_t *nData) { static int32_t tGnrtDiskCol(SDiskColBuilder *pBuilder, SBlockCol *pBlockCol, const uint8_t **ppData) {
int32_t code = 0; int32_t code = 0;
ASSERT(pDiskCol->flag && pDiskCol->flag != HAS_NONE); ASSERT(pBuilder->flag && pBuilder->flag != HAS_NONE);
if (pDiskCol->flag == HAS_NULL) { if (pBuilder->flag == HAS_NULL) {
return code; return code;
} }
// bitmap (todo) // bitmap (todo)
if (pDiskCol->flag != HAS_VALUE) { if (pBuilder->flag != HAS_VALUE) {
} }
// offset (todo) // offset (todo)
if (IS_VAR_DATA_TYPE(pDiskCol->type)) { if (IS_VAR_DATA_TYPE(pBuilder->type)) {
code = tCompGen(pDiskCol->pOffC, NULL /* todo */, NULL /* todo */); code = tCompGen(pBuilder->pOffC, NULL /* todo */, NULL /* todo */);
if (code) return code; if (code) return code;
} }
// value (todo) // value (todo)
if (pDiskCol->flag != (HAS_NULL | HAS_NONE)) { if (pBuilder->flag != (HAS_NULL | HAS_NONE)) {
code = tCompGen(pDiskCol->pValC, NULL /* todo */, NULL /* todo */); code = tCompGen(pBuilder->pValC, NULL /* todo */, NULL /* todo */);
if (code) return code; if (code) return code;
} }
return code; return code;
} }
static int32_t tDiskColAddValue(SDiskCol *pDiskCol, SColVal *pColVal) { static int32_t tDiskColAddValue(SDiskColBuilder *pBuilder, SColVal *pColVal) {
int32_t code = 0; int32_t code = 0;
if (IS_VAR_DATA_TYPE(pColVal->type)) { if (IS_VAR_DATA_TYPE(pColVal->type)) {
code = tCompress(pDiskCol->pOffC, &pDiskCol->offset, sizeof(int32_t)); code = tCompress(pBuilder->pOffC, &pBuilder->offset, sizeof(int32_t));
if (code) goto _exit; if (code) goto _exit;
pDiskCol->offset += pColVal->value.nData; pBuilder->offset += pColVal->value.nData;
} }
code = tCompress(pDiskCol->pValC, pColVal->value.pData, pColVal->value.nData /*TODO*/); code = tCompress(pBuilder->pValC, pColVal->value.pData, pColVal->value.nData /*TODO*/);
if (code) goto _exit; if (code) goto _exit;
_exit: _exit:
return code; return code;
} }
static int32_t tDiskColAddVal0(SDiskCol *pDiskCol, SColVal *pColVal) { // 0 static int32_t tDiskColAddVal0(SDiskColBuilder *pBuilder, SColVal *pColVal) { // 0
int32_t code = 0; int32_t code = 0;
if (pColVal->isNone) { if (pColVal->isNone) {
pDiskCol->flag = HAS_NONE; pBuilder->flag = HAS_NONE;
} else if (pColVal->isNull) { } else if (pColVal->isNull) {
pDiskCol->flag = HAS_NULL; pBuilder->flag = HAS_NULL;
} else { } else {
pDiskCol->flag = HAS_VALUE; pBuilder->flag = HAS_VALUE;
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
} }
pDiskCol->nVal++; pBuilder->nVal++;
_exit: _exit:
return code; return code;
} }
static int32_t tDiskColAddVal1(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_NONE static int32_t tDiskColAddVal1(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_NONE
int32_t code = 0; int32_t code = 0;
if (!pColVal->isNone) { if (!pColVal->isNone) {
// bit map // bit map
int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1); int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1);
code = tRealloc(&pDiskCol->pBitMap, nBit); code = tRealloc(&pBuilder->pBitMap, nBit);
if (code) goto _exit; if (code) goto _exit;
memset(pDiskCol->pBitMap, 0, nBit); memset(pBuilder->pBitMap, 0, nBit);
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1);
// value // value
if (pColVal->isNull) { if (pColVal->isNull) {
pDiskCol->flag |= HAS_NULL; pBuilder->flag |= HAS_NULL;
} else { } else {
pDiskCol->flag |= HAS_VALUE; pBuilder->flag |= HAS_VALUE;
SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
code = tDiskColAddValue(pDiskCol, &cv); code = tDiskColAddValue(pBuilder, &cv);
if (code) goto _exit; if (code) goto _exit;
} }
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
} }
} }
pDiskCol->nVal++; pBuilder->nVal++;
_exit: _exit:
return code; return code;
} }
static int32_t tDiskColAddVal2(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_NULL static int32_t tDiskColAddVal2(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_NULL
int32_t code = 0; int32_t code = 0;
if (!pColVal->isNull) { if (!pColVal->isNull) {
int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1); int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1);
code = tRealloc(&pDiskCol->pBitMap, nBit); code = tRealloc(&pBuilder->pBitMap, nBit);
if (code) goto _exit; if (code) goto _exit;
if (pColVal->isNone) { if (pColVal->isNone) {
pDiskCol->flag |= HAS_NONE; pBuilder->flag |= HAS_NONE;
memset(pDiskCol->pBitMap, 255, nBit); memset(pBuilder->pBitMap, 255, nBit);
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0);
} else { } else {
pDiskCol->flag |= HAS_VALUE; pBuilder->flag |= HAS_VALUE;
memset(pDiskCol->pBitMap, 0, nBit); memset(pBuilder->pBitMap, 0, nBit);
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1);
SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
code = tDiskColAddValue(pDiskCol, &cv); code = tDiskColAddValue(pBuilder, &cv);
if (code) goto _exit; if (code) goto _exit;
} }
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
} }
} }
pDiskCol->nVal++; pBuilder->nVal++;
_exit: _exit:
return code; return code;
} }
static int32_t tDiskColAddVal3(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_NULL|HAS_NONE static int32_t tDiskColAddVal3(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_NULL|HAS_NONE
int32_t code = 0; int32_t code = 0;
if (pColVal->isNone) { if (pColVal->isNone) {
code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1)); code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1));
if (code) goto _exit; if (code) goto _exit;
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0);
} else if (pColVal->isNull) { } else if (pColVal->isNull) {
code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1)); code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1));
if (code) goto _exit; if (code) goto _exit;
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1);
} else { } else {
pDiskCol->flag |= HAS_VALUE; pBuilder->flag |= HAS_VALUE;
uint8_t *pBitMap = NULL; uint8_t *pBitMap = NULL;
code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1)); code = tRealloc(&pBitMap, BIT2_SIZE(pBuilder->nVal + 1));
if (code) goto _exit; if (code) goto _exit;
for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal)); SET_BIT2(pBitMap, iVal, GET_BIT1(pBuilder->pBitMap, iVal));
} }
SET_BIT2(pBitMap, pDiskCol->nVal, 2); SET_BIT2(pBitMap, pBuilder->nVal, 2);
tFree(pDiskCol->pBitMap); tFree(pBuilder->pBitMap);
pDiskCol->pBitMap = pBitMap; pBuilder->pBitMap = pBitMap;
SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0}); SColVal cv = COL_VAL_VALUE(pColVal->cid, pColVal->type, (SValue){0});
for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
code = tDiskColAddValue(pDiskCol, &cv); code = tDiskColAddValue(pBuilder, &cv);
if (code) goto _exit; if (code) goto _exit;
} }
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
} }
pDiskCol->nVal++; pBuilder->nVal++;
_exit: _exit:
return code; return code;
} }
static int32_t tDiskColAddVal4(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE static int32_t tDiskColAddVal4(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_VALUE
int32_t code = 0; int32_t code = 0;
if (pColVal->isNone || pColVal->isNull) { if (pColVal->isNone || pColVal->isNull) {
if (pColVal->isNone) { if (pColVal->isNone) {
pDiskCol->flag |= HAS_NONE; pBuilder->flag |= HAS_NONE;
} else { } else {
pDiskCol->flag |= HAS_NULL; pBuilder->flag |= HAS_NULL;
} }
int32_t nBit = BIT1_SIZE(pDiskCol->nVal + 1); int32_t nBit = BIT1_SIZE(pBuilder->nVal + 1);
code = tRealloc(&pDiskCol->pBitMap, nBit); code = tRealloc(&pBuilder->pBitMap, nBit);
if (code) goto _exit; if (code) goto _exit;
memset(pDiskCol->pBitMap, 255, nBit); memset(pBuilder->pBitMap, 255, nBit);
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0);
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
} else { } else {
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
} }
pDiskCol->nVal++; pBuilder->nVal++;
_exit: _exit:
return code; return code;
} }
static int32_t tDiskColAddVal5(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE|HAS_NONE static int32_t tDiskColAddVal5(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_VALUE|HAS_NONE
int32_t code = 0; int32_t code = 0;
if (pColVal->isNull) { if (pColVal->isNull) {
pDiskCol->flag |= HAS_NULL; pBuilder->flag |= HAS_NULL;
uint8_t *pBitMap = NULL; uint8_t *pBitMap = NULL;
code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1)); code = tRealloc(&pBitMap, BIT2_SIZE(pBuilder->nVal + 1));
if (code) goto _exit; if (code) goto _exit;
for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal) ? 2 : 0); SET_BIT2(pBitMap, iVal, GET_BIT1(pBuilder->pBitMap, iVal) ? 2 : 0);
} }
SET_BIT2(pBitMap, pDiskCol->nVal, 1); SET_BIT2(pBitMap, pBuilder->nVal, 1);
tFree(pDiskCol->pBitMap); tFree(pBuilder->pBitMap);
pDiskCol->pBitMap = pBitMap; pBuilder->pBitMap = pBitMap;
} else { } else {
code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1)); code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1));
if (code) goto _exit; if (code) goto _exit;
if (pColVal->isNone) { if (pColVal->isNone) {
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0);
} else { } else {
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1);
} }
} }
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
pDiskCol->nVal++; pBuilder->nVal++;
_exit: _exit:
return code; return code;
} }
static int32_t tDiskColAddVal6(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE|HAS_NULL static int32_t tDiskColAddVal6(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_VALUE|HAS_NULL
int32_t code = 0; int32_t code = 0;
if (pColVal->isNone) { if (pColVal->isNone) {
pDiskCol->flag |= HAS_NONE; pBuilder->flag |= HAS_NONE;
uint8_t *pBitMap = NULL; uint8_t *pBitMap = NULL;
code = tRealloc(&pBitMap, BIT2_SIZE(pDiskCol->nVal + 1)); code = tRealloc(&pBitMap, BIT2_SIZE(pBuilder->nVal + 1));
if (code) goto _exit; if (code) goto _exit;
for (int32_t iVal = 0; iVal < pDiskCol->nVal; iVal++) { for (int32_t iVal = 0; iVal < pBuilder->nVal; iVal++) {
SET_BIT2(pBitMap, iVal, GET_BIT1(pDiskCol->pBitMap, iVal) ? 2 : 1); SET_BIT2(pBitMap, iVal, GET_BIT1(pBuilder->pBitMap, iVal) ? 2 : 1);
} }
SET_BIT2(pBitMap, pDiskCol->nVal, 0); SET_BIT2(pBitMap, pBuilder->nVal, 0);
tFree(pDiskCol->pBitMap); tFree(pBuilder->pBitMap);
pDiskCol->pBitMap = pBitMap; pBuilder->pBitMap = pBitMap;
} else { } else {
code = tRealloc(&pDiskCol->pBitMap, BIT1_SIZE(pDiskCol->nVal + 1)); code = tRealloc(&pBuilder->pBitMap, BIT1_SIZE(pBuilder->nVal + 1));
if (code) goto _exit; if (code) goto _exit;
if (pColVal->isNull) { if (pColVal->isNull) {
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 0); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 0);
} else { } else {
SET_BIT1(pDiskCol->pBitMap, pDiskCol->nVal, 1); SET_BIT1(pBuilder->pBitMap, pBuilder->nVal, 1);
} }
} }
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
pDiskCol->nVal++; pBuilder->nVal++;
_exit: _exit:
return code; return code;
} }
static int32_t tDiskColAddVal7(SDiskCol *pDiskCol, SColVal *pColVal) { // HAS_VALUE|HAS_NULL|HAS_NONE static int32_t tDiskColAddVal7(SDiskColBuilder *pBuilder, SColVal *pColVal) { // HAS_VALUE|HAS_NULL|HAS_NONE
int32_t code = 0; int32_t code = 0;
code = tRealloc(&pDiskCol->pBitMap, BIT2_SIZE(pDiskCol->nVal + 1)); code = tRealloc(&pBuilder->pBitMap, BIT2_SIZE(pBuilder->nVal + 1));
if (code) goto _exit; if (code) goto _exit;
if (pColVal->isNone) { if (pColVal->isNone) {
SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 0); SET_BIT2(pBuilder->pBitMap, pBuilder->nVal, 0);
} else if (pColVal->isNull) { } else if (pColVal->isNull) {
SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 1); SET_BIT2(pBuilder->pBitMap, pBuilder->nVal, 1);
} else { } else {
SET_BIT2(pDiskCol->pBitMap, pDiskCol->nVal, 2); SET_BIT2(pBuilder->pBitMap, pBuilder->nVal, 2);
} }
code = tDiskColAddValue(pDiskCol, pColVal); code = tDiskColAddValue(pBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
pDiskCol->nVal++; pBuilder->nVal++;
_exit: _exit:
return code; return code;
} }
static int32_t (*tDiskColAddValImpl[])(SDiskCol *pDiskCol, SColVal *pColVal) = { static int32_t (*tDiskColAddValImpl[])(SDiskColBuilder *pBuilder, SColVal *pColVal) = {
tDiskColAddVal0, // 0 tDiskColAddVal0, // 0
tDiskColAddVal1, // HAS_NONE tDiskColAddVal1, // HAS_NONE
tDiskColAddVal2, // HAS_NULL tDiskColAddVal2, // HAS_NULL
@ -377,7 +377,7 @@ struct SDiskDataBuilder {
SCompressor *pVerC; SCompressor *pVerC;
SCompressor *pKeyC; SCompressor *pKeyC;
int32_t nDiskCol; int32_t nDiskCol;
SArray *aDiskCol; SArray *aDiskColBuilder;
uint8_t *aBuf[2]; uint8_t *aBuf[2];
}; };
@ -410,9 +410,9 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg);
if (code) return code; if (code) return code;
if (pBuilder->aDiskCol == NULL) { if (pBuilder->aDiskColBuilder == NULL) {
pBuilder->aDiskCol = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskCol)); pBuilder->aDiskColBuilder = taosArrayInit(pTSchema->numOfCols - 1, sizeof(SDiskColBuilder));
if (pBuilder->aDiskCol == NULL) { if (pBuilder->aDiskColBuilder == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; return code;
} }
@ -422,17 +422,17 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) { for (int32_t iCol = 1; iCol < pTSchema->numOfCols; iCol++) {
STColumn *pTColumn = &pTSchema->columns[iCol]; STColumn *pTColumn = &pTSchema->columns[iCol];
if (pBuilder->nDiskCol >= taosArrayGetSize(pBuilder->aDiskCol)) { if (pBuilder->nDiskCol >= taosArrayGetSize(pBuilder->aDiskColBuilder)) {
SDiskCol dc = (SDiskCol){0}; SDiskColBuilder dc = (SDiskColBuilder){0};
if (taosArrayPush(pBuilder->aDiskCol, &dc) == NULL) { if (taosArrayPush(pBuilder->aDiskColBuilder, &dc) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
return code; return code;
} }
} }
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, pBuilder->nDiskCol); SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aDiskColBuilder, pBuilder->nDiskCol);
code = tDiskColInit(pDiskCol, pTColumn->colId, pTColumn->type, cmprAlg); code = tDiskColInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg);
if (code) return code; if (code) return code;
pBuilder->nDiskCol++; pBuilder->nDiskCol++;
@ -448,12 +448,13 @@ int32_t tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) {
if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC); if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC);
if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC); if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC);
if (pBuilder->aDiskCol) { if (pBuilder->aDiskColBuilder) {
for (int32_t iDiskCol = 0; iDiskCol < taosArrayGetSize(pBuilder->aDiskCol); iDiskCol++) { for (int32_t iDiskColBuilder = 0; iDiskColBuilder < taosArrayGetSize(pBuilder->aDiskColBuilder);
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); iDiskColBuilder++) {
tDiskColClear(pDiskCol); SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder);
tDiskColClear(pDiskColBuilder);
} }
taosArrayDestroy(pBuilder->aDiskCol); taosArrayDestroy(pBuilder->aDiskColBuilder);
} }
for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) {
tFree(pBuilder->aBuf[iBuf]); tFree(pBuilder->aBuf[iBuf]);
@ -494,19 +495,19 @@ int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSche
tRowIterInit(&iter, pRow, pTSchema); tRowIterInit(&iter, pRow, pTSchema);
SColVal *pColVal = tRowIterNext(&iter); SColVal *pColVal = tRowIterNext(&iter);
for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) { for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nDiskCol; iDiskColBuilder++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder);
while (pColVal && pColVal->cid < pDiskCol->cid) { while (pColVal && pColVal->cid < pDiskColBuilder->cid) {
pColVal = tRowIterNext(&iter); pColVal = tRowIterNext(&iter);
} }
if (pColVal == NULL || pColVal->cid > pDiskCol->cid) { if (pColVal == NULL || pColVal->cid > pDiskColBuilder->cid) {
SColVal cv = COL_VAL_NONE(pDiskCol->cid, pDiskCol->type); SColVal cv = COL_VAL_NONE(pDiskColBuilder->cid, pDiskColBuilder->type);
code = tDiskColAddValImpl[pDiskCol->flag](pDiskCol, &cv); code = tDiskColAddValImpl[pDiskColBuilder->flag](pDiskColBuilder, &cv);
if (code) goto _exit; if (code) goto _exit;
} else { } else {
code = tDiskColAddValImpl[pDiskCol->flag](pDiskCol, pColVal); code = tDiskColAddValImpl[pDiskColBuilder->flag](pDiskColBuilder, pColVal);
if (code) goto _exit; if (code) goto _exit;
pColVal = tRowIterNext(&iter); pColVal = tRowIterNext(&iter);
} }
@ -551,18 +552,18 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
if (code) return code; if (code) return code;
int32_t offset = 0; int32_t offset = 0;
for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) { for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nDiskCol; iDiskColBuilder++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder);
if (pDiskCol->flag == HAS_NONE) continue; if (pDiskColBuilder->flag == HAS_NONE) continue;
code = tDiskColToBinary(pDiskCol, NULL, NULL); code = tGnrtDiskCol(pDiskColBuilder, NULL, NULL);
if (code) return code; if (code) return code;
SBlockCol bCol = {.cid = pDiskCol->cid, SBlockCol bCol = {.cid = pDiskColBuilder->cid,
.type = pDiskCol->type, .type = pDiskColBuilder->type,
// .smaOn = , // .smaOn = ,
.flag = pDiskCol->flag, .flag = pDiskColBuilder->flag,
// .szOrigin = // .szOrigin =
// .szBitmap = // .szBitmap =
// .szOffset = // .szOffset =
@ -589,12 +590,12 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
n += hdr.szVer; n += hdr.szVer;
memcpy(pBuilder->aBuf[0] + n, pKey, hdr.szKey); memcpy(pBuilder->aBuf[0] + n, pKey, hdr.szKey);
n += hdr.szKey; n += hdr.szKey;
for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) { for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nDiskCol; iDiskColBuilder++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder);
n += tPutBlockCol(pBuilder->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */); n += tPutBlockCol(pBuilder->aBuf[0] + n, NULL /*pDiskCol->bCol (todo) */);
} }
for (int32_t iDiskCol = 0; iDiskCol < pBuilder->nDiskCol; iDiskCol++) { for (int32_t iDiskColBuilder = 0; iDiskColBuilder < pBuilder->nDiskCol; iDiskColBuilder++) {
SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskCol, iDiskCol); SDiskCol *pDiskCol = (SDiskCol *)taosArrayGet(pBuilder->aDiskColBuilder, iDiskColBuilder);
// memcpy(pDiskData->aBuf[0] + n, NULL, ); // memcpy(pDiskData->aBuf[0] + n, NULL, );
// n += 0; // n += 0;
} }