diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 7aec39817a..04e13fbdb3 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -136,7 +136,7 @@ int32_t tRowMerge(SArray *aRowP, STSchema *pTSchema, int8_t flag); int32_t tRowUpsertColData(SRow *pRow, STSchema *pTSchema, SColData *aColData, int32_t nColData, int32_t flag); void tRowGetPrimaryKey(SRow *pRow, SRowKey *key); int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2); -int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); +void tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc); // SRowIter ================================ int32_t tRowIterOpen(SRow *pRow, STSchema *pTSchema, SRowIter **ppIter); diff --git a/include/common/tmsgcb.h b/include/common/tmsgcb.h index 03bf8da707..0c61aa5a51 100644 --- a/include/common/tmsgcb.h +++ b/include/common/tmsgcb.h @@ -17,13 +17,13 @@ #define _TD_COMMON_MSG_CB_H_ #include "os.h" +#include "tmsg.h" #ifdef __cplusplus extern "C" { #endif typedef struct SRpcMsg SRpcMsg; -typedef struct SEpSet SEpSet; typedef struct SMgmtWrapper SMgmtWrapper; typedef struct SRpcHandleInfo SRpcHandleInfo; @@ -46,7 +46,7 @@ typedef int32_t (*PutToQueueFp)(void* pMgmt, EQueueType qtype, SRpcMsg* pMsg); typedef int32_t (*GetQueueSizeFp)(void* pMgmt, int32_t vgId, EQueueType qtype); typedef int32_t (*SendReqFp)(const SEpSet* pEpSet, SRpcMsg* pMsg); typedef void (*SendRspFp)(SRpcMsg* pMsg); -typedef void (*RegisterBrokenLinkArgFp)(SRpcMsg* pMsg); +typedef void (*RegisterBrokenLinkArgFp)(struct SRpcMsg* pMsg); typedef void (*ReleaseHandleFp)(SRpcHandleInfo* pHandle, int8_t type); typedef void (*ReportStartup)(const char* name, const char* desc); diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index 45f9f73fb1..edb6ec9cbf 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -105,7 +105,7 @@ typedef struct SMTbCursor { } SMTbCursor; typedef struct SMCtbCursor { - SMeta* pMeta; + struct SMeta* pMeta; void* pCur; tb_uid_t suid; void* pKey; @@ -134,7 +134,7 @@ typedef struct SMetaTableInfo { } SMetaTableInfo; typedef struct SSnapContext { - SMeta* pMeta; + struct SMeta* pMeta; int64_t snapVersion; void* pCur; int64_t suid; @@ -178,7 +178,7 @@ typedef struct TsdReader { int32_t (*tsdNextDataBlock)(); int32_t (*tsdReaderRetrieveBlockSMAInfo)(); - SSDataBlock *(*tsdReaderRetrieveDataBlock)(); + int32_t (*tsdReaderRetrieveDataBlock)(); void (*tsdReaderReleaseDataBlock)(); diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 0be03b82f3..1fb077e3ca 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -70,7 +70,6 @@ typedef int64_t SyncIndex; typedef int64_t SyncTerm; typedef struct SSyncNode SSyncNode; -typedef struct SWal SWal; typedef struct SSyncRaftEntry SSyncRaftEntry; typedef enum { @@ -238,7 +237,7 @@ typedef struct SSyncInfo { int32_t batchSize; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - SWal* pWal; + struct SWal* pWal; SSyncFSM* pFsm; SMsgCb* msgcb; int32_t pingMs; diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 7710dbdb3f..142e75dbcb 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -615,6 +615,10 @@ int32_t blockDataUpdateTsWindow(SSDataBlock* pDataBlock, int32_t tsColumnIndex) int32_t index = (tsColumnIndex == -1) ? 0 : tsColumnIndex; SColumnInfoData* pColInfoData = taosArrayGet(pDataBlock->pDataBlock, index); + if (pColInfoData == NULL) { + return 0; + } + if (pColInfoData->info.type != TSDB_DATA_TYPE_TIMESTAMP) { return 0; } @@ -1514,7 +1518,7 @@ void blockDataFreeRes(SSDataBlock* pBlock) { taosArrayDestroy(pBlock->pDataBlock); pBlock->pDataBlock = NULL; - + taosMemoryFreeClear(pBlock->pBlockAgg); memset(&pBlock->info, 0, sizeof(SDataBlockInfo)); } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 61af2f6aae..27f865353e 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -305,11 +305,11 @@ static int32_t tRowBuildTupleRow(SArray *aColVal, const SRowBuildScanInfo *sinfo *(int32_t *)(fixed + schema->columns[i].offset) = varlen - fixed - sinfo->tupleFixedSize; varlen += tPutU32v(varlen, colValArray[colValIndex].value.nData); if (colValArray[colValIndex].value.nData) { - memcpy(varlen, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData); + (void)memcpy(varlen, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData); varlen += colValArray[colValIndex].value.nData; } } else { - memcpy(fixed + schema->columns[i].offset, &colValArray[colValIndex].value.val, + (void)memcpy(fixed + schema->columns[i].offset, &colValArray[colValIndex].value.val, tDataTypes[schema->columns[i].type].bytes); } } else if (COL_VAL_IS_NULL(&colValArray[colValIndex])) { // NULL @@ -384,12 +384,12 @@ static int32_t tRowBuildKVRow(SArray *aColVal, const SRowBuildScanInfo *sinfo, c payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid); payloadSize += tPutU32v(payload + payloadSize, colValArray[colValIndex].value.nData); if (colValArray[colValIndex].value.nData > 0) { - memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData); + (void)memcpy(payload + payloadSize, colValArray[colValIndex].value.pData, colValArray[colValIndex].value.nData); } payloadSize += colValArray[colValIndex].value.nData; } else { payloadSize += tPutI16v(payload + payloadSize, colValArray[colValIndex].cid); - memcpy(payload + payloadSize, &colValArray[colValIndex].value.val, + (void)memcpy(payload + payloadSize, &colValArray[colValIndex].value.val, tDataTypes[schema->columns[i].type].bytes); payloadSize += tDataTypes[schema->columns[i].type].bytes; } @@ -475,7 +475,7 @@ int32_t tRowBuildFromBind(SBindInfo *infos, int32_t numOfInfos, bool infoSorted, value.nData = infos[iInfo].bind->length[iRow]; value.pData = (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow; } else { - memcpy(&value.val, (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow, + (void)memcpy(&value.val, (uint8_t *)infos[iInfo].bind->buffer + infos[iInfo].bind->buffer_length * iRow, infos[iInfo].bind->buffer_length); } colVal = COL_VAL_VALUE(infos[iInfo].columnId, value); @@ -509,7 +509,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) pColVal->cid = pTColumn->colId; pColVal->value.type = pTColumn->type; pColVal->flag = CV_FLAG_VALUE; - memcpy(&pColVal->value.val, &pRow->ts, sizeof(TSKEY)); + (void)memcpy(&pColVal->value.val, &pRow->ts, sizeof(TSKEY)); return 0; } @@ -573,7 +573,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) pColVal->value.pData = NULL; } } else { - memcpy(&pColVal->value.val, pData, pTColumn->bytes); + (void)memcpy(&pColVal->value.val, pData, pTColumn->bytes); } } return 0; @@ -624,7 +624,7 @@ int32_t tRowGet(SRow *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal) pColVal->value.pData = varlen + *(int32_t *)(fixed + pTColumn->offset); pColVal->value.pData += tGetU32v(pColVal->value.pData, &pColVal->value.nData); } else { - memcpy(&pColVal->value.val, fixed + pTColumn->offset, TYPE_BYTES[pTColumn->type]); + (void)memcpy(&pColVal->value.val, fixed + pTColumn->offset, TYPE_BYTES[pTColumn->type]); } } @@ -861,7 +861,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { pIter->cv.cid = pTColumn->colId; pIter->cv.value.type = pTColumn->type; pIter->cv.flag = CV_FLAG_VALUE; - memcpy(&pIter->cv.value.val, &pIter->pRow->ts, sizeof(TSKEY)); + (void)memcpy(&pIter->cv.value.val, &pIter->pRow->ts, sizeof(TSKEY)); goto _exit; } @@ -906,7 +906,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { pIter->cv.value.pData = NULL; } } else { - memcpy(&pIter->cv.value.val, pData, pTColumn->bytes); + (void)memcpy(&pIter->cv.value.val, pData, pTColumn->bytes); } } @@ -965,7 +965,7 @@ SColVal *tRowIterNext(SRowIter *pIter) { pIter->cv.value.pData = NULL; } } else { - memcpy(&pIter->cv.value.val, pIter->pf + pTColumn->offset, TYPE_BYTES[pTColumn->type]); + (void)memcpy(&pIter->cv.value.val, pIter->pf + pTColumn->offset, TYPE_BYTES[pTColumn->type]); } goto _exit; } @@ -1288,7 +1288,7 @@ void tRowGetPrimaryKey(SRow *row, SRowKey *key) { key->pks[i].pData = tdata; key->pks[i].pData += tGetU32v(key->pks[i].pData, &key->pks[i].nData); } else { - memcpy(&key->pks[i].val, tdata, tDataTypes[indices[i].type].bytes); + (void)memcpy(&key->pks[i].val, tdata, tDataTypes[indices[i].type].bytes); } } } @@ -1378,7 +1378,7 @@ FORCE_INLINE int32_t tRowKeyCompare(const SRowKey *key1, const SRowKey *key2) { return 0; } -int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) { +void tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) { pDst->ts = pSrc->ts; pDst->numOfPKs = pSrc->numOfPKs; @@ -1392,12 +1392,10 @@ int32_t tRowKeyAssign(SRowKey *pDst, SRowKey *pSrc) { } else { pVal->nData = pSrc->pks[i].nData; ASSERT(pSrc->pks[i].pData != NULL); - memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData); + (void)memcpy(pVal->pData, pSrc->pks[i].pData, pVal->nData); } } } - - return TSDB_CODE_SUCCESS; } // STag ======================================== @@ -1528,7 +1526,7 @@ static int32_t tPutTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) { } else { p = p ? p + n : p; n += tDataTypes[pTagVal->type].bytes; - if (p) memcpy(p, &(pTagVal->i64), tDataTypes[pTagVal->type].bytes); + if (p) (void)memcpy(p, &(pTagVal->i64), tDataTypes[pTagVal->type].bytes); } return n; @@ -1550,7 +1548,7 @@ static int32_t tGetTagVal(uint8_t *p, STagVal *pTagVal, int8_t isJson) { if (IS_VAR_DATA_TYPE(pTagVal->type)) { n += tGetBinary(p + n, &pTagVal->pData, &pTagVal->nData); } else { - memcpy(&(pTagVal->i64), p + n, tDataTypes[pTagVal->type].bytes); + (void)memcpy(&(pTagVal->i64), p + n, tDataTypes[pTagVal->type].bytes); n += tDataTypes[pTagVal->type].bytes; } @@ -1661,7 +1659,7 @@ char *tTagValToData(const STagVal *value, bool isJson) { } varDataLen(data + typeBytes) = value->nData; - memcpy(varDataVal(data + typeBytes), value->pData, value->nData); + (void)memcpy(varDataVal(data + typeBytes), value->pData, value->nData); } else { data = ((char *)&(value->i64)) - typeBytes; // json with type } @@ -1713,7 +1711,7 @@ bool tTagGet(const STag *pTag, STagVal *pTagVal) { } else if (c > 0) { lidx = midx + 1; } else { - memcpy(pTagVal, &tv, sizeof(tv)); + (void)memcpy(pTagVal, &tv, sizeof(tv)); return true; } } @@ -1892,7 +1890,7 @@ static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData, if (nData) { code = tRealloc(&pColData->pData, pColData->nData + nData); if (code) goto _exit; - memcpy(pColData->pData + pColData->nData, pData, nData); + (void)memcpy(pColData->pData + pColData->nData, pData, nData); pColData->nData += nData; } } else { @@ -1900,7 +1898,7 @@ static FORCE_INLINE int32_t tColDataPutValue(SColData *pColData, uint8_t *pData, code = tRealloc(&pColData->pData, pColData->nData + tDataTypes[pColData->type].bytes); if (code) goto _exit; if (pData) { - memcpy(pColData->pData + pColData->nData, pData, TYPE_BYTES[pColData->type]); + (void)memcpy(pColData->pData + pColData->nData, pData, TYPE_BYTES[pColData->type]); } else { memset(pColData->pData + pColData->nData, 0, TYPE_BYTES[pColData->type]); } @@ -2594,7 +2592,7 @@ static FORCE_INLINE void tColDataGetValue4(SColData *pColData, int32_t iVal, SCo } value.pData = pColData->pData + pColData->aOffset[iVal]; } else { - memcpy(&value.val, pColData->pData + tDataTypes[pColData->type].bytes * iVal, tDataTypes[pColData->type].bytes); + (void)memcpy(&value.val, pColData->pData + tDataTypes[pColData->type].bytes * iVal, tDataTypes[pColData->type].bytes); } *pColVal = COL_VAL_VALUE(pColData->cid, value); } @@ -2692,7 +2690,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT1_SIZE(pColData->nVal)); + (void)memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT1_SIZE(pColData->nVal)); break; case (HAS_VALUE | HAS_NULL | HAS_NONE): pColData->pBitMap = xMalloc(arg, BIT2_SIZE(pColData->nVal)); @@ -2700,7 +2698,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT2_SIZE(pColData->nVal)); + (void)memcpy(pColData->pBitMap, pColDataFrom->pBitMap, BIT2_SIZE(pColData->nVal)); break; default: pColData->pBitMap = NULL; @@ -2714,7 +2712,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } - memcpy(pColData->aOffset, pColDataFrom->aOffset, pColData->nVal << 2); + (void)memcpy(pColData->aOffset, pColDataFrom->aOffset, pColData->nVal << 2); } else { pColData->aOffset = NULL; } @@ -2727,7 +2725,7 @@ int32_t tColDataCopy(SColData *pColDataFrom, SColData *pColData, xMallocFn xMall goto _exit; } - memcpy(pColData->pData, pColDataFrom->pData, pColData->nData); + (void)memcpy(pColData->pData, pColDataFrom->pData, pColData->nData); } else { pColData->pData = NULL; } @@ -3119,10 +3117,10 @@ static int32_t tColDataCopyRowCell(SColData *pFromColData, int32_t iFromRow, SCo pToColData->aOffset[iToRow + 1] = pToColData->aOffset[iToRow] + nData; } - memcpy(pToColData->pData + pToColData->aOffset[iToRow], pFromColData->pData + pFromColData->aOffset[iFromRow], + (void)memcpy(pToColData->pData + pToColData->aOffset[iToRow], pFromColData->pData + pFromColData->aOffset[iFromRow], nData); } else { - memcpy(&pToColData->pData[TYPE_BYTES[pToColData->type] * iToRow], + (void)memcpy(&pToColData->pData[TYPE_BYTES[pToColData->type] * iToRow], &pFromColData->pData[TYPE_BYTES[pToColData->type] * iFromRow], TYPE_BYTES[pToColData->type]); } return code; @@ -3361,7 +3359,7 @@ static void tColDataMergeImpl(SColData *pColData, int32_t iStart, int32_t iEnd / ASSERT(0); } else { if (iv != iStart) { - memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart], + (void)memcpy(&pColData->pData[TYPE_BYTES[pColData->type] * iStart], &pColData->pData[TYPE_BYTES[pColData->type] * iv], TYPE_BYTES[pColData->type]); } memmove(&pColData->pData[TYPE_BYTES[pColData->type] * (iStart + 1)], @@ -3579,11 +3577,11 @@ static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) { case (HAS_NULL | HAS_NONE): case (HAS_VALUE | HAS_NONE): case (HAS_VALUE | HAS_NULL): - if (pBuf) memcpy(pBuf + n, pColData->pBitMap, BIT1_SIZE(pColData->nVal)); + if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT1_SIZE(pColData->nVal)); n += BIT1_SIZE(pColData->nVal); break; case (HAS_VALUE | HAS_NULL | HAS_NONE): - if (pBuf) memcpy(pBuf + n, pColData->pBitMap, BIT2_SIZE(pColData->nVal)); + if (pBuf) (void)memcpy(pBuf + n, pColData->pBitMap, BIT2_SIZE(pColData->nVal)); n += BIT2_SIZE(pColData->nVal); break; default: @@ -3593,14 +3591,14 @@ static int32_t tPutColDataVersion0(uint8_t *pBuf, SColData *pColData) { // value if (pColData->flag & HAS_VALUE) { if (IS_VAR_DATA_TYPE(pColData->type)) { - if (pBuf) memcpy(pBuf + n, pColData->aOffset, pColData->nVal << 2); + if (pBuf) (void)memcpy(pBuf + n, pColData->aOffset, pColData->nVal << 2); n += (pColData->nVal << 2); n += tPutI32v(pBuf ? pBuf + n : NULL, pColData->nData); - if (pBuf) memcpy(pBuf + n, pColData->pData, pColData->nData); + if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData); n += pColData->nData; } else { - if (pBuf) memcpy(pBuf + n, pColData->pData, pColData->nData); + if (pBuf) (void)memcpy(pBuf + n, pColData->pData, pColData->nData); n += pColData->nData; } } @@ -4348,7 +4346,7 @@ int32_t tCompressData(void *input, // input ASSERT(outputSize >= extraSizeNeeded); if (info->cmprAlg == NO_COMPRESSION) { - memcpy(output, input, info->originalSize); + (void)memcpy(output, input, info->originalSize); info->compressedSize = info->originalSize; } else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) { SBuffer local; @@ -4385,7 +4383,7 @@ int32_t tCompressData(void *input, // input } else { DEFINE_VAR(info->cmprAlg) if ((l1 == L1_UNKNOWN && l2 == L2_UNKNOWN) || (l1 == L1_DISABLED && l2 == L2_DISABLED)) { - memcpy(output, input, info->originalSize); + (void)memcpy(output, input, info->originalSize); info->compressedSize = info->originalSize; return 0; } @@ -4431,7 +4429,7 @@ int32_t tDecompressData(void *input, // input if (info->cmprAlg == NO_COMPRESSION) { ASSERT(info->compressedSize == info->originalSize); - memcpy(output, input, info->compressedSize); + (void)memcpy(output, input, info->compressedSize); } else if (info->cmprAlg == ONE_STAGE_COMP || info->cmprAlg == TWO_STAGE_COMP) { SBuffer local; @@ -4468,7 +4466,7 @@ int32_t tDecompressData(void *input, // input } else { DEFINE_VAR(info->cmprAlg); if (l1 == L1_DISABLED && l2 == L2_DISABLED) { - memcpy(output, input, info->compressedSize); + (void)memcpy(output, input, info->compressedSize); return 0; } SBuffer local; diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index e35c152e9b..990d03f940 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -167,7 +167,7 @@ void tsdbReaderClose2(STsdbReader *pReader); int32_t tsdbNextDataBlock2(STsdbReader *pReader, bool *hasNext); int32_t tsdbRetrieveDatablockSMA2(STsdbReader *pReader, SSDataBlock *pDataBlock, bool *allHave, bool *hasNullSMA); void tsdbReleaseDataBlock2(STsdbReader *pReader); -SSDataBlock *tsdbRetrieveDataBlock2(STsdbReader *pTsdbReadHandle, SArray *pColumnIdList); +int32_t tsdbRetrieveDataBlock2(STsdbReader *pReader, SSDataBlock **pBlock, SArray *pIdList); int32_t tsdbReaderReset2(STsdbReader *pReader, SQueryTableDataCond *pCond); int32_t tsdbGetFileBlocksDistInfo2(STsdbReader *pReader, STableBlockDistInfo *pTableBlockInfo); int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader *pHandle); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 84e1996f2c..943ba099f6 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -909,9 +909,9 @@ void tMergeTreeUnpinSttBlock(SMergeTree *pMTree); bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); -SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); -void * destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); -void * destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); +int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, SSttBlockLoadInfo **pInfo); +void destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); +void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); // tsdbCache ============================================================================================== typedef enum { diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 4a47e08730..70aa55915a 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -54,7 +54,6 @@ extern "C" { #endif typedef struct SVnodeInfo SVnodeInfo; -typedef struct SMeta SMeta; typedef struct SSma SSma; typedef struct STsdb STsdb; typedef struct STQ STQ; @@ -153,7 +152,6 @@ int32_t vnodeBufPoolRegisterQuery(SVBufPool* pPool, SQueryNode* pQNode); void vnodeBufPoolDeregisterQuery(SVBufPool* pPool, SQueryNode* pQNode, bool proactive); // meta -typedef struct SMCtbCursor SMCtbCursor; typedef struct SMStbCursor SMStbCursor; typedef struct STbUidStore STbUidStore; diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 38dd945b28..ed09ca821b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2579,7 +2579,7 @@ typedef struct { static int32_t lastIterOpen(SFSLastIter *iter, STFileSet *pFileSet, STsdb *pTsdb, STSchema *pTSchema, tb_uid_t suid, tb_uid_t uid, SCacheRowsReader *pr, int64_t lastTs, int16_t *aCols, int nCols) { int32_t code = 0; - pr->pLDataIterArray = destroySttBlockReader(pr->pLDataIterArray, NULL); + destroySttBlockReader(pr->pLDataIterArray, NULL); pr->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); SMergeTreeConf conf = { @@ -3212,7 +3212,10 @@ static int32_t nextRowIterOpen(CacheNextRowIter *pIter, tb_uid_t uid, STsdb *pTs pIter->pMemDelData = NULL; - loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); + code = loadMemTombData(&pIter->pMemDelData, pMem, pIMem, pr->info.verRange.maxVer); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } pIter->idx = (SBlockIdx){.suid = suid, .uid = uid}; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 817468c6b6..46c3ba4785 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -203,7 +203,7 @@ int32_t tsdbReuseCacherowsReader(void* reader, void* pTableIdList, int32_t numOf pReader->pTableList = pTableIdList; pReader->numOfTables = numOfTables; pReader->lastTs = INT64_MIN; - pReader->pLDataIterArray = destroySttBlockReader(pReader->pLDataIterArray, NULL); + destroySttBlockReader(pReader->pLDataIterArray, NULL); pReader->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); return TSDB_CODE_SUCCESS; diff --git a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c index 57b8a99fb1..2c03603d73 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMergeTree.c +++ b/source/dnode/vnode/src/tsdb/tsdbMergeTree.c @@ -23,11 +23,12 @@ static void tLDataIterClose2(SLDataIter *pIter); // SLDataIter ================================================= -SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols) { +int32_t tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols, SSttBlockLoadInfo **pInfo) { + *pInfo = NULL; + SSttBlockLoadInfo *pLoadInfo = taosMemoryCalloc(1, sizeof(SSttBlockLoadInfo)); if (pLoadInfo == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pLoadInfo->blockData[0].sttBlockIndex = -1; @@ -37,26 +38,29 @@ SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t code = tBlockDataCreate(&pLoadInfo->blockData[0].data); if (code) { - terrno = code; + taosMemoryFreeClear(pLoadInfo); + return code; } code = tBlockDataCreate(&pLoadInfo->blockData[1].data); if (code) { - terrno = code; + taosMemoryFreeClear(pLoadInfo); + return code; } pLoadInfo->aSttBlk = taosArrayInit(4, sizeof(SSttBlk)); if (pLoadInfo->aSttBlk == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; taosMemoryFreeClear(pLoadInfo); - return NULL; + return code; } pLoadInfo->pSchema = pSchema; pLoadInfo->colIds = colList; pLoadInfo->numOfCols = numOfCols; - return pLoadInfo; + *pInfo = pLoadInfo; + return code; } static void freeItem(void* pValue) { @@ -66,9 +70,9 @@ static void freeItem(void* pValue) { } } -void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { +void destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { if (pLoadInfo == NULL) { - return NULL; + return; } pLoadInfo->currentLoadBlockIndex = 1; @@ -94,7 +98,6 @@ void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo) { taosArrayDestroy(pLoadInfo->aSttBlk); taosMemoryFree(pLoadInfo); - return NULL; } void destroyLDataIter(SLDataIter *pIter) { @@ -103,9 +106,9 @@ void destroyLDataIter(SLDataIter *pIter) { taosMemoryFree(pIter); } -void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) { +void destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost) { if (pLDataIterArray == NULL) { - return NULL; + return; } int32_t numOfLevel = taosArrayGetSize(pLDataIterArray); @@ -132,7 +135,6 @@ void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoa } taosArrayDestroy(pLDataIterArray); - return NULL; } // choose the unpinned slot to load next data block @@ -914,9 +916,8 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF } if (pLoadInfo == NULL) { - pLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); - if (pLoadInfo == NULL) { - code = terrno; + code = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols, &pLoadInfo); + if (code != TSDB_CODE_SUCCESS) { goto _end; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 272c0762d8..cf2d23cfc8 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -45,7 +45,7 @@ typedef struct { bool moreThanCapcity; } SDataBlockToLoadInfo; -static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter); +static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo); static int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t endKey, int32_t capacity, STsdbReader* pReader); static TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, STsdbReader* pReader); @@ -123,7 +123,7 @@ static void tColRowGetPriamyKeyDeepCopy(SBlockData* pBlock, int32_t irow, int32_ pKey->pks[0].val = cv.value.val; } else { pKey->pks[0].nData = cv.value.nData; - memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData); + (void)(void)memcpy(pKey->pks[0].pData, cv.value.pData, cv.value.nData); } } @@ -155,9 +155,9 @@ static void tRowGetPrimaryKeyDeepCopy(SRow* pRow, SRowKey* pKey) { if (IS_VAR_DATA_TYPE(indices[i].type)) { tdata += tGetU32v(tdata, &pKey->pks[i].nData); - memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); + (void)memcpy(pKey->pks[i].pData, tdata, pKey->pks[i].nData); } else { - memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); + (void)memcpy(&pKey->pks[i].val, data + indices[i].offset, tDataTypes[pKey->pks[i].type].bytes); } } } @@ -185,6 +185,10 @@ static int32_t setColumnIdSlotList(SBlockLoadSuppInfo* pSupInfo, SColumnInfo* pC if (IS_VAR_DATA_TYPE(pCols[i].type)) { pSupInfo->buildBuf[i] = taosMemoryMalloc(pCols[i].bytes); + if (pSupInfo->buildBuf[i] == NULL) { + tsdbError("failed to prepare memory for set columnId slot list, size:%d, code:out of memory", pCols[i].bytes); + return TSDB_CODE_OUT_OF_MEMORY; + } } else { pSupInfo->buildBuf[i] = NULL; } @@ -242,6 +246,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA SBlockLoadSuppInfo* pInfo = &pReader->suppInfo; size_t numOfFileset = TARRAY2_SIZE(pFileSetArray); bool asc = ASCENDING_TRAVERSE(pReader->info.order); + int32_t code = TSDB_CODE_SUCCESS; pIter->index = asc ? -1 : numOfFileset; pIter->order = pReader->info.order; @@ -251,7 +256,7 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA if (pIter->pSttBlockReader == NULL) { pIter->pSttBlockReader = taosMemoryCalloc(1, sizeof(struct SSttBlockReader)); if (pIter->pSttBlockReader == NULL) { - int32_t code = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; tsdbError("failed to prepare the last block iterator, since:%s %s", tstrerror(code), pReader->idStr); return code; } @@ -265,10 +270,14 @@ static int32_t initFilesetIterator(SFilesetIter* pIter, TFileSetArray* pFileSetA pSttReader->uid = 0; tMergeTreeClose(&pSttReader->mergeTree); - initRowKey(&pSttReader->currentKey, INT64_MIN, pInfo->numOfPks, pInfo->pk.type, pInfo->pk.bytes, asc); + code = initRowKey(&pSttReader->currentKey, INT64_MIN, pInfo->numOfPks, pInfo->pk.type, pInfo->pk.bytes, asc); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed init row key, %s", pReader->idStr); + } else { + tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); + } - tsdbDebug("init fileset iterator, total files:%d %s", pIter->numOfFiles, pReader->idStr); - return TSDB_CODE_SUCCESS; + return code; } static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bool* hasNext) { @@ -286,7 +295,8 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo pIter->pSttBlockReader->uid = 0; tMergeTreeClose(&pIter->pSttBlockReader->mergeTree); - pReader->status.pLDataIterArray = destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); + destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); + pReader->status.pLDataIterArray = NULL; pReader->status.pLDataIterArray = taosArrayInit(4, POINTER_BYTES); // check file the time range of coverage @@ -294,7 +304,7 @@ static int32_t filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader, bo while (1) { if (pReader->pFileReader != NULL) { - tsdbDataFileReaderClose(&pReader->pFileReader); + (void) tsdbDataFileReaderClose(&pReader->pFileReader); } pReader->status.pCurrentFileset = pIter->pFilesetList->data[pIter->index]; @@ -367,15 +377,20 @@ _err: bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp) { return (pSupp->numOfPks > 0) && IS_VAR_DATA_TYPE(pSupp->pk.type); } -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { +int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool needFree) { pIter->order = order; pIter->index = -1; pIter->numOfBlocks = 0; if (pIter->blockList == NULL) { pIter->blockList = taosArrayInit(4, sizeof(SFileDataBlockInfo)); + if (pIter->blockList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } else { clearDataBlockIterator(pIter, needFree); } + + return TSDB_CODE_SUCCESS; } static void initReaderStatus(SReaderStatus* pStatus) { @@ -383,75 +398,85 @@ static void initReaderStatus(SReaderStatus* pStatus) { pStatus->loadFromFile = true; } -static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) { - SSDataBlock* pResBlock = createDataBlock(); - if (pResBlock == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; +static int32_t createResBlock(SQueryTableDataCond* pCond, int32_t capacity, SSDataBlock** pResBlock) { + *pResBlock = createDataBlock(); + if (*pResBlock == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; } for (int32_t i = 0; i < pCond->numOfCols; ++i) { SColumnInfoData colInfo = {0}; colInfo.info = pCond->colList[i]; - blockDataAppendColInfo(pResBlock, &colInfo); + int32_t code = blockDataAppendColInfo(*pResBlock, &colInfo); + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(*pResBlock); + *pResBlock = NULL; + return code; + } } - int32_t code = blockDataEnsureCapacity(pResBlock, capacity); + int32_t code = blockDataEnsureCapacity(*pResBlock, capacity); if (code != TSDB_CODE_SUCCESS) { - terrno = code; - taosMemoryFree(pResBlock); - return NULL; + taosMemoryFree(*pResBlock); + *pResBlock = NULL; } - return pResBlock; + + return code; } static int32_t tsdbInitReaderLock(STsdbReader* pReader) { int32_t code = taosThreadMutexInit(&pReader->readerMutex, NULL); - qTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %p, post-init read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; } static int32_t tsdbUninitReaderLock(STsdbReader* pReader) { - int32_t code = -1; - qTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + int32_t code = TSDB_CODE_SUCCESS; + tsdbTrace("tsdb/read: %p, pre-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); code = taosThreadMutexDestroy(&pReader->readerMutex); - qTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %p, post-uninit read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; } static int32_t tsdbAcquireReader(STsdbReader* pReader) { int32_t code = -1; - qTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %p, pre-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); code = taosThreadMutexLock(&pReader->readerMutex); - qTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + tsdbTrace("tsdb/read: %p, post-take read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); return code; } static int32_t tsdbTryAcquireReader(STsdbReader* pReader) { int32_t code = taosThreadMutexTryLock(&pReader->readerMutex); - qTrace("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - + if (code != TSDB_CODE_SUCCESS) { + tsdbError("tsdb/read: %p, post-trytake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); + } else { + tsdbTrace("tsdb/read: %p, post-trytask read mutex: %p", pReader, &pReader->readerMutex); + } return code; } static int32_t tsdbReleaseReader(STsdbReader* pReader) { int32_t code = taosThreadMutexUnlock(&pReader->readerMutex); - qTrace("tsdb/read: %p, post-untake read mutex: %p, code: %d", pReader, &pReader->readerMutex, code); - + if (code != TSDB_CODE_SUCCESS) { + tsdbError("tsdb/read: %p post-untake read mutex:%p failed, code:%d", pReader, &pReader->readerMutex, code); + } else { + tsdbTrace("tsdb/read: %p, post-untake read mutex: %p", pReader, &pReader->readerMutex); + } return code; } void tsdbReleaseDataBlock2(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; if (!pStatus->composedDataBlock) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); } } @@ -459,11 +484,16 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit SQueryTableDataCond* pCond, SBlockLoadSuppInfo* pSup) { pResBlockInfo->capacity = capacity; pResBlockInfo->pResBlock = pResBlock; - terrno = 0; + int32_t code = 0; if (pResBlockInfo->pResBlock == NULL) { pResBlockInfo->freeBlock = true; - pResBlockInfo->pResBlock = createResBlock(pCond, pResBlockInfo->capacity); + pResBlockInfo->pResBlock = NULL; + + code = createResBlock(pCond, pResBlockInfo->capacity, &pResBlockInfo->pResBlock); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (pSup->numOfPks > 0) { SSDataBlock* p = pResBlockInfo->pResBlock; @@ -490,7 +520,7 @@ static int32_t initResBlockInfo(SResultBlockInfo* pResBlockInfo, int64_t capacit pResBlockInfo->freeBlock = false; } - return terrno; + return code; } static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void** ppReader, int32_t capacity, @@ -530,7 +560,10 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void // allocate buffer in order to load data blocks from file SBlockLoadSuppInfo* pSup = &pReader->suppInfo; pSup->tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; - setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols); + code = setColumnIdSlotList(pSup, pCond->colList, pCond->pSlotList, pCond->numOfCols); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } code = initResBlockInfo(&pReader->resBlockInfo, capacity, pResBlock, pCond, pSup); if (code != TSDB_CODE_SUCCESS) { @@ -558,8 +591,15 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, void goto _end; } - tsdbInitReaderLock(pReader); - tsem_init(&pReader->resumeAfterSuspend, 0, 0); + code = tsdbInitReaderLock(pReader); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } + + code = tsem_init(&pReader->resumeAfterSuspend, 0, 0); + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } *ppReader = pReader; return code; @@ -630,7 +670,11 @@ static int32_t doLoadBlockIndex(STsdbReader* pReader, SDataFileReader* pFileRead break; } - taosArrayPush(pIndexList, pBrinBlk); + void* p1 = taosArrayPush(pIndexList, pBrinBlk); + if (p1 == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + i += 1; } @@ -661,7 +705,12 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S cleanupInfoForNextFileset(pReader->status.pTableMap); initBrinRecordIter(&iter, pReader->pFileReader, pIndexList); - while (((pRecord = getNextBrinRecord(&iter)) != NULL)) { + while (1) { + int32_t code = getNextBrinRecord(&iter, &pRecord); + if (code != TSDB_CODE_SUCCESS) { + break; + } + if (pRecord->suid > pReader->info.suid) { break; } @@ -689,7 +738,11 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S ASSERT(pRecord->suid == pReader->info.suid && uid == pRecord->uid); - STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + STableBlockScanInfo* pScanInfo = NULL; + code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } // here we should find the first timestamp that is greater than the lastProcKey // the window is an open interval NOW. @@ -761,13 +814,17 @@ static int32_t loadFileBlockBrinInfo(STsdbReader* pReader, SArray* pIndexList, S pBlockNum->numOfBlocks += 1; if (taosArrayGetSize(pTableScanInfoList) == 0) { - taosArrayPush(pTableScanInfoList, &pScanInfo); + p1 = taosArrayPush(pTableScanInfoList, &pScanInfo); } else { STableBlockScanInfo** p = taosArrayGetLast(pTableScanInfoList); if ((*p)->uid != uid) { - taosArrayPush(pTableScanInfoList, &pScanInfo); + p1 = taosArrayPush(pTableScanInfoList, &pScanInfo); } } + + if (p1 == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } clearBrinBlockIter(&iter); @@ -805,12 +862,14 @@ static void updateLastKeyInfo(SRowKey* pKey, SFileDataBlockInfo* pBlockInfo, SDa } else { uint8_t* p = asc ? pBlockInfo->lastPk.pData : pBlockInfo->firstPk.pData; pKey->pks[0].nData = asc ? varDataLen(pBlockInfo->lastPk.pData) : varDataLen(pBlockInfo->firstPk.pData); - memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); + (void)memcpy(pKey->pks[0].pData, p, pKey->pks[0].nData); } } static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int32_t colIndex, SColVal* pColVal, SBlockLoadSuppInfo* pSup) { + int32_t code = 0; + if (IS_VAR_DATA_TYPE(pColVal->value.type)) { if (!COL_VAL_IS_VALUE(pColVal)) { colDataSetNULL(pColInfoData, rowIndex); @@ -823,31 +882,33 @@ static int32_t doCopyColVal(SColumnInfoData* pColInfoData, int32_t rowIndex, int } if (pColVal->value.nData > 0) { // pData may be null, if nData is 0 - memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); + (void)memcpy(varDataVal(pSup->buildBuf[colIndex]), pColVal->value.pData, pColVal->value.nData); } - colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false); + code = colDataSetVal(pColInfoData, rowIndex, pSup->buildBuf[colIndex], false); } } else { - colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal)); + code = colDataSetVal(pColInfoData, rowIndex, (const char*)&pColVal->value.val, !COL_VAL_IS_VALUE(pColVal)); } - return TSDB_CODE_SUCCESS; + return code; } -static SFileDataBlockInfo* getCurrentBlockInfo(SDataBlockIter* pBlockIter) { +static int32_t getCurrentBlockInfo(SDataBlockIter* pBlockIter, SFileDataBlockInfo** pInfo) { + *pInfo = NULL; + if (pBlockIter->blockList == NULL) { - return NULL; + return TSDB_CODE_FAILED; } size_t num = TARRAY_SIZE(pBlockIter->blockList); if (num == 0) { ASSERT(pBlockIter->numOfBlocks == num); - return NULL; + return TSDB_CODE_FAILED; } - SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index); - return pBlockInfo; + *pInfo = taosArrayGet(pBlockIter->blockList, pBlockIter->index); + return (*pInfo) != NULL? TSDB_CODE_SUCCESS:TSDB_CODE_FAILED; } static int doBinarySearchKey(const TSKEY* keyList, int num, int pos, TSKEY key, int order) { @@ -961,10 +1022,10 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData static void copyPrimaryTsCol(const SBlockData* pBlockData, SFileBlockDumpInfo* pDumpInfo, SColumnInfoData* pColData, int32_t dumpedRows, bool asc) { if (asc) { - memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t)); + (void)memcpy(pColData->pData, &pBlockData->aTSKEY[pDumpInfo->rowIndex], dumpedRows * sizeof(int64_t)); } else { int32_t startIndex = pDumpInfo->rowIndex - dumpedRows + 1; - memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t)); + (void)memcpy(pColData->pData, &pBlockData->aTSKEY[startIndex], dumpedRows * sizeof(int64_t)); // todo: opt perf by extract the loop // reverse the array list @@ -995,7 +1056,7 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo // ASSERT((((uint64_t)pColData->pData) & (0x8 - 1)) == 0); // 1. copy data in a batch model - memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); + (void)memcpy(pColData->pData, p, dumpedRows * tDataTypes[pData->type].bytes); // 2. reverse the array list in case of descending order scan data block if (!asc) { @@ -1110,7 +1171,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBlockData* pBlockData = &pStatus->fileBlockData; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + SFileDataBlockInfo* pBlockInfo = NULL; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; int32_t numOfOutputCols = pSupInfo->numOfCols; int32_t code = TSDB_CODE_SUCCESS; @@ -1118,6 +1179,11 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader, SRowKey* pLastPro bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; + code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + SColVal cv = {0}; SBrinRecord tmp; blockInfoToRecord(&tmp, pBlockInfo, pSupInfo); @@ -1301,9 +1367,11 @@ static FORCE_INLINE STSchema* getTableSchemaImpl(STsdbReader* pReader, uint64_t static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockIter, SBlockData* pBlockData, uint64_t uid) { - int32_t code = 0; - STSchema* pSchema = pReader->info.pSchema; - int64_t st = taosGetTimestampUs(); + int32_t code = 0; + STSchema* pSchema = pReader->info.pSchema; + int64_t st = taosGetTimestampUs(); + SFileDataBlockInfo* pBlockInfo = NULL; + SBlockLoadSuppInfo* pSup = &pReader->suppInfo; tBlockDataReset(pBlockData); @@ -1315,8 +1383,11 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI } } - SBlockLoadSuppInfo* pSup = &pReader->suppInfo; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; SBrinRecord tmp; @@ -1379,8 +1450,9 @@ static bool getNeighborBlockOfTable(SDataBlockIter* pBlockIter, SFileDataBlockIn static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t index, int32_t step) { + int32_t code = TSDB_CODE_SUCCESS; if (index < 0 || index >= pBlockIter->numOfBlocks) { - return -1; + return TSDB_CODE_FAILED; } SFileDataBlockInfo fblock = *(SFileDataBlockInfo*)taosArrayGet(pBlockIter->blockList, index); @@ -1391,8 +1463,12 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte for (int32_t i = index - 1; i >= pBlockIter->index; --i) { SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i); - STableBlockScanInfo* pBlockScanInfo = - getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); + STableBlockScanInfo* pBlockScanInfo = NULL; + code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx); pTableDataBlockIdx->globalIndex = i + 1; @@ -1402,8 +1478,12 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte for (int32_t i = index + 1; i <= pBlockIter->index; ++i) { SFileDataBlockInfo* pBlockInfo = taosArrayGet(pBlockIter->blockList, i); - STableBlockScanInfo* pBlockScanInfo = - getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); + STableBlockScanInfo* pBlockScanInfo = NULL; + code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, pBlockInfo->tbBlockIdx); pTableDataBlockIdx->globalIndex = i - 1; @@ -1412,7 +1492,12 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte } taosArraySet(pBlockIter->blockList, pBlockIter->index, &fblock); - STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, fblock.uid, pReader->idStr); + STableBlockScanInfo* pBlockScanInfo = NULL; + code = getTableBlockScanInfo(pReader->status.pTableMap, fblock.uid, &pBlockScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + STableDataBlockIdx* pTableDataBlockIdx = taosArrayGet(pBlockScanInfo->pBlockIdxList, fblock.tbBlockIdx); pTableDataBlockIdx->globalIndex = pBlockIter->index; } @@ -1759,7 +1844,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (ps == NULL) { return terrno; } - tsdbRowMergerInit(pMerger, ps); + int32_t code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } SRowKey minKey = k; @@ -1792,7 +1880,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + code = doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pkCompEx(&minKey, pSttKey) == 0) { @@ -1801,7 +1892,10 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* if (code != TSDB_CODE_SUCCESS) { return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + code = doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pkCompEx(&minKey, &k) == 0) { @@ -1843,7 +1937,10 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* if (ps == NULL) { return terrno; } - tsdbRowMergerInit(pMerger, ps); + code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); @@ -1890,7 +1987,10 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + code = doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); code = tsdbRowMergerAdd(pMerger, pRow1, NULL); @@ -1899,7 +1999,10 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } // pSttKey will be changed when sttBlockReader iterates to the next row, so use pKey instead. - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + code = doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { @@ -1965,7 +2068,11 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* if (ps == NULL) { return terrno; } - tsdbRowMergerInit(pMerger, ps); + + code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } SRowKey minKey = k; @@ -2005,7 +2112,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + code = doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pfKey, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pkCompEx(&minKey, pSttKey) == 0) { @@ -2015,7 +2125,10 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* return code; } - doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + code = doMergeRowsInSttBlock(pSttBlockReader, pBlockScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } if (pkCompEx(&minKey, &ik) == 0) { @@ -2106,10 +2219,9 @@ static void doForwardDataIter(SRowKey* pKey, SIterInfo* pIter, STableBlockScanIn } // handle the open interval issue. Find the first row key that is greater than the given one. -static int32_t forwardDataIter(SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { +static void forwardDataIter(SRowKey* pKey, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { doForwardDataIter(pKey, &pBlockScanInfo->iter, pBlockScanInfo, pReader); doForwardDataIter(pKey, &pBlockScanInfo->iiter, pBlockScanInfo, pReader); - return TSDB_CODE_SUCCESS; } static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader) { @@ -2142,7 +2254,10 @@ static int32_t initMemDataIterator(STableBlockScanInfo* pBlockScanInfo, STsdbRea return code; } - loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer); + code = loadMemTombData(&pBlockScanInfo->pMemDelData, d, di, pReader->info.verRange.maxVer); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (forward) { forwardDataIter(&startKey.key, pBlockScanInfo, pReader); @@ -2255,8 +2370,15 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan return false; } - initMemDataIterator(pScanInfo, pReader); - initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); + code = initMemDataIterator(pScanInfo, pReader); + if (code != TSDB_CODE_SUCCESS) { + return false; + } + + code = initDelSkylineIterator(pScanInfo, pReader->info.order, &pReader->cost); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (conf.rspRows) { pScanInfo->cleanSttBlocks = isCleanSttBlock(info.pKeyRangeList, &pReader->info.window, pScanInfo, order); @@ -2329,7 +2451,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc if (ps == NULL) { return terrno; } - tsdbRowMergerInit(pMerger, ps); + + code = tsdbRowMergerInit(pMerger, ps); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } tRowKeyAssign(&pBlockScanInfo->lastProcKey, pKey); @@ -2345,7 +2471,11 @@ int32_t mergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pBloc return code; } - doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + code = doMergeRowsInFileBlocks(pBlockData, pBlockScanInfo, pKey, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2392,8 +2522,16 @@ int32_t mergeRowsInSttBlocks(SSttBlockReader* pSttBlockReader, STableBlockScanIn } TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); - doMergeRowsInSttBlock(pSttBlockReader, pScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = doMergeRowsInSttBlock(pSttBlockReader, pScanInfo, pMerger, pkSrcSlot, &pReader->info.verRange, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + code = tsdbRowMergerGetRow(pMerger, &pTSRow); if (code != TSDB_CODE_SUCCESS) { return code; @@ -2473,7 +2611,10 @@ static int32_t loadNeighborIfOverlap(SFileDataBlockInfo* pBlockInfo, STableBlock // 2. remove it from the scan block list int32_t neighborIndex = tableDataBlockIdx->globalIndex; - setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step); + code = setFileBlockActiveInBlockIter(pReader, pBlockIter, neighborIndex, step); + if (code != TSDB_CODE_SUCCESS) { + return code; + } // 3. load the neighbor block, and set it to be the currently accessed file data block code = doLoadFileBlockData(pReader, pBlockIter, &pStatus->fileBlockData, pBlockInfo->uid); @@ -2496,8 +2637,8 @@ void updateComposedBlockInfo(STsdbReader* pReader, double el, STableBlockScanInf pResBlock->info.dataLoad = 1; pResBlock->info.version = pReader->info.verRange.maxVer; - blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); - blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pkDstSlot, ASCENDING_TRAVERSE(pReader->info.order)); + int32_t code = blockDataUpdateTsWindow(pResBlock, pReader->suppInfo.slotId[0]); + code = blockDataUpdatePkRange(pResBlock, pReader->suppInfo.pkDstSlot, ASCENDING_TRAVERSE(pReader->info.order)); setComposedBlockFlag(pReader, true); pReader->cost.composedBlocks += 1; @@ -2511,11 +2652,12 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { int32_t step = asc ? 1 : -1; double el = 0; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + SFileDataBlockInfo* pBlockInfo = NULL; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; - if (pBlockInfo == NULL) { + code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo); + if (code != TSDB_CODE_SUCCESS) { return 0; } @@ -2524,9 +2666,9 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { return code; } - STableBlockScanInfo* pBlockScanInfo = - getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); - if (pBlockScanInfo == NULL) { + STableBlockScanInfo* pBlockScanInfo = NULL; + code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { goto _end; } @@ -2544,7 +2686,7 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { } SBlockData* pBlockData = &pReader->status.fileBlockData; - initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); + (void) initSttBlockReader(pSttBlockReader, pBlockScanInfo, pReader); while (1) { bool hasBlockData = false; @@ -2559,7 +2701,10 @@ static int32_t buildComposedDataBlock(STsdbReader* pReader) { pDumpInfo->rowIndex += step; if (pDumpInfo->rowIndex >= pBlockData->nRow || pDumpInfo->rowIndex < 0) { - pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); // NOTE: get the new block info + code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo); // NOTE: get the new block info + if (code != TSDB_CODE_SUCCESS) { + goto _end; + } // continue check for the next file block if the last ts in the current block // is overlapped with the next neighbor block @@ -2638,7 +2783,10 @@ int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, int32_t orde if (pSource == NULL) { pSource = pBlockScanInfo->pMemDelData; } else { - taosArrayAddAll(pSource, pBlockScanInfo->pMemDelData); + void* p1 = taosArrayAddAll(pSource, pBlockScanInfo->pMemDelData); + if (p1 == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } code = tsdbBuildDeleteSkyline(pSource, 0, taosArrayGetSize(pSource) - 1, pBlockScanInfo->delSkyline); @@ -2736,6 +2884,9 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr size_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); SArray* pIndexList = taosArrayInit(numOfTables, sizeof(SBrinBlk)); + if (pIndexList == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } while (1) { // only check here, since the iterate data in memory is very fast. @@ -2784,6 +2935,7 @@ static int32_t moveToNextFile(STsdbReader* pReader, SBlockNumber* pBlockNum, SAr return loadDataFileTombDataForAll(pReader); } +// pTableIter can be NULL, no need to handle the return value static void resetTableListIndex(SReaderStatus* pStatus) { STableUidList* pList = &pStatus->uidList; @@ -2825,7 +2977,7 @@ static bool moveToNextTableForPreFileSetMem(SReaderStatus* pStatus) { return (pStatus->pProcMemTableIter != NULL); } -static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo) { +static int32_t buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo) { SReaderStatus* pStatus = &pReader->status; SSttBlockReader* pSttBlockReader = pStatus->fileIter.pSttBlockReader; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; @@ -2833,7 +2985,10 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf bool asc = ASCENDING_TRAVERSE(pReader->info.order); SDataBlockInfo* pInfo = &pResBlock->info; - blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt); + int32_t code = blockDataEnsureCapacity(pResBlock, pScanInfo->numOfRowsInStt); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pInfo->rows = pScanInfo->numOfRowsInStt; pInfo->id.uid = pScanInfo->uid; @@ -2858,6 +3013,7 @@ static void buildCleanBlockFromSttFiles(STsdbReader* pReader, STableBlockScanInf tsdbDebug("%p uid:%" PRId64 " return clean stt block as one, brange:%" PRId64 "-%" PRId64 " rows:%" PRId64 " %s", pReader, pResBlock->info.id.uid, pResBlock->info.window.skey, pResBlock->info.window.ekey, pResBlock->info.rows, pReader->idStr); + return code; } static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanInfo* pScanInfo, @@ -2878,8 +3034,8 @@ static void buildCleanBlockFromDataFiles(STsdbReader* pReader, STableBlockScanIn pInfo->pks[0].val = pBlockInfo->firstPk.val; pInfo->pks[1].val = pBlockInfo->lastPk.val; } else { - memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData)); - memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData)); + (void)memcpy(pInfo->pks[0].pData, varDataVal(pBlockInfo->firstPk.pData), varDataLen(pBlockInfo->firstPk.pData)); + (void)memcpy(pInfo->pks[1].pData, varDataVal(pBlockInfo->lastPk.pData), varDataLen(pBlockInfo->lastPk.pData)); pInfo->pks[0].nData = varDataLen(pBlockInfo->firstPk.pData); pInfo->pks[1].nData = varDataLen(pBlockInfo->lastPk.pData); @@ -2950,8 +3106,8 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { // if only require the total rows, no need to load data from stt file if it is clean stt blocks if (pReader->info.execMode == READER_EXEC_ROWS && pScanInfo->cleanSttBlocks) { - buildCleanBlockFromSttFiles(pReader, pScanInfo); - return TSDB_CODE_SUCCESS; + code = buildCleanBlockFromSttFiles(pReader, pScanInfo); + return code; } int64_t st = taosGetTimestampUs(); @@ -3006,11 +3162,16 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; STableBlockScanInfo* pScanInfo = NULL; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + SFileDataBlockInfo* pBlockInfo = NULL; SSttBlockReader* pSttBlockReader = pReader->status.fileIter.pSttBlockReader; bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t code = TSDB_CODE_SUCCESS; + code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pReader->pIgnoreTables && taosHashGet(*pReader->pIgnoreTables, &pBlockInfo->uid, sizeof(pBlockInfo->uid))) { setBlockAllDumped(&pStatus->fBlockDumpInfo, pBlockInfo->lastKey, pReader->info.order); return code; @@ -3020,13 +3181,13 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { return pReader->code; } - pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, pReader->idStr); - if (pScanInfo == NULL) { - return terrno; + code = getTableBlockScanInfo(pReader->status.pTableMap, pBlockInfo->uid, &pScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; } if (pScanInfo->sttKeyInfo.status == STT_FILE_READER_UNINIT) { - initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); } TSDBKEY keyInBuf = getCurrentKeyInBuf(pScanInfo, pReader); @@ -3067,8 +3228,8 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { // return the stt file block ASSERT(pReader->info.execMode == READER_EXEC_ROWS && pSttBlockReader->mergeTree.pIter == NULL); - buildCleanBlockFromSttFiles(pReader, pScanInfo); - return TSDB_CODE_SUCCESS; + code = buildCleanBlockFromSttFiles(pReader, pScanInfo); + return code; } } else { SBlockData* pBData = &pReader->status.fileBlockData; @@ -3080,7 +3241,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); // let's load data from stt files, make sure clear the cleanStt block flag before load the data from stt files - initSttBlockReader(pSttBlockReader, pScanInfo, pReader); + (void) initSttBlockReader(pSttBlockReader, pScanInfo, pReader); // no data in stt block, no need to proceed. while (hasDataInSttBlock(pScanInfo)) { @@ -3141,10 +3302,17 @@ static int32_t buildBlockFromBufferSeqForPreFileset(STsdbReader* pReader, int64_ continue; } - initMemDataIterator(*pBlockScanInfo, pReader); - initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + int32_t code = initMemDataIterator(*pBlockScanInfo, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); + code = initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3183,10 +3351,17 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en continue; } - initMemDataIterator(*pBlockScanInfo, pReader); - initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + int32_t code = initMemDataIterator(*pBlockScanInfo, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - int32_t code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); + code = initDelSkylineIterator(*pBlockScanInfo, pReader->info.order, &pReader->cost); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = buildDataBlockFromBuf(pReader, *pBlockScanInfo, endKey); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -3205,12 +3380,12 @@ static int32_t buildBlockFromBufferSequentially(STsdbReader* pReader, int64_t en // set the correct start position in case of the first/last file block, according to the query time window static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter) { - int64_t lastKey = ASCENDING_TRAVERSE(pReader->info.order) ? INT64_MIN : INT64_MAX; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); + SFileDataBlockInfo* pBlockInfo = NULL; SReaderStatus* pStatus = &pReader->status; SFileBlockDumpInfo* pDumpInfo = &pStatus->fBlockDumpInfo; - if (pBlockInfo) { + int32_t code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + if (code == TSDB_CODE_SUCCESS) { pDumpInfo->totalRows = pBlockInfo->numRow; pDumpInfo->rowIndex = ASCENDING_TRAVERSE(pReader->info.order) ? 0 : pBlockInfo->numRow - 1; } else { @@ -3243,7 +3418,7 @@ static int32_t initForFirstBlockInFile(STsdbReader* pReader, SDataBlockIter* pBl code = initBlockIterator(pReader, pBlockIter, num.numOfBlocks, pTableList); } else { // no block data, only last block exists tBlockDataReset(&pReader->status.fileBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); + code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); resetTableListIndex(&pReader->status); } @@ -3353,7 +3528,10 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } tBlockDataReset(pBlockData); - resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); + code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); + if (code != TSDB_CODE_SUCCESS) { + return code; + } resetTableListIndex(&pReader->status); ERetrieveType type = doReadDataFromSttFiles(pReader); @@ -3622,6 +3800,7 @@ FORCE_INLINE TSDBROW* getValidMemRow(SIterInfo* pIter, const SArray* pDelList, S int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArray* pDelList, STsdbReader* pReader) { SRowMerger* pMerger = &pReader->status.merger; + int32_t code = 0; while (1) { pIter->hasVal = tsdbTbDataIterNext(pIter->iter); @@ -3656,10 +3835,13 @@ int32_t doMergeRowsInBuf(SIterInfo* pIter, uint64_t uid, SRowKey* pCurKey, SArra } } - tsdbRowMergerAdd(pMerger, pRow, pTSchema); + code = tsdbRowMergerAdd(pMerger, pRow, pTSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } - return TSDB_CODE_SUCCESS; + return code; } static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowIndex, SRowKey* pKey, SRowMerger* pMerger, @@ -3677,7 +3859,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd } TSDBROW fRow = tsdbRowFromBlockData(pBlockData, rowIndex); - tsdbRowMergerAdd(pMerger, &fRow, NULL); + int32_t code = tsdbRowMergerAdd(pMerger, &fRow, NULL); rowIndex += step; } @@ -3719,6 +3901,7 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc bool asc = ASCENDING_TRAVERSE(pReader->info.order); int32_t step = asc ? 1 : -1; SVersionRange* pRange = &pReader->info.verRange; + int32_t code = 0; pDumpInfo->rowIndex += step; if ((pDumpInfo->rowIndex <= pBlockData->nRow - 1 && asc) || (pDumpInfo->rowIndex >= 0 && !asc)) { @@ -3730,25 +3913,31 @@ int32_t doMergeRowsInFileBlocks(SBlockData* pBlockData, STableBlockScanInfo* pSc while (1) { CHECK_FILEBLOCK_STATE st; - SFileDataBlockInfo* pFileBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + SFileDataBlockInfo* pFileBlockInfo = NULL; + code = getCurrentBlockInfo(&pReader->status.blockIter, &pFileBlockInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pFileBlockInfo == NULL) { st = CHECK_FILEBLOCK_QUIT; break; } - checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, pKey, &st); - if (st == CHECK_FILEBLOCK_QUIT) { + code = checkForNeighborFileBlock(pReader, pScanInfo, pFileBlockInfo, pMerger, pKey, &st); + if (st == CHECK_FILEBLOCK_QUIT || code != TSDB_CODE_SUCCESS) { break; } } } - return TSDB_CODE_SUCCESS; + return code; } int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanInfo* pScanInfo, SRowMerger* pMerger, int32_t pkSrcSlot, SVersionRange* pVerRange, const char* idStr) { SRowKey* pRowKey = &pScanInfo->lastProcKey; + int32_t code = TSDB_CODE_SUCCESS; while (nextRowFromSttBlocks(pSttBlockReader, pScanInfo, pkSrcSlot, pVerRange)) { SRowKey* pNextKey = getCurrentKeyInSttBlock(pSttBlockReader); @@ -3756,7 +3945,10 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI int32_t ret = pkCompEx(pRowKey, pNextKey); if (ret == 0) { TSDBROW* pRow1 = tMergeTreeGetRow(&pSttBlockReader->mergeTree); - tsdbRowMergerAdd(pMerger, pRow1, NULL); + code = tsdbRowMergerAdd(pMerger, pRow1, NULL); + if (code != TSDB_CODE_SUCCESS) { + break; + } } else { tsdbTrace("uid:%" PRIu64 " last del index:%d, del range:%d, lastKeyInStt:%" PRId64 ", %s", pScanInfo->uid, pScanInfo->sttBlockDelIndex, (int32_t)taosArrayGetSize(pScanInfo->delSkyline), @@ -3765,7 +3957,7 @@ int32_t doMergeRowsInSttBlock(SSttBlockReader* pSttBlockReader, STableBlockScanI } } - return TSDB_CODE_SUCCESS; + return code; } int32_t doMergeMemTableMultiRows(TSDBROW* pRow, SRowKey* pKey, uint64_t uid, SIterInfo* pIter, SArray* pDelList, @@ -4024,7 +4216,11 @@ int32_t doAppendRowFromTSRow(SSDataBlock* pBlock, STsdbReader* pReader, SRow* pT if (colId == pSchema->columns[j].colId) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, pSupInfo->slotId[i]); - tRowGet(pTSRow, pSchema, j, &colVal); + code = tRowGet(pTSRow, pSchema, j, &colVal); + if (code) { + return code; + } + code = doCopyColVal(pColInfoData, outputRowIndex, i, &colVal, pSupInfo); if (code) { return code; @@ -4109,7 +4305,11 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e do { TSDBROW row = {.type = -1}; bool freeTSRow = false; - tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); + code = tsdbGetNextRowInMem(pBlockScanInfo, pReader, &row, endKey, &freeTSRow); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (row.type == -1) { break; } @@ -4155,6 +4355,7 @@ int32_t buildDataBlockFromBufImpl(STableBlockScanInfo* pBlockScanInfo, int64_t e // TODO refactor: with createDataBlockScanInfo int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t num) { + int32_t code = TSDB_CODE_SUCCESS; int32_t size = tSimpleHashGetSize(pReader->status.pTableMap); STableBlockScanInfo** p = NULL; @@ -4165,7 +4366,7 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t } if (size < num) { - int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num); + code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num); if (code) { return code; } @@ -4186,42 +4387,43 @@ int32_t tsdbSetTableList2(STsdbReader* pReader, const void* pTableList, int32_t for (int32_t i = 0; i < num; ++i) { pUidList->tableUidList[i] = pList[i].uid; - STableBlockScanInfo* pInfo = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i); - initTableBlockScanInfo(pInfo, pList[i].uid, pReader->status.pTableMap, pReader); + STableBlockScanInfo* pInfo = NULL; + code = getPosInBlockInfoBuf(&pReader->blockInfoBuf, i, &pInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = initTableBlockScanInfo(pInfo, pList[i].uid, pReader->status.pTableMap, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } return TDB_CODE_SUCCESS; } -void* tsdbGetIdx2(SMeta* pMeta) { - if (pMeta == NULL) { - return NULL; - } - return metaGetIdx(pMeta); -} - -void* tsdbGetIvtIdx2(SMeta* pMeta) { - if (pMeta == NULL) { - return NULL; - } - return metaGetIvtIdx(pMeta); -} - uint64_t tsdbGetReaderMaxVersion2(STsdbReader* pReader) { return pReader->info.verRange.maxVer; } static int32_t doOpenReaderImpl(STsdbReader* pReader) { SReaderStatus* pStatus = &pReader->status; SDataBlockIter* pBlockIter = &pStatus->blockIter; + int32_t code = TSDB_CODE_SUCCESS; if (pReader->bFilesetDelimited) { getMemTableTimeRange(pReader, &pReader->status.memTableMaxKey, &pReader->status.memTableMinKey); pReader->status.bProcMemFirstFileset = true; } - initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); + code = initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = resetDataBlockIterator(&pStatus->blockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); + if (code != TSDB_CODE_SUCCESS) { + return code; + } - int32_t code = TSDB_CODE_SUCCESS; if (pStatus->fileIter.numOfFiles == 0) { pStatus->loadFromFile = false; } else { @@ -4248,7 +4450,7 @@ static void clearSharedPtr(STsdbReader* p) { p->pSchemaMap = NULL; } -static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { +static int32_t setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { pDst->status.pTableMap = pSrc->status.pTableMap; pDst->status.uidList = pSrc->status.uidList; pDst->info.pSchema = pSrc->info.pSchema; @@ -4257,8 +4459,10 @@ static void setSharedPtr(STsdbReader* pDst, const STsdbReader* pSrc) { pDst->pReadSnap->pfSetArray = pSrc->pReadSnap->pfSetArray; if (pDst->info.pSchema) { - tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema); + return tsdbRowMergerInit(&pDst->status.merger, pDst->info.pSchema); } + + return TSDB_CODE_SUCCESS; } // ====================================== EXPOSED APIs ====================================== @@ -4266,13 +4470,17 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi SSDataBlock* pResBlock, void** ppReader, const char* idstr, SHashObj** pIgnoreTables) { STimeWindow window = pCond->twindows; SVnodeCfg* pConf = &(((SVnode*)pVnode)->config); + int32_t code = 0; int32_t capacity = pConf->tsdbCfg.maxRows; if (pResBlock != NULL) { - blockDataEnsureCapacity(pResBlock, capacity); + code = blockDataEnsureCapacity(pResBlock, capacity); + if (code != TSDB_CODE_SUCCESS) { + goto _err; + } } - int32_t code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); + code = tsdbReaderCreate(pVnode, pCond, ppReader, capacity, pResBlock, idstr); if (code != TSDB_CODE_SUCCESS) { goto _err; } @@ -4335,7 +4543,10 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi } if (pReader->info.pSchema != NULL) { - tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema); + code = tsdbRowMergerInit(&pReader->status.merger, pReader->info.pSchema); + if (code != TSDB_CODE_SUCCESS) { + return code; + } } pReader->pSchemaMap = tSimpleHashInit(8, taosFastHash); @@ -4354,11 +4565,10 @@ int32_t tsdbReaderOpen2(void* pVnode, SQueryTableDataCond* pCond, void* pTableLi } STsdbReader* p = (pReader->innerReader[0] != NULL) ? pReader->innerReader[0] : pReader; - pReader->status.pTableMap = - createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables); - if (pReader->status.pTableMap == NULL) { + + code = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, &pReader->status.uidList, numOfTables, &pReader->status.pTableMap); + if (code != TSDB_CODE_SUCCESS) { *ppReader = NULL; - code = TSDB_CODE_OUT_OF_MEMORY; goto _err; } @@ -4391,7 +4601,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { return; } - tsdbAcquireReader(pReader); + (void)tsdbAcquireReader(pReader); { if (pReader->innerReader[0] != NULL || pReader->innerReader[1] != NULL) { @@ -4430,7 +4640,7 @@ void tsdbReaderClose2(STsdbReader* pReader) { clearBlockScanInfoBuf(&pReader->blockInfoBuf); if (pReader->pFileReader != NULL) { - tsdbDataFileReaderClose(&pReader->pFileReader); + (void) tsdbDataFileReaderClose(&pReader->pFileReader); } SReadCostSummary* pCost = &pReader->cost; @@ -4446,15 +4656,15 @@ void tsdbReaderClose2(STsdbReader* pReader) { destroySttBlockReader(pReader->status.pLDataIterArray, &pCost->sttCost); taosMemoryFreeClear(pReader->status.uidList.tableUidList); - qTrace("tsdb/reader-close: %p, untake snapshot", pReader); + tsdbTrace("tsdb/reader-close: %p, untake snapshot", pReader); void* p = pReader->pReadSnap; if ((p == atomic_val_compare_exchange_ptr((void**)&pReader->pReadSnap, p, NULL)) && (p != NULL)) { tsdbUntakeReadSnap2(pReader, p, true); } - tsem_destroy(&pReader->resumeAfterSuspend); - tsdbReleaseReader(pReader); - tsdbUninitReaderLock(pReader); + (void) tsem_destroy(&pReader->resumeAfterSuspend); + (void) tsdbReleaseReader(pReader); + (void) tsdbUninitReaderLock(pReader); tsdbDebug( "%p :io-cost summary: head-file:%" PRIu64 ", head-file time:%.2f ms, SMA:%" PRId64 @@ -4483,11 +4693,14 @@ static int32_t doSuspendCurrentReader(STsdbReader* pCurrentReader) { SReaderStatus* pStatus = &pCurrentReader->status; if (pStatus->loadFromFile) { - tsdbDataFileReaderClose(&pCurrentReader->pFileReader); + (void) tsdbDataFileReaderClose(&pCurrentReader->pFileReader); SReadCostSummary* pCost = &pCurrentReader->cost; - pStatus->pLDataIterArray = destroySttBlockReader(pStatus->pLDataIterArray, &pCost->sttCost); + destroySttBlockReader(pStatus->pLDataIterArray, &pCost->sttCost); pStatus->pLDataIterArray = taosArrayInit(4, POINTER_BYTES); + if (pStatus->pLDataIterArray == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } } // resetDataBlockScanInfo excluding lastKey @@ -4515,14 +4728,14 @@ int32_t tsdbReaderSuspend2(STsdbReader* pReader) { if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_PREV) { - doSuspendCurrentReader(pReader->innerReader[0]); + code = doSuspendCurrentReader(pReader->innerReader[0]); } else if (pReader->step == EXTERNAL_ROWS_MAIN) { - doSuspendCurrentReader(pReader); + code = doSuspendCurrentReader(pReader); } else { - doSuspendCurrentReader(pReader->innerReader[1]); + code = doSuspendCurrentReader(pReader->innerReader[1]); } } else { - doSuspendCurrentReader(pReader); + code = doSuspendCurrentReader(pReader); } // make sure only release once @@ -4557,13 +4770,12 @@ static int32_t tsdbSetQueryReseek(void* pQHandle) { code = tsdbTryAcquireReader(pReader); if (code == 0) { if (pReader->flag == READER_STATUS_SUSPEND) { - tsdbReleaseReader(pReader); + code = tsdbReleaseReader(pReader); return code; } - tsdbReaderSuspend2(pReader); - tsdbReleaseReader(pReader); - + code = tsdbReaderSuspend2(pReader); + (void) tsdbReleaseReader(pReader); return code; } else if (code == EBUSY) { return TSDB_CODE_VND_QUERY_BUSY; @@ -4580,7 +4792,7 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { // restore reader's state, task snapshot int32_t numOfTables = tSimpleHashGetSize(pReader->status.pTableMap); if (numOfTables > 0) { - qTrace("tsdb/reader: %p, take snapshot", pReader); + tsdbTrace("tsdb/reader: %p, take snapshot", pReader); code = tsdbTakeReadSnap2(pReader, tsdbSetQueryReseek, &pReader->pReadSnap); if (code != TSDB_CODE_SUCCESS) { goto _err; @@ -4601,10 +4813,16 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) { // we need only one row pPrevReader->resBlockInfo.capacity = 1; - setSharedPtr(pPrevReader, pReader); + code = setSharedPtr(pPrevReader, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } pNextReader->resBlockInfo.capacity = 1; - setSharedPtr(pNextReader, pReader); + code = setSharedPtr(pNextReader, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (pReader->step == 0 || pReader->step == EXTERNAL_ROWS_PREV) { code = doOpenReaderImpl(pPrevReader); @@ -4773,12 +4991,17 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { #endif code = tsdbAcquireReader(pReader); - qTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + tsdbTrace("tsdb/read: %p, take read mutex, code: %d", pReader, code); if (pReader->flag == READER_STATUS_SUSPEND) { code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + // release reader failure should be suppressed here, to avoid over-write the original error code + (void) tsdbReleaseReader(pReader); return code; } } @@ -4786,7 +5009,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (pReader->innerReader[0] != NULL && pReader->step == 0) { code = doTsdbNextDataBlock2(pReader->innerReader[0], hasNext); if (code) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } @@ -4794,8 +5017,8 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (*hasNext) { pStatus = &pReader->innerReader[0]->status; if (pStatus->composedDataBlock) { - qTrace("tsdb/read: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); + tsdbTrace("tsdb/read: %p, unlock read mutex", pReader); + code = tsdbReleaseReader(pReader); } return code; @@ -4817,14 +5040,14 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { code = doTsdbNextDataBlock2(pReader, hasNext); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } if (*hasNext) { if (pStatus->composedDataBlock) { - qTrace("tsdb/read: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); + tsdbTrace("tsdb/read: %p, unlock read mutex", pReader); + code = tsdbReleaseReader(pReader); } return code; } @@ -4840,7 +5063,7 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { code = doTsdbNextDataBlock2(pReader->innerReader[1], hasNext); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } @@ -4848,17 +5071,16 @@ int32_t tsdbNextDataBlock2(STsdbReader* pReader, bool* hasNext) { if (*hasNext) { pStatus = &pReader->innerReader[1]->status; if (pStatus->composedDataBlock) { - qTrace("tsdb/read: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); + tsdbTrace("tsdb/read: %p, unlock read mutex", pReader); + code = tsdbReleaseReader(pReader); } return code; } } - qTrace("tsdb/read: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); - + tsdbTrace("tsdb/read: %p, unlock read mutex", pReader); + code = tsdbReleaseReader(pReader); return code; } @@ -4910,9 +5132,9 @@ static void doFillNullColSMA(SBlockLoadSuppInfo* pSup, int32_t numOfRows, int32_ } int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, bool* allHave, bool* hasNullSMA) { - SColumnDataAgg** pBlockSMA = &pDataBlock->pBlockAgg; - - int32_t code = 0; + SColumnDataAgg** pBlockSMA = &pDataBlock->pBlockAgg; + SFileDataBlockInfo* pBlockInfo = NULL; + int32_t code = 0; *allHave = false; *pBlockSMA = NULL; @@ -4925,7 +5147,11 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, return TSDB_CODE_SUCCESS; } - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pReader->status.blockIter); + code = getCurrentBlockInfo(&pReader->status.blockIter, &pBlockInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + SBlockLoadSuppInfo* pSup = &pReader->suppInfo; SSDataBlock* pResBlock = pReader->resBlockInfo.pResBlock; @@ -5005,38 +5231,45 @@ int32_t tsdbRetrieveDatablockSMA2(STsdbReader* pReader, SSDataBlock* pDataBlock, return code; } -static SSDataBlock* doRetrieveDataBlock(STsdbReader* pReader) { +static int32_t doRetrieveDataBlock(STsdbReader* pReader, SSDataBlock** pBlock) { SReaderStatus* pStatus = &pReader->status; int32_t code = TSDB_CODE_SUCCESS; - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(&pStatus->blockIter); + SFileDataBlockInfo* pBlockInfo = NULL; + *pBlock = NULL; - if (pReader->code != TSDB_CODE_SUCCESS) { - return NULL; + code = getCurrentBlockInfo(&pStatus->blockIter, &pBlockInfo); + if (code != TSDB_CODE_SUCCESS) { + return code; } - STableBlockScanInfo* pBlockScanInfo = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, pReader->idStr); - if (pBlockScanInfo == NULL) { - return NULL; + if (pReader->code != TSDB_CODE_SUCCESS) { + return pReader->code; + } + + STableBlockScanInfo* pBlockScanInfo = NULL; + code = getTableBlockScanInfo(pStatus->pTableMap, pBlockInfo->uid, &pBlockScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; } code = doLoadFileBlockData(pReader, &pStatus->blockIter, &pStatus->fileBlockData, pBlockScanInfo->uid); if (code != TSDB_CODE_SUCCESS) { tBlockDataReset(&pStatus->fileBlockData); - terrno = code; - return NULL; + return code; } code = copyBlockDataToSDataBlock(pReader, &pBlockScanInfo->lastProcKey); if (code != TSDB_CODE_SUCCESS) { tBlockDataReset(&pStatus->fileBlockData); - terrno = code; - return NULL; } - return pReader->resBlockInfo.pResBlock; + *pBlock = pReader->resBlockInfo.pResBlock; + return code; } -SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { +int32_t tsdbRetrieveDataBlock2(STsdbReader* pReader, SSDataBlock** pBlock, SArray* pIdList) { + *pBlock = NULL; + STsdbReader* pTReader = pReader; if (pReader->type == TIMEWINDOW_RANGE_EXTERNAL) { if (pReader->step == EXTERNAL_ROWS_PREV) { @@ -5050,39 +5283,40 @@ SSDataBlock* tsdbRetrieveDataBlock2(STsdbReader* pReader, SArray* pIdList) { if (pStatus->composedDataBlock || pReader->info.execMode == READER_EXEC_ROWS) { // tsdbReaderSuspend2(pReader); // tsdbReaderResume2(pReader); - - return pTReader->resBlockInfo.pResBlock; + *pBlock = pTReader->resBlockInfo.pResBlock; + return TSDB_CODE_SUCCESS; } - SSDataBlock* ret = doRetrieveDataBlock(pTReader); + int32_t code = doRetrieveDataBlock(pTReader, pBlock); - qTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader); - tsdbReleaseReader(pReader); + tsdbTrace("tsdb/read-retrieve: %p, unlock read mutex", pReader); + (void) tsdbReleaseReader(pReader); // tsdbReaderSuspend2(pReader); // tsdbReaderResume2(pReader); - - return ret; + return code; } int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { int32_t code = TSDB_CODE_SUCCESS; - qTrace("tsdb/reader-reset: %p, take read mutex", pReader); - tsdbAcquireReader(pReader); + tsdbTrace("tsdb/reader-reset: %p, take read mutex", pReader); + code = tsdbAcquireReader(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } if (pReader->flag == READER_STATUS_SUSPEND) { code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } } if (isEmptyQueryTimeWindow(&pReader->info.window) || pReader->pReadSnap == NULL) { tsdbDebug("tsdb reader reset return %p, %s", pReader->pReadSnap, pReader->idStr); - tsdbReleaseReader(pReader); - return TSDB_CODE_SUCCESS; + return tsdbReleaseReader(pReader); } SReaderStatus* pStatus = &pReader->status; @@ -5098,12 +5332,21 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { memset(&pReader->suppInfo.tsColAgg, 0, sizeof(SColumnDataAgg)); pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; - tsdbDataFileReaderClose(&pReader->pFileReader); + (void) tsdbDataFileReaderClose(&pReader->pFileReader); int32_t numOfTables = tSimpleHashGetSize(pStatus->pTableMap); - initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); - resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); + code = initFilesetIterator(&pStatus->fileIter, pReader->pReadSnap->pfSetArray, pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + code = resetDataBlockIterator(pBlockIter, pReader->info.order, shouldFreePkBuf(&pReader->suppInfo)); + if (code != TSDB_CODE_SUCCESS) { + (void) tsdbReleaseReader(pReader); + return code; + } + resetTableListIndex(&pReader->status); bool asc = ASCENDING_TRAVERSE(pReader->info.order); @@ -5126,7 +5369,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { tsdbError("%p reset reader failed, numOfTables:%d, query range:%" PRId64 " - %" PRId64 " in query %s", pReader, numOfTables, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } } @@ -5136,8 +5379,7 @@ int32_t tsdbReaderReset2(STsdbReader* pReader, SQueryTableDataCond* pCond) { pReader, pReader->info.suid, numOfTables, pCond->twindows.skey, pReader->info.window.skey, pReader->info.window.ekey, pReader->idStr); - tsdbReleaseReader(pReader); - + code = tsdbReleaseReader(pReader); return code; } @@ -5161,12 +5403,15 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT pTableBlockInfo->numOfVgroups = 1; // find the start data block in file - tsdbAcquireReader(pReader); + code = tsdbAcquireReader(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pReader->flag == READER_STATUS_SUSPEND) { code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); - return code; + return tsdbReleaseReader(pReader); } } @@ -5206,8 +5451,13 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT while (true) { if (hasNext) { - SFileDataBlockInfo* pBlockInfo = getCurrentBlockInfo(pBlockIter); - int32_t numOfRows = pBlockInfo->numRow; + SFileDataBlockInfo* pBlockInfo = NULL; + code = getCurrentBlockInfo(pBlockIter, &pBlockInfo); + if (code != TSDB_CODE_SUCCESS) { + break; + } + + int32_t numOfRows = pBlockInfo->numRow; pTableBlockInfo->totalRows += numOfRows; @@ -5243,7 +5493,7 @@ int32_t tsdbGetFileBlocksDistInfo2(STsdbReader* pReader, STableBlockDistInfo* pT } // record the data in stt files - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } @@ -5300,11 +5550,15 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { int64_t rows = 0; SReaderStatus* pStatus = &pReader->status; - tsdbAcquireReader(pReader); + code = tsdbAcquireReader(pReader); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pReader->flag == READER_STATUS_SUSPEND) { code = tsdbReaderResume2(pReader); if (code != TSDB_CODE_SUCCESS) { - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return code; } } @@ -5335,7 +5589,7 @@ int64_t tsdbGetNumOfRowsInMemTable2(STsdbReader* pReader) { pStatus->pTableIter = tSimpleHashIterate(pStatus->pTableMap, pStatus->pTableIter, &iter); } - tsdbReleaseReader(pReader); + (void) tsdbReleaseReader(pReader); return rows; } @@ -5382,12 +5636,16 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs SVersionRange* pRange = &pReader->info.verRange; // lock - taosThreadMutexLock(&pTsdb->mutex); + code = taosThreadMutexLock(&pTsdb->mutex); + if (code != TSDB_CODE_SUCCESS) { + tsdbError("failed to lock tsdb, code:%s", tstrerror(code)); + return code; + } // alloc STsdbReadSnap* pSnap = (STsdbReadSnap*)taosMemoryCalloc(1, sizeof(STsdbReadSnap)); if (pSnap == NULL) { - taosThreadMutexUnlock(&pTsdb->mutex); + (void) taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -5397,7 +5655,7 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pMem = pTsdb->mem; pSnap->pNode = taosMemoryMalloc(sizeof(*pSnap->pNode)); if (pSnap->pNode == NULL) { - taosThreadMutexUnlock(&pTsdb->mutex); + (void) taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -5405,14 +5663,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pNode->pQHandle = pReader; pSnap->pNode->reseek = reseek; - tsdbRefMemTable(pTsdb->mem, pSnap->pNode); + (void)tsdbRefMemTable(pTsdb->mem, pSnap->pNode); } if (pTsdb->imem && (pRange->minVer <= pTsdb->imem->maxVer && pRange->maxVer >= pTsdb->imem->minVer)) { pSnap->pIMem = pTsdb->imem; pSnap->pINode = taosMemoryMalloc(sizeof(*pSnap->pINode)); if (pSnap->pINode == NULL) { - taosThreadMutexUnlock(&pTsdb->mutex); + (void) taosThreadMutexUnlock(&pTsdb->mutex); code = TSDB_CODE_OUT_OF_MEMORY; goto _exit; } @@ -5420,14 +5678,14 @@ int32_t tsdbTakeReadSnap2(STsdbReader* pReader, _query_reseek_func_t reseek, STs pSnap->pINode->pQHandle = pReader; pSnap->pINode->reseek = reseek; - tsdbRefMemTable(pTsdb->imem, pSnap->pINode); + (void)tsdbRefMemTable(pTsdb->imem, pSnap->pINode); } // fs code = tsdbFSCreateRefSnapshotWithoutLock(pTsdb->pFS, &pSnap->pfSetArray); // unlock - taosThreadMutexUnlock(&pTsdb->mutex); + (void) taosThreadMutexUnlock(&pTsdb->mutex); if (code == TSDB_CODE_SUCCESS) { tsdbTrace("vgId:%d, take read snapshot", TD_VID(pTsdb->pVnode)); @@ -5455,17 +5713,17 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact if (pSnap) { if (pSnap->pMem) { - tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive); + (void) tsdbUnrefMemTable(pSnap->pMem, pSnap->pNode, proactive); } if (pSnap->pIMem) { - tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive); + (void) tsdbUnrefMemTable(pSnap->pIMem, pSnap->pINode, proactive); } if (pSnap->pNode) taosMemoryFree(pSnap->pNode); if (pSnap->pINode) taosMemoryFree(pSnap->pINode); - tsdbFSDestroyRefSnapshot(&pSnap->pfSetArray); + (void) tsdbFSDestroyRefSnapshot(&pSnap->pfSetArray); taosMemoryFree(pSnap); } @@ -5476,6 +5734,9 @@ void tsdbUntakeReadSnap2(STsdbReader* pReader, STsdbReadSnap* pSnap, bool proact void tsdbReaderSetId2(STsdbReader* pReader, const char* idstr) { taosMemoryFreeClear(pReader->idStr); pReader->idStr = taosStrdup(idstr); + if (pReader->idStr == NULL) { + // no need to do anything + } pReader->status.fileIter.pSttBlockReader->mergeTree.idStr = pReader->idStr; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 4bd9bee050..84f235ab25 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -112,22 +112,28 @@ void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf) { taosArrayDestroy(pBuf->pData); } -void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) { +int32_t getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index, STableBlockScanInfo** pInfo) { + *pInfo = NULL; + int32_t bucketIndex = index / pBuf->numPerBucket; char** pBucket = taosArrayGet(pBuf->pData, bucketIndex); - return (*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo); -} - -STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id) { - STableBlockScanInfo** p = tSimpleHashGet(pTableMap, &uid, sizeof(uid)); - if (p == NULL || *p == NULL) { - terrno = TSDB_CODE_INVALID_PARA; - int32_t size = tSimpleHashGetSize(pTableMap); - tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id); - return NULL; + if (pBucket == NULL) { + return TSDB_CODE_FAILED; } - return *p; + *pInfo = (STableBlockScanInfo*)((*pBucket) + (index % pBuf->numPerBucket) * sizeof(STableBlockScanInfo)); + return TSDB_CODE_SUCCESS; +} + +int32_t getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, STableBlockScanInfo** pInfo, const char* id) { + *pInfo = *(STableBlockScanInfo**)tSimpleHashGet(pTableMap, &uid, sizeof(uid)); + if (pInfo == NULL) { + int32_t size = tSimpleHashGetSize(pTableMap); + tsdbError("failed to locate the uid:%" PRIu64 " in query table uid list, total tables:%d, %s", uid, size, id); + return TSDB_CODE_INVALID_PARA; + } + + return TSDB_CODE_SUCCESS; } int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc) { @@ -146,17 +152,17 @@ int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, in } case TSDB_DATA_TYPE_INT:{ int32_t min = INT32_MIN; - memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); + (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; } case TSDB_DATA_TYPE_SMALLINT:{ int16_t min = INT16_MIN; - memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); + (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; } case TSDB_DATA_TYPE_TINYINT:{ int8_t min = INT8_MIN; - memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); + (void)memcpy(&pKey->pks[0].val, &min, tDataTypes[type].bytes); break; } case TSDB_DATA_TYPE_UTINYINT: @@ -245,20 +251,27 @@ int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSH initLastProcKey(pScanInfo, pReader); pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; - tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); + int32_t code = tSimpleHashPut(pTableMap, &pScanInfo->uid, sizeof(uint64_t), &pScanInfo, POINTER_BYTES); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + tsdbTrace("%p check table uid:%" PRId64 " from lastKey:%" PRId64 " %s", pReader, pScanInfo->uid, pScanInfo->lastProcKey.ts, pReader->idStr); - return TSDB_CODE_SUCCESS; + return code; } // NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model -SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, - STableUidList* pUidList, int32_t numOfTables) { +int32_t createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, + STableUidList* pUidList, int32_t numOfTables, SSHashObj** pHashObj) { + int32_t code = 0; + *pHashObj = NULL; + // allocate buffer in order to load data blocks from file // todo use simple hash instead, optimize the memory consumption SSHashObj* pTableMap = tSimpleHashInit(numOfTables, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT)); if (pTableMap == NULL) { - return NULL; + return terrno; } int64_t st = taosGetTimestampUs(); @@ -267,7 +280,7 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf pUidList->tableUidList = taosMemoryMalloc(numOfTables * sizeof(uint64_t)); if (pUidList->tableUidList == NULL) { tSimpleHashCleanup(pTableMap); - return NULL; + return TSDB_CODE_OUT_OF_MEMORY; } pUidList->currentIndex = 0; @@ -275,8 +288,16 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf for (int32_t j = 0; j < numOfTables; ++j) { pUidList->tableUidList[j] = idList[j].uid; - STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j); - initTableBlockScanInfo(pScanInfo, idList[j].uid, pTableMap, pTsdbReader); + STableBlockScanInfo* pScanInfo = NULL; + code = getPosInBlockInfoBuf(pBuf, j, &pScanInfo); + if (code != TSDB_CODE_SUCCESS) { + break; + } + + code = initTableBlockScanInfo(pScanInfo, idList[j].uid, pTableMap, pTsdbReader); + if (code != TSDB_CODE_SUCCESS) { + break; + } } taosSort(pUidList->tableUidList, numOfTables, sizeof(uint64_t), uidComparFunc); @@ -286,7 +307,8 @@ SSHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf (sizeof(STableBlockScanInfo) * numOfTables) / 1024.0, pTsdbReader->cost.createScanInfoList, pTsdbReader->idStr); - return pTableMap; + *pHashObj = pTableMap; + return code; } void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step) { @@ -391,11 +413,13 @@ void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray pIter->pBrinBlockList = pList; } -SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) { +int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord) { + *pRecord = NULL; + if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= pIter->block.numOfRecords) { pIter->blockIndex += 1; if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) { - return NULL; + return TSDB_CODE_FAILED; } pIter->pCurrentBlk = taosArrayGet(pIter->pBrinBlockList, pIter->blockIndex); @@ -404,7 +428,7 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) { int32_t code = tsdbDataFileReadBrinBlock(pIter->pReader, pIter->pCurrentBlk, &pIter->block); if (code != TSDB_CODE_SUCCESS) { tsdbError("failed to read brinBlock from file, code:%s", tstrerror(code)); - return NULL; + return TSDB_CODE_FAILED; } pIter->recordIndex = -1; @@ -412,7 +436,9 @@ SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) { pIter->recordIndex += 1; tBrinBlockGet(&pIter->block, pIter->recordIndex, &pIter->record); - return &pIter->record; + *pRecord = &pIter->record; + + return TSDB_CODE_SUCCESS; } void clearBrinBlockIter(SBrinRecordIter* pIter) { tBrinBlockDestroy(&pIter->block); } @@ -670,7 +696,13 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ STombRecord record = {0}; uint64_t uid = pReader->status.uidList.tableUidList[*j]; - STableBlockScanInfo* pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + STableBlockScanInfo* pScanInfo = NULL; + + code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pScanInfo->pFileDelData == NULL) { pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); } @@ -702,7 +734,11 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ } uid = pReader->status.uidList.tableUidList[*j]; - pScanInfo = getTableBlockScanInfo(pReader->status.pTableMap, uid, pReader->idStr); + code = getTableBlockScanInfo(pReader->status.pTableMap, uid, &pScanInfo, pReader->idStr); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + if (pScanInfo->pFileDelData == NULL) { pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); } @@ -806,9 +842,12 @@ int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileRead return doLoadTombDataFromTombBlk(pBlkArray, pReader, pSttFileReader, false); } -void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { +int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver) { if (*ppMemDelData == NULL) { *ppMemDelData = taosArrayInit(4, sizeof(SDelData)); + if (*ppMemDelData == NULL) { + return TSDB_CODE_SUCCESS; + } } SArray* pMemDelData = *ppMemDelData; @@ -836,6 +875,8 @@ void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemT p = p->pNext; } } + + return TSDB_CODE_SUCCESS; } int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, @@ -960,6 +1001,7 @@ int32_t adjustSttDataIters(SArray* pSttFileBlockIterArray, STFileSet* pFileSet) int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArray, STsdb* pTsdb, SMergeTreeConf* pConf, const char* pstr) { int32_t numOfRows = 0; + int32_t code = 0; // no data exists, go to end int32_t numOfLevels = pFileSet->lvlArr->size; @@ -984,7 +1026,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra conf.file[0] = *pSttLevel->fobjArr->data[i]->f; const char* pName = pSttLevel->fobjArr->data[i]->fname; - int32_t code = tsdbSttFileReaderOpen(pName, &conf, &pIter->pReader); + code = tsdbSttFileReaderOpen(pName, &conf, &pIter->pReader); if (code != TSDB_CODE_SUCCESS) { tsdbError("open stt file reader error. file:%s, code %s, %s", pName, tstrerror(code), pstr); continue; @@ -992,8 +1034,8 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra } if (pIter->pBlockLoadInfo == NULL) { - pIter->pBlockLoadInfo = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols); - if (pIter->pBlockLoadInfo == NULL) { + code = tCreateSttBlockLoadInfo(pConf->pSchema, pConf->pCols, pConf->numOfCols, &pIter->pBlockLoadInfo); + if (code != TSDB_CODE_SUCCESS) { tsdbError("failed to create block load info, code: out of memory, %s", pstr); continue; } @@ -1001,7 +1043,7 @@ int32_t tsdbGetRowsInSttFiles(STFileSet* pFileSet, SArray* pSttFileBlockIterArra // load stt blocks statis for all stt-blocks, to decide if the data of queried table exists in current stt file TStatisBlkArray* pStatisBlkArray = NULL; - int32_t code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray**)&pStatisBlkArray); + code = tsdbSttFileReadStatisBlk(pIter->pReader, (const TStatisBlkArray**)&pStatisBlkArray); if (code != TSDB_CODE_SUCCESS) { tsdbError("failed to load stt block statistics, code:%s, %s", tstrerror(code), pstr); continue; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h index 6c76f8c372..45a2384b65 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.h +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.h @@ -315,32 +315,31 @@ typedef struct SBrinRecordIter { } SBrinRecordIter; int32_t uidComparFunc(const void* p1, const void* p2); +int32_t getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, STableBlockScanInfo** pInfo, const char* id); -STableBlockScanInfo* getTableBlockScanInfo(SSHashObj* pTableMap, uint64_t uid, const char* id); - -SSHashObj* createDataBlockScanInfo(STsdbReader* pReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, - STableUidList* pUidList, int32_t numOfTables); -int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap, - STsdbReader* pReader); -void clearBlockScanInfo(STableBlockScanInfo* p); -void destroyAllBlockScanInfo(SSHashObj* pTableMap); -void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step); -void cleanupInfoForNextFileset(SSHashObj* pTableMap); -int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables); -void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf); -void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index); +int32_t createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, + STableUidList* pUidList, int32_t numOfTables, SSHashObj** pHashObj); +int32_t initTableBlockScanInfo(STableBlockScanInfo* pScanInfo, uint64_t uid, SSHashObj* pTableMap, + STsdbReader* pReader); +void clearBlockScanInfo(STableBlockScanInfo* p); +void destroyAllBlockScanInfo(SSHashObj* pTableMap); +void resetAllDataBlockScanInfo(SSHashObj* pTableMap, int64_t ts, int32_t step); +void cleanupInfoForNextFileset(SSHashObj* pTableMap); +int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables); +void clearBlockScanInfoBuf(SBlockInfoBuf* pBuf); +int32_t getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index, STableBlockScanInfo** pRes); // brin records iterator -void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList); -SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter); -void clearBrinBlockIter(SBrinRecordIter* pIter); +void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray* pList); +int32_t getNextBrinRecord(SBrinRecordIter* pIter, SBrinRecord** pRecord); +void clearBrinBlockIter(SBrinRecordIter* pIter); // initialize block iterator API int32_t initBlockIterator(STsdbReader* pReader, SDataBlockIter* pBlockIter, int32_t numOfBlocks, SArray* pTableList); bool blockIteratorNext(SDataBlockIter* pBlockIter, const char* idStr); // load tomb data API (stt/mem only for one table each, tomb data from data files are load for all tables at one time) -void loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); +int32_t loadMemTombData(SArray** ppMemDelData, STbData* pMemTbData, STbData* piMemTbData, int64_t ver); int32_t loadDataFileTombDataForAll(STsdbReader* pReader); int32_t loadSttTombDataForAll(STsdbReader* pReader, SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pLoadInfo); int32_t getNumOfRowsInSttBlock(SSttFileReader* pSttFileReader, SSttBlockLoadInfo* pBlockLoadInfo, @@ -358,10 +357,10 @@ int32_t pkCompEx(SRowKey* p1, SRowKey* p2); int32_t initRowKey(SRowKey* pKey, int64_t ts, int32_t numOfPks, int32_t type, int32_t len, bool asc); void clearRowKey(SRowKey* pKey); -bool shouldFreePkBuf(SBlockLoadSuppInfo *pSupp); -void resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); -void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree); -void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); +bool shouldFreePkBuf(SBlockLoadSuppInfo* pSupp); +int32_t resetDataBlockIterator(SDataBlockIter* pIter, int32_t order, bool hasPk); +void clearDataBlockIterator(SDataBlockIter* pIter, bool needFree); +void cleanupDataBlockIterator(SDataBlockIter* pIter, bool hasPk); typedef struct { SArray* pTombData; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index ce4915ca4d..f6c5118c88 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -388,9 +388,10 @@ static int32_t loadDataBlock(SOperatorInfo* pOperator, STableScanBase* pTableSca pCost->totalCheckedRows += pBlock->info.rows; pCost->loadBlocks += 1; - SSDataBlock* p = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, NULL); - if (p == NULL) { - return terrno; + SSDataBlock* p = NULL; + int32_t code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pTableScanInfo->dataReader, &p, NULL); + if (p == NULL || code != TSDB_CODE_SUCCESS) { + return code; } ASSERT(p == pBlock); @@ -1351,7 +1352,12 @@ static SSDataBlock* readPreVersionData(SOperatorInfo* pTableScanOp, uint64_t tbU } if (hasNext) { - /*SSDataBlock* p = */ pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, NULL); + SSDataBlock* p = NULL; + code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pReader, &p, NULL); + if (code != TSDB_CODE_SUCCESS) { + return NULL; + } + doSetTagColumnData(&pTableScanInfo->base, pBlock, pTaskInfo, pBlock->info.rows); pBlock->info.id.groupId = tableListGetTableGroupId(pTableScanInfo->base.pTableListInfo, pBlock->info.id.uid); } @@ -2929,8 +2935,9 @@ static SSDataBlock* doRawScan(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - SSDataBlock* pBlock = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, NULL); - if (pBlock == NULL) { + SSDataBlock* pBlock = NULL; + code = pAPI->tsdReader.tsdReaderRetrieveDataBlock(pInfo->dataReader, &pBlock, NULL); + if (pBlock == NULL || code != TSDB_CODE_SUCCESS) { T_LONG_JMP(pTaskInfo->env, terrno); } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 23bd2b87a1..41d398888e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1139,7 +1139,6 @@ void destroyIntervalOperatorInfo(void* param) { taosArrayDestroy(pInfo->pInterpCols); pInfo->pInterpCols = NULL; taosArrayDestroyEx(pInfo->pPrevValues, freeItem); - pInfo->pPrevValues = NULL; cleanupGroupResInfo(&pInfo->groupResInfo); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 620f5f1248..3ea3e814ed 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -310,7 +310,7 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) { taosArrayDestroy(pInfo->pList); pInfo->pList = NULL; - + if (pInfo->checkRspTmr != NULL) { /*bool ret = */ taosTmrStop(pInfo->checkRspTmr); pInfo->checkRspTmr = NULL; diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 3d8588a55a..009854b45b 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -74,16 +74,16 @@ typedef struct SRaftStore { TdThreadMutex mutex; } SRaftStore; -typedef struct SSyncHbTimerData { +struct SSyncHbTimerData { int64_t syncNodeRid; SSyncTimer* pTimer; SRaftId destId; uint64_t logicClock; int64_t execTime; int64_t rid; -} SSyncHbTimerData; +}; -typedef struct SSyncTimer { +struct SSyncTimer { void* pTimer; TAOS_TMR_CALLBACK timerCb; uint64_t logicClock; @@ -92,7 +92,7 @@ typedef struct SSyncTimer { int64_t timeStamp; SRaftId destId; int64_t hbDataRid; -} SSyncTimer; +}; typedef struct SElectTimerParam { uint64_t logicClock; @@ -106,7 +106,7 @@ typedef struct SPeerState { int64_t lastSendTime; } SPeerState; -typedef struct SSyncNode { +struct SSyncNode { // init by SSyncInfo SyncGroupId vgId; SRaftCfg raftCfg; @@ -116,7 +116,7 @@ typedef struct SSyncNode { // sync io SSyncLogBuffer* pLogBuf; - SWal* pWal; + struct SWal* pWal; const SMsgCb* msgcb; int32_t (*syncSendMSg)(const SEpSet* pEpSet, SRpcMsg* pMsg); int32_t (*syncEqMsg)(const SMsgCb* msgcb, SRpcMsg* pMsg); @@ -234,7 +234,7 @@ typedef struct SSyncNode { bool isStart; -} SSyncNode; +}; // open/close -------------- SSyncNode* syncNodeOpen(SSyncInfo* pSyncInfo, int32_t vnodeVersion); diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index e62ca7d69a..acb9bd20f3 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -103,7 +103,7 @@ typedef void* queue[2]; #define TRANS_MAGIC_NUM 0x5f375a86 #define TRANS_NOVALID_PACKET(src) ((src) != TRANS_MAGIC_NUM ? 1 : 0) -typedef SRpcMsg STransMsg; +typedef struct SRpcMsg STransMsg; typedef SRpcCtx STransCtx; typedef SRpcCtxVal STransCtxVal; typedef SRpcInfo STrans; diff --git a/tests/script/tsim/insert/update2.sim b/tests/script/tsim/insert/update2.sim index d2b5b05267..8ea767da0e 100644 --- a/tests/script/tsim/insert/update2.sim +++ b/tests/script/tsim/insert/update2.sim @@ -120,7 +120,8 @@ endi if $data01 != 2 then return -1 endi -if $data02 != 1 then +if $data02 != 1 then + print expect 1 , actual $data02 return -1 endi if $data03 != 1 then