From 9f7771cde4a892407a3d8802488addaea79a9f19 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 22 Sep 2022 11:52:47 +0800 Subject: [PATCH] refact more code --- include/util/tcompression.h | 2 +- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/tsdb/tsdbDiskData.c | 97 ++++++++++--------- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 2 +- source/util/src/tcompression.c | 2 +- 5 files changed, 56 insertions(+), 48 deletions(-) diff --git a/include/util/tcompression.h b/include/util/tcompression.h index 8d7c607e6b..f22205d144 100644 --- a/include/util/tcompression.h +++ b/include/util/tcompression.h @@ -130,7 +130,7 @@ typedef struct SCompressor SCompressor; int32_t tCompressorCreate(SCompressor **ppCmprsor); int32_t tCompressorDestroy(SCompressor *pCmprsor); -int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); +int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg); int32_t tCompGen(SCompressor *pCmprsor, const uint8_t **ppData, int32_t *nData); int32_t tCompress(SCompressor *pCmprsor, const void *pData, int64_t nData); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index e67fa2ee2d..2121d779ab 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -665,6 +665,7 @@ struct SDiskCol { const uint8_t *pBit; const uint8_t *pOff; const uint8_t *pVal; + SColumnDataAgg agg; }; struct SDiskData { diff --git a/source/dnode/vnode/src/tsdb/tsdbDiskData.c b/source/dnode/vnode/src/tsdb/tsdbDiskData.c index 9b5934a04f..9e2f8eec7c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDiskData.c +++ b/source/dnode/vnode/src/tsdb/tsdbDiskData.c @@ -35,16 +35,17 @@ struct SDiskDataBuilder { int64_t uid; int32_t nRow; uint8_t cmprAlg; + uint8_t calcSma; SCompressor *pUidC; SCompressor *pVerC; SCompressor *pKeyC; int32_t nBuilder; - SArray *aBuilder; + SArray *aBuilder; // SArray uint8_t *aBuf[2]; }; // SDiskColBuilder ================================================ -static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) { +static int32_t tDiskColBuilderInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, uint8_t cmprAlg) { int32_t code = 0; pBuilder->cid = cid; @@ -59,7 +60,7 @@ static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, code = tCompressorCreate(&pBuilder->pOffC); if (code) return code; } - code = tCompressorReset(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg); + code = tCompressorInit(pBuilder->pOffC, TSDB_DATA_TYPE_INT, cmprAlg); if (code) return code; } @@ -67,13 +68,13 @@ static int32_t tDiskColInit(SDiskColBuilder *pBuilder, int16_t cid, int8_t type, code = tCompressorCreate(&pBuilder->pValC); if (code) return code; } - code = tCompressorReset(pBuilder->pValC, type, cmprAlg); + code = tCompressorInit(pBuilder->pValC, type, cmprAlg); if (code) return code; return code; } -static int32_t tDiskColClear(SDiskColBuilder *pBuilder) { +static int32_t tDiskColBuilderClear(SDiskColBuilder *pBuilder) { int32_t code = 0; tFree(pBuilder->pBitMap); @@ -396,33 +397,60 @@ static int32_t (*tDiskColAddValImpl[])(SDiskColBuilder *pBuilder, SColVal *pColV }; // SDiskDataBuilder ================================================ -int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg) { +int32_t tDiskDataBuilderCreate(SDiskDataBuilder **ppBuilder) { + int32_t code = 0; + + *ppBuilder = (SDiskDataBuilder *)taosMemoryCalloc(1, sizeof(SDiskDataBuilder)); + if (*ppBuilder == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + return code; +} + +void *tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) { + if (pBuilder == NULL) return NULL; + + if (pBuilder->pUidC) tCompressorDestroy(pBuilder->pUidC); + if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC); + if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC); + + if (pBuilder->aBuilder) { + for (int32_t iBuilder = 0; iBuilder < taosArrayGetSize(pBuilder->aBuilder); iBuilder++) { + SDiskColBuilder *pDCBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iBuilder); + tDiskColBuilderClear(pDCBuilder); + } + taosArrayDestroy(pBuilder->aBuilder); + } + for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { + tFree(pBuilder->aBuf[iBuf]); + } + taosMemoryFree(pBuilder); + + return NULL; +} + +int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TABLEID *pId, uint8_t cmprAlg, + uint8_t calcSma) { int32_t code = 0; pBuilder->suid = pId->suid; pBuilder->uid = pId->uid; pBuilder->nRow = 0; pBuilder->cmprAlg = cmprAlg; + pBuilder->calcSma = calcSma; - if (pBuilder->pUidC == NULL) { - code = tCompressorCreate(&pBuilder->pUidC); - if (code) return code; - } - code = tCompressorReset(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); + if (pBuilder->pUidC == NULL && (code = tCompressorCreate(&pBuilder->pUidC))) return code; + code = tCompressorInit(pBuilder->pUidC, TSDB_DATA_TYPE_BIGINT, cmprAlg); if (code) return code; - if (pBuilder->pVerC == NULL) { - code = tCompressorCreate(&pBuilder->pVerC); - if (code) return code; - } - code = tCompressorReset(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg); + if (pBuilder->pVerC == NULL && (code = tCompressorCreate(&pBuilder->pVerC))) return code; + code = tCompressorInit(pBuilder->pVerC, TSDB_DATA_TYPE_BIGINT, cmprAlg); if (code) return code; - if (pBuilder->pKeyC == NULL) { - code = tCompressorCreate(&pBuilder->pKeyC); - if (code) return code; - } - code = tCompressorReset(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); + if (pBuilder->pKeyC == NULL && (code = tCompressorCreate(&pBuilder->pKeyC))) return code; + code = tCompressorInit(pBuilder->pKeyC, TSDB_DATA_TYPE_TIMESTAMP, cmprAlg); if (code) return code; if (pBuilder->aBuilder == NULL) { @@ -438,7 +466,7 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB STColumn *pTColumn = &pTSchema->columns[iCol]; if (pBuilder->nBuilder >= taosArrayGetSize(pBuilder->aBuilder)) { - SDiskColBuilder dc = (SDiskColBuilder){0}; + SDiskColBuilder dc = {0}; if (taosArrayPush(pBuilder->aBuilder, &dc) == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; return code; @@ -447,7 +475,7 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, pBuilder->nBuilder); - code = tDiskColInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg); + code = tDiskColBuilderInit(pDiskColBuilder, pTColumn->colId, pTColumn->type, cmprAlg); if (code) return code; pBuilder->nBuilder++; @@ -456,27 +484,6 @@ int32_t tDiskDataBuilderInit(SDiskDataBuilder *pBuilder, STSchema *pTSchema, TAB return code; } -int32_t tDiskDataBuilderDestroy(SDiskDataBuilder *pBuilder) { - int32_t code = 0; - - if (pBuilder->pUidC) tCompressorDestroy(pBuilder->pUidC); - if (pBuilder->pVerC) tCompressorDestroy(pBuilder->pVerC); - if (pBuilder->pKeyC) tCompressorDestroy(pBuilder->pKeyC); - - if (pBuilder->aBuilder) { - for (int32_t iDiskColBuilder = 0; iDiskColBuilder < taosArrayGetSize(pBuilder->aBuilder); iDiskColBuilder++) { - SDiskColBuilder *pDiskColBuilder = (SDiskColBuilder *)taosArrayGet(pBuilder->aBuilder, iDiskColBuilder); - tDiskColClear(pDiskColBuilder); - } - taosArrayDestroy(pBuilder->aBuilder); - } - for (int32_t iBuf = 0; iBuf < sizeof(pBuilder->aBuf) / sizeof(pBuilder->aBuf[0]); iBuf++) { - tFree(pBuilder->aBuf[iBuf]); - } - - return code; -} - int32_t tDiskDataBuilderAddRow(SDiskDataBuilder *pBuilder, TSDBROW *pRow, STSchema *pTSchema, TABLEID *pId) { int32_t code = 0; @@ -605,6 +612,6 @@ int32_t tGnrtDiskData(SDiskDataBuilder *pBuilder, SDiskData *pDiskData) { // SDiskData ================================================ int32_t tDiskDataDestroy(SDiskData *pDiskData) { int32_t code = 0; - if (pDiskData->aDiskCol) taosArrayDestroy(pDiskData->aDiskCol); + pDiskData->aDiskCol = taosArrayDestroy(pDiskData->aDiskCol); return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 8674ae571e..679f7c3457 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -128,7 +128,7 @@ _exit: return code; } -static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size) { +static int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size) { int32_t code = 0; int64_t fOffset = LOGIC_TO_FILE_OFFSET(offset, pFD->szPage); int64_t pgno = OFFSET_PGNO(fOffset, pFD->szPage); diff --git a/source/util/src/tcompression.c b/source/util/src/tcompression.c index cd9ff5e325..619ae8360d 100644 --- a/source/util/src/tcompression.c +++ b/source/util/src/tcompression.c @@ -1518,7 +1518,7 @@ int32_t tCompressorDestroy(SCompressor *pCmprsor) { return code; } -int32_t tCompressorReset(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { +int32_t tCompressorInit(SCompressor *pCmprsor, int8_t type, int8_t cmprAlg) { int32_t code = 0; pCmprsor->type = type;