From 931749dca6e1f902be972b26958dca4ada3ee123 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 17 Jul 2024 16:15:19 +0800 Subject: [PATCH] refactor tsdb code --- include/util/tbuffer.inc | 17 +- include/util/tutil.h | 2 + source/dnode/vnode/src/inc/tsdb.h | 153 ++-- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 7 +- source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 37 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 835 ++++-------------- source/dnode/vnode/src/tsdb/tsdbUtil2.c | 258 +++--- 7 files changed, 387 insertions(+), 922 deletions(-) diff --git a/include/util/tbuffer.inc b/include/util/tbuffer.inc index 2c1405d6c5..595c1e0827 100644 --- a/include/util/tbuffer.inc +++ b/include/util/tbuffer.inc @@ -15,6 +15,7 @@ #include "taoserror.h" #include "tcoding.h" +#include "tutil.h" struct SBuffer { uint32_t size; @@ -67,8 +68,7 @@ static FORCE_INLINE int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capa } static FORCE_INLINE int32_t tBufferPut(SBuffer *buffer, const void *data, uint32_t size) { - int32_t code = tBufferEnsureCapacity(buffer, buffer->size + size); - if (code) return code; + TAOS_CHECK_RETURN(tBufferEnsureCapacity(buffer, buffer->size + size)); memcpy((char *)buffer->data + buffer->size, data, size); buffer->size += size; return 0; @@ -119,10 +119,8 @@ static FORCE_INLINE int32_t tBufferPutU16v(SBuffer *buffer, uint16_t value) { re static FORCE_INLINE int32_t tBufferPutU32v(SBuffer *buffer, uint32_t value) { return tBufferPutU64v(buffer, value); } static FORCE_INLINE int32_t tBufferPutU64v(SBuffer *buffer, uint64_t value) { - int32_t code; while (value >= 0x80) { - code = tBufferPutU8(buffer, (value & 0x7F) | 0x80); - if (code) return code; + TAOS_CHECK_RETURN(tBufferPutU8(buffer, (value & 0x7F) | 0x80)); value >>= 7; } return tBufferPutU8(buffer, value); @@ -141,8 +139,7 @@ static FORCE_INLINE int32_t tBufferPutI64v(SBuffer *buffer, int64_t value) { } static FORCE_INLINE int32_t tBufferPutBinary(SBuffer *buffer, const void *data, uint32_t size) { - int32_t code = tBufferPutU32v(buffer, size); - if (code) return code; + TAOS_CHECK_RETURN(tBufferPutU32v(buffer, size)); return tBufferPut(buffer, data, size); } @@ -324,8 +321,7 @@ static int32_t tBufferGetF32(SBufferReader *reader, float *value) { float f; uint32_t u; } u; - int32_t code = tBufferGetU32(reader, &u.u); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetU32(reader, &u.u)); if (value) { *value = u.f; } @@ -337,8 +333,7 @@ static int32_t tBufferGetF64(SBufferReader *reader, double *value) { double f; uint64_t u; } u; - int32_t code = tBufferGetU64(reader, &u.u); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetU64(reader, &u.u)); if (value) { *value = u.f; } diff --git a/include/util/tutil.h b/include/util/tutil.h index 31ce34f667..2aa28ac1df 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -172,6 +172,8 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen, } \ } while (0) +#define TAOS_UNUSED(expr) (void)(expr) + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 943ba099f6..21dd48919e 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -147,7 +147,6 @@ int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2); void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key); void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key); - // STSDBRowIter int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema); void tsdbRowClose(STSDBRowIter *pIter); @@ -223,11 +222,11 @@ void tMapDataReset(SMapData *pMapData); void tMapDataClear(SMapData *pMapData); int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)); int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo); -void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); +int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t (*tItemCmprFn)(const void *, const void *), void *pItem); int32_t tPutMapData(uint8_t *p, SMapData *pMapData); -int32_t tGetMapData(uint8_t *p, SMapData *pMapData); +int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int64_t *decodeSize); int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *), SArray **ppArray); // other @@ -245,10 +244,10 @@ void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive); -SArray * tsdbMemTableGetTbDataArray(SMemTable *pMemTable); +SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter); -void * tsdbTbDataIterDestroy(STbDataIter *pIter); +void *tsdbTbDataIterDestroy(STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum); @@ -301,7 +300,7 @@ int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_ // tsdbMerge.c ============================================================================================== typedef struct { - STsdb * tsdb; + STsdb *tsdb; int32_t fid; } SMergeArg; @@ -332,22 +331,22 @@ int32_t tsdbDataIterNext2(STsdbDataIter2 *pIter, STsdbFilterInfo *pFilterInfo); // structs ======================= struct STsdbFS { SDelFile *pDelFile; - SArray * aDFileSet; // SArray + SArray *aDFileSet; // SArray }; typedef struct { - rocksdb_t * db; - rocksdb_comparator_t * my_comparator; - rocksdb_cache_t * blockcache; + rocksdb_t *db; + rocksdb_comparator_t *my_comparator; + rocksdb_cache_t *blockcache; rocksdb_block_based_table_options_t *tableoptions; - rocksdb_options_t * options; - rocksdb_flushoptions_t * flushoptions; - rocksdb_writeoptions_t * writeoptions; - rocksdb_readoptions_t * readoptions; - rocksdb_writebatch_t * writebatch; - rocksdb_writebatch_t * rwritebatch; + rocksdb_options_t *options; + rocksdb_flushoptions_t *flushoptions; + rocksdb_writeoptions_t *writeoptions; + rocksdb_readoptions_t *readoptions; + rocksdb_writebatch_t *writebatch; + rocksdb_writebatch_t *rwritebatch; TdThreadMutex rMutex; - STSchema * pTSchema; + STSchema *pTSchema; } SRocksCache; typedef struct { @@ -358,26 +357,26 @@ typedef struct { typedef struct SCompMonitor SCompMonitor; struct STsdb { - char * path; - SVnode * pVnode; + char *path; + SVnode *pVnode; STsdbKeepCfg keepCfg; TdThreadMutex mutex; bool bgTaskDisabled; - SMemTable * mem; - SMemTable * imem; + SMemTable *mem; + SMemTable *imem; STsdbFS fs; // old - SLRUCache * lruCache; + SLRUCache *lruCache; SCacheFlushState flushState; TdThreadMutex lruMutex; - SLRUCache * biCache; + SLRUCache *biCache; TdThreadMutex biMutex; - SLRUCache * bCache; + SLRUCache *bCache; TdThreadMutex bMutex; - SLRUCache * pgCache; + SLRUCache *pgCache; TdThreadMutex pgMutex; struct STFileSystem *pFS; // new SRocksCache rCache; - SCompMonitor *pCompMonitor; + SCompMonitor *pCompMonitor; struct { SVHashTable *ht; SArray *arr; @@ -405,17 +404,17 @@ struct STbData { TSKEY minKey; TSKEY maxKey; SRWLatch lock; - SDelData * pHead; - SDelData * pTail; + SDelData *pHead; + SDelData *pTail; SMemSkipList sl; - STbData * next; + STbData *next; SRBTreeNode rbtn[1]; }; struct SMemTable { SRWLatch latch; - STsdb * pTsdb; - SVBufPool * pPool; + STsdb *pTsdb; + SVBufPool *pPool; volatile int32_t nRef; int64_t minVer; int64_t maxVer; @@ -425,7 +424,7 @@ struct SMemTable { int64_t nDel; int32_t nTbData; int32_t nBucket; - STbData ** aBucket; + STbData **aBucket; SRBTree tbDataTree[1]; }; @@ -434,7 +433,7 @@ struct TSDBROW { union { struct { int64_t version; - SRow * pTSRow; + SRow *pTSRow; }; struct { SBlockData *pBlockData; @@ -535,9 +534,9 @@ struct SBlockData { int64_t suid; // 0 means normal table block data, otherwise child table block data int64_t uid; // 0 means block data in .last file, otherwise in .data file int32_t nRow; // number of rows - int64_t * aUid; // uids of each row, only exist in block data in .last file (uid == 0) - int64_t * aVersion; // versions of each row - TSKEY * aTSKEY; // timestamp of each row + int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0) + int64_t *aVersion; // versions of each row + TSKEY *aTSKEY; // timestamp of each row int32_t nColData; SColData *aColData; }; @@ -548,10 +547,10 @@ struct TABLEID { }; struct STbDataIter { - STbData * pTbData; + STbData *pTbData; int8_t backward; SMemSkipListNode *pNode; - TSDBROW * pRow; + TSDBROW *pRow; TSDBROW row; }; @@ -629,9 +628,9 @@ struct SDFileSet { int32_t fid; SHeadFile *pHeadF; SDataFile *pDataF; - SSmaFile * pSmaF; + SSmaFile *pSmaF; uint8_t nSttF; - SSttFile * aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE]; + SSttFile *aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE]; }; struct STSDBRowIter { @@ -647,18 +646,18 @@ struct STSDBRowIter { struct SRowMerger { STSchema *pTSchema; int64_t version; - SArray * pArray; // SArray + SArray *pArray; // SArray }; typedef struct { - char * path; + char *path; int32_t szPage; int32_t flag; TdFilePtr pFD; int64_t pgno; - uint8_t * pBuf; + uint8_t *pBuf; int64_t szFile; - STsdb * pTsdb; + STsdb *pTsdb; const char *objName; uint8_t s3File; int32_t lcn; @@ -668,7 +667,7 @@ typedef struct { } STsdbFD; struct SDelFWriter { - STsdb * pTsdb; + STsdb *pTsdb; SDelFile fDel; STsdbFD *pWriteH; uint8_t *aBuf[1]; @@ -728,15 +727,15 @@ int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); // snap read struct STsdbReadSnap { - SMemTable * pMem; - SQueryNode * pNode; - SMemTable * pIMem; - SQueryNode * pINode; + SMemTable *pMem; + SQueryNode *pNode; + SMemTable *pIMem; + SQueryNode *pINode; TFileSetArray *pfSetArray; }; struct SDataFWriter { - STsdb * pTsdb; + STsdb *pTsdb; SDFileSet wSet; STsdbFD *pHeadFD; @@ -753,13 +752,13 @@ struct SDataFWriter { }; struct SDataFReader { - STsdb * pTsdb; + STsdb *pTsdb; SDFileSet *pSet; - STsdbFD * pHeadFD; - STsdbFD * pDataFD; - STsdbFD * pSmaFD; - STsdbFD * aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE]; - uint8_t * aBuf[3]; + STsdbFD *pHeadFD; + STsdbFD *pDataFD; + STsdbFD *pSmaFD; + STsdbFD *aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE]; + uint8_t *aBuf[3]; }; // NOTE: do NOT change the order of the fields @@ -794,10 +793,10 @@ typedef struct { typedef struct SSttBlockLoadInfo { SBlockDataInfo blockData[2]; // buffered block data - SArray * aSttBlk; + SArray *aSttBlk; int32_t currentLoadBlockIndex; - STSchema * pSchema; - int16_t * colIds; + STSchema *pSchema; + int16_t *colIds; int32_t numOfCols; bool checkRemainingRow; // todo: no assign value? bool isLast; @@ -834,7 +833,7 @@ struct SDiskData { const uint8_t *pUid; const uint8_t *pVer; const uint8_t *pKey; - SArray * aDiskCol; // SArray + SArray *aDiskCol; // SArray }; struct SDiskDataBuilder { @@ -847,15 +846,15 @@ struct SDiskDataBuilder { SCompressor *pVerC; SCompressor *pKeyC; int32_t nBuilder; - SArray * aBuilder; // SArray - uint8_t * aBuf[2]; + SArray *aBuilder; // SArray + uint8_t *aBuf[2]; SDiskData dd; SBlkInfo bi; }; struct SLDataIter { SRBTreeNode node; - SSttBlk * pSttBlk; + SSttBlk *pSttBlk; int64_t cid; // for debug purpose int8_t backward; int32_t iSttBlk; @@ -864,8 +863,8 @@ struct SLDataIter { uint64_t uid; STimeWindow timeWindow; SVersionRange verRange; - SSttBlockLoadInfo * pBlockLoadInfo; - SRowKey * pStartRowKey; // current row key + SSttBlockLoadInfo *pBlockLoadInfo; + SRowKey *pStartRowKey; // current row key bool ignoreEarlierTs; struct SSttFileReader *pReader; }; @@ -878,21 +877,21 @@ typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pS typedef struct SMergeTreeConf { int8_t backward; - STsdb * pTsdb; + STsdb *pTsdb; uint64_t suid; uint64_t uid; STimeWindow timewindow; SVersionRange verRange; bool strictTimeRange; - SArray * pSttFileBlockIterArray; - void * pCurrentFileset; - STSchema * pSchema; - int16_t * pCols; + SArray *pSttFileBlockIterArray; + void *pCurrentFileset; + STSchema *pSchema; + int16_t *pCols; int32_t numOfCols; - SRowKey * pCurRowKey; + SRowKey *pCurRowKey; _load_tomb_fn loadTombFn; - void * pReader; - void * idstr; + void *pReader; + void *idstr; bool rspRows; // response the rows in stt-file, if possible } SMergeTreeConf; @@ -1023,7 +1022,7 @@ struct STsdbDataIter2 { // TSDB_DATA_FILE_DATA_ITER struct { SDataFReader *pReader; - SArray * aBlockIdx; // SArray + SArray *aBlockIdx; // SArray SMapData mDataBlk; SBlockData bData; int32_t iBlockIdx; @@ -1035,7 +1034,7 @@ struct STsdbDataIter2 { struct { SDataFReader *pReader; int32_t iStt; - SArray * aSttBlk; + SArray *aSttBlk; SBlockData bData; int32_t iSttBlk; int32_t iRow; @@ -1043,8 +1042,8 @@ struct STsdbDataIter2 { // TSDB_TOMB_FILE_DATA_ITER struct { SDelFReader *pReader; - SArray * aDelIdx; - SArray * aDelData; + SArray *aDelIdx; + SArray *aDelData; int32_t iDelIdx; int32_t iDelData; } tIter; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 932bf2d92c..8cbcdbc34f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -803,11 +803,8 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m if (code) goto _err; // decode - int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk); - if (n < 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } + int64_t n; + TAOS_CHECK_GOTO(tGetMapData(pReader->aBuf[0], mDataBlk, &n), NULL, _err); ASSERT(n == size); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 09ab2243a0..1052d1c1a1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -37,8 +37,8 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * int8_t cmprAlg; int32_t szPage; SBuffer buffers[10]; - int32_t encryptAlgorithm; - char* encryptKey; + int32_t encryptAlgorithm; + char *encryptKey; // reader SArray *aBlockIdx; SMapData mDataBlk[1]; @@ -96,7 +96,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) { SDataBlk dataBlk[1]; - tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk); + TAOS_CHECK_GOTO(tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk), &lino, _exit); SBrinRecord record = { .suid = pBlockIdx->suid, @@ -139,8 +139,9 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * if (ctx->brinBlock->numOfRecords >= ctx->maxRow) { SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; - code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, - ctx->brinBlkArray, ctx->buffers, &range, ctx->encryptAlgorithm, ctx->encryptKey); + code = + tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, + ctx->brinBlkArray, ctx->buffers, &range, ctx->encryptAlgorithm, ctx->encryptKey); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -157,8 +158,8 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->encryptAlgorithm, ctx->encryptKey); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer, ctx->encryptAlgorithm, - ctx->encryptKey); + code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer, ctx->encryptAlgorithm, + ctx->encryptKey); TSDB_CHECK_CODE(code, lino, _exit); code = tsdbFsyncFile(ctx->fd, ctx->encryptAlgorithm, ctx->encryptKey); @@ -258,7 +259,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade struct { int32_t szPage; int32_t encryptAlgorithm; - char* encryptKey; + char *encryptKey; // writer STsdbFD *fd; TSttBlkArray sttBlkArray[1]; @@ -290,7 +291,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade } code = tsdbFileWriteSttBlk(ctx->fd, ctx->sttBlkArray, ctx->footer->sttBlkPtr, &fobj->f->size, ctx->encryptAlgorithm, - ctx->encryptKey); + ctx->encryptKey); TSDB_CHECK_CODE(code, lino, _exit1); code = tsdbFileWriteSttFooter(ctx->fd, ctx->footer, &fobj->f->size, ctx->encryptAlgorithm, ctx->encryptKey); @@ -442,7 +443,7 @@ static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **f uint8_t hdr[TSDB_FHDR_SIZE] = {0}; int32_t encryptAlgorithm = tsdb->pVnode->config.tsdbCfg.encryptAlgorithm; - char* encryptKey = tsdb->pVnode->config.tsdbCfg.encryptKey; + char *encryptKey = tsdb->pVnode->config.tsdbCfg.encryptKey; code = tsdbWriteFile(fd[0], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey); TSDB_CHECK_CODE(code, lino, _exit); @@ -467,8 +468,8 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * int64_t minKey; int64_t maxKey; SBuffer buffers[10]; - int32_t encryptAlgorithm; - char* encryptKey; + int32_t encryptAlgorithm; + char *encryptKey; // reader SArray *aDelData; // writer @@ -538,20 +539,20 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * if (ctx->fd != NULL) { if (ctx->toStt) { - code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->sttFooter->tombBlkPtr, &ctx->fobj->f->size, + code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->sttFooter->tombBlkPtr, &ctx->fobj->f->size, ctx->encryptAlgorithm, ctx->encryptKey); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbFileWriteSttFooter(ctx->fd, ctx->sttFooter, &ctx->fobj->f->size, ctx->encryptAlgorithm, - ctx->encryptKey); + code = + tsdbFileWriteSttFooter(ctx->fd, ctx->sttFooter, &ctx->fobj->f->size, ctx->encryptAlgorithm, ctx->encryptKey); TSDB_CHECK_CODE(code, lino, _exit); } else { - code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->tombFooter->tombBlkPtr, &ctx->fobj->f->size, + code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->tombFooter->tombBlkPtr, &ctx->fobj->f->size, ctx->encryptAlgorithm, ctx->encryptKey); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbFileWriteTombFooter(ctx->fd, ctx->tombFooter, &ctx->fobj->f->size, ctx->encryptAlgorithm, - ctx->encryptKey); + code = tsdbFileWriteTombFooter(ctx->fd, ctx->tombFooter, &ctx->fobj->f->size, ctx->encryptAlgorithm, + ctx->encryptKey); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 58075cf0ac..025fbf3eb7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -36,183 +36,47 @@ void tMapDataClear(SMapData *pMapData) { pMapData->aOffset = NULL; } -#ifdef BUILD_NO_CALL -int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { - int32_t code = 0; - int32_t offset = pMapData->nData; - int32_t nItem = pMapData->nItem; - - pMapData->nItem++; - pMapData->nData += tPutItemFn(NULL, pItem); - - // alloc - code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem); - if (code) goto _exit; - code = tRealloc(&pMapData->pData, pMapData->nData); - if (code) goto _exit; - - // put - pMapData->aOffset[nItem] = offset; - tPutItemFn(pMapData->pData + offset, pItem); - -_exit: - return code; -} - -int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo) { - int32_t code = 0; - - pTo->nItem = pFrom->nItem; - pTo->nData = pFrom->nData; - code = tRealloc((uint8_t **)&pTo->aOffset, sizeof(int32_t) * pFrom->nItem); - if (code) goto _exit; - code = tRealloc(&pTo->pData, pFrom->nData); - if (code) goto _exit; - memcpy(pTo->aOffset, pFrom->aOffset, sizeof(int32_t) * pFrom->nItem); - memcpy(pTo->pData, pFrom->pData, pFrom->nData); - -_exit: - return code; -} - -int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), - int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) { - int32_t code = 0; - int32_t lidx = 0; - int32_t ridx = pMapData->nItem - 1; - int32_t midx; - int32_t c; - - while (lidx <= ridx) { - midx = (lidx + ridx) / 2; - - tMapDataGetItemByIdx(pMapData, midx, pItem, tGetItemFn); - - c = tItemCmprFn(pSearchItem, pItem); - if (c == 0) { - goto _exit; - } else if (c < 0) { - ridx = midx - 1; - } else { - lidx = midx + 1; - } +int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) { + if (idx < 0 || idx >= pMapData->nItem) { + return TSDB_CODE_OUT_OF_RANGE; } - code = TSDB_CODE_NOT_FOUND; - -_exit: - return code; -} -#endif - -void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) { - ASSERT(idx >= 0 && idx < pMapData->nItem); tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem); + return 0; } -#ifdef BUILD_NO_CALL -int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *), - SArray **ppArray) { - int32_t code = 0; - - SArray *pArray = taosArrayInit(pMapData->nItem, itemSize); - if (pArray == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - - for (int32_t i = 0; i < pMapData->nItem; i++) { - tMapDataGetItemByIdx(pMapData, i, taosArrayReserve(pArray, 1), tGetItemFn); - } - -_exit: - *ppArray = pArray; - return code; -} - -int32_t tPutMapData(uint8_t *p, SMapData *pMapData) { - int32_t n = 0; - - n += tPutI32v(p ? p + n : p, pMapData->nItem); - if (pMapData->nItem) { - int32_t lOffset = 0; - for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset); - lOffset = pMapData->aOffset[iItem]; - } - - n += tPutI32v(p ? p + n : p, pMapData->nData); - if (p) { - memcpy(p + n, pMapData->pData, pMapData->nData); - } - n += pMapData->nData; - } - - return n; -} -#endif - -int32_t tGetMapData(uint8_t *p, SMapData *pMapData) { - int32_t n = 0; - int32_t offset; +int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int64_t *decodeSize) { + int64_t size = 0; tMapDataReset(pMapData); - n += tGetI32v(p + n, &pMapData->nItem); + size += tGetI32v(p + size, &pMapData->nItem); if (pMapData->nItem) { - if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1; + if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) { + return TSDB_CODE_OUT_OF_MEMORY; + } int32_t lOffset = 0; for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tGetI32v(p + n, &pMapData->aOffset[iItem]); + size += tGetI32v(p + size, &pMapData->aOffset[iItem]); pMapData->aOffset[iItem] += lOffset; lOffset = pMapData->aOffset[iItem]; } - n += tGetI32v(p + n, &pMapData->nData); - if (tRealloc(&pMapData->pData, pMapData->nData)) return -1; - memcpy(pMapData->pData, p + n, pMapData->nData); - n += pMapData->nData; + size += tGetI32v(p + size, &pMapData->nData); + if (tRealloc(&pMapData->pData, pMapData->nData)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + memcpy(pMapData->pData, p + size, pMapData->nData); + size += pMapData->nData; } - return n; -} - -#ifdef BUILD_NO_CALL -// TABLEID ======================================================================= -int32_t tTABLEIDCmprFn(const void *p1, const void *p2) { - TABLEID *pId1 = (TABLEID *)p1; - TABLEID *pId2 = (TABLEID *)p2; - - if (pId1->suid < pId2->suid) { - return -1; - } else if (pId1->suid > pId2->suid) { - return 1; + if (decodeSize) { + *decodeSize = size; } - - if (pId1->uid < pId2->uid) { - return -1; - } else if (pId1->uid > pId2->uid) { - return 1; - } - return 0; } -// SBlockIdx ====================================================== -int32_t tPutBlockIdx(uint8_t *p, void *ph) { - int32_t n = 0; - SBlockIdx *pBlockIdx = (SBlockIdx *)ph; - - n += tPutI64(p ? p + n : p, pBlockIdx->suid); - n += tPutI64(p ? p + n : p, pBlockIdx->uid); - n += tPutI64v(p ? p + n : p, pBlockIdx->offset); - n += tPutI64v(p ? p + n : p, pBlockIdx->size); - - return n; -} -#endif - int32_t tGetBlockIdx(uint8_t *p, void *ph) { int32_t n = 0; SBlockIdx *pBlockIdx = (SBlockIdx *)ph; @@ -225,77 +89,6 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) { return n; } -#ifdef BUILD_NO_CALL -int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { - SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; - SBlockIdx *rBlockIdx = (SBlockIdx *)rhs; - - if (lBlockIdx->suid < rBlockIdx->suid) { - return -1; - } else if (lBlockIdx->suid > rBlockIdx->suid) { - return 1; - } - - if (lBlockIdx->uid < rBlockIdx->uid) { - return -1; - } else if (lBlockIdx->uid > rBlockIdx->uid) { - return 1; - } - - return 0; -} - -int32_t tCmprBlockL(void const *lhs, void const *rhs) { - SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; - SSttBlk *rBlockL = (SSttBlk *)rhs; - - if (lBlockIdx->suid < rBlockL->suid) { - return -1; - } else if (lBlockIdx->suid > rBlockL->suid) { - return 1; - } - - if (lBlockIdx->uid < rBlockL->minUid) { - return -1; - } else if (lBlockIdx->uid > rBlockL->maxUid) { - return 1; - } - - return 0; -} - -// SDataBlk ====================================================== -void tDataBlkReset(SDataBlk *pDataBlk) { - *pDataBlk = (SDataBlk){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; -} - -int32_t tPutDataBlk(uint8_t *p, void *ph) { - int32_t n = 0; - SDataBlk *pDataBlk = (SDataBlk *)ph; - - n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version); - n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts); - n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version); - n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts); - n += tPutI64v(p ? p + n : p, pDataBlk->minVer); - n += tPutI64v(p ? p + n : p, pDataBlk->maxVer); - n += tPutI32v(p ? p + n : p, pDataBlk->nRow); - n += tPutI8(p ? p + n : p, pDataBlk->hasDup); - n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock); - for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { - n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset); - n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock); - n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey); - } - if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) { - n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset); - n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size); - } - - return n; -} -#endif - int32_t tGetDataBlk(uint8_t *p, void *ph) { int32_t n = 0; SDataBlk *pDataBlk = (SDataBlk *)ph; @@ -325,48 +118,6 @@ int32_t tGetDataBlk(uint8_t *p, void *ph) { return n; } -#ifdef BUILD_NO_CALL -int32_t tDataBlkCmprFn(const void *p1, const void *p2) { - SDataBlk *pBlock1 = (SDataBlk *)p1; - SDataBlk *pBlock2 = (SDataBlk *)p2; - - if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) { - return -1; - } else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) { - return 1; - } - - return 0; -} - -bool tDataBlkHasSma(SDataBlk *pDataBlk) { - if (pDataBlk->nSubBlock > 1) return false; - if (pDataBlk->hasDup) return false; - - return pDataBlk->smaInfo.size > 0; -} - -// SSttBlk ====================================================== -int32_t tPutSttBlk(uint8_t *p, void *ph) { - int32_t n = 0; - SSttBlk *pSttBlk = (SSttBlk *)ph; - - n += tPutI64(p ? p + n : p, pSttBlk->suid); - n += tPutI64(p ? p + n : p, pSttBlk->minUid); - n += tPutI64(p ? p + n : p, pSttBlk->maxUid); - n += tPutI64v(p ? p + n : p, pSttBlk->minKey); - n += tPutI64v(p ? p + n : p, pSttBlk->maxKey); - n += tPutI64v(p ? p + n : p, pSttBlk->minVer); - n += tPutI64v(p ? p + n : p, pSttBlk->maxVer); - n += tPutI32v(p ? p + n : p, pSttBlk->nRow); - n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset); - n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock); - n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey); - - return n; -} -#endif - int32_t tGetSttBlk(uint8_t *p, void *ph) { int32_t n = 0; SSttBlk *pSttBlk = (SSttBlk *)ph; @@ -391,47 +142,42 @@ int32_t tGetSttBlk(uint8_t *p, void *ph) { static const int32_t BLOCK_WITH_ALG_VER = 2; int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) { - int32_t code; - ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); - if ((code = tBufferPutI16v(buffer, pBlockCol->cid))) return code; - if ((code = tBufferPutI8(buffer, pBlockCol->type))) return code; - if ((code = tBufferPutI8(buffer, pBlockCol->cflag))) return code; - if ((code = tBufferPutI8(buffer, pBlockCol->flag))) return code; - if ((code = tBufferPutI32v(buffer, pBlockCol->szOrigin))) return code; + TAOS_CHECK_RETURN(tBufferPutI16v(buffer, pBlockCol->cid)); + TAOS_CHECK_RETURN(tBufferPutI8(buffer, pBlockCol->type)); + TAOS_CHECK_RETURN(tBufferPutI8(buffer, pBlockCol->cflag)); + TAOS_CHECK_RETURN(tBufferPutI8(buffer, pBlockCol->flag)); + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->szOrigin)); if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_VALUE) { - if ((code = tBufferPutI32v(buffer, pBlockCol->szBitmap))) return code; + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->szBitmap)); } if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - if ((code = tBufferPutI32v(buffer, pBlockCol->szOffset))) return code; + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->szOffset)); } if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) { - if ((code = tBufferPutI32v(buffer, pBlockCol->szValue))) return code; + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->szValue)); } - - if ((code = tBufferPutI32v(buffer, pBlockCol->offset))) return code; + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->offset)); } if (ver >= BLOCK_WITH_ALG_VER) { - if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code; + TAOS_CHECK_RETURN(tBufferPutU32(buffer, pBlockCol->alg)); } else { - if ((code = tBufferPutU32(buffer, defaultCmprAlg))) return code; + TAOS_CHECK_RETURN(tBufferPutU32(buffer, defaultCmprAlg)); } return 0; } int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) { - int32_t code; - - if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code; - if ((code = tBufferGetI8(br, &pBlockCol->type))) return code; - if ((code = tBufferGetI8(br, &pBlockCol->cflag))) return code; - if ((code = tBufferGetI8(br, &pBlockCol->flag))) return code; - if ((code = tBufferGetI32v(br, &pBlockCol->szOrigin))) return code; + TAOS_CHECK_RETURN(tBufferGetI16v(br, &pBlockCol->cid)); + TAOS_CHECK_RETURN(tBufferGetI8(br, &pBlockCol->type)); + TAOS_CHECK_RETURN(tBufferGetI8(br, &pBlockCol->cflag)); + TAOS_CHECK_RETURN(tBufferGetI8(br, &pBlockCol->flag)); + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->szOrigin)); ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); @@ -442,22 +188,22 @@ int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint3 if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_VALUE) { - if ((code = tBufferGetI32v(br, &pBlockCol->szBitmap))) return code; + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->szBitmap)); } if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - if ((code = tBufferGetI32v(br, &pBlockCol->szOffset))) return code; + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->szOffset)); } if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) { - if ((code = tBufferGetI32v(br, &pBlockCol->szValue))) return code; + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->szValue)); } - if ((code = tBufferGetI32v(br, &pBlockCol->offset))) return code; + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->offset)); } if (ver >= BLOCK_WITH_ALG_VER) { - if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code; + TAOS_CHECK_RETURN(tBufferGetU32(br, &pBlockCol->alg)); } else { pBlockCol->alg = defaultCmprAlg; } @@ -465,50 +211,6 @@ int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint3 return 0; } -#ifdef BUILD_NO_CALL -int32_t tBlockColCmprFn(const void *p1, const void *p2) { - if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) { - return -1; - } else if (((SBlockCol *)p1)->cid > ((SBlockCol *)p2)->cid) { - return 1; - } - - return 0; -} - -// SDelIdx ====================================================== -int32_t tCmprDelIdx(void const *lhs, void const *rhs) { - SDelIdx *lDelIdx = (SDelIdx *)lhs; - SDelIdx *rDelIdx = (SDelIdx *)rhs; - - if (lDelIdx->suid < rDelIdx->suid) { - return -1; - } else if (lDelIdx->suid > rDelIdx->suid) { - return 1; - } - - if (lDelIdx->uid < rDelIdx->uid) { - return -1; - } else if (lDelIdx->uid > rDelIdx->uid) { - return 1; - } - - return 0; -} - -int32_t tPutDelIdx(uint8_t *p, void *ph) { - SDelIdx *pDelIdx = (SDelIdx *)ph; - int32_t n = 0; - - n += tPutI64(p ? p + n : p, pDelIdx->suid); - n += tPutI64(p ? p + n : p, pDelIdx->uid); - n += tPutI64v(p ? p + n : p, pDelIdx->offset); - n += tPutI64v(p ? p + n : p, pDelIdx->size); - - return n; -} -#endif - int32_t tGetDelIdx(uint8_t *p, void *ph) { SDelIdx *pDelIdx = (SDelIdx *)ph; int32_t n = 0; @@ -521,20 +223,6 @@ int32_t tGetDelIdx(uint8_t *p, void *ph) { return n; } -#ifdef BUILD_NO_CALL -// SDelData ====================================================== -int32_t tPutDelData(uint8_t *p, void *ph) { - SDelData *pDelData = (SDelData *)ph; - int32_t n = 0; - - n += tPutI64v(p ? p + n : p, pDelData->version); - n += tPutI64(p ? p + n : p, pDelData->sKey); - n += tPutI64(p ? p + n : p, pDelData->eKey); - - return n; -} -#endif - int32_t tGetDelData(uint8_t *p, void *ph) { SDelData *pDelData = (SDelData *)ph; int32_t n = 0; @@ -680,20 +368,16 @@ int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2) { // STSDBRowIter ====================================================== int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { - int32_t code = 0; - pIter->pRow = pRow; if (pRow->type == TSDBROW_ROW_FMT) { - code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter); - if (code) goto _exit; + TAOS_CHECK_RETURN(tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter)); } else if (pRow->type == TSDBROW_COL_FMT) { pIter->iColData = 0; } else { ASSERT(0); } -_exit: - return code; + return 0; } void tsdbRowClose(STSDBRowIter *pIter) { @@ -748,8 +432,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = key.ts})); if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; + return TSDB_CODE_OUT_OF_MEMORY; } // other @@ -760,7 +443,9 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) --iCol; continue; } else if (pTSchema->columns[jCol].colId > pTColumn->colId) { - taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); + if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } continue; } @@ -769,10 +454,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) uint8_t *pVal = pColVal->value.pData; pColVal->value.pData = NULL; - code = tRealloc(&pColVal->value.pData, pColVal->value.nData); - if (code) { - return TSDB_CODE_OUT_OF_MEMORY; - } + TAOS_CHECK_RETURN(tRealloc(&pColVal->value.pData, pColVal->value.nData)); if (pColVal->value.nData) { memcpy(pColVal->value.pData, pVal, pColVal->value.nData); @@ -780,14 +462,15 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) } if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - return code; + return TSDB_CODE_OUT_OF_MEMORY; } } for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) { pTColumn = &pMerger->pTSchema->columns[iCol]; - taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); + if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } pMerger->version = key.version; @@ -812,8 +495,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) if (IS_VAR_DATA_TYPE(pColVal->value.type)) { SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); if (!COL_VAL_IS_NULL(pColVal)) { - code = tRealloc(&pTColVal->value.pData, pColVal->value.nData); - if (code) return code; + TAOS_CHECK_RETURN(tRealloc(&pTColVal->value.pData, pColVal->value.nData)); pTColVal->value.nData = pColVal->value.nData; if (pTColVal->value.nData) { @@ -832,8 +514,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->value.type)) { - code = tRealloc(&tColVal->value.pData, pColVal->value.nData); - if (code) return code; + TAOS_CHECK_RETURN(tRealloc(&tColVal->value.pData, pColVal->value.nData)); tColVal->value.nData = pColVal->value.nData; if (pColVal->value.nData) { @@ -891,79 +572,8 @@ int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow); } -/* -// delete skyline ====================================================== -static int32_t tsdbMergeSkyline2(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) { - int32_t code = 0; - int32_t i1 = 0; - int32_t n1 = taosArrayGetSize(aSkyline1); - int32_t i2 = 0; - int32_t n2 = taosArrayGetSize(aSkyline2); - TSDBKEY *pSkyline1; - TSDBKEY *pSkyline2; - TSDBKEY item; - int64_t version1 = 0; - int64_t version2 = 0; - - ASSERT(n1 > 0 && n2 > 0); - - taosArrayClear(aSkyline); - - while (i1 < n1 && i2 < n2) { - pSkyline1 = (TSDBKEY *)taosArrayGet(aSkyline1, i1); - pSkyline2 = (TSDBKEY *)taosArrayGet(aSkyline2, i2); - - if (pSkyline1->ts < pSkyline2->ts) { - version1 = pSkyline1->version; - i1++; - } else if (pSkyline1->ts > pSkyline2->ts) { - version2 = pSkyline2->version; - i2++; - } else { - version1 = pSkyline1->version; - version2 = pSkyline2->version; - i1++; - i2++; - } - - item.ts = TMIN(pSkyline1->ts, pSkyline2->ts); - item.version = TMAX(version1, version2); - if (taosArrayPush(aSkyline, &item) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - } - - while (i1 < n1) { - pSkyline1 = (TSDBKEY *)taosArrayGet(aSkyline1, i1); - item.ts = pSkyline1->ts; - item.version = pSkyline1->version; - if (taosArrayPush(aSkyline, &item) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - i1++; - } - - while (i2 < n2) { - pSkyline2 = (TSDBKEY *)taosArrayGet(aSkyline2, i2); - item.ts = pSkyline2->ts; - item.version = pSkyline2->version; - if (taosArrayPush(aSkyline, &item) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; - } - i2++; - } - -_exit: - return code; -} -*/ - // delete skyline ====================================================== static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) { - int32_t code = 0; int32_t i1 = 0; int32_t n1 = taosArrayGetSize(pSkyline1); int32_t i2 = 0; @@ -1017,7 +627,7 @@ static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pS } pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem); - return code; + return 0; } int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) { @@ -1029,8 +639,12 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, if (sidx == eidx) { TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2); TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1); - taosArrayPush(pSkyline, &pItem1); - taosArrayPush(pSkyline, &pItem2); + if (taosArrayPush(pSkyline, &pItem1) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + if (taosArrayPush(pSkyline, &pItem2) == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } else { SArray *pSkyline1 = NULL; SArray *pSkyline2 = NULL; @@ -1043,11 +657,8 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, goto _clear; } - code = tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1); - if (code) goto _clear; - - code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2); - if (code) goto _clear; + TAOS_CHECK_GOTO(tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1), NULL, _clear); + TAOS_CHECK_GOTO(tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2), NULL, _clear); code = tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline); @@ -1063,72 +674,48 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr SDelData *pDelData; int32_t code = 0; int32_t dataNum = eidx - sidx + 1; - SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); - SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); + SArray *aTmpSkyline; + SArray *pSkyline; + + aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); + if (aTmpSkyline == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); + if (pSkyline) { + taosArrayDestroy(aTmpSkyline); + return TSDB_CODE_OUT_OF_MEMORY; + } taosArrayClear(aSkyline); for (int32_t i = sidx; i <= eidx; ++i) { pDelData = (SDelData *)taosArrayGet(aDelData, i); - taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}); - taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}); + if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _clear); + } + + if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _clear); + } } - code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline); - if (code) goto _clear; + TAOS_CHECK_GOTO(tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline), NULL, _clear); int32_t skylineNum = taosArrayGetSize(pSkyline); for (int32_t i = 0; i < skylineNum; ++i) { TSDBKEY *p = taosArrayGetP(pSkyline, i); - taosArrayPush(aSkyline, p); + if (taosArrayPush(aSkyline, p) == NULL) { + TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _clear); + } } _clear: taosArrayDestroy(aTmpSkyline); taosArrayDestroy(pSkyline); - return code; } -/* -int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) { - int32_t code = 0; - SDelData *pDelData; - int32_t midx; - - taosArrayClear(aSkyline); - if (sidx == eidx) { - pDelData = (SDelData *)taosArrayGet(aDelData, sidx); - taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}); - taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}); - } else { - SArray *aSkyline1 = NULL; - SArray *aSkyline2 = NULL; - - aSkyline1 = taosArrayInit(0, sizeof(TSDBKEY)); - aSkyline2 = taosArrayInit(0, sizeof(TSDBKEY)); - if (aSkyline1 == NULL || aSkyline2 == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _clear; - } - midx = (sidx + eidx) / 2; - - code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1); - if (code) goto _clear; - - code = tsdbBuildDeleteSkyline(aDelData, midx + 1, eidx, aSkyline2); - if (code) goto _clear; - - code = tsdbMergeSkyline(aSkyline1, aSkyline2, aSkyline); - - _clear: - taosArrayDestroy(aSkyline1); - taosArrayDestroy(aSkyline2); - } - - return code; -} -*/ - // SBlockData ====================================================== int32_t tBlockDataCreate(SBlockData *pBlockData) { pBlockData->suid = 0; @@ -1158,8 +745,6 @@ void tBlockDataDestroy(SBlockData *pBlockData) { } static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) { - int32_t code = 0; - if (pBlockData->nColData > nColData) { for (int32_t i = nColData; i < pBlockData->nColData; i++) { tColDataDestroy(&pBlockData->aColData[i]); @@ -1167,8 +752,7 @@ static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) } else if (pBlockData->nColData < nColData) { SColData *aColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SBlockData) * nColData); if (aColData == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _exit; + return TSDB_CODE_OUT_OF_MEMORY; } pBlockData->aColData = aColData; @@ -1176,12 +760,10 @@ static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) } pBlockData->nColData = nColData; -_exit: - return code; + return 0; } -int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) { - int32_t code = 0; +int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) { ASSERT(pId->suid || pId->uid); pBlockData->suid = pId->suid; @@ -1189,8 +771,7 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, pBlockData->nRow = 0; if (aCid) { - code = tBlockDataAdjustColData(pBlockData, nCid); - if (code) goto _exit; + TAOS_CHECK_RETURN(tBlockDataAdjustColData(pBlockData, nCid)); int32_t iColumn = 1; STColumn *pTColumn = &pTSchema->columns[iColumn]; @@ -1217,17 +798,14 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL; } } else { - code = tBlockDataAdjustColData(pBlockData, pTSchema->numOfCols - 1); - if (code) goto _exit; + TAOS_CHECK_RETURN(tBlockDataAdjustColData(pBlockData, pTSchema->numOfCols - 1)); for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { STColumn *pTColumn = &pTSchema->columns[iColData + 1]; tColDataInit(&pBlockData->aColData[iColData], pTColumn->colId, pTColumn->type, pTColumn->flags); } } - -_exit: - return code; + return 0; } void tBlockDataReset(SBlockData *pBlockData) { @@ -1309,43 +887,35 @@ _exit: } int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { - int32_t code = 0; - ASSERT(pBlockData->suid || pBlockData->uid); // uid if (pBlockData->uid == 0) { ASSERT(uid); - code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1)); - if (code) goto _exit; + TAOS_CHECK_RETURN(tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1))); pBlockData->aUid[pBlockData->nRow] = uid; } // version - code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1)); - if (code) goto _exit; + TAOS_CHECK_RETURN(tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1))); pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow); // timestamp - code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1)); - if (code) goto _exit; + TAOS_CHECK_RETURN(tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1))); pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); if (pRow->type == TSDBROW_ROW_FMT) { - code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */); - if (code) goto _exit; + TAOS_CHECK_RETURN( + tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */)); } else if (pRow->type == TSDBROW_COL_FMT) { - code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */); - if (code) goto _exit; + TAOS_CHECK_RETURN(tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */)); } else { ASSERT(0); } pBlockData->nRow++; -_exit: - return code; + return 0; } -int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { - int32_t code = 0; +int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { // version int64_t lversion = pBlockData->aVersion[pBlockData->nRow - 1]; int64_t rversion = TSDBROW_VERSION(pRow); @@ -1356,40 +926,18 @@ int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS // update other rows if (pRow->type == TSDBROW_ROW_FMT) { - code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, - (rversion > lversion) ? 1 : -1 /* update */); - if (code) goto _exit; + TAOS_CHECK_RETURN(tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, + (rversion > lversion) ? 1 : -1 /* update */)); } else if (pRow->type == TSDBROW_COL_FMT) { - code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1); - if (code) goto _exit; + TAOS_CHECK_RETURN( + tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1)); } else { ASSERT(0); } -_exit: - return code; + return 0; } -#ifdef BUILD_NO_CALL -int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) { - if (pBlockData->nRow == 0) { - return 1; - } else if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { - return pBlockData->nRow; - } else { - return pBlockData->nRow + 1; - } -} - -int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { - if (pBlockData->nRow > 0 && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { - return tBlockDataUpdateRow(pBlockData, pRow, pTSchema); - } else { - return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); - } -} -#endif - SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) { ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); int32_t lidx = 0; @@ -1422,7 +970,8 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB int32_t lino = 0; SColCompressInfo *pInfo = pCompr; - code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg); + + TAOS_CHECK_GOTO(tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg), &lino, _exit); SDiskDataHdr hdr = { .delimiter = TSDB_FILE_DLMT, @@ -1440,8 +989,7 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB // Key part tBufferClear(&buffers[1]); - code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo), &lino, _exit); // Regulart column part tBufferClear(&buffers[2]); @@ -1459,14 +1007,10 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB SColDataCompressInfo cinfo = { .cmprAlg = pInfo->defaultCmprAlg, }; - code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg); - if (code < 0) { - // - } + TAOS_UNUSED(tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg)); int32_t offset = buffers[3].size; - code = tColDataCompress(colData, &cinfo, &buffers[3], assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tColDataCompress(colData, &cinfo, &buffers[3], assist), &lino, _exit); SBlockCol blockCol = (SBlockCol){.cid = cinfo.columnId, .type = cinfo.dataType, @@ -1479,15 +1023,13 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB .offset = offset, .alg = cinfo.cmprAlg}; - code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit); } hdr.szBlkCol = buffers[2].size; // SDiskDataHdr part tBufferClear(&buffers[0]); - code = tPutDiskDataHdr(&buffers[0], &hdr); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tPutDiskDataHdr(&buffers[0], &hdr), &lino, _exit); _exit: return code; @@ -1500,8 +1042,7 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * SCompressInfo cinfo; // SDiskDataHdr - code = tGetDiskDataHdr(br, &hdr); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tGetDiskDataHdr(br, &hdr), &lino, _exit); tBlockDataReset(blockData); blockData->suid = hdr.suid; @@ -1509,8 +1050,7 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * blockData->nRow = hdr.nRow; // Key part - code = tBlockDataDecompressKeyPart(&hdr, br, blockData, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, br, blockData, assist), &lino, _exit); // Column part SBufferReader br2 = *br; @@ -1518,11 +1058,11 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) { SBlockCol blockCol; - code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg); + TAOS_CHECK_GOTO(tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit); + if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg; - TSDB_CHECK_CODE(code, lino, _exit); - code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist); - TSDB_CHECK_CODE(code, lino, _exit); + + TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist), &lino, _exit); } _exit: @@ -1531,28 +1071,26 @@ _exit: // SDiskDataHdr ============================== int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) { - int32_t code; - - if ((code = tBufferPutU32(buffer, pHdr->delimiter))) return code; - if ((code = tBufferPutU32v(buffer, pHdr->fmtVer))) return code; - if ((code = tBufferPutI64(buffer, pHdr->suid))) return code; - if ((code = tBufferPutI64(buffer, pHdr->uid))) return code; - if ((code = tBufferPutI32v(buffer, pHdr->szUid))) return code; - if ((code = tBufferPutI32v(buffer, pHdr->szVer))) return code; - if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code; - if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code; - if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code; + TAOS_CHECK_RETURN(tBufferPutU32(buffer, pHdr->delimiter)); + TAOS_CHECK_RETURN(tBufferPutU32v(buffer, pHdr->fmtVer)); + TAOS_CHECK_RETURN(tBufferPutI64(buffer, pHdr->suid)); + TAOS_CHECK_RETURN(tBufferPutI64(buffer, pHdr->uid)); + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->szUid)); + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->szVer)); + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->szKey)); + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->szBlkCol)); + TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->nRow)); if (pHdr->fmtVer < 2) { - if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code; + TAOS_CHECK_RETURN(tBufferPutI8(buffer, pHdr->cmprAlg)); } else if (pHdr->fmtVer == 2) { - if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code; + TAOS_CHECK_RETURN(tBufferPutU32(buffer, pHdr->cmprAlg)); } else { // more data fmt ver } if (pHdr->fmtVer >= 1) { - if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code; + TAOS_CHECK_RETURN(tBufferPutI8(buffer, pHdr->numOfPKs)); for (int i = 0; i < pHdr->numOfPKs; i++) { - if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) return code; + TAOS_CHECK_RETURN(tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg)); } } @@ -1560,32 +1098,28 @@ int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) { } int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) { - int32_t code; - - if ((code = tBufferGetU32(br, &pHdr->delimiter))) return code; - if ((code = tBufferGetU32v(br, &pHdr->fmtVer))) return code; - if ((code = tBufferGetI64(br, &pHdr->suid))) return code; - if ((code = tBufferGetI64(br, &pHdr->uid))) return code; - if ((code = tBufferGetI32v(br, &pHdr->szUid))) return code; - if ((code = tBufferGetI32v(br, &pHdr->szVer))) return code; - if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code; - if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code; - if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code; + TAOS_CHECK_RETURN(tBufferGetU32(br, &pHdr->delimiter)); + TAOS_CHECK_RETURN(tBufferGetU32v(br, &pHdr->fmtVer)); + TAOS_CHECK_RETURN(tBufferGetI64(br, &pHdr->suid)); + TAOS_CHECK_RETURN(tBufferGetI64(br, &pHdr->uid)); + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->szUid)); + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->szVer)); + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->szKey)); + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->szBlkCol)); + TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->nRow)); if (pHdr->fmtVer < 2) { int8_t cmprAlg = 0; - if ((code = tBufferGetI8(br, &cmprAlg))) return code; + TAOS_CHECK_RETURN(tBufferGetI8(br, &cmprAlg)); pHdr->cmprAlg = cmprAlg; } else if (pHdr->fmtVer == 2) { - if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code; + TAOS_CHECK_RETURN(tBufferGetU32(br, &pHdr->cmprAlg)); } else { // more data fmt ver } if (pHdr->fmtVer >= 1) { - if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code; + TAOS_CHECK_RETURN(tBufferGetI8(br, &pHdr->numOfPKs)); for (int i = 0; i < pHdr->numOfPKs; i++) { - if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) { - return code; - } + TAOS_CHECK_RETURN(tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg)); } } else { pHdr->numOfPKs = 0; @@ -1596,26 +1130,20 @@ int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) { // ALGORITHM ============================== int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg) { - int32_t code; - - if ((code = tBufferPutI16v(buffer, pColAgg->colId))) return code; - if ((code = tBufferPutI16v(buffer, pColAgg->numOfNull))) return code; - if ((code = tBufferPutI64(buffer, pColAgg->sum))) return code; - if ((code = tBufferPutI64(buffer, pColAgg->max))) return code; - if ((code = tBufferPutI64(buffer, pColAgg->min))) return code; - + TAOS_CHECK_RETURN(tBufferPutI16v(buffer, pColAgg->colId)); + TAOS_CHECK_RETURN(tBufferPutI16v(buffer, pColAgg->numOfNull)); + TAOS_CHECK_RETURN(tBufferPutI64(buffer, pColAgg->sum)); + TAOS_CHECK_RETURN(tBufferPutI64(buffer, pColAgg->max)); + TAOS_CHECK_RETURN(tBufferPutI64(buffer, pColAgg->min)); return 0; } int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) { - int32_t code; - - if ((code = tBufferGetI16v(br, &pColAgg->colId))) return code; - if ((code = tBufferGetI16v(br, &pColAgg->numOfNull))) return code; - if ((code = tBufferGetI64(br, &pColAgg->sum))) return code; - if ((code = tBufferGetI64(br, &pColAgg->max))) return code; - if ((code = tBufferGetI64(br, &pColAgg->min))) return code; - + TAOS_CHECK_RETURN(tBufferGetI16v(br, &pColAgg->colId)); + TAOS_CHECK_RETURN(tBufferGetI16v(br, &pColAgg->numOfNull)); + TAOS_CHECK_RETURN(tBufferGetI64(br, &pColAgg->sum)); + TAOS_CHECK_RETURN(tBufferGetI64(br, &pColAgg->max)); + TAOS_CHECK_RETURN(tBufferGetI64(br, &pColAgg->min)); return 0; } @@ -1632,8 +1160,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = sizeof(int64_t) * bData->nRow, }; - code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist), &lino, _exit); hdr->szUid = cinfo.compressedSize; } @@ -1643,8 +1170,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = sizeof(int64_t) * bData->nRow, }; - code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist), &lino, _exit); hdr->szVer = cinfo.compressedSize; // ts @@ -1654,8 +1180,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .originalSize = sizeof(TSKEY) * bData->nRow, }; - code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist), &lino, _exit); hdr->szKey = cinfo.compressedSize; // primary keys @@ -1672,14 +1197,9 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S SColDataCompressInfo info = { .cmprAlg = hdr->cmprAlg, }; - code = tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg); - if (code < 0) { - // do nothing - } else { - } + TAOS_UNUSED(tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg)); - code = tColDataCompress(colData, &info, buffer, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tColDataCompress(colData, &info, buffer, assist), &lino, _exit); *blockCol = (SBlockCol){ .cid = info.columnId, @@ -1706,8 +1226,8 @@ int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *bl SColData *colData; - code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData), &lino, + _exit); // ASSERT(blockCol->flag != HAS_NONE); @@ -1737,8 +1257,7 @@ int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *bl break; } - code = tColDataDecompress(BR_PTR(br), &info, colData, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tColDataDecompress(BR_PTR(br), &info, colData, assist), &lino, _exit); br->offset += blockCol->szBitmap + blockCol->szOffset + blockCol->szValue; _exit: @@ -1760,10 +1279,8 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, .originalSize = sizeof(int64_t) * hdr->nRow, }; - code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize); - TSDB_CHECK_CODE(code, lino, _exit); - code = tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize), &lino, _exit); + TAOS_CHECK_GOTO(tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist), &lino, _exit); br->offset += cinfo.compressedSize; } @@ -1774,10 +1291,8 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, .compressedSize = hdr->szVer, .originalSize = sizeof(int64_t) * hdr->nRow, }; - code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize); - TSDB_CHECK_CODE(code, lino, _exit); - code = tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize), &lino, _exit); + TAOS_CHECK_GOTO(tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist), &lino, _exit); br->offset += cinfo.compressedSize; // ts @@ -1787,10 +1302,8 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, .compressedSize = hdr->szKey, .originalSize = sizeof(TSKEY) * hdr->nRow, }; - code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize); - TSDB_CHECK_CODE(code, lino, _exit); - code = tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize), &lino, _exit); + TAOS_CHECK_GOTO(tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist), &lino, _exit); br->offset += cinfo.compressedSize; // primary keys @@ -1800,8 +1313,7 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, ASSERT(blockCol->flag == HAS_VALUE); ASSERT(blockCol->cflag & COL_IS_KEY); - code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist); - TSDB_CHECK_CODE(code, lino, _exit); + TAOS_CHECK_GOTO(tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist), &lino, _exit); } _exit: @@ -1809,14 +1321,19 @@ _exit: } int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) { - if (set == NULL) return -1; + if (set == NULL) { + return TSDB_CODE_INVALID_PARA; + } uint32_t *ret = taosHashGet(set, &colId, sizeof(colId)); - if (ret == NULL) return -1; + if (ret == NULL) { + return TSDB_CODE_NOT_FOUND; + } *alg = *ret; return 0; } + uint32_t tsdbCvtTimestampAlg(uint32_t alg) { DEFINE_VAR(alg) diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index 9b88500ad1..97fea598cd 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -17,17 +17,24 @@ // SDelBlock ---------- int32_t tTombBlockInit(STombBlock *tombBlock) { + int32_t code; + tombBlock->numOfRecords = 0; for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - tBufferInit(&tombBlock->buffers[i]); + TAOS_CHECK_GOTO(tBufferInit(&tombBlock->buffers[i]), NULL, _exit); } - return 0; + +_exit: + if (code) { + TAOS_UNUSED(tTombBlockDestroy(tombBlock)); + } + return code; } int32_t tTombBlockDestroy(STombBlock *tombBlock) { tombBlock->numOfRecords = 0; for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - tBufferDestroy(&tombBlock->buffers[i]); + TAOS_UNUSED(tBufferDestroy(&tombBlock->buffers[i])); } return 0; } @@ -35,15 +42,14 @@ int32_t tTombBlockDestroy(STombBlock *tombBlock) { int32_t tTombBlockClear(STombBlock *tombBlock) { tombBlock->numOfRecords = 0; for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - tBufferClear(&tombBlock->buffers[i]); + TAOS_UNUSED(tBufferClear(&tombBlock->buffers[i])); } return 0; } int32_t tTombBlockPut(STombBlock *tombBlock, const STombRecord *record) { for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - int32_t code = tBufferPutI64(&tombBlock->buffers[i], record->data[i]); - if (code) return code; + TAOS_CHECK_RETURN(tBufferPutI64(&tombBlock->buffers[i], record->data[i])); } tombBlock->numOfRecords++; return 0; @@ -56,8 +62,7 @@ int32_t tTombBlockGet(STombBlock *tombBlock, int32_t idx, STombRecord *record) { for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * idx, &tombBlock->buffers[i]); - int32_t code = tBufferGetI64(&br, &record->data[i]); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&br, &record->data[i])); } return 0; } @@ -74,27 +79,34 @@ int32_t tTombRecordCompare(const STombRecord *r1, const STombRecord *r2) { // STbStatisBlock ---------- int32_t tStatisBlockInit(STbStatisBlock *statisBlock) { + int32_t code; + statisBlock->numOfPKs = 0; statisBlock->numOfRecords = 0; for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { - tBufferInit(&statisBlock->buffers[i]); + TAOS_CHECK_GOTO(tBufferInit(&statisBlock->buffers[i]), NULL, _exit); } for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { - tValueColumnInit(&statisBlock->firstKeyPKs[i]); - tValueColumnInit(&statisBlock->lastKeyPKs[i]); + TAOS_CHECK_GOTO(tValueColumnInit(&statisBlock->firstKeyPKs[i]), NULL, _exit); + TAOS_CHECK_GOTO(tValueColumnInit(&statisBlock->lastKeyPKs[i]), NULL, _exit); } - return 0; + +_exit: + if (code) { + TAOS_UNUSED(tStatisBlockDestroy(statisBlock)); + } + return code; } int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock) { statisBlock->numOfPKs = 0; statisBlock->numOfRecords = 0; for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { - tBufferDestroy(&statisBlock->buffers[i]); + TAOS_UNUSED(tBufferDestroy(&statisBlock->buffers[i])); } for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { - tValueColumnDestroy(&statisBlock->firstKeyPKs[i]); - tValueColumnDestroy(&statisBlock->lastKeyPKs[i]); + TAOS_UNUSED(tValueColumnDestroy(&statisBlock->firstKeyPKs[i])); + TAOS_UNUSED(tValueColumnDestroy(&statisBlock->lastKeyPKs[i])); } return 0; } @@ -103,17 +115,16 @@ int32_t tStatisBlockClear(STbStatisBlock *statisBlock) { statisBlock->numOfPKs = 0; statisBlock->numOfRecords = 0; for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { - tBufferClear(&statisBlock->buffers[i]); + TAOS_UNUSED(tBufferClear(&statisBlock->buffers[i])); } for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { - tValueColumnClear(&statisBlock->firstKeyPKs[i]); - tValueColumnClear(&statisBlock->lastKeyPKs[i]); + TAOS_UNUSED(tValueColumnClear(&statisBlock->firstKeyPKs[i])); + TAOS_UNUSED(tValueColumnClear(&statisBlock->lastKeyPKs[i])); } return 0; } static int32_t tStatisBlockAppend(STbStatisBlock *block, SRowInfo *row) { - int32_t code; STsdbRowKey key; tsdbRowGetKey(&row->row, &key); @@ -129,14 +140,14 @@ static int32_t tStatisBlockAppend(STbStatisBlock *block, SRowInfo *row) { } } - if ((code = tBufferPutI64(&block->suids, row->suid))) return code; - if ((code = tBufferPutI64(&block->uids, row->uid))) return code; - if ((code = tBufferPutI64(&block->firstKeyTimestamps, key.key.ts))) return code; - if ((code = tBufferPutI64(&block->lastKeyTimestamps, key.key.ts))) return code; - if ((code = tBufferPutI64(&block->counts, 1))) return code; + TAOS_CHECK_RETURN(tBufferPutI64(&block->suids, row->suid)); + TAOS_CHECK_RETURN(tBufferPutI64(&block->uids, row->uid)); + TAOS_CHECK_RETURN(tBufferPutI64(&block->firstKeyTimestamps, key.key.ts)); + TAOS_CHECK_RETURN(tBufferPutI64(&block->lastKeyTimestamps, key.key.ts)); + TAOS_CHECK_RETURN(tBufferPutI64(&block->counts, 1)); for (int32_t i = 0; i < block->numOfPKs; ++i) { - if ((code = tValueColumnAppend(block->firstKeyPKs + i, key.key.pks + i))) return code; - if ((code = tValueColumnAppend(block->lastKeyPKs + i, key.key.pks + i))) return code; + TAOS_CHECK_RETURN(tValueColumnAppend(block->firstKeyPKs + i, key.key.pks + i)); + TAOS_CHECK_RETURN(tValueColumnAppend(block->lastKeyPKs + i, key.key.pks + i)); } block->numOfRecords++; @@ -147,9 +158,8 @@ static int32_t tStatisBlockUpdate(STbStatisBlock *block, SRowInfo *row) { STbStatisRecord record; STsdbRowKey key; int32_t c; - int32_t code; - tStatisBlockGet(block, block->numOfRecords - 1, &record); + TAOS_CHECK_RETURN(tStatisBlockGet(block, block->numOfRecords - 1, &record)); tsdbRowGetKey(&row->row, &key); c = tRowKeyCompare(&record.lastKey, &key.key); @@ -157,21 +167,18 @@ static int32_t tStatisBlockUpdate(STbStatisBlock *block, SRowInfo *row) { return 0; } else if (c < 0) { // last ts - code = tBufferPutAt(&block->lastKeyTimestamps, (block->numOfRecords - 1) * sizeof(record.lastKey.ts), &key.key.ts, - sizeof(key.key.ts)); - if (code) return code; + TAOS_CHECK_RETURN(tBufferPutAt(&block->lastKeyTimestamps, (block->numOfRecords - 1) * sizeof(record.lastKey.ts), + &key.key.ts, sizeof(key.key.ts))); // last primary keys for (int i = 0; i < block->numOfPKs; i++) { - code = tValueColumnUpdate(&block->lastKeyPKs[i], block->numOfRecords - 1, &key.key.pks[i]); - if (code) return code; + TAOS_CHECK_RETURN(tValueColumnUpdate(&block->lastKeyPKs[i], block->numOfRecords - 1, &key.key.pks[i])); } // count record.count++; - code = tBufferPutAt(&block->counts, (block->numOfRecords - 1) * sizeof(record.count), &record.count, - sizeof(record.count)); - if (code) return code; + TAOS_CHECK_RETURN(tBufferPutAt(&block->counts, (block->numOfRecords - 1) * sizeof(record.count), &record.count, + sizeof(record.count))); } else { ASSERT(0); } @@ -183,8 +190,7 @@ int32_t tStatisBlockPut(STbStatisBlock *block, SRowInfo *row, int32_t maxRecords if (block->numOfRecords > 0) { int64_t lastUid; SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * (block->numOfRecords - 1), &block->uids); - int32_t code = tBufferGetI64(&br, &lastUid); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&br, &lastUid)); if (lastUid == row->uid) { return tStatisBlockUpdate(block, row); @@ -196,7 +202,6 @@ int32_t tStatisBlockPut(STbStatisBlock *block, SRowInfo *row, int32_t maxRecords } int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) { - int32_t code; SBufferReader reader; if (idx < 0 || idx >= statisBlock->numOfRecords) { @@ -204,36 +209,29 @@ int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecor } reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->suid), &statisBlock->suids); - code = tBufferGetI64(&reader, &record->suid); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->suid)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->uid), &statisBlock->uids); - code = tBufferGetI64(&reader, &record->uid); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->uid)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->firstKey.ts), &statisBlock->firstKeyTimestamps); - code = tBufferGetI64(&reader, &record->firstKey.ts); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->firstKey.ts)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->lastKey.ts), &statisBlock->lastKeyTimestamps); - code = tBufferGetI64(&reader, &record->lastKey.ts); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->lastKey.ts)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->count), &statisBlock->counts); - code = tBufferGetI64(&reader, &record->count); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->count)); // primary keys for (record->firstKey.numOfPKs = 0; record->firstKey.numOfPKs < statisBlock->numOfPKs; record->firstKey.numOfPKs++) { - code = tValueColumnGet(&statisBlock->firstKeyPKs[record->firstKey.numOfPKs], idx, - &record->firstKey.pks[record->firstKey.numOfPKs]); - if (code) return code; + TAOS_CHECK_RETURN(tValueColumnGet(&statisBlock->firstKeyPKs[record->firstKey.numOfPKs], idx, + &record->firstKey.pks[record->firstKey.numOfPKs])); } for (record->lastKey.numOfPKs = 0; record->lastKey.numOfPKs < statisBlock->numOfPKs; record->lastKey.numOfPKs++) { - code = tValueColumnGet(&statisBlock->lastKeyPKs[record->lastKey.numOfPKs], idx, - &record->lastKey.pks[record->lastKey.numOfPKs]); - if (code) return code; + TAOS_CHECK_RETURN(tValueColumnGet(&statisBlock->lastKeyPKs[record->lastKey.numOfPKs], idx, + &record->lastKey.pks[record->lastKey.numOfPKs])); } return 0; @@ -241,27 +239,34 @@ int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecor // SBrinRecord ---------- int32_t tBrinBlockInit(SBrinBlock *brinBlock) { + int32_t code; + brinBlock->numOfPKs = 0; brinBlock->numOfRecords = 0; for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { - tBufferInit(&brinBlock->buffers[i]); + TAOS_CHECK_GOTO(tBufferInit(&brinBlock->buffers[i]), NULL, _exit); } for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { - tValueColumnInit(&brinBlock->firstKeyPKs[i]); - tValueColumnInit(&brinBlock->lastKeyPKs[i]); + TAOS_CHECK_GOTO(tValueColumnInit(&brinBlock->firstKeyPKs[i]), NULL, _exit); + TAOS_CHECK_GOTO(tValueColumnInit(&brinBlock->lastKeyPKs[i]), NULL, _exit); } - return 0; + +_exit: + if (code) { + TAOS_UNUSED(tBrinBlockDestroy(brinBlock)); + } + return code; } int32_t tBrinBlockDestroy(SBrinBlock *brinBlock) { brinBlock->numOfPKs = 0; brinBlock->numOfRecords = 0; for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { - tBufferDestroy(&brinBlock->buffers[i]); + TAOS_UNUSED(tBufferDestroy(&brinBlock->buffers[i])); } for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { - tValueColumnDestroy(&brinBlock->firstKeyPKs[i]); - tValueColumnDestroy(&brinBlock->lastKeyPKs[i]); + TAOS_UNUSED(tValueColumnDestroy(&brinBlock->firstKeyPKs[i])); + TAOS_UNUSED(tValueColumnDestroy(&brinBlock->lastKeyPKs[i])); } return 0; } @@ -270,18 +275,16 @@ int32_t tBrinBlockClear(SBrinBlock *brinBlock) { brinBlock->numOfPKs = 0; brinBlock->numOfRecords = 0; for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { - tBufferClear(&brinBlock->buffers[i]); + TAOS_UNUSED(tBufferClear(&brinBlock->buffers[i])); } for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) { - tValueColumnClear(&brinBlock->firstKeyPKs[i]); - tValueColumnClear(&brinBlock->lastKeyPKs[i]); + TAOS_UNUSED(tValueColumnClear(&brinBlock->firstKeyPKs[i])); + TAOS_UNUSED(tValueColumnClear(&brinBlock->lastKeyPKs[i])); } return 0; } int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { - int32_t code; - ASSERT(record->firstKey.key.numOfPKs == record->lastKey.key.numOfPKs); if (brinBlock->numOfRecords == 0) { // the first row @@ -298,60 +301,29 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { } } - code = tBufferPutI64(&brinBlock->suids, record->suid); - if (code) return code; - - code = tBufferPutI64(&brinBlock->uids, record->uid); - if (code) return code; - - code = tBufferPutI64(&brinBlock->firstKeyTimestamps, record->firstKey.key.ts); - if (code) return code; - - code = tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version); - if (code) return code; - - code = tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts); - if (code) return code; - - code = tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version); - if (code) return code; - - code = tBufferPutI64(&brinBlock->minVers, record->minVer); - if (code) return code; - - code = tBufferPutI64(&brinBlock->maxVers, record->maxVer); - if (code) return code; - - code = tBufferPutI64(&brinBlock->blockOffsets, record->blockOffset); - if (code) return code; - - code = tBufferPutI64(&brinBlock->smaOffsets, record->smaOffset); - if (code) return code; - - code = tBufferPutI32(&brinBlock->blockSizes, record->blockSize); - if (code) return code; - - code = tBufferPutI32(&brinBlock->blockKeySizes, record->blockKeySize); - if (code) return code; - - code = tBufferPutI32(&brinBlock->smaSizes, record->smaSize); - if (code) return code; - - code = tBufferPutI32(&brinBlock->numRows, record->numRow); - if (code) return code; - - code = tBufferPutI32(&brinBlock->counts, record->count); - if (code) return code; + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->suids, record->suid)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->uids, record->uid)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->firstKeyTimestamps, record->firstKey.key.ts)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->minVers, record->minVer)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->maxVers, record->maxVer)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->blockOffsets, record->blockOffset)); + TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->smaOffsets, record->smaOffset)); + TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->blockSizes, record->blockSize)); + TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->blockKeySizes, record->blockKeySize)); + TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->smaSizes, record->smaSize)); + TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->numRows, record->numRow)); + TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->counts, record->count)); if (brinBlock->numOfPKs > 0) { for (int32_t i = 0; i < brinBlock->numOfPKs; ++i) { - code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]); - if (code) return code; + TAOS_CHECK_RETURN(tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i])); } for (int32_t i = 0; i < brinBlock->numOfPKs; ++i) { - code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]); - if (code) return code; + TAOS_CHECK_RETURN(tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i])); } } @@ -361,7 +333,6 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { } int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) { - int32_t code; SBufferReader reader; if (idx < 0 || idx >= brinBlock->numOfRecords) { @@ -369,78 +340,61 @@ int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) { } reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->suids); - code = tBufferGetI64(&reader, &record->suid); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->suid)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->uids); - code = tBufferGetI64(&reader, &record->uid); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->uid)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->firstKeyTimestamps); - code = tBufferGetI64(&reader, &record->firstKey.key.ts); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->firstKey.key.ts)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->firstKeyVersions); - code = tBufferGetI64(&reader, &record->firstKey.version); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->firstKey.version)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->lastKeyTimestamps); - code = tBufferGetI64(&reader, &record->lastKey.key.ts); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->lastKey.key.ts)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->lastKeyVersions); - code = tBufferGetI64(&reader, &record->lastKey.version); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->lastKey.version)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->minVers); - code = tBufferGetI64(&reader, &record->minVer); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->minVer)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->maxVers); - code = tBufferGetI64(&reader, &record->maxVer); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->maxVer)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->blockOffsets); - code = tBufferGetI64(&reader, &record->blockOffset); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->blockOffset)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->smaOffsets); - code = tBufferGetI64(&reader, &record->smaOffset); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->smaOffset)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->blockSizes); - code = tBufferGetI32(&reader, &record->blockSize); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->blockSize)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->blockKeySizes); - code = tBufferGetI32(&reader, &record->blockKeySize); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->blockKeySize)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->smaSizes); - code = tBufferGetI32(&reader, &record->smaSize); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->smaSize)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->numRows); - code = tBufferGetI32(&reader, &record->numRow); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->numRow)); reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->counts); - code = tBufferGetI32(&reader, &record->count); - if (code) return code; + TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->count)); // primary keys for (record->firstKey.key.numOfPKs = 0; record->firstKey.key.numOfPKs < brinBlock->numOfPKs; record->firstKey.key.numOfPKs++) { - code = tValueColumnGet(&brinBlock->firstKeyPKs[record->firstKey.key.numOfPKs], idx, - &record->firstKey.key.pks[record->firstKey.key.numOfPKs]); - if (code) return code; + TAOS_CHECK_RETURN(tValueColumnGet(&brinBlock->firstKeyPKs[record->firstKey.key.numOfPKs], idx, + &record->firstKey.key.pks[record->firstKey.key.numOfPKs])); } for (record->lastKey.key.numOfPKs = 0; record->lastKey.key.numOfPKs < brinBlock->numOfPKs; record->lastKey.key.numOfPKs++) { - code = tValueColumnGet(&brinBlock->lastKeyPKs[record->lastKey.key.numOfPKs], idx, - &record->lastKey.key.pks[record->lastKey.key.numOfPKs]); - if (code) return code; + TAOS_CHECK_RETURN(tValueColumnGet(&brinBlock->lastKeyPKs[record->lastKey.key.numOfPKs], idx, + &record->lastKey.key.pks[record->lastKey.key.numOfPKs])); } return 0;