refact more code
This commit is contained in:
parent
46c7c0c8ce
commit
9f7771cde4
|
@ -130,7 +130,7 @@ typedef struct SCompressor SCompressor;
|
|||
|
||||
int32_t tCompressorCreate(SCompressor **ppCmprsor);
|
||||
int32_t tCompressorDestroy(SCompressor *pCmprsor);
|
||||
int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
|
||||
int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg);
|
||||
int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData);
|
||||
int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData);
|
||||
|
||||
|
|
|
@ -665,6 +665,7 @@ struct SDiskCol {
|
|||
const uint8_t *pBit;
|
||||
const uint8_t *pOff;
|
||||
const uint8_t *pVal;
|
||||
SColumnDataAgg agg;
|
||||
};
|
||||
|
||||
struct SDiskData {
|
||||
|
|
|
@ -35,16 +35,17 @@ struct SDiskDataBuilder {
|
|||
int64_t uid;
|
||||
int32_t nRow;
|
||||
uint8_t cmprAlg;
|
||||
uint8_t calcSma;
|
||||
SCompressor *pUidC;
|
||||
SCompressor *pVerC;
|
||||
SCompressor *pKeyC;
|
||||
int32_t nBuilder;
|
||||
SArray *aBuilder;
|
||||
SArray *aBuilder; // SArray<SDiskColBuilder>
|
||||
uint8_t *aBuf[2];
|
||||
};
|
||||
|
||||
// SDiskColBuilder ================================================
|
||||
static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) {
|
||||
static int32_t tDiskColBuilderInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) {
|
||||
int32_t code = 0;
|
||||
|
||||
pBuilder->cid = cid;
|
||||
|
@ -59,7 +60,7 @@ static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type,
|
|||
code = tCompressorCreate(&pBuilder->pOffC);
|
||||
if (code) return code;
|
||||
}
|
||||
code = tCompressorReset(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg);
|
||||
code = tCompressorInit(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg);
|
||||
if (code) return code;
|
||||
}
|
||||
|
||||
|
@ -67,13 +68,13 @@ static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type,
|
|||
code = tCompressorCreate(&pBuilder->pValC);
|
||||
if (code) return code;
|
||||
}
|
||||
code = tCompressorReset(pBuilder->pValC, type, cmprAlg);
|
||||
code = tCompressorInit(pBuilder->pValC, type, cmprAlg);
|
||||
if (code) return code;
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t tDiskColClear(SDiskColBuilder *pBuilder) {
|
||||
static int32_t tDiskColBuilderClear(SDiskColBuilder *pBuilder) {
|
||||
int32_t code = 0;
|
||||
|
||||
tFree(pBuilder->pBitMap);
|
||||
|
@ -396,33 +397,60 @@ static int32_t (*tDiskColAddValImpl[])(SDiskColBuilder *pBuilder, SColVal *pColV
|
|||
};
|
||||
|
||||
// SDiskDataBuilder ================================================
|
||||
int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) {
|
||||
int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder) {
|
||||
int32_t code = 0;
|
||||
|
||||
*ppBuilder = (SDiskDataBuilder *)taosMemoryCalloc(1, sizeof(SDiskDataBuilder));
|
||||
if (*ppBuilder == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) {
|
||||
if (pBuilder == NULL) return NULL;
|
||||
|
||||
if (pBuilder->pUidC) tCompressorDestroy(pBuilder->pUidC);
|
||||
if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC);
|
||||
if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC);
|
||||
|
||||
if (pBuilder->aBuilder) {
|
||||
for (int32_t iBuilder = 0; iBuilder < taosArrayGetSize(pBuilder->aBuilder); iBuilder++) {
|
||||
SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder);
|
||||
tDiskColBuilderClear(pDCBuilder);
|
||||
}
|
||||
taosArrayDestroy(pBuilder->aBuilder);
|
||||
}
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) {
|
||||
tFree(pBuilder->aBuf[iBuf]);
|
||||
}
|
||||
taosMemoryFree(pBuilder);
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg,
|
||||
uint8_t calcSma) {
|
||||
int32_t code = 0;
|
||||
|
||||
pBuilder->suid = pId->suid;
|
||||
pBuilder->uid = pId->uid;
|
||||
pBuilder->nRow = 0;
|
||||
pBuilder->cmprAlg = cmprAlg;
|
||||
pBuilder->calcSma = calcSma;
|
||||
|
||||
if (pBuilder->pUidC == NULL) {
|
||||
code = tCompressorCreate(&pBuilder->pUidC);
|
||||
if (code) return code;
|
||||
}
|
||||
code = tCompressorReset(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
|
||||
if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code;
|
||||
code = tCompressorInit(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
|
||||
if (code) return code;
|
||||
|
||||
if (pBuilder->pVerC == NULL) {
|
||||
code = tCompressorCreate(&pBuilder->pVerC);
|
||||
if (code) return code;
|
||||
}
|
||||
code = tCompressorReset(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
|
||||
if (pBuilder->pVerC == NULL && (code = tCompressorCreate(&pBuilder->pVerC))) return code;
|
||||
code = tCompressorInit(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg);
|
||||
if (code) return code;
|
||||
|
||||
if (pBuilder->pKeyC == NULL) {
|
||||
code = tCompressorCreate(&pBuilder->pKeyC);
|
||||
if (code) return code;
|
||||
}
|
||||
code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg);
|
||||
if (pBuilder->pKeyC == NULL && (code = tCompressorCreate(&pBuilder->pKeyC))) return code;
|
||||
code = tCompressorInit(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg);
|
||||
if (code) return code;
|
||||
|
||||
if (pBuilder->aBuilder == NULL) {
|
||||
|
@ -438,7 +466,7 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
|
|||
STColumn *pTColumn = &pTSchema->columns[iCol];
|
||||
|
||||
if (pBuilder->nBuilder >= taosArrayGetSize(pBuilder->aBuilder)) {
|
||||
SDiskColBuilder dc = (SDiskColBuilder){0};
|
||||
SDiskColBuilder dc = {0};
|
||||
if (taosArrayPush(pBuilder->aBuilder, &dc) == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return code;
|
||||
|
@ -447,7 +475,7 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
|
|||
|
||||
SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder);
|
||||
|
||||
code = tDiskColInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg);
|
||||
code = tDiskColBuilderInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg);
|
||||
if (code) return code;
|
||||
|
||||
pBuilder->nBuilder++;
|
||||
|
@ -456,27 +484,6 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pBuilder->pUidC) tCompressorDestroy(pBuilder->pUidC);
|
||||
if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC);
|
||||
if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC);
|
||||
|
||||
if (pBuilder->aBuilder) {
|
||||
for (int32_t iDiskColBuilder = 0; iDiskColBuilder < taosArrayGetSize(pBuilder->aBuilder); iDiskColBuilder++) {
|
||||
SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iDiskColBuilder);
|
||||
tDiskColClear(pDiskColBuilder);
|
||||
}
|
||||
taosArrayDestroy(pBuilder->aBuilder);
|
||||
}
|
||||
for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) {
|
||||
tFree(pBuilder->aBuf[iBuf]);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) {
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -605,6 +612,6 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) {
|
|||
// SDiskData ================================================
|
||||
int32_t tDiskDataDestroy(SDiskData *pDiskData) {
|
||||
int32_t code = 0;
|
||||
if (pDiskData->aDiskCol) taosArrayDestroy(pDiskData->aDiskCol);
|
||||
pDiskData->aDiskCol = taosArrayDestroy(pDiskData->aDiskCol);
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -128,7 +128,7 @@ _exit:
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) {
|
||||
static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) {
|
||||
int32_t code = 0;
|
||||
int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage);
|
||||
int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage);
|
||||
|
|
|
@ -1518,7 +1518,7 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
|
||||
int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) {
|
||||
int32_t code = 0;
|
||||
|
||||
pCmprsor->type = type;
|
||||
|
|
Loading…
Reference in New Issue