From 3bada98670cbe57f1908746e38e3cfe39b488b56 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 17 Jul 2024 21:58:23 +0800 Subject: [PATCH] revert some change to make CI pass --- source/dnode/vnode/src/inc/tsdb.h | 4 +- .../dnode/vnode/src/tsdb/tsdbReaderWriter.c | 7 +- source/dnode/vnode/src/tsdb/tsdbUpgrade.c | 2 +- source/dnode/vnode/src/tsdb/tsdbUtil.c | 839 ++++++++++++++---- 4 files changed, 669 insertions(+), 183 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 21dd48919e..75dfd03402 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -222,11 +222,11 @@ void tMapDataReset(SMapData *pMapData); void tMapDataClear(SMapData *pMapData); int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)); int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo); -int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); +void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)); int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), int32_t (*tItemCmprFn)(const void *, const void *), void *pItem); int32_t tPutMapData(uint8_t *p, SMapData *pMapData); -int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int64_t *decodeSize); +int32_t tGetMapData(uint8_t *p, SMapData *pMapData); int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *), SArray **ppArray); // other diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 8cbcdbc34f..932bf2d92c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -803,8 +803,11 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m if (code) goto _err; // decode - int64_t n; - TAOS_CHECK_GOTO(tGetMapData(pReader->aBuf[0], mDataBlk, &n), NULL, _err); + int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk); + if (n < 0) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } ASSERT(n == size); return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 1052d1c1a1..5d0500998e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -96,7 +96,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) { SDataBlk dataBlk[1]; - TAOS_CHECK_GOTO(tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk), &lino, _exit); + tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk); SBrinRecord record = { .suid = pBlockIdx->suid, diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 025fbf3eb7..58075cf0ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -36,47 +36,183 @@ void tMapDataClear(SMapData *pMapData) { pMapData->aOffset = NULL; } -int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) { - if (idx < 0 || idx >= pMapData->nItem) { - return TSDB_CODE_OUT_OF_RANGE; - } +#ifdef BUILD_NO_CALL +int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *)) { + int32_t code = 0; + int32_t offset = pMapData->nData; + int32_t nItem = pMapData->nItem; - tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem); - return 0; + pMapData->nItem++; + pMapData->nData += tPutItemFn(NULL, pItem); + + // alloc + code = tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem); + if (code) goto _exit; + code = tRealloc(&pMapData->pData, pMapData->nData); + if (code) goto _exit; + + // put + pMapData->aOffset[nItem] = offset; + tPutItemFn(pMapData->pData + offset, pItem); + +_exit: + return code; } -int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int64_t *decodeSize) { - int64_t size = 0; +int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo) { + int32_t code = 0; + + pTo->nItem = pFrom->nItem; + pTo->nData = pFrom->nData; + code = tRealloc((uint8_t **)&pTo->aOffset, sizeof(int32_t) * pFrom->nItem); + if (code) goto _exit; + code = tRealloc(&pTo->pData, pFrom->nData); + if (code) goto _exit; + memcpy(pTo->aOffset, pFrom->aOffset, sizeof(int32_t) * pFrom->nItem); + memcpy(pTo->pData, pFrom->pData, pFrom->nData); + +_exit: + return code; +} + +int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *), + int32_t (*tItemCmprFn)(const void *, const void *), void *pItem) { + int32_t code = 0; + int32_t lidx = 0; + int32_t ridx = pMapData->nItem - 1; + int32_t midx; + int32_t c; + + while (lidx <= ridx) { + midx = (lidx + ridx) / 2; + + tMapDataGetItemByIdx(pMapData, midx, pItem, tGetItemFn); + + c = tItemCmprFn(pSearchItem, pItem); + if (c == 0) { + goto _exit; + } else if (c < 0) { + ridx = midx - 1; + } else { + lidx = midx + 1; + } + } + + code = TSDB_CODE_NOT_FOUND; + +_exit: + return code; +} +#endif + +void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *)) { + ASSERT(idx >= 0 && idx < pMapData->nItem); + tGetItemFn(pMapData->pData + pMapData->aOffset[idx], pItem); +} + +#ifdef BUILD_NO_CALL +int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *), + SArray **ppArray) { + int32_t code = 0; + + SArray *pArray = taosArrayInit(pMapData->nItem, itemSize); + if (pArray == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + + for (int32_t i = 0; i < pMapData->nItem; i++) { + tMapDataGetItemByIdx(pMapData, i, taosArrayReserve(pArray, 1), tGetItemFn); + } + +_exit: + *ppArray = pArray; + return code; +} + +int32_t tPutMapData(uint8_t *p, SMapData *pMapData) { + int32_t n = 0; + + n += tPutI32v(p ? p + n : p, pMapData->nItem); + if (pMapData->nItem) { + int32_t lOffset = 0; + for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { + n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset); + lOffset = pMapData->aOffset[iItem]; + } + + n += tPutI32v(p ? p + n : p, pMapData->nData); + if (p) { + memcpy(p + n, pMapData->pData, pMapData->nData); + } + n += pMapData->nData; + } + + return n; +} +#endif + +int32_t tGetMapData(uint8_t *p, SMapData *pMapData) { + int32_t n = 0; + int32_t offset; tMapDataReset(pMapData); - size += tGetI32v(p + size, &pMapData->nItem); + n += tGetI32v(p + n, &pMapData->nItem); if (pMapData->nItem) { - if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) { - return TSDB_CODE_OUT_OF_MEMORY; - } + if (tRealloc((uint8_t **)&pMapData->aOffset, sizeof(int32_t) * pMapData->nItem)) return -1; int32_t lOffset = 0; for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - size += tGetI32v(p + size, &pMapData->aOffset[iItem]); + n += tGetI32v(p + n, &pMapData->aOffset[iItem]); pMapData->aOffset[iItem] += lOffset; lOffset = pMapData->aOffset[iItem]; } - size += tGetI32v(p + size, &pMapData->nData); - if (tRealloc(&pMapData->pData, pMapData->nData)) { - return TSDB_CODE_OUT_OF_MEMORY; - } - memcpy(pMapData->pData, p + size, pMapData->nData); - size += pMapData->nData; + n += tGetI32v(p + n, &pMapData->nData); + if (tRealloc(&pMapData->pData, pMapData->nData)) return -1; + memcpy(pMapData->pData, p + n, pMapData->nData); + n += pMapData->nData; } - if (decodeSize) { - *decodeSize = size; + return n; +} + +#ifdef BUILD_NO_CALL +// TABLEID ======================================================================= +int32_t tTABLEIDCmprFn(const void *p1, const void *p2) { + TABLEID *pId1 = (TABLEID *)p1; + TABLEID *pId2 = (TABLEID *)p2; + + if (pId1->suid < pId2->suid) { + return -1; + } else if (pId1->suid > pId2->suid) { + return 1; } + + if (pId1->uid < pId2->uid) { + return -1; + } else if (pId1->uid > pId2->uid) { + return 1; + } + return 0; } +// SBlockIdx ====================================================== +int32_t tPutBlockIdx(uint8_t *p, void *ph) { + int32_t n = 0; + SBlockIdx *pBlockIdx = (SBlockIdx *)ph; + + n += tPutI64(p ? p + n : p, pBlockIdx->suid); + n += tPutI64(p ? p + n : p, pBlockIdx->uid); + n += tPutI64v(p ? p + n : p, pBlockIdx->offset); + n += tPutI64v(p ? p + n : p, pBlockIdx->size); + + return n; +} +#endif + int32_t tGetBlockIdx(uint8_t *p, void *ph) { int32_t n = 0; SBlockIdx *pBlockIdx = (SBlockIdx *)ph; @@ -89,6 +225,77 @@ int32_t tGetBlockIdx(uint8_t *p, void *ph) { return n; } +#ifdef BUILD_NO_CALL +int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { + SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; + SBlockIdx *rBlockIdx = (SBlockIdx *)rhs; + + if (lBlockIdx->suid < rBlockIdx->suid) { + return -1; + } else if (lBlockIdx->suid > rBlockIdx->suid) { + return 1; + } + + if (lBlockIdx->uid < rBlockIdx->uid) { + return -1; + } else if (lBlockIdx->uid > rBlockIdx->uid) { + return 1; + } + + return 0; +} + +int32_t tCmprBlockL(void const *lhs, void const *rhs) { + SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; + SSttBlk *rBlockL = (SSttBlk *)rhs; + + if (lBlockIdx->suid < rBlockL->suid) { + return -1; + } else if (lBlockIdx->suid > rBlockL->suid) { + return 1; + } + + if (lBlockIdx->uid < rBlockL->minUid) { + return -1; + } else if (lBlockIdx->uid > rBlockL->maxUid) { + return 1; + } + + return 0; +} + +// SDataBlk ====================================================== +void tDataBlkReset(SDataBlk *pDataBlk) { + *pDataBlk = (SDataBlk){.minKey = TSDBKEY_MAX, .maxKey = TSDBKEY_MIN, .minVer = VERSION_MAX, .maxVer = VERSION_MIN}; +} + +int32_t tPutDataBlk(uint8_t *p, void *ph) { + int32_t n = 0; + SDataBlk *pDataBlk = (SDataBlk *)ph; + + n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version); + n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts); + n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version); + n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts); + n += tPutI64v(p ? p + n : p, pDataBlk->minVer); + n += tPutI64v(p ? p + n : p, pDataBlk->maxVer); + n += tPutI32v(p ? p + n : p, pDataBlk->nRow); + n += tPutI8(p ? p + n : p, pDataBlk->hasDup); + n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock); + for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { + n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset); + n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock); + n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey); + } + if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) { + n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset); + n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size); + } + + return n; +} +#endif + int32_t tGetDataBlk(uint8_t *p, void *ph) { int32_t n = 0; SDataBlk *pDataBlk = (SDataBlk *)ph; @@ -118,6 +325,48 @@ int32_t tGetDataBlk(uint8_t *p, void *ph) { return n; } +#ifdef BUILD_NO_CALL +int32_t tDataBlkCmprFn(const void *p1, const void *p2) { + SDataBlk *pBlock1 = (SDataBlk *)p1; + SDataBlk *pBlock2 = (SDataBlk *)p2; + + if (tsdbKeyCmprFn(&pBlock1->maxKey, &pBlock2->minKey) < 0) { + return -1; + } else if (tsdbKeyCmprFn(&pBlock1->minKey, &pBlock2->maxKey) > 0) { + return 1; + } + + return 0; +} + +bool tDataBlkHasSma(SDataBlk *pDataBlk) { + if (pDataBlk->nSubBlock > 1) return false; + if (pDataBlk->hasDup) return false; + + return pDataBlk->smaInfo.size > 0; +} + +// SSttBlk ====================================================== +int32_t tPutSttBlk(uint8_t *p, void *ph) { + int32_t n = 0; + SSttBlk *pSttBlk = (SSttBlk *)ph; + + n += tPutI64(p ? p + n : p, pSttBlk->suid); + n += tPutI64(p ? p + n : p, pSttBlk->minUid); + n += tPutI64(p ? p + n : p, pSttBlk->maxUid); + n += tPutI64v(p ? p + n : p, pSttBlk->minKey); + n += tPutI64v(p ? p + n : p, pSttBlk->maxKey); + n += tPutI64v(p ? p + n : p, pSttBlk->minVer); + n += tPutI64v(p ? p + n : p, pSttBlk->maxVer); + n += tPutI32v(p ? p + n : p, pSttBlk->nRow); + n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset); + n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock); + n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey); + + return n; +} +#endif + int32_t tGetSttBlk(uint8_t *p, void *ph) { int32_t n = 0; SSttBlk *pSttBlk = (SSttBlk *)ph; @@ -142,42 +391,47 @@ int32_t tGetSttBlk(uint8_t *p, void *ph) { static const int32_t BLOCK_WITH_ALG_VER = 2; int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) { + int32_t code; + ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); - TAOS_CHECK_RETURN(tBufferPutI16v(buffer, pBlockCol->cid)); - TAOS_CHECK_RETURN(tBufferPutI8(buffer, pBlockCol->type)); - TAOS_CHECK_RETURN(tBufferPutI8(buffer, pBlockCol->cflag)); - TAOS_CHECK_RETURN(tBufferPutI8(buffer, pBlockCol->flag)); - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->szOrigin)); + if ((code = tBufferPutI16v(buffer, pBlockCol->cid))) return code; + if ((code = tBufferPutI8(buffer, pBlockCol->type))) return code; + if ((code = tBufferPutI8(buffer, pBlockCol->cflag))) return code; + if ((code = tBufferPutI8(buffer, pBlockCol->flag))) return code; + if ((code = tBufferPutI32v(buffer, pBlockCol->szOrigin))) return code; if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_VALUE) { - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->szBitmap)); + if ((code = tBufferPutI32v(buffer, pBlockCol->szBitmap))) return code; } if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->szOffset)); + if ((code = tBufferPutI32v(buffer, pBlockCol->szOffset))) return code; } if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) { - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->szValue)); + if ((code = tBufferPutI32v(buffer, pBlockCol->szValue))) return code; } - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pBlockCol->offset)); + + if ((code = tBufferPutI32v(buffer, pBlockCol->offset))) return code; } if (ver >= BLOCK_WITH_ALG_VER) { - TAOS_CHECK_RETURN(tBufferPutU32(buffer, pBlockCol->alg)); + if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code; } else { - TAOS_CHECK_RETURN(tBufferPutU32(buffer, defaultCmprAlg)); + if ((code = tBufferPutU32(buffer, defaultCmprAlg))) return code; } return 0; } int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) { - TAOS_CHECK_RETURN(tBufferGetI16v(br, &pBlockCol->cid)); - TAOS_CHECK_RETURN(tBufferGetI8(br, &pBlockCol->type)); - TAOS_CHECK_RETURN(tBufferGetI8(br, &pBlockCol->cflag)); - TAOS_CHECK_RETURN(tBufferGetI8(br, &pBlockCol->flag)); - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->szOrigin)); + int32_t code; + + if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code; + if ((code = tBufferGetI8(br, &pBlockCol->type))) return code; + if ((code = tBufferGetI8(br, &pBlockCol->cflag))) return code; + if ((code = tBufferGetI8(br, &pBlockCol->flag))) return code; + if ((code = tBufferGetI32v(br, &pBlockCol->szOrigin))) return code; ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); @@ -188,22 +442,22 @@ int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint3 if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_VALUE) { - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->szBitmap)); + if ((code = tBufferGetI32v(br, &pBlockCol->szBitmap))) return code; } if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->szOffset)); + if ((code = tBufferGetI32v(br, &pBlockCol->szOffset))) return code; } if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) { - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->szValue)); + if ((code = tBufferGetI32v(br, &pBlockCol->szValue))) return code; } - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pBlockCol->offset)); + if ((code = tBufferGetI32v(br, &pBlockCol->offset))) return code; } if (ver >= BLOCK_WITH_ALG_VER) { - TAOS_CHECK_RETURN(tBufferGetU32(br, &pBlockCol->alg)); + if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code; } else { pBlockCol->alg = defaultCmprAlg; } @@ -211,6 +465,50 @@ int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint3 return 0; } +#ifdef BUILD_NO_CALL +int32_t tBlockColCmprFn(const void *p1, const void *p2) { + if (((SBlockCol *)p1)->cid < ((SBlockCol *)p2)->cid) { + return -1; + } else if (((SBlockCol *)p1)->cid > ((SBlockCol *)p2)->cid) { + return 1; + } + + return 0; +} + +// SDelIdx ====================================================== +int32_t tCmprDelIdx(void const *lhs, void const *rhs) { + SDelIdx *lDelIdx = (SDelIdx *)lhs; + SDelIdx *rDelIdx = (SDelIdx *)rhs; + + if (lDelIdx->suid < rDelIdx->suid) { + return -1; + } else if (lDelIdx->suid > rDelIdx->suid) { + return 1; + } + + if (lDelIdx->uid < rDelIdx->uid) { + return -1; + } else if (lDelIdx->uid > rDelIdx->uid) { + return 1; + } + + return 0; +} + +int32_t tPutDelIdx(uint8_t *p, void *ph) { + SDelIdx *pDelIdx = (SDelIdx *)ph; + int32_t n = 0; + + n += tPutI64(p ? p + n : p, pDelIdx->suid); + n += tPutI64(p ? p + n : p, pDelIdx->uid); + n += tPutI64v(p ? p + n : p, pDelIdx->offset); + n += tPutI64v(p ? p + n : p, pDelIdx->size); + + return n; +} +#endif + int32_t tGetDelIdx(uint8_t *p, void *ph) { SDelIdx *pDelIdx = (SDelIdx *)ph; int32_t n = 0; @@ -223,6 +521,20 @@ int32_t tGetDelIdx(uint8_t *p, void *ph) { return n; } +#ifdef BUILD_NO_CALL +// SDelData ====================================================== +int32_t tPutDelData(uint8_t *p, void *ph) { + SDelData *pDelData = (SDelData *)ph; + int32_t n = 0; + + n += tPutI64v(p ? p + n : p, pDelData->version); + n += tPutI64(p ? p + n : p, pDelData->sKey); + n += tPutI64(p ? p + n : p, pDelData->eKey); + + return n; +} +#endif + int32_t tGetDelData(uint8_t *p, void *ph) { SDelData *pDelData = (SDelData *)ph; int32_t n = 0; @@ -368,16 +680,20 @@ int32_t tsdbRowCompareWithoutVersion(const void *p1, const void *p2) { // STSDBRowIter ====================================================== int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + pIter->pRow = pRow; if (pRow->type == TSDBROW_ROW_FMT) { - TAOS_CHECK_RETURN(tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter)); + code = tRowIterOpen(pRow->pTSRow, pTSchema, &pIter->pIter); + if (code) goto _exit; } else if (pRow->type == TSDBROW_COL_FMT) { pIter->iColData = 0; } else { ASSERT(0); } - return 0; +_exit: + return code; } void tsdbRowClose(STSDBRowIter *pIter) { @@ -432,7 +748,8 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) *pColVal = COL_VAL_VALUE(pTColumn->colId, ((SValue){.type = pTColumn->type, .val = key.ts})); if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } // other @@ -443,9 +760,7 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) --iCol; continue; } else if (pTSchema->columns[jCol].colId > pTColumn->colId) { - if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); continue; } @@ -454,7 +769,10 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) uint8_t *pVal = pColVal->value.pData; pColVal->value.pData = NULL; - TAOS_CHECK_RETURN(tRealloc(&pColVal->value.pData, pColVal->value.nData)); + code = tRealloc(&pColVal->value.pData, pColVal->value.nData); + if (code) { + return TSDB_CODE_OUT_OF_MEMORY; + } if (pColVal->value.nData) { memcpy(pColVal->value.pData, pVal, pColVal->value.nData); @@ -462,15 +780,14 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) } if (taosArrayPush(pMerger->pArray, pColVal) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + return code; } } for (; iCol < pMerger->pTSchema->numOfCols; ++iCol) { pTColumn = &pMerger->pTSchema->columns[iCol]; - if (taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + taosArrayPush(pMerger->pArray, &COL_VAL_NONE(pTColumn->colId, pTColumn->type)); } pMerger->version = key.version; @@ -495,7 +812,8 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) if (IS_VAR_DATA_TYPE(pColVal->value.type)) { SColVal *pTColVal = taosArrayGet(pMerger->pArray, iCol); if (!COL_VAL_IS_NULL(pColVal)) { - TAOS_CHECK_RETURN(tRealloc(&pTColVal->value.pData, pColVal->value.nData)); + code = tRealloc(&pTColVal->value.pData, pColVal->value.nData); + if (code) return code; pTColVal->value.nData = pColVal->value.nData; if (pTColVal->value.nData) { @@ -514,7 +832,8 @@ int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) SColVal *tColVal = (SColVal *)taosArrayGet(pMerger->pArray, iCol); if (COL_VAL_IS_NONE(tColVal) && !COL_VAL_IS_NONE(pColVal)) { if ((!COL_VAL_IS_NULL(pColVal)) && IS_VAR_DATA_TYPE(pColVal->value.type)) { - TAOS_CHECK_RETURN(tRealloc(&tColVal->value.pData, pColVal->value.nData)); + code = tRealloc(&tColVal->value.pData, pColVal->value.nData); + if (code) return code; tColVal->value.nData = pColVal->value.nData; if (pColVal->value.nData) { @@ -572,8 +891,79 @@ int32_t tsdbRowMergerGetRow(SRowMerger *pMerger, SRow **ppRow) { return tRowBuild(pMerger->pArray, pMerger->pTSchema, ppRow); } +/* +// delete skyline ====================================================== +static int32_t tsdbMergeSkyline2(SArray *aSkyline1, SArray *aSkyline2, SArray *aSkyline) { + int32_t code = 0; + int32_t i1 = 0; + int32_t n1 = taosArrayGetSize(aSkyline1); + int32_t i2 = 0; + int32_t n2 = taosArrayGetSize(aSkyline2); + TSDBKEY *pSkyline1; + TSDBKEY *pSkyline2; + TSDBKEY item; + int64_t version1 = 0; + int64_t version2 = 0; + + ASSERT(n1 > 0 && n2 > 0); + + taosArrayClear(aSkyline); + + while (i1 < n1 && i2 < n2) { + pSkyline1 = (TSDBKEY *)taosArrayGet(aSkyline1, i1); + pSkyline2 = (TSDBKEY *)taosArrayGet(aSkyline2, i2); + + if (pSkyline1->ts < pSkyline2->ts) { + version1 = pSkyline1->version; + i1++; + } else if (pSkyline1->ts > pSkyline2->ts) { + version2 = pSkyline2->version; + i2++; + } else { + version1 = pSkyline1->version; + version2 = pSkyline2->version; + i1++; + i2++; + } + + item.ts = TMIN(pSkyline1->ts, pSkyline2->ts); + item.version = TMAX(version1, version2); + if (taosArrayPush(aSkyline, &item) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + } + + while (i1 < n1) { + pSkyline1 = (TSDBKEY *)taosArrayGet(aSkyline1, i1); + item.ts = pSkyline1->ts; + item.version = pSkyline1->version; + if (taosArrayPush(aSkyline, &item) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + i1++; + } + + while (i2 < n2) { + pSkyline2 = (TSDBKEY *)taosArrayGet(aSkyline2, i2); + item.ts = pSkyline2->ts; + item.version = pSkyline2->version; + if (taosArrayPush(aSkyline, &item) == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; + } + i2++; + } + +_exit: + return code; +} +*/ + // delete skyline ====================================================== static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pSkyline) { + int32_t code = 0; int32_t i1 = 0; int32_t n1 = taosArrayGetSize(pSkyline1); int32_t i2 = 0; @@ -627,7 +1017,7 @@ static int32_t tsdbMergeSkyline(SArray *pSkyline1, SArray *pSkyline2, SArray *pS } pSkyline->size = TARRAY_ELEM_IDX(pSkyline, pItem); - return 0; + return code; } int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, SArray *pSkyline) { @@ -639,12 +1029,8 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, if (sidx == eidx) { TSDBKEY *pItem1 = taosArrayGet(aSkyline, sidx * 2); TSDBKEY *pItem2 = taosArrayGet(aSkyline, sidx * 2 + 1); - if (taosArrayPush(pSkyline, &pItem1) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - if (taosArrayPush(pSkyline, &pItem2) == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } + taosArrayPush(pSkyline, &pItem1); + taosArrayPush(pSkyline, &pItem2); } else { SArray *pSkyline1 = NULL; SArray *pSkyline2 = NULL; @@ -657,8 +1043,11 @@ int32_t tsdbBuildDeleteSkylineImpl(SArray *aSkyline, int32_t sidx, int32_t eidx, goto _clear; } - TAOS_CHECK_GOTO(tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1), NULL, _clear); - TAOS_CHECK_GOTO(tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2), NULL, _clear); + code = tsdbBuildDeleteSkylineImpl(aSkyline, sidx, midx, pSkyline1); + if (code) goto _clear; + + code = tsdbBuildDeleteSkylineImpl(aSkyline, midx + 1, eidx, pSkyline2); + if (code) goto _clear; code = tsdbMergeSkyline(pSkyline1, pSkyline2, pSkyline); @@ -674,48 +1063,72 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr SDelData *pDelData; int32_t code = 0; int32_t dataNum = eidx - sidx + 1; - SArray *aTmpSkyline; - SArray *pSkyline; - - aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); - if (aTmpSkyline == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); - if (pSkyline) { - taosArrayDestroy(aTmpSkyline); - return TSDB_CODE_OUT_OF_MEMORY; - } + SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); + SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); taosArrayClear(aSkyline); for (int32_t i = sidx; i <= eidx; ++i) { pDelData = (SDelData *)taosArrayGet(aDelData, i); - if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}) == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _clear); - } - - if (taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}) == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _clear); - } + taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}); + taosArrayPush(aTmpSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}); } - TAOS_CHECK_GOTO(tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline), NULL, _clear); + code = tsdbBuildDeleteSkylineImpl(aTmpSkyline, sidx, eidx, pSkyline); + if (code) goto _clear; int32_t skylineNum = taosArrayGetSize(pSkyline); for (int32_t i = 0; i < skylineNum; ++i) { TSDBKEY *p = taosArrayGetP(pSkyline, i); - if (taosArrayPush(aSkyline, p) == NULL) { - TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, NULL, _clear); - } + taosArrayPush(aSkyline, p); } _clear: taosArrayDestroy(aTmpSkyline); taosArrayDestroy(pSkyline); + return code; } +/* +int32_t tsdbBuildDeleteSkyline2(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline) { + int32_t code = 0; + SDelData *pDelData; + int32_t midx; + + taosArrayClear(aSkyline); + if (sidx == eidx) { + pDelData = (SDelData *)taosArrayGet(aDelData, sidx); + taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->sKey, .version = pDelData->version}); + taosArrayPush(aSkyline, &(TSDBKEY){.ts = pDelData->eKey, .version = 0}); + } else { + SArray *aSkyline1 = NULL; + SArray *aSkyline2 = NULL; + + aSkyline1 = taosArrayInit(0, sizeof(TSDBKEY)); + aSkyline2 = taosArrayInit(0, sizeof(TSDBKEY)); + if (aSkyline1 == NULL || aSkyline2 == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _clear; + } + midx = (sidx + eidx) / 2; + + code = tsdbBuildDeleteSkyline(aDelData, sidx, midx, aSkyline1); + if (code) goto _clear; + + code = tsdbBuildDeleteSkyline(aDelData, midx + 1, eidx, aSkyline2); + if (code) goto _clear; + + code = tsdbMergeSkyline(aSkyline1, aSkyline2, aSkyline); + + _clear: + taosArrayDestroy(aSkyline1); + taosArrayDestroy(aSkyline2); + } + + return code; +} +*/ + // SBlockData ====================================================== int32_t tBlockDataCreate(SBlockData *pBlockData) { pBlockData->suid = 0; @@ -745,6 +1158,8 @@ void tBlockDataDestroy(SBlockData *pBlockData) { } static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) { + int32_t code = 0; + if (pBlockData->nColData > nColData) { for (int32_t i = nColData; i < pBlockData->nColData; i++) { tColDataDestroy(&pBlockData->aColData[i]); @@ -752,7 +1167,8 @@ static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) } else if (pBlockData->nColData < nColData) { SColData *aColData = taosMemoryRealloc(pBlockData->aColData, sizeof(SBlockData) * nColData); if (aColData == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _exit; } pBlockData->aColData = aColData; @@ -760,10 +1176,12 @@ static int32_t tBlockDataAdjustColData(SBlockData *pBlockData, int32_t nColData) } pBlockData->nColData = nColData; - return 0; +_exit: + return code; } - int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, int16_t *aCid, int32_t nCid) { + int32_t code = 0; + ASSERT(pId->suid || pId->uid); pBlockData->suid = pId->suid; @@ -771,7 +1189,8 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, pBlockData->nRow = 0; if (aCid) { - TAOS_CHECK_RETURN(tBlockDataAdjustColData(pBlockData, nCid)); + code = tBlockDataAdjustColData(pBlockData, nCid); + if (code) goto _exit; int32_t iColumn = 1; STColumn *pTColumn = &pTSchema->columns[iColumn]; @@ -798,14 +1217,17 @@ int32_t tBlockDataInit(SBlockData *pBlockData, TABLEID *pId, STSchema *pTSchema, pTColumn = (iColumn < pTSchema->numOfCols) ? &pTSchema->columns[iColumn] : NULL; } } else { - TAOS_CHECK_RETURN(tBlockDataAdjustColData(pBlockData, pTSchema->numOfCols - 1)); + code = tBlockDataAdjustColData(pBlockData, pTSchema->numOfCols - 1); + if (code) goto _exit; for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { STColumn *pTColumn = &pTSchema->columns[iColData + 1]; tColDataInit(&pBlockData->aColData[iColData], pTColumn->colId, pTColumn->type, pTColumn->flags); } } - return 0; + +_exit: + return code; } void tBlockDataReset(SBlockData *pBlockData) { @@ -887,35 +1309,43 @@ _exit: } int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { + int32_t code = 0; + ASSERT(pBlockData->suid || pBlockData->uid); // uid if (pBlockData->uid == 0) { ASSERT(uid); - TAOS_CHECK_RETURN(tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1))); + code = tRealloc((uint8_t **)&pBlockData->aUid, sizeof(int64_t) * (pBlockData->nRow + 1)); + if (code) goto _exit; pBlockData->aUid[pBlockData->nRow] = uid; } // version - TAOS_CHECK_RETURN(tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1))); + code = tRealloc((uint8_t **)&pBlockData->aVersion, sizeof(int64_t) * (pBlockData->nRow + 1)); + if (code) goto _exit; pBlockData->aVersion[pBlockData->nRow] = TSDBROW_VERSION(pRow); // timestamp - TAOS_CHECK_RETURN(tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1))); + code = tRealloc((uint8_t **)&pBlockData->aTSKEY, sizeof(TSKEY) * (pBlockData->nRow + 1)); + if (code) goto _exit; pBlockData->aTSKEY[pBlockData->nRow] = TSDBROW_TS(pRow); if (pRow->type == TSDBROW_ROW_FMT) { - TAOS_CHECK_RETURN( - tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */)); + code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, 0 /* append */); + if (code) goto _exit; } else if (pRow->type == TSDBROW_COL_FMT) { - TAOS_CHECK_RETURN(tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */)); + code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, 0 /* append */); + if (code) goto _exit; } else { ASSERT(0); } pBlockData->nRow++; - return 0; +_exit: + return code; } - int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema) { + int32_t code = 0; + // version int64_t lversion = pBlockData->aVersion[pBlockData->nRow - 1]; int64_t rversion = TSDBROW_VERSION(pRow); @@ -926,18 +1356,40 @@ int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS // update other rows if (pRow->type == TSDBROW_ROW_FMT) { - TAOS_CHECK_RETURN(tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, - (rversion > lversion) ? 1 : -1 /* update */)); + code = tRowUpsertColData(pRow->pTSRow, pTSchema, pBlockData->aColData, pBlockData->nColData, + (rversion > lversion) ? 1 : -1 /* update */); + if (code) goto _exit; } else if (pRow->type == TSDBROW_COL_FMT) { - TAOS_CHECK_RETURN( - tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1)); + code = tBlockDataUpsertBlockRow(pBlockData, pRow->pBlockData, pRow->iRow, (rversion > lversion) ? 1 : -1); + if (code) goto _exit; } else { ASSERT(0); } - return 0; +_exit: + return code; } +#ifdef BUILD_NO_CALL +int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid) { + if (pBlockData->nRow == 0) { + return 1; + } else if (pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { + return pBlockData->nRow; + } else { + return pBlockData->nRow + 1; + } +} + +int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid) { + if (pBlockData->nRow > 0 && pBlockData->aTSKEY[pBlockData->nRow - 1] == TSDBROW_TS(pRow)) { + return tBlockDataUpdateRow(pBlockData, pRow, pTSchema); + } else { + return tBlockDataAppendRow(pBlockData, pRow, pTSchema, uid); + } +} +#endif + SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) { ASSERT(cid != PRIMARYKEY_TIMESTAMP_COL_ID); int32_t lidx = 0; @@ -970,8 +1422,7 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB int32_t lino = 0; SColCompressInfo *pInfo = pCompr; - - TAOS_CHECK_GOTO(tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg), &lino, _exit); + code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, 1, &pInfo->defaultCmprAlg); SDiskDataHdr hdr = { .delimiter = TSDB_FILE_DLMT, @@ -989,7 +1440,8 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB // Key part tBufferClear(&buffers[1]); - TAOS_CHECK_GOTO(tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo), &lino, _exit); + code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist, (SColCompressInfo *)pInfo); + TSDB_CHECK_CODE(code, lino, _exit); // Regulart column part tBufferClear(&buffers[2]); @@ -1007,10 +1459,14 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB SColDataCompressInfo cinfo = { .cmprAlg = pInfo->defaultCmprAlg, }; - TAOS_UNUSED(tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg)); + code = tsdbGetColCmprAlgFromSet(pInfo->pColCmpr, colData->cid, &cinfo.cmprAlg); + if (code < 0) { + // + } int32_t offset = buffers[3].size; - TAOS_CHECK_GOTO(tColDataCompress(colData, &cinfo, &buffers[3], assist), &lino, _exit); + code = tColDataCompress(colData, &cinfo, &buffers[3], assist); + TSDB_CHECK_CODE(code, lino, _exit); SBlockCol blockCol = (SBlockCol){.cid = cinfo.columnId, .type = cinfo.dataType, @@ -1023,13 +1479,15 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB .offset = offset, .alg = cinfo.cmprAlg}; - TAOS_CHECK_GOTO(tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit); + code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg); + TSDB_CHECK_CODE(code, lino, _exit); } hdr.szBlkCol = buffers[2].size; // SDiskDataHdr part tBufferClear(&buffers[0]); - TAOS_CHECK_GOTO(tPutDiskDataHdr(&buffers[0], &hdr), &lino, _exit); + code = tPutDiskDataHdr(&buffers[0], &hdr); + TSDB_CHECK_CODE(code, lino, _exit); _exit: return code; @@ -1042,7 +1500,8 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * SCompressInfo cinfo; // SDiskDataHdr - TAOS_CHECK_GOTO(tGetDiskDataHdr(br, &hdr), &lino, _exit); + code = tGetDiskDataHdr(br, &hdr); + TSDB_CHECK_CODE(code, lino, _exit); tBlockDataReset(blockData); blockData->suid = hdr.suid; @@ -1050,7 +1509,8 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * blockData->nRow = hdr.nRow; // Key part - TAOS_CHECK_GOTO(tBlockDataDecompressKeyPart(&hdr, br, blockData, assist), &lino, _exit); + code = tBlockDataDecompressKeyPart(&hdr, br, blockData, assist); + TSDB_CHECK_CODE(code, lino, _exit); // Column part SBufferReader br2 = *br; @@ -1058,11 +1518,11 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) { SBlockCol blockCol; - TAOS_CHECK_GOTO(tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg), &lino, _exit); - + code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg); if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg; - - TAOS_CHECK_GOTO(tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist), &lino, _exit); + TSDB_CHECK_CODE(code, lino, _exit); + code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: @@ -1071,26 +1531,28 @@ _exit: // SDiskDataHdr ============================== int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) { - TAOS_CHECK_RETURN(tBufferPutU32(buffer, pHdr->delimiter)); - TAOS_CHECK_RETURN(tBufferPutU32v(buffer, pHdr->fmtVer)); - TAOS_CHECK_RETURN(tBufferPutI64(buffer, pHdr->suid)); - TAOS_CHECK_RETURN(tBufferPutI64(buffer, pHdr->uid)); - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->szUid)); - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->szVer)); - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->szKey)); - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->szBlkCol)); - TAOS_CHECK_RETURN(tBufferPutI32v(buffer, pHdr->nRow)); + int32_t code; + + if ((code = tBufferPutU32(buffer, pHdr->delimiter))) return code; + if ((code = tBufferPutU32v(buffer, pHdr->fmtVer))) return code; + if ((code = tBufferPutI64(buffer, pHdr->suid))) return code; + if ((code = tBufferPutI64(buffer, pHdr->uid))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->szUid))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->szVer))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code; if (pHdr->fmtVer < 2) { - TAOS_CHECK_RETURN(tBufferPutI8(buffer, pHdr->cmprAlg)); + if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code; } else if (pHdr->fmtVer == 2) { - TAOS_CHECK_RETURN(tBufferPutU32(buffer, pHdr->cmprAlg)); + if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code; } else { // more data fmt ver } if (pHdr->fmtVer >= 1) { - TAOS_CHECK_RETURN(tBufferPutI8(buffer, pHdr->numOfPKs)); + if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code; for (int i = 0; i < pHdr->numOfPKs; i++) { - TAOS_CHECK_RETURN(tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg)); + if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) return code; } } @@ -1098,28 +1560,32 @@ int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) { } int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) { - TAOS_CHECK_RETURN(tBufferGetU32(br, &pHdr->delimiter)); - TAOS_CHECK_RETURN(tBufferGetU32v(br, &pHdr->fmtVer)); - TAOS_CHECK_RETURN(tBufferGetI64(br, &pHdr->suid)); - TAOS_CHECK_RETURN(tBufferGetI64(br, &pHdr->uid)); - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->szUid)); - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->szVer)); - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->szKey)); - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->szBlkCol)); - TAOS_CHECK_RETURN(tBufferGetI32v(br, &pHdr->nRow)); + int32_t code; + + if ((code = tBufferGetU32(br, &pHdr->delimiter))) return code; + if ((code = tBufferGetU32v(br, &pHdr->fmtVer))) return code; + if ((code = tBufferGetI64(br, &pHdr->suid))) return code; + if ((code = tBufferGetI64(br, &pHdr->uid))) return code; + if ((code = tBufferGetI32v(br, &pHdr->szUid))) return code; + if ((code = tBufferGetI32v(br, &pHdr->szVer))) return code; + if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code; + if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code; + if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code; if (pHdr->fmtVer < 2) { int8_t cmprAlg = 0; - TAOS_CHECK_RETURN(tBufferGetI8(br, &cmprAlg)); + if ((code = tBufferGetI8(br, &cmprAlg))) return code; pHdr->cmprAlg = cmprAlg; } else if (pHdr->fmtVer == 2) { - TAOS_CHECK_RETURN(tBufferGetU32(br, &pHdr->cmprAlg)); + if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code; } else { // more data fmt ver } if (pHdr->fmtVer >= 1) { - TAOS_CHECK_RETURN(tBufferGetI8(br, &pHdr->numOfPKs)); + if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code; for (int i = 0; i < pHdr->numOfPKs; i++) { - TAOS_CHECK_RETURN(tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg)); + if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) { + return code; + } } } else { pHdr->numOfPKs = 0; @@ -1130,20 +1596,26 @@ int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) { // ALGORITHM ============================== int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg) { - TAOS_CHECK_RETURN(tBufferPutI16v(buffer, pColAgg->colId)); - TAOS_CHECK_RETURN(tBufferPutI16v(buffer, pColAgg->numOfNull)); - TAOS_CHECK_RETURN(tBufferPutI64(buffer, pColAgg->sum)); - TAOS_CHECK_RETURN(tBufferPutI64(buffer, pColAgg->max)); - TAOS_CHECK_RETURN(tBufferPutI64(buffer, pColAgg->min)); + int32_t code; + + if ((code = tBufferPutI16v(buffer, pColAgg->colId))) return code; + if ((code = tBufferPutI16v(buffer, pColAgg->numOfNull))) return code; + if ((code = tBufferPutI64(buffer, pColAgg->sum))) return code; + if ((code = tBufferPutI64(buffer, pColAgg->max))) return code; + if ((code = tBufferPutI64(buffer, pColAgg->min))) return code; + return 0; } int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) { - TAOS_CHECK_RETURN(tBufferGetI16v(br, &pColAgg->colId)); - TAOS_CHECK_RETURN(tBufferGetI16v(br, &pColAgg->numOfNull)); - TAOS_CHECK_RETURN(tBufferGetI64(br, &pColAgg->sum)); - TAOS_CHECK_RETURN(tBufferGetI64(br, &pColAgg->max)); - TAOS_CHECK_RETURN(tBufferGetI64(br, &pColAgg->min)); + int32_t code; + + if ((code = tBufferGetI16v(br, &pColAgg->colId))) return code; + if ((code = tBufferGetI16v(br, &pColAgg->numOfNull))) return code; + if ((code = tBufferGetI64(br, &pColAgg->sum))) return code; + if ((code = tBufferGetI64(br, &pColAgg->max))) return code; + if ((code = tBufferGetI64(br, &pColAgg->min))) return code; + return 0; } @@ -1160,7 +1632,8 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = sizeof(int64_t) * bData->nRow, }; - TAOS_CHECK_GOTO(tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist), &lino, _exit); + code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); hdr->szUid = cinfo.compressedSize; } @@ -1170,7 +1643,8 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = sizeof(int64_t) * bData->nRow, }; - TAOS_CHECK_GOTO(tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist), &lino, _exit); + code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); hdr->szVer = cinfo.compressedSize; // ts @@ -1180,7 +1654,8 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S .originalSize = sizeof(TSKEY) * bData->nRow, }; - TAOS_CHECK_GOTO(tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist), &lino, _exit); + code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); hdr->szKey = cinfo.compressedSize; // primary keys @@ -1197,9 +1672,14 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S SColDataCompressInfo info = { .cmprAlg = hdr->cmprAlg, }; - TAOS_UNUSED(tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg)); + code = tsdbGetColCmprAlgFromSet(compressInfo->pColCmpr, colData->cid, &info.cmprAlg); + if (code < 0) { + // do nothing + } else { + } - TAOS_CHECK_GOTO(tColDataCompress(colData, &info, buffer, assist), &lino, _exit); + code = tColDataCompress(colData, &info, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); *blockCol = (SBlockCol){ .cid = info.columnId, @@ -1226,8 +1706,8 @@ int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *bl SColData *colData; - TAOS_CHECK_GOTO(tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData), &lino, - _exit); + code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData); + TSDB_CHECK_CODE(code, lino, _exit); // ASSERT(blockCol->flag != HAS_NONE); @@ -1257,7 +1737,8 @@ int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *bl break; } - TAOS_CHECK_GOTO(tColDataDecompress(BR_PTR(br), &info, colData, assist), &lino, _exit); + code = tColDataDecompress(BR_PTR(br), &info, colData, assist); + TSDB_CHECK_CODE(code, lino, _exit); br->offset += blockCol->szBitmap + blockCol->szOffset + blockCol->szValue; _exit: @@ -1279,8 +1760,10 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, .originalSize = sizeof(int64_t) * hdr->nRow, }; - TAOS_CHECK_GOTO(tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize), &lino, _exit); - TAOS_CHECK_GOTO(tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist), &lino, _exit); + code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); br->offset += cinfo.compressedSize; } @@ -1291,8 +1774,10 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, .compressedSize = hdr->szVer, .originalSize = sizeof(int64_t) * hdr->nRow, }; - TAOS_CHECK_GOTO(tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize), &lino, _exit); - TAOS_CHECK_GOTO(tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist), &lino, _exit); + code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); br->offset += cinfo.compressedSize; // ts @@ -1302,8 +1787,10 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, .compressedSize = hdr->szKey, .originalSize = sizeof(TSKEY) * hdr->nRow, }; - TAOS_CHECK_GOTO(tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize), &lino, _exit); - TAOS_CHECK_GOTO(tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist), &lino, _exit); + code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); br->offset += cinfo.compressedSize; // primary keys @@ -1313,7 +1800,8 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, ASSERT(blockCol->flag == HAS_VALUE); ASSERT(blockCol->cflag & COL_IS_KEY); - TAOS_CHECK_GOTO(tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist), &lino, _exit); + code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: @@ -1321,19 +1809,14 @@ _exit: } int32_t tsdbGetColCmprAlgFromSet(SHashObj *set, int16_t colId, uint32_t *alg) { - if (set == NULL) { - return TSDB_CODE_INVALID_PARA; - } + if (set == NULL) return -1; uint32_t *ret = taosHashGet(set, &colId, sizeof(colId)); - if (ret == NULL) { - return TSDB_CODE_NOT_FOUND; - } + if (ret == NULL) return -1; *alg = *ret; return 0; } - uint32_t tsdbCvtTimestampAlg(uint32_t alg) { DEFINE_VAR(alg)