diff --git a/include/libs/executor/storageapi.h b/include/libs/executor/storageapi.h index edb6ec9cbf..0a33798f0c 100644 --- a/include/libs/executor/storageapi.h +++ b/include/libs/executor/storageapi.h @@ -381,7 +381,7 @@ typedef struct SStateStore { void** ppVal, int32_t* pVLen); int32_t (*streamStateCountWinAdd)(SStreamState* pState, SSessionKey* pKey, void** pVal, int32_t* pVLen); - SUpdateInfo* (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen); + int32_t (*updateInfoInit)(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); TSKEY (*updateInfoFillBlockData)(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol); bool (*updateInfoIsUpdated)(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); bool (*updateInfoIsTableInserted)(SUpdateInfo* pInfo, int64_t tbUid); @@ -389,12 +389,12 @@ typedef struct SStateStore { void (*updateInfoDestroy)(SUpdateInfo* pInfo); void (*windowSBfDelete)(SUpdateInfo* pInfo, uint64_t count); - void (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count); + int32_t (*windowSBfAdd)(SUpdateInfo* pInfo, uint64_t count); - SUpdateInfo* (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen); + int32_t (*updateInfoInitP)(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); void (*updateInfoAddCloseWindowSBF)(SUpdateInfo* pInfo); void (*updateInfoDestoryColseWinSBF)(SUpdateInfo* pInfo); - int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo); + int32_t (*updateInfoSerialize)(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen); int32_t (*updateInfoDeserialize)(void* buf, int32_t bufLen, SUpdateInfo* pInfo); SStreamStateCur* (*streamStateSessionSeekKeyNext)(SStreamState* pState, const SSessionKey* key); diff --git a/include/libs/stream/tstreamUpdate.h b/include/libs/stream/tstreamUpdate.h index 632a82743f..e1b33de429 100644 --- a/include/libs/stream/tstreamUpdate.h +++ b/include/libs/stream/tstreamUpdate.h @@ -15,29 +15,29 @@ #ifndef _TSTREAMUPDATE_H_ #define _TSTREAMUPDATE_H_ +#include "storageapi.h" #include "taosdef.h" #include "tarray.h" #include "tcommon.h" #include "tmsg.h" -#include "storageapi.h" #ifdef __cplusplus extern "C" { #endif -SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen); -SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen); -TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol, int32_t primaryKeyCol); -bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); -bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid); -void updateInfoDestroy(SUpdateInfo *pInfo); -void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo); -void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo); -int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo); -int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo); -void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count); -void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count); -bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); +int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); +int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, SUpdateInfo** ppInfo); +TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol); +bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); +bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid); +void updateInfoDestroy(SUpdateInfo* pInfo); +void updateInfoAddCloseWindowSBF(SUpdateInfo* pInfo); +void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo); +int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen); +int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo); +void windowSBfDelete(SUpdateInfo* pInfo, uint64_t count); +int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count); +bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len); #ifdef __cplusplus } diff --git a/include/util/tbloomfilter.h b/include/util/tbloomfilter.h index 23bf5aaafb..f0450cb862 100644 --- a/include/util/tbloomfilter.h +++ b/include/util/tbloomfilter.h @@ -19,6 +19,8 @@ #include "os.h" #include "tencode.h" #include "thash.h" +#include "tlog.h" +#include "tutil.h" #ifdef __cplusplus extern "C" { @@ -35,19 +37,19 @@ typedef struct SBloomFilter { uint64_t size; _hash_fn_t hashFn1; _hash_fn_t hashFn2; - void *buffer; + void* buffer; double errorRate; } SBloomFilter; -SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate); -int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2); -int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len); -int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t h1, uint64_t h2); -void tBloomFilterDestroy(SBloomFilter *pBF); -void tBloomFilterDump(const SBloomFilter *pBF); -bool tBloomFilterIsFull(const SBloomFilter *pBF); -int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder *pEncoder); -SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder); +int32_t tBloomFilterInit(uint64_t expectedEntries, double errorRate, SBloomFilter** ppBF); +int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2); +int32_t tBloomFilterPut(SBloomFilter* pBF, const void* keyBuf, uint32_t len); +int32_t tBloomFilterNoContain(const SBloomFilter* pBF, uint64_t h1, uint64_t h2); +void tBloomFilterDestroy(SBloomFilter* pBF); +void tBloomFilterDump(const SBloomFilter* pBF); +bool tBloomFilterIsFull(const SBloomFilter* pBF); +int32_t tBloomFilterEncode(const SBloomFilter* pBF, SEncoder* pEncoder); +int32_t tBloomFilterDecode(SDecoder* pDecoder, SBloomFilter** ppBF); #ifdef __cplusplus } diff --git a/include/util/tscalablebf.h b/include/util/tscalablebf.h index d3ce2eb23b..f82493aca2 100644 --- a/include/util/tscalablebf.h +++ b/include/util/tscalablebf.h @@ -23,22 +23,22 @@ extern "C" { #endif typedef struct SScalableBf { - SArray *bfArray; // array of bloom filters - uint32_t growth; - uint64_t numBits; - uint32_t maxBloomFilters; - int8_t status; + SArray* bfArray; // array of bloom filters + uint32_t growth; + uint64_t numBits; + uint32_t maxBloomFilters; + int8_t status; _hash_fn_t hashFn1; _hash_fn_t hashFn2; } SScalableBf; -SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate); -int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len); -int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len); -int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len); -void tScalableBfDestroy(SScalableBf *pSBf); -int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder); -SScalableBf *tScalableBfDecode(SDecoder *pDecoder); +int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf** ppSBf); +int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t len); +int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len); +int32_t tScalableBfNoContain(const SScalableBf* pSBf, const void* keyBuf, uint32_t len); +void tScalableBfDestroy(SScalableBf* pSBf); +int32_t tScalableBfEncode(const SScalableBf* pSBf, SEncoder* pEncoder); +int32_t tScalableBfDecode(SDecoder* pDecoder, SScalableBf** ppSBf); #ifdef __cplusplus } diff --git a/include/util/tutil.h b/include/util/tutil.h index d1fc11d0fe..31ce34f667 100644 --- a/include/util/tutil.h +++ b/include/util/tutil.h @@ -21,6 +21,7 @@ #include "tdef.h" #include "thash.h" #include "tmd5.h" +#include "tutil.h" #ifdef __cplusplus extern "C" { diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 9547fa12f3..6db3c780e2 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -929,7 +929,7 @@ int32_t initStreamAggSupporter(SStreamAggSupporter* pSup, SExprSupp* pExpSup, i SStreamState* pState, int32_t keySize, int16_t keyType, SStateStore* pStore, SReadHandle* pHandle, STimeWindowAggSupp* pTwAggSup, const char* taskIdStr, SStorageAPI* pApi, int32_t tsIndex); -void initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, +int32_t initDownStream(struct SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic); void getMaxTsWins(const SArray* pAllWins, SArray* pMaxWins); void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList); diff --git a/source/libs/executor/inc/streamexecutorInt.h b/source/libs/executor/inc/streamexecutorInt.h index 8b61b93601..c6053cc96e 100644 --- a/source/libs/executor/inc/streamexecutorInt.h +++ b/source/libs/executor/inc/streamexecutorInt.h @@ -20,6 +20,7 @@ extern "C" { #endif #include "executorInt.h" +#include "tutil.h" void setStreamOperatorState(SSteamOpBasicInfo* pBasicInfo, EStreamType type); bool needSaveStreamOperatorInfo(SSteamOpBasicInfo* pBasicInfo); diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index f0fddc28fc..c7236a88e8 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -17,6 +17,7 @@ #include "function.h" #include "os.h" #include "tname.h" +#include "tutil.h" #include "tdatablock.h" #include "tmsg.h" @@ -1357,11 +1358,13 @@ static void destroyStreamPartitionOperatorInfo(void* param) { taosMemoryFreeClear(param); } -void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, SExprSupp* pTbnameExpr) { +int32_t initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup, SExprSupp* pExpr, SExprSupp* pTbnameExpr) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStorageAPI* pAPI = &downstream->pTaskInfo->storageAPI; if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - return; + return code; } SStreamScanInfo* pScanInfo = downstream->info; @@ -1369,8 +1372,9 @@ void initParDownStream(SOperatorInfo* downstream, SPartitionBySupporter* pParSup pScanInfo->pPartScalarSup = pExpr; pScanInfo->pPartTbnameSup = pTbnameExpr; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen); + code = pAPI->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, 0, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo); } + return code; } SSDataBlock* buildCreateTableBlock(SExprSupp* tbName, SExprSupp* tag) { @@ -1421,6 +1425,7 @@ void freePartItem(void* ptr) { SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo) { int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamPartitionOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamPartitionOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -1511,14 +1516,19 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState); - initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup); + code = initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup, &pInfo->tbnameCalSup); + TSDB_CHECK_CODE(code, lino, _error); + code = appendDownstream(pOperator, &downstream, 1); + TSDB_CHECK_CODE(code, lino, _error); + return pOperator; _error: pTaskInfo->code = code; destroyStreamPartitionOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return NULL; } diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index f6c5118c88..678d8d569e 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -2483,27 +2483,54 @@ static void doCheckUpdate(SStreamScanInfo* pInfo, TSKEY endKey, SSDataBlock* pBl } } -int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff) { - int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo); - len += encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup); - *pBuff = taosMemoryCalloc(1, len); - void* buf = *pBuff; - encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup); - pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo); - return len; +int32_t streamScanOperatorEncode(SStreamScanInfo* pInfo, void** pBuff, int32_t* pLen) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + int32_t len = 0; + code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len); + TSDB_CHECK_CODE(code, lino, _end); + + len += encodeSTimeWindowAggSupp(NULL, &pInfo->twAggSup); + *pBuff = taosMemoryCalloc(1, len); + if (!(*pBuff)) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } + void* buf = *pBuff; + encodeSTimeWindowAggSupp(&buf, &pInfo->twAggSup); + int32_t tmp = 0; + code = pInfo->stateStore.updateInfoSerialize(buf, len, pInfo->pUpdateInfo, &tmp); + TSDB_CHECK_CODE(code, lino, _end); + + *pLen = len; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } void streamScanOperatorSaveCheckpoint(SStreamScanInfo* pInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (!pInfo->pState) { return; } if (needSaveStreamOperatorInfo(&pInfo->basic)) { void* pBuf = NULL; - int32_t len = streamScanOperatorEncode(pInfo, &pBuf); + int32_t len = 0; + code = streamScanOperatorEncode(pInfo, &pBuf, &len); + TSDB_CHECK_CODE(code, lino, _end); pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_CHECKPOINT_NAME, strlen(STREAM_SCAN_OP_CHECKPOINT_NAME), pBuf, len); taosMemoryFree(pBuf); saveStreamOperatorStateComplete(&pInfo->basic); } + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } } // other properties are recovered from the execution plan @@ -3101,17 +3128,35 @@ static void destroyStreamScanOperatorInfo(void* param) { } void streamScanReleaseState(SOperatorInfo* pOperator) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamScanInfo* pInfo = pOperator->info; + void* pBuff = NULL; if (!pInfo->pState) { return; } if (!pInfo->pUpdateInfo) { return; } - int32_t len = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo); - void* pBuff = taosMemoryCalloc(1, len); - pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo); + int32_t len = 0; + code = pInfo->stateStore.updateInfoSerialize(NULL, 0, pInfo->pUpdateInfo, &len); + TSDB_CHECK_CODE(code, lino, _end); + + pBuff = taosMemoryCalloc(1, len); + if (!pBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; +TSDB_CHECK_CODE(code, lino, _end); + } + + int32_t tmp = 0; + code = pInfo->stateStore.updateInfoSerialize(pBuff, len, pInfo->pUpdateInfo, &tmp); + TSDB_CHECK_CODE(code, lino, _end); + pInfo->stateStore.streamStateSaveInfo(pInfo->pState, STREAM_SCAN_OP_STATE_NAME, strlen(STREAM_SCAN_OP_STATE_NAME), pBuff, len); +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } taosMemoryFree(pBuff); } diff --git a/source/libs/executor/src/streamcountwindowoperator.c b/source/libs/executor/src/streamcountwindowoperator.c index 857d048457..14f2dfbada 100644 --- a/source/libs/executor/src/streamcountwindowoperator.c +++ b/source/libs/executor/src/streamcountwindowoperator.c @@ -68,9 +68,7 @@ void destroyStreamCountAggOperatorInfo(void* param) { taosMemoryFreeClear(param); } -bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { - return pAggSup->windowCount != pAggSup->windowSliding; -} +bool isSlidingCountWindow(SStreamAggSupporter* pAggSup) { return pAggSup->windowCount != pAggSup->windowSliding; } void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, SCountWindowInfo* pCurWin, SBuffInfo* pBuffInfo) { @@ -89,13 +87,14 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, ASSERT(pBuffInfo->pCur); pAggSup->stateStore.streamStateCurNext(pAggSup->pState, pBuffInfo->pCur); code = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin, - (void**)&pCurWin->winInfo.pStatePos, &size); + (void**)&pCurWin->winInfo.pStatePos, &size); if (code == TSDB_CODE_FAILED) { pAggSup->stateStore.streamStateCountWinAdd(pAggSup->pState, &pCurWin->winInfo.sessionWin, (void**)&pCurWin->winInfo.pStatePos, &size); } } else { - pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount); + pBuffInfo->pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin, + pAggSup->windowCount); code = pAggSup->stateStore.streamStateSessionGetKVByCur(pBuffInfo->pCur, &pCurWin->winInfo.sessionWin, (void**)&pCurWin->winInfo.pStatePos, &size); if (code == TSDB_CODE_FAILED) { @@ -107,15 +106,16 @@ void setCountOutputBuf(SStreamAggSupporter* pAggSup, TSKEY ts, uint64_t groupId, pBuffInfo->rebuildWindow = true; } } else { - code = pAggSup->stateStore.streamStateCountWinAddIfNotExist( - pAggSup->pState, &pCurWin->winInfo.sessionWin, pAggSup->windowCount, (void**)&pCurWin->winInfo.pStatePos, &size); + code = pAggSup->stateStore.streamStateCountWinAddIfNotExist(pAggSup->pState, &pCurWin->winInfo.sessionWin, + pAggSup->windowCount, + (void**)&pCurWin->winInfo.pStatePos, &size); } if (code == TSDB_CODE_SUCCESS) { pCurWin->winInfo.isOutput = true; } - pCurWin->pWindowCount= - (COUNT_TYPE*) ((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - sizeof(COUNT_TYPE))); + pCurWin->pWindowCount = + (COUNT_TYPE*)((char*)pCurWin->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - sizeof(COUNT_TYPE))); if (*pCurWin->pWindowCount == pAggSup->windowCount) { pBuffInfo->rebuildWindow = true; @@ -133,7 +133,7 @@ static int32_t updateCountWindowInfo(SStreamAggSupporter* pAggSup, SCountWindowI int32_t start, int32_t rows, int32_t maxRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) { SSessionKey sWinKey = pWinInfo->winInfo.sessionWin; - int32_t num = 0; + int32_t num = 0; for (int32_t i = start; i < rows; i++) { if (pTs[i] < pWinInfo->winInfo.sessionWin.win.ekey) { num++; @@ -181,7 +181,7 @@ void getCountWinRange(SStreamAggSupporter* pAggSup, const SSessionKey* pKey, ESt pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentNext(pAggSup->pState, pKey); } SSessionKey tmpKey = {.groupId = pKey->groupId, .win.ekey = INT64_MIN, .win.skey = INT64_MIN}; - int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); + int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, NULL, 0); if (code != TSDB_CODE_SUCCESS) { pAggSup->stateStore.streamStateFreeCur(pCur); return; @@ -239,7 +239,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl TSKEY* startTsCols = (int64_t*)pStartTsCol->pData; blockDataEnsureCapacity(pAggSup->pScanBlock, rows * 2); SStreamStateCur* pCur = NULL; - COUNT_TYPE slidingRows = 0; + COUNT_TYPE slidingRows = 0; for (int32_t i = 0; i < rows;) { if (pInfo->ignoreExpiredData && @@ -269,7 +269,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl } getCountWinRange(pAggSup, &curWin.winInfo.sessionWin, STREAM_DELETE_DATA, &range); range.win.skey = TMIN(startTsCols[i], range.win.skey); - range.win.ekey = TMAX(startTsCols[rows-1], range.win.ekey); + range.win.ekey = TMAX(startTsCols[rows - 1], range.win.ekey); uint64_t uid = 0; appendDataToSpecialBlock(pAggSup->pScanBlock, &range.win.skey, &range.win.ekey, &uid, &range.groupId, NULL); break; @@ -289,8 +289,7 @@ static void doStreamCountAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pStUpdated) { code = saveResult(curWin.winInfo, pStUpdated); if (code != TSDB_CODE_SUCCESS) { - qError("%s do stream count aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), - tstrerror(code)); + qError("%s do stream count aggregate impl, set result error, code %s", GET_TASKID(pTaskInfo), tstrerror(code)); break; } } @@ -397,13 +396,14 @@ void* doStreamCountDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato int32_t mapSize = 0; buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { - SSessionKey key = {0}; + SSessionKey key = {0}; SCountWindowInfo curWin = {0}; buf = decodeSSessionKey(buf, &key); SBuffInfo buffInfo = {.rebuildWindow = false, .winBuffOp = NONE_WINDOW, .pCur = NULL}; setCountOutputBuf(&pInfo->streamAggSup, key.win.skey, key.groupId, &curWin, &buffInfo); buf = decodeSResultWindowInfo(buf, &curWin.winInfo, pInfo->streamAggSup.resultRowSize); - tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo)); + tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, + sizeof(SResultWindowInfo)); } // 2.twAggSup @@ -441,9 +441,9 @@ void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) { uint64_t* gpDatas = (uint64_t*)pGroupCol->pData; SRowBuffPos* pPos = NULL; - int32_t size = 0; + int32_t size = 0; for (int32_t i = 0; i < pBlock->info.rows; i++) { - SSessionKey key = {.groupId = gpDatas[i], .win.skey = startDatas[i], .win.ekey = endDatas[i]}; + SSessionKey key = {.groupId = gpDatas[i], .win.skey = startDatas[i], .win.ekey = endDatas[i]}; SStreamStateCur* pCur = NULL; if (isSlidingCountWindow(pAggSup)) { pCur = pAggSup->stateStore.streamStateCountSeekKeyPrev(pAggSup->pState, &key, pAggSup->windowCount); @@ -452,7 +452,7 @@ void doResetCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock) { } while (1) { SSessionKey tmpKey = {.groupId = gpDatas[i], .win.skey = INT64_MIN, .win.ekey = INT64_MIN}; - int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, (void **)&pPos, &size); + int32_t code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &tmpKey, (void**)&pPos, &size); if (code != TSDB_CODE_SUCCESS || tmpKey.win.skey > endDatas[i]) { pAggSup->stateStore.streamStateFreeCur(pCur); break; @@ -482,7 +482,7 @@ void doDeleteCountWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SAr SSessionKey key = {.win.skey = startDatas[i], .win.ekey = endDatas[i], .groupId = gpDatas[i]}; while (1) { SSessionKey curWin = {0}; - int32_t code = pAggSup->stateStore.streamStateCountGetKeyByRange(pAggSup->pState, &key, &curWin); + int32_t code = pAggSup->stateStore.streamStateCountGetKeyByRange(pAggSup->pState, &key, &curWin); if (code == TSDB_CODE_FAILED) { break; } @@ -511,11 +511,11 @@ void deleteCountWinState(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SSHa } static SSDataBlock* doStreamCountAgg(SOperatorInfo* pOperator) { - SExprSupp* pSup = &pOperator->exprSupp; - SStreamCountAggOperatorInfo* pInfo = pOperator->info; - SOptrBasicInfo* pBInfo = &pInfo->binfo; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SExprSupp* pSup = &pOperator->exprSupp; + SStreamCountAggOperatorInfo* pInfo = pOperator->info; + SOptrBasicInfo* pBInfo = &pInfo->binfo; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; qDebug("stask:%s %s status: %d", GET_TASKID(pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); if (pOperator->status == OP_EXEC_DONE) { return NULL; @@ -635,7 +635,7 @@ void streamCountReloadState(SOperatorInfo* pOperator) { int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_COUNT_OP_STATE_NAME, strlen(STREAM_COUNT_OP_STATE_NAME), &pBuf, &size); - TSKEY ts = *(TSKEY*)pBuf; + TSKEY ts = *(TSKEY*)pBuf; pInfo->twAggSup.maxTs = TMAX(pInfo->twAggSup.maxTs, ts); taosMemoryFree(pBuf); @@ -650,11 +650,13 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys SExecTaskInfo* pTaskInfo, SReadHandle* pHandle) { SCountWinodwPhysiNode* pCountNode = (SCountWinodwPhysiNode*)pPhyNode; int32_t numOfCols = 0; - int32_t code = TSDB_CODE_OUT_OF_MEMORY; + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamCountAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamCountAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { - goto _error; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } pOperator->pTaskInfo = pTaskInfo; @@ -664,18 +666,14 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pCountNode->window.pExprs, NULL, &numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); } SExprSupp* pExpSup = &pOperator->exprSupp; SExprInfo* pExprInfo = createExprInfo(pCountNode->window.pFuncs, NULL, &numOfCols); SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pExpSup, pExprInfo, numOfCols, pResBlock, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); pInfo->twAggSup = (STimeWindowAggSupp){ .waterMark = pCountNode->window.watermark, @@ -686,12 +684,11 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys }; pInfo->primaryTsIndex = ((SColumnNode*)pCountNode->window.pTspk)->slotId; - code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, - pTaskInfo->streamInfo.pState, sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, - &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, + sizeof(COUNT_TYPE), 0, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, + GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); + TSDB_CHECK_CODE(code, lino, _error); + pInfo->streamAggSup.windowCount = pCountNode->windowCount; pInfo->streamAggSup.windowSliding = pCountNode->windowSliding; @@ -709,7 +706,8 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->dataVersion = 0; pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey)); if (!pInfo->historyWins) { - goto _error; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } pInfo->pCheckpointRes = createSpecialDataBlock(STREAM_CHECKPOINT); @@ -735,8 +733,12 @@ SOperatorInfo* createStreamCountAggOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorStreamStateFn(pOperator, streamCountReleaseState, streamCountReloadState); if (downstream) { - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); + code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, + &pInfo->twAggSup, &pInfo->basic); + TSDB_CHECK_CODE(code, lino, _error); + code = appendDownstream(pOperator, &downstream, 1); + TSDB_CHECK_CODE(code, lino, _error); } return pOperator; @@ -747,6 +749,6 @@ _error: taosMemoryFreeClear(pOperator); pTaskInfo->code = code; + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return NULL; } - diff --git a/source/libs/executor/src/streameventwindowoperator.c b/source/libs/executor/src/streameventwindowoperator.c index 1809d0cc67..e6ff4fe242 100644 --- a/source/libs/executor/src/streameventwindowoperator.c +++ b/source/libs/executor/src/streameventwindowoperator.c @@ -28,9 +28,9 @@ #include "tlog.h" #include "ttime.h" -#define IS_NORMAL_EVENT_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) -#define STREAM_EVENT_OP_STATE_NAME "StreamEventHistoryState" -#define STREAM_EVENT_OP_CHECKPOINT_NAME "StreamEventOperator_Checkpoint" +#define IS_NORMAL_EVENT_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_EVENT) +#define STREAM_EVENT_OP_STATE_NAME "StreamEventHistoryState" +#define STREAM_EVENT_OP_CHECKPOINT_NAME "StreamEventOperator_Checkpoint" typedef struct SEventWinfowFlag { bool startFlag; @@ -86,10 +86,11 @@ void destroyStreamEventOperatorInfo(void* param) { void setEventWindowFlag(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo) { char* pFlagInfo = (char*)pWinInfo->winInfo.pStatePos->pRowBuff + (pAggSup->resultRowSize - pAggSup->stateKeySize); - pWinInfo->pWinFlag = (SEventWinfowFlag*) pFlagInfo; + pWinInfo->pWinFlag = (SEventWinfowFlag*)pFlagInfo; } -void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos, SEventWindowInfo* pWinInfo) { +void setEventWindowInfo(SStreamAggSupporter* pAggSup, SSessionKey* pKey, SRowBuffPos* pPos, + SEventWindowInfo* pWinInfo) { pWinInfo->winInfo.sessionWin = *pKey; pWinInfo->winInfo.pStatePos = pPos; setEventWindowFlag(pAggSup, pWinInfo); @@ -122,22 +123,23 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI pCurWin->winInfo.sessionWin.groupId = groupId; pCurWin->winInfo.sessionWin.win.skey = ts; pCurWin->winInfo.sessionWin.win.ekey = ts; - SStreamStateCur* pCur = pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin); - SSessionKey leftWinKey = {.groupId = groupId}; - void* pVal = NULL; - int32_t len = 0; + SStreamStateCur* pCur = + pAggSup->stateStore.streamStateSessionSeekKeyCurrentPrev(pAggSup->pState, &pCurWin->winInfo.sessionWin); + SSessionKey leftWinKey = {.groupId = groupId}; + void* pVal = NULL; + int32_t len = 0; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &leftWinKey, &pVal, &len); - if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &leftWinKey.win) ) { + if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &leftWinKey.win)) { bool inWin = isInTimeWindow(&leftWinKey.win, ts, 0); setEventWindowInfo(pAggSup, &leftWinKey, pVal, pCurWin); - if(inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag) ) { + if (inWin || (pCurWin->pWinFlag->startFlag && !pCurWin->pWinFlag->endFlag)) { pCurWin->winInfo.isOutput = !isWindowIncomplete(pCurWin); goto _end; } } pAggSup->stateStore.streamStateFreeCur(pCur); pCur = pAggSup->stateStore.streamStateSessionSeekKeyNext(pAggSup->pState, &pCurWin->winInfo.sessionWin); - SSessionKey rightWinKey = {.groupId = groupId}; + SSessionKey rightWinKey = {.groupId = groupId}; code = pAggSup->stateStore.streamStateSessionGetKVByCur(pCur, &rightWinKey, &pVal, &len); bool inWin = isInTimeWindow(&rightWinKey.win, ts, 0); if (code == TSDB_CODE_SUCCESS && inWinRange(&pAggSup->winRange, &rightWinKey.win) && (inWin || (start && !end))) { @@ -149,7 +151,7 @@ void setEventOutputBuf(SStreamAggSupporter* pAggSup, TSKEY* pTs, uint64_t groupI } } - SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId}; + SSessionKey winKey = {.win.skey = ts, .win.ekey = ts, .groupId = groupId}; pAggSup->stateStore.streamStateSessionAllocWinBuffByNextPosition(pAggSup->pState, pCur, &winKey, &pVal, &len); setEventWindowInfo(pAggSup, &winKey, pVal, pCurWin); pCurWin->pWinFlag->startFlag = start; @@ -164,7 +166,7 @@ _end: if (code != TSDB_CODE_SUCCESS) { SET_SESSION_WIN_KEY_INVALID(pNextWinKey); } - if(pCurWin->winInfo.pStatePos->needFree) { + if (pCurWin->winInfo.pStatePos->needFree) { pAggSup->stateStore.streamStateSessionDel(pAggSup->pState, &pCurWin->winInfo.sessionWin); } pAggSup->stateStore.streamStateFreeCur(pCur); @@ -173,14 +175,14 @@ _end: } int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pWinInfo, SSessionKey* pNextWinKey, - TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, SSHashObj* pResultRows, - SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) { + TSKEY* pTsData, bool* starts, bool* ends, int32_t rows, int32_t start, + SSHashObj* pResultRows, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool* pRebuild) { *pRebuild = false; - if (!pWinInfo->pWinFlag->startFlag && !(starts[start]) ) { + if (!pWinInfo->pWinFlag->startFlag && !(starts[start])) { return 1; } - TSKEY maxTs = INT64_MAX; + TSKEY maxTs = INT64_MAX; STimeWindow* pWin = &pWinInfo->winInfo.sessionWin.win; if (pWinInfo->pWinFlag->endFlag) { maxTs = pWin->ekey + 1; @@ -219,7 +221,7 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW memcpy(pWinInfo->winInfo.pStatePos->pKey, &pWinInfo->winInfo.sessionWin, sizeof(SSessionKey)); if (ends[i]) { - if (pWinInfo->pWinFlag->endFlag && pWin->skey <= pTsData[i] && pTsData[i] < pWin->ekey ) { + if (pWinInfo->pWinFlag->endFlag && pWin->skey <= pTsData[i] && pTsData[i] < pWin->ekey) { *pRebuild = true; } return i + 1 - start; @@ -230,26 +232,28 @@ int32_t updateEventWindowInfo(SStreamAggSupporter* pAggSup, SEventWindowInfo* pW static int32_t compactEventWindow(SOperatorInfo* pOperator, SEventWindowInfo* pCurWin, SSHashObj* pStUpdated, SSHashObj* pStDeleted, bool addGap) { - SExprSupp* pSup = &pOperator->exprSupp; - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; - int32_t winNum = 0; - SStreamEventAggOperatorInfo* pInfo = pOperator->info; - SResultRow* pCurResult = NULL; - int32_t numOfOutput = pOperator->exprSupp.numOfExprs; - SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; + SExprSupp* pSup = &pOperator->exprSupp; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI; + int32_t winNum = 0; + SStreamEventAggOperatorInfo* pInfo = pOperator->info; + SResultRow* pCurResult = NULL; + int32_t numOfOutput = pOperator->exprSupp.numOfExprs; + SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; while (1) { if (!pCurWin->pWinFlag->startFlag || pCurWin->pWinFlag->endFlag) { break; } SEventWindowInfo nextWinInfo = {0}; getNextSessionWinInfo(pAggSup, pStUpdated, &pCurWin->winInfo, &nextWinInfo.winInfo); - if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) || !inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) { + if (!IS_VALID_SESSION_WIN(nextWinInfo.winInfo) || + !inWinRange(&pAggSup->winRange, &nextWinInfo.winInfo.sessionWin.win)) { releaseOutputBuf(pAggSup->pState, nextWinInfo.winInfo.pStatePos, &pAggSup->pSessionAPI->stateStore); break; } setEventWindowFlag(pAggSup, &nextWinInfo); - compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo, pStUpdated, pStDeleted, false); + compactTimeWindow(pSup, pAggSup, &pInfo->twAggSup, pTaskInfo, &pCurWin->winInfo, &nextWinInfo.winInfo, pStUpdated, + pStDeleted, false); pCurWin->pWinFlag->endFlag = nextWinInfo.pWinFlag->endFlag; winNum++; } @@ -275,7 +279,7 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl int32_t winRows = 0; SStreamAggSupporter* pAggSup = &pInfo->streamAggSup; SColumnInfoData* pColStart = NULL; - SColumnInfoData* pColEnd = NULL; + SColumnInfoData* pColEnd = NULL; pInfo->dataVersion = TMAX(pInfo->dataVersion, pSDataBlock->info.version); pAggSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow; @@ -290,7 +294,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl return; } - SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock}; + SFilterColumnParam paramStart = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), + .pDataBlock = pSDataBlock->pDataBlock}; code = filterSetDataFromSlotId(pInfo->pStartCondInfo, ¶mStart); if (code != TSDB_CODE_SUCCESS) { qError("set data from start slotId error."); @@ -299,7 +304,8 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl int32_t statusStart = 0; filterExecute(pInfo->pStartCondInfo, pSDataBlock, &pColStart, NULL, paramStart.numOfCols, &statusStart); - SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), .pDataBlock = pSDataBlock->pDataBlock}; + SFilterColumnParam paramEnd = {.numOfCols = taosArrayGetSize(pSDataBlock->pDataBlock), + .pDataBlock = pSDataBlock->pDataBlock}; code = filterSetDataFromSlotId(pInfo->pEndCondInfo, ¶mEnd); if (code != TSDB_CODE_SUCCESS) { qError("set data from end slotId error."); @@ -320,20 +326,23 @@ static void doStreamEventAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl int32_t winIndex = 0; bool allEqual = true; SEventWindowInfo curWin = {0}; - SSessionKey nextWinKey = {0}; - setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, &nextWinKey); + SSessionKey nextWinKey = {0}; + setEventOutputBuf(pAggSup, tsCols, groupId, (bool*)pColStart->pData, (bool*)pColEnd->pData, i, rows, &curWin, + &nextWinKey); setSessionWinOutputInfo(pSeUpdated, &curWin.winInfo); bool rebuild = false; - winRows = updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, rows, i, - pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild); + winRows = + updateEventWindowInfo(pAggSup, &curWin, &nextWinKey, tsCols, (bool*)pColStart->pData, (bool*)pColEnd->pData, + rows, i, pAggSup->pResultRows, pSeUpdated, pStDeleted, &rebuild); ASSERT(winRows >= 1); if (rebuild) { uint64_t uid = 0; appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, - &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL); + &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL); tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey)); doDeleteEventWindow(pAggSup, pSeUpdated, &curWin.winInfo.sessionWin); - if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) && !isWindowIncomplete(&curWin)) { + if (pInfo->destHasPrimaryKey && curWin.winInfo.isOutput && IS_NORMAL_EVENT_OP(pOperator) && + !isWindowIncomplete(&curWin)) { saveDeleteRes(pInfo->pPkDeleted, curWin.winInfo.sessionWin); } releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore); @@ -603,7 +612,8 @@ void streamEventReleaseState(SOperatorInfo* pOperator) { char* pBuff = taosMemoryCalloc(1, resSize); memcpy(pBuff, pInfo->historyWins->pData, winSize); memcpy(pBuff + winSize, &pInfo->twAggSup.maxTs, sizeof(TSKEY)); - qDebug("===stream=== event window operator relase state. save result count:%d", (int32_t)taosArrayGetSize(pInfo->historyWins)); + qDebug("===stream=== event window operator relase state. save result count:%d", + (int32_t)taosArrayGetSize(pInfo->historyWins)); pInfo->streamAggSup.stateStore.streamStateSaveInfo(pInfo->streamAggSup.pState, STREAM_EVENT_OP_STATE_NAME, strlen(STREAM_EVENT_OP_STATE_NAME), pBuff, resSize); pInfo->streamAggSup.stateStore.streamStateCommit(pInfo->streamAggSup.pState); @@ -647,7 +657,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) { qDebug("===stream=== reload state. try process result %" PRId64 ", %" PRIu64 ", index:%d", pSeKeyBuf[i].win.skey, pSeKeyBuf[i].groupId, i); getSessionWindowInfoByKey(pAggSup, pSeKeyBuf + i, &curInfo.winInfo); - //event window has been deleted + // event window has been deleted if (!IS_VALID_SESSION_WIN(curInfo.winInfo)) { continue; } @@ -659,7 +669,7 @@ void streamEventReloadState(SOperatorInfo* pOperator) { compactEventWindow(pOperator, &curInfo, pInfo->pSeUpdated, pInfo->pSeDeleted, false); qDebug("===stream=== reload state. save result %" PRId64 ", %" PRIu64, curInfo.winInfo.sessionWin.win.skey, - curInfo.winInfo.sessionWin.groupId); + curInfo.winInfo.sessionWin.groupId); if (IS_VALID_SESSION_WIN(curInfo.winInfo)) { saveSessionOutputBuf(pAggSup, &curInfo.winInfo); } @@ -693,6 +703,7 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys SStreamEventWinodwPhysiNode* pEventNode = (SStreamEventWinodwPhysiNode*)pPhyNode; int32_t tsSlotId = ((SColumnNode*)pEventNode->window.pTspk)->slotId; int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamEventAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamEventAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -783,20 +794,18 @@ SOperatorInfo* createStreamEventAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamEventAgg, NULL, destroyStreamEventOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamEventReleaseState, streamEventReloadState); - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); + code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, + &pInfo->twAggSup, &pInfo->basic); + TSDB_CHECK_CODE(code, lino, _error); + code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); code = filterInitFromNode((SNode*)pEventNode->pStartCond, &pInfo->pStartCondInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); + code = filterInitFromNode((SNode*)pEventNode->pEndCond, &pInfo->pEndCondInfo, 0); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); return pOperator; @@ -804,5 +813,6 @@ _error: destroyStreamEventOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return NULL; } diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index d3f90c3134..96c6712866 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -28,14 +28,18 @@ #include "tlog.h" #include "ttime.h" -#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) -#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) -#define IS_NORMAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) +#define IS_FINAL_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) +#define IS_MID_INTERVAL_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) +#define IS_NORMAL_INTERVAL_OP(op) \ + ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL || \ + (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_INTERVAL) -#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) -#define IS_NORMAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) +#define IS_FINAL_SESSION_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) +#define IS_NORMAL_SESSION_OP(op) \ + ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION || \ + (op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_FINAL_SESSION) -#define IS_NORMAL_STATE_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) +#define IS_NORMAL_STATE_OP(op) ((op)->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE) #define DEAULT_DELETE_MARK INT64_MAX #define STREAM_INTERVAL_OP_STATE_NAME "StreamIntervalHistoryState" @@ -263,14 +267,16 @@ static void doDeleteWindows(SOperatorInfo* pOperator, SInterval* pInterval, SSDa if (chIds) { int32_t childId = getChildIndex(pBlock); if (pInvalidWins) { - qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, winRes.groupId, childId); + qDebug("===stream===save invalid delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d", winRes.ts, + winRes.groupId, childId); taosHashPut(pInvalidWins, &winRes, sizeof(SWinKey), NULL, 0); } SArray* chArray = *(void**)chIds; int32_t index = taosArraySearchIdx(chArray, &childId, compareInt32Val, TD_EQ); if (index != -1) { - qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, winGpId, childId); + qDebug("===stream===try push delete window:%" PRId64 ",groupId:%" PRId64 ",chId:%d ,continue", win.skey, + winGpId, childId); getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC); continue; } @@ -390,7 +396,7 @@ static void doBuildDeleteResult(SStreamIntervalOperatorInfo* pInfo, SArray* pWin } void destroyFlusedPos(void* pRes) { - SRowBuffPos* pPos = (SRowBuffPos*) pRes; + SRowBuffPos* pPos = (SRowBuffPos*)pRes; if (!pPos->needFree && !pPos->pRowBuff) { taosMemoryFreeClear(pPos->pKey); taosMemoryFree(pPos); @@ -488,8 +494,11 @@ void initIntervalDownStream(SOperatorInfo* downstream, uint16_t type, SStreamInt pScanInfo->windowSup.parentType = type; pScanInfo->windowSup.pIntervalAggSup = &pInfo->aggSup; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = - pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen); + int32_t code = pAPI->updateInfoInitP(&pInfo->interval, pInfo->twAggSup.waterMark, pScanInfo->igCheckUpdate, + pScanInfo->pkColType, pScanInfo->pkColLen, &pScanInfo->pUpdateInfo); + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at %d since %s", __func__, __LINE__, tstrerror(code)); + } } pScanInfo->interval = pInfo->interval; @@ -624,17 +633,17 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB blockDataUpdateTsWindow(pBlock, 0); } -static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, SArray* pPullWins, - int32_t numOfCh, SOperatorInfo* pOperator) { +static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFinalMap, SInterval* pInterval, + SArray* pPullWins, int32_t numOfCh, SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; - SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); - TSKEY* tsData = (TSKEY*)pStartCol->pData; - SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); - TSKEY* tsEndData = (TSKEY*)pEndCol->pData; - SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); - uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; - int32_t chId = getChildIndex(pBlock); - bool res = false; + SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX); + TSKEY* tsData = (TSKEY*)pStartCol->pData; + SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX); + TSKEY* tsEndData = (TSKEY*)pEndCol->pData; + SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX); + uint64_t* groupIdData = (uint64_t*)pGroupCol->pData; + int32_t chId = getChildIndex(pBlock); + bool res = false; for (int32_t i = 0; i < pBlock->info.rows; i++) { TSKEY winTs = tsData[i]; while (winTs <= tsEndData[i]) { @@ -650,7 +659,7 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina // pull data is over taosArrayDestroy(chArray); taosHashRemove(pMap, &winRes, sizeof(SWinKey)); - res =true; + res = true; qDebug("===stream===retrive pull data over.window %" PRId64, winRes.ts); void* pFinalCh = taosHashGet(pFinalMap, &winRes, sizeof(SWinKey)); @@ -663,7 +672,8 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; // add pull data request - qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", winRes.ts, winRes.groupId, numOfCh); + qDebug("===stream===prepare final retrive for delete window:%" PRId64 ",groupId:%" PRId64 ", size:%d", + winRes.ts, winRes.groupId, numOfCh); if (IS_MID_INTERVAL_OP(pOperator)) { SStreamIntervalOperatorInfo* pInfo = (SStreamIntervalOperatorInfo*)pOperator->info; taosArrayPush(pInfo->pMidPullDatas, &winRes); @@ -671,7 +681,7 @@ static bool processPullOver(SSDataBlock* pBlock, SHashObj* pMap, SHashObj* pFina taosArrayPush(pInfo->pDelWins, &winRes); addPullWindow(pMap, &winRes, numOfCh); if (pInfo->destHasPrimaryKey) { - tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0); + tSimpleHashPut(pInfo->pDeletedMap, &winRes, sizeof(SWinKey), NULL, 0); } qDebug("===stream===prepare final retrive for delete %" PRId64 ", size:%d", winRes.ts, numOfCh); } @@ -690,7 +700,7 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, i for (int32_t i = 0; i < size; i++) { SWinKey* winKey = taosArrayGet(wins, i); STimeWindow nextWin = getFinalTimeWindow(winKey->ts, &pInfo->interval); - void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); + void* chIds = taosHashGet(pInfo->pPullDataMap, winKey, sizeof(SWinKey)); if (!chIds) { SPullWindowInfo pull = { .window = nextWin, .groupId = winKey->groupId, .calWin.skey = nextWin.skey, .calWin.ekey = nextWin.skey}; @@ -698,7 +708,7 @@ static void addRetriveWindow(SArray* wins, SStreamIntervalOperatorInfo* pInfo, i if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { addPullWindow(pInfo->pPullDataMap, winKey, pInfo->numOfChild); if (pInfo->destHasPrimaryKey) { - tSimpleHashPut(pInfo->pDeletedMap,winKey, sizeof(SWinKey), NULL, 0); + tSimpleHashPut(pInfo->pDeletedMap, winKey, sizeof(SWinKey), NULL, 0); } qDebug("===stream===prepare retrive for delete %" PRId64 ", size:%d", winKey->ts, pInfo->numOfChild); } @@ -751,7 +761,8 @@ int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, void* pState, SSDat if (pBlock->info.id.groupId == 0) { pBlock->info.id.groupId = groupId; void* tbname = NULL; - if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname, false) < 0) { + if (pAPI->stateStore.streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname, + false) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -829,9 +840,7 @@ static int32_t getNextQualifiedFinalWindow(SInterval* pInterval, STimeWindow* pN return startPos; } -bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { - return pInfo->primaryPkIndex != -1; -} +bool hasSrcPrimaryKeyCol(SSteamOpBasicInfo* pInfo) { return pInfo->primaryPkIndex != -1; } static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBlock, uint64_t groupId, SSHashObj* pUpdatedMap, SSHashObj* pDeletedMap) { @@ -852,23 +861,22 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex); tsCols = (int64_t*)pColDataInfo->pData; - void* pPkVal = NULL; - int32_t pkLen = 0; + void* pPkVal = NULL; + int32_t pkLen = 0; SColumnInfoData* pPkColDataInfo = NULL; if (hasSrcPrimaryKeyCol(&pInfo->basic)) { pPkColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->basic.primaryPkIndex); } if (pSDataBlock->info.window.skey != tsCols[0] || pSDataBlock->info.window.ekey != tsCols[endRowId]) { - qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 - ",maxKey %" PRId64, - pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); + qError("table uid %" PRIu64 " data block timestamp range may not be calculated! minKey %" PRId64 ",maxKey %" PRId64, + pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); blockDataUpdateTsWindow(pSDataBlock, pInfo->primaryTsIndex); // timestamp of the data is incorrect if (pSDataBlock->info.window.skey <= 0 || pSDataBlock->info.window.ekey <= 0) { qError("table uid %" PRIu64 " data block timestamp is out of range! minKey %" PRId64 ",maxKey %" PRId64, - pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); + pSDataBlock->info.id.uid, pSDataBlock->info.window.skey, pSDataBlock->info.window.ekey); } } @@ -916,7 +924,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat if (savePullWindow(&pull, pInfo->pPullWins) == TSDB_CODE_SUCCESS) { addPullWindow(pInfo->pPullDataMap, &winRes, pInfo->numOfChild); if (pInfo->destHasPrimaryKey) { - tSimpleHashPut(pInfo->pDeletedMap,&winRes, sizeof(SWinKey), NULL, 0); + tSimpleHashPut(pInfo->pDeletedMap, &winRes, sizeof(SWinKey), NULL, 0); } } } else { @@ -962,7 +970,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDat }; if (pInfo->destHasPrimaryKey && code == TSDB_CODE_SUCCESS && IS_NORMAL_INTERVAL_OP(pOperator)) { - tSimpleHashPut(pDeletedMap,&key, sizeof(SWinKey), NULL, 0); + tSimpleHashPut(pDeletedMap, &key, sizeof(SWinKey), NULL, 0); } if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE && pUpdatedMap) { @@ -1365,7 +1373,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT || pBlock->info.type == STREAM_CLEAR) { - SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); + SArray* delWins = taosArrayInit(8, sizeof(SWinKey)); SHashObj* finalMap = IS_FINAL_INTERVAL_OP(pOperator) ? pInfo->pFinalPullDataMap : NULL; doDeleteWindows(pOperator, &pInfo->interval, pBlock, delWins, pInfo->pUpdatedMap, finalMap); if (IS_FINAL_INTERVAL_OP(pOperator)) { @@ -1399,7 +1407,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { getAllIntervalWindow(pInfo->aggSup.pResultRowHashTable, pInfo->pUpdatedMap); continue; } else if (pBlock->info.type == STREAM_RETRIEVE) { - if(!IS_FINAL_INTERVAL_OP(pOperator)) { + if (!IS_FINAL_INTERVAL_OP(pOperator)) { pInfo->recvRetrive = true; copyDataBlock(pInfo->pMidRetriveRes, pBlock); pInfo->pMidRetriveRes->info.type = STREAM_MID_RETRIEVE; @@ -1525,8 +1533,8 @@ void streamIntervalReloadState(SOperatorInfo* pOperator) { SStreamIntervalOperatorInfo* pInfo = pOperator->info; if (pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL && pOperator->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_MID_INTERVAL) { - int32_t size = 0; - void* pBuf = NULL; + int32_t size = 0; + void* pBuf = NULL; int32_t code = pInfo->stateStore.streamStateGetInfo(pInfo->pState, STREAM_INTERVAL_OP_STATE_NAME, strlen(STREAM_INTERVAL_OP_STATE_NAME), &pBuf, &size); if (code == 0) { @@ -1600,7 +1608,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pAPI->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsIndex); code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str, - pInfo->pState, &pTaskInfo->storageAPI.functionStore); + pInfo->pState, &pTaskInfo->storageAPI.functionStore); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -1755,27 +1763,37 @@ void initDummyFunction(SqlFunctionCtx* pDummy, SqlFunctionCtx* pCtx, int32_t num } } -void initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, - STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic) { +int32_t initDownStream(SOperatorInfo* downstream, SStreamAggSupporter* pAggSup, uint16_t type, int32_t tsColIndex, + STimeWindowAggSupp* pTwSup, struct SSteamOpBasicInfo* pBasic) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_STREAM_PARTITION) { SStreamPartitionOperatorInfo* pScanInfo = downstream->info; pScanInfo->tsColIndex = tsColIndex; } if (downstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) { - initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic); - return; + code = initDownStream(downstream->pDownstream[0], pAggSup, type, tsColIndex, pTwSup, pBasic); + return code; } SStreamScanInfo* pScanInfo = downstream->info; pScanInfo->windowSup = (SWindowSupporter){.pStreamAggSup = pAggSup, .gap = pAggSup->gap, .parentType = type}; pScanInfo->pState = pAggSup->pState; if (!pScanInfo->pUpdateInfo) { - pScanInfo->pUpdateInfo = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, - pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen); + code = pAggSup->stateStore.updateInfoInit(60000, TSDB_TIME_PRECISION_MILLI, pTwSup->waterMark, + pScanInfo->igCheckUpdate, pScanInfo->pkColType, pScanInfo->pkColLen, + &pScanInfo->pUpdateInfo); + TSDB_CHECK_CODE(code, lino, _end); } pScanInfo->twAggSup = *pTwSup; pAggSup->pUpdateInfo = pScanInfo->pUpdateInfo; pBasic->primaryPkIndex = pScanInfo->primaryKeyIndex; + +_end: + if (code != TSDB_CODE_SUCCESS) { + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } static TSKEY sesionTs(void* pKey) { @@ -2148,8 +2166,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData TSKEY* endTsCols = (int64_t*)pEndTsCol->pData; - void* pPkVal = NULL; - int32_t pkLen = 0; + void* pPkVal = NULL; + int32_t pkLen = 0; SColumnInfoData* pPkColDataInfo = NULL; if (hasSrcPrimaryKeyCol(&pInfo->basic)) { pPkColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->basic.primaryPkIndex); @@ -2454,8 +2472,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa pBlock->info.id.groupId = pKey->groupId; void* tbname = NULL; - if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, - &tbname, false) < 0) { + if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname, + false) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -2935,7 +2953,7 @@ void streamSessionSemiReloadState(SOperatorInfo* pOperator) { int32_t size = 0; void* pBuf = NULL; int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, - strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); @@ -2968,7 +2986,7 @@ void streamSessionReloadState(SOperatorInfo* pOperator) { int32_t size = 0; void* pBuf = NULL; int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_SESSION_OP_STATE_NAME, - strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); + strlen(STREAM_SESSION_OP_STATE_NAME), &pBuf, &size); int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; ASSERT(size == num * sizeof(SSessionKey) + sizeof(TSKEY)); @@ -3018,6 +3036,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh SSessionWinodwPhysiNode* pSessionNode = (SSessionWinodwPhysiNode*)pPhyNode; int32_t numOfCols = 0; int32_t code = TSDB_CODE_OUT_OF_MEMORY; + int32_t lino = 0; SStreamSessionAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamSessionAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -3112,8 +3131,12 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh setOperatorStreamStateFn(pOperator, streamSessionReleaseState, streamSessionReloadState); if (downstream) { - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); + code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, + &pInfo->twAggSup, &pInfo->basic); + TSDB_CHECK_CODE(code, lino, _error); + code = appendDownstream(pOperator, &downstream, 1); + TSDB_CHECK_CODE(code, lino, _error); } return pOperator; @@ -3124,6 +3147,7 @@ _error: taosMemoryFreeClear(pOperator); pTaskInfo->code = code; + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return NULL; } @@ -3574,7 +3598,7 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl if (!allEqual) { uint64_t uid = 0; appendDataToSpecialBlock(pAggSup->pScanBlock, &curWin.winInfo.sessionWin.win.skey, - &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL); + &curWin.winInfo.sessionWin.win.ekey, &uid, &groupId, NULL); tSimpleHashRemove(pSeUpdated, &curWin.winInfo.sessionWin, sizeof(SSessionKey)); doDeleteSessionWindow(pAggSup, &curWin.winInfo.sessionWin); releaseOutputBuf(pAggSup->pState, curWin.winInfo.pStatePos, &pAPI->stateStore); @@ -3681,9 +3705,9 @@ void* doStreamStateDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOperato SSessionKey key = {0}; SResultWindowInfo winfo = {0}; buf = decodeSSessionKey(buf, &key); - pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL, - pAggSup->stateKeySize, compareStateKey, - (void**)&winfo.pStatePos, &pAggSup->resultRowSize); + pAggSup->stateStore.streamStateStateAddIfNotExist(pAggSup->pState, &winfo.sessionWin, NULL, pAggSup->stateKeySize, + compareStateKey, (void**)&winfo.pStatePos, + &pAggSup->resultRowSize); buf = decodeSResultWindowInfo(buf, &winfo, pInfo->streamAggSup.resultRowSize); tSimpleHashPut(pInfo->streamAggSup.pResultRows, &key, sizeof(SSessionKey), &winfo, sizeof(SResultWindowInfo)); } @@ -3877,7 +3901,7 @@ void streamStateReloadState(SOperatorInfo* pOperator) { int32_t size = 0; void* pBuf = NULL; int32_t code = pAggSup->stateStore.streamStateGetInfo(pAggSup->pState, STREAM_STATE_OP_STATE_NAME, - strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); + strlen(STREAM_STATE_OP_STATE_NAME), &pBuf, &size); int32_t num = (size - sizeof(TSKEY)) / sizeof(SSessionKey); qDebug("===stream=== reload state. get result count:%d", num); SSessionKey* pSeKeyBuf = (SSessionKey*)pBuf; @@ -3941,12 +3965,13 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys int32_t tsSlotId = ((SColumnNode*)pStateNode->window.pTspk)->slotId; SColumnNode* pColNode = (SColumnNode*)(pStateNode->pStateKey); int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; SStreamStateAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamStateAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; - goto _error; + TSDB_CHECK_CODE(code, lino, _error); } pInfo->stateCol = extractColumnFromColumnNode(pColNode); @@ -3955,9 +3980,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys int32_t numOfScalar = 0; SExprInfo* pScalarExprInfo = createExprInfo(pStateNode->window.pExprs, NULL, &numOfScalar); code = initExprSupp(&pInfo->scalarSupp, pScalarExprInfo, numOfScalar, &pTaskInfo->storageAPI.functionStore); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); } pInfo->twAggSup = (STimeWindowAggSupp){ @@ -3984,9 +4007,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys code = initStreamAggSupporter(&pInfo->streamAggSup, pExpSup, numOfCols, 0, pTaskInfo->streamInfo.pState, keySize, type, &pTaskInfo->storageAPI.stateStore, pHandle, &pInfo->twAggSup, GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI, pInfo->primaryTsIndex); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); + _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->pSeDeleted = tSimpleHashInit(64, hashFn); pInfo->pDelIterator = NULL; @@ -3999,7 +4021,8 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pInfo->dataVersion = 0; pInfo->historyWins = taosArrayInit(4, sizeof(SSessionKey)); if (!pInfo->historyWins) { - goto _error; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } if (pHandle) { pInfo->isHistoryOp = pHandle->fillHistory; @@ -4026,17 +4049,20 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); setOperatorStreamStateFn(pOperator, streamStateReleaseState, streamStateReloadState); - initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup, &pInfo->basic); + code = initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, + &pInfo->twAggSup, &pInfo->basic); + TSDB_CHECK_CODE(code, lino, _error); + code = appendDownstream(pOperator, &downstream, 1); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } + TSDB_CHECK_CODE(code, lino, _error); + return pOperator; _error: destroyStreamStateOperatorInfo(pInfo); taosMemoryFreeClear(pOperator); pTaskInfo->code = code; + qError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); return NULL; } @@ -4203,11 +4229,12 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys .precision = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->node.resType.precision, }; - pInfo->twAggSup = (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark, - .calTrigger = pIntervalPhyNode->window.triggerType, - .maxTs = INT64_MIN, - .minTs = INT64_MAX, - .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)}; + pInfo->twAggSup = + (STimeWindowAggSupp){.waterMark = pIntervalPhyNode->window.watermark, + .calTrigger = pIntervalPhyNode->window.triggerType, + .maxTs = INT64_MIN, + .minTs = INT64_MAX, + .deleteMark = getDeleteMark(&pIntervalPhyNode->window, pIntervalPhyNode->interval)}; ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay"); @@ -4335,7 +4362,7 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS .ts = nextWin.skey, .groupId = groupId, }; - void* chIds = taosHashGet(pInfo->pPullDataMap, &key, sizeof(SWinKey)); + void* chIds = taosHashGet(pInfo->pPullDataMap, &key, sizeof(SWinKey)); int32_t index = -1; SArray* chArray = NULL; int32_t chId = 0; @@ -4407,8 +4434,8 @@ static void doStreamMidIntervalAggImpl(SOperatorInfo* pOperator, SSDataBlock* pS static void addMidRetriveWindow(SArray* wins, SHashObj* pMidPullMap, int32_t numOfChild) { int32_t size = taosArrayGetSize(wins); for (int32_t i = 0; i < size; i++) { - SWinKey* winKey = taosArrayGet(wins, i); - void* chIds = taosHashGet(pMidPullMap, winKey, sizeof(SWinKey)); + SWinKey* winKey = taosArrayGet(wins, i); + void* chIds = taosHashGet(pMidPullMap, winKey, sizeof(SWinKey)); if (!chIds) { addPullWindow(pMidPullMap, winKey, numOfChild); qDebug("===stream===prepare mid operator retrive for delete %" PRId64 ", size:%d", winKey->ts, numOfChild); @@ -4611,5 +4638,6 @@ static SSDataBlock* doStreamMidIntervalAgg(SOperatorInfo* pOperator) { void setStreamOperatorCompleted(SOperatorInfo* pOperator) { setOperatorCompleted(pOperator); - qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), getStreamOpName(pOperator->operatorType), pOperator->status); + qDebug("stask:%s %s status: %d. set completed", GET_TASKID(pOperator->pTaskInfo), + getStreamOpName(pOperator->operatorType), pOperator->status); } diff --git a/source/libs/stream/src/streamUpdate.c b/source/libs/stream/src/streamUpdate.c index 0adb7050c6..61b7e02a4c 100644 --- a/source/libs/stream/src/streamUpdate.c +++ b/source/libs/stream/src/streamUpdate.c @@ -18,6 +18,7 @@ #include "tencode.h" #include "tstreamUpdate.h" #include "ttime.h" +#include "tutil.h" #define DEFAULT_FALSE_POSITIVE 0.01 #define DEFAULT_BUCKET_SIZE 131072 @@ -34,7 +35,8 @@ static int64_t adjustExpEntries(int64_t entries) { return TMIN(DEFAULT_EXPECTED_ENTRIES, entries); } int compareKeyTs(void* pTs1, void* pTs2, void* pPkVal, __compar_fn_t cmpPkFn) { - return compareInt64Val(pTs1, pTs2);; + return compareInt64Val(pTs1, pTs2); + ; } int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpPkFn) { @@ -49,7 +51,7 @@ int compareKeyTsAndPk(void* pValue1, void* pTs, void* pPkVal, __compar_fn_t cmpP int32_t getKeyBuff(TSKEY ts, int64_t tbUid, void* pVal, int32_t len, char* buff) { *(TSKEY*)buff = ts; - memcpy(buff+ sizeof(TSKEY), &tbUid, sizeof(int64_t)); + memcpy(buff + sizeof(TSKEY), &tbUid, sizeof(int64_t)); if (len == 0) { return sizeof(TSKEY) + sizeof(int64_t); } @@ -66,26 +68,40 @@ int32_t getValueBuff(TSKEY ts, char* pVal, int32_t len, char* buff) { return sizeof(TSKEY) + len; } -void windowSBfAdd(SUpdateInfo *pInfo, uint64_t count) { +int32_t windowSBfAdd(SUpdateInfo* pInfo, uint64_t count) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pInfo->numSBFs < count) { count = pInfo->numSBFs; } for (uint64_t i = 0; i < count; ++i) { int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); - SScalableBf *tsSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE); - taosArrayPush(pInfo->pTsSBFs, &tsSBF); + SScalableBf* tsSBF = NULL; + code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &tsSBF); + TSDB_CHECK_CODE(code, lino, _error); + void* res = taosArrayPush(pInfo->pTsSBFs, &tsSBF); + if (!res) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); + } } + +_error: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -static void clearItemHelper(void *p) { - SScalableBf **pBf = p; +static void clearItemHelper(void* p) { + SScalableBf** pBf = p; tScalableBfDestroy(*pBf); } -void windowSBfDelete(SUpdateInfo *pInfo, uint64_t count) { +void windowSBfDelete(SUpdateInfo* pInfo, uint64_t count) { if (count < pInfo->numSBFs) { for (uint64_t i = 0; i < count; ++i) { - SScalableBf *pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0); + SScalableBf* pTsSBFs = taosArrayGetP(pInfo->pTsSBFs, 0); tScalableBfDestroy(pTsSBFs); taosArrayRemove(pInfo->pTsSBFs, 0); } @@ -124,14 +140,19 @@ static int64_t adjustWatermark(int64_t adjInterval, int64_t originInt, int64_t w return watermark; } -SUpdateInfo *updateInfoInitP(SInterval *pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) { - return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen); +int32_t updateInfoInitP(SInterval* pInterval, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, + SUpdateInfo** ppInfo) { + return updateInfoInit(pInterval->interval, pInterval->precision, watermark, igUp, pkType, pkLen, ppInfo); } -SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen) { - SUpdateInfo *pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); +int32_t updateInfoInit(int64_t interval, int32_t precision, int64_t watermark, bool igUp, int8_t pkType, int32_t pkLen, + SUpdateInfo** ppInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SUpdateInfo* pInfo = taosMemoryCalloc(1, sizeof(SUpdateInfo)); if (pInfo == NULL) { - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); } pInfo->pTsBuckets = NULL; pInfo->pTsSBFs = NULL; @@ -145,17 +166,19 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma bfSize = (uint64_t)(pInfo->watermark / pInfo->interval); pInfo->numSBFs = bfSize; - pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void *)); + pInfo->pTsSBFs = taosArrayInit(bfSize, sizeof(void*)); if (pInfo->pTsSBFs == NULL) { updateInfoDestroy(pInfo); - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); } windowSBfAdd(pInfo, bfSize); pInfo->pTsBuckets = taosArrayInit(DEFAULT_BUCKET_SIZE, sizeof(TSKEY)); if (pInfo->pTsBuckets == NULL) { updateInfoDestroy(pInfo); - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); } TSKEY dumy = 0; @@ -167,31 +190,54 @@ SUpdateInfo *updateInfoInit(int64_t interval, int32_t precision, int64_t waterma } _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(DEFAULT_MAP_CAPACITY, hashFn, true, HASH_NO_LOCK); + if (!pInfo->pMap) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } pInfo->maxDataVersion = 0; pInfo->pkColLen = pkLen; pInfo->pkColType = pkType; pInfo->pKeyBuff = taosMemoryCalloc(1, sizeof(TSKEY) + sizeof(int64_t) + pkLen); + if (!pInfo->pKeyBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pkLen); + if (!pInfo->pValueBuff) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _end); + } if (pkLen != 0) { pInfo->comparePkRowFn = compareKeyTsAndPk; - pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC);; + pInfo->comparePkCol = getKeyComparFunc(pkType, TSDB_ORDER_ASC); + ; } else { pInfo->comparePkRowFn = compareKeyTs; pInfo->comparePkCol = NULL; } - return pInfo; + (*ppInfo) = pInfo; + +_end: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { +static int32_t getSBf(SUpdateInfo* pInfo, TSKEY ts, SScalableBf** ppSBf) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (ts <= 0) { - return NULL; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); } if (pInfo->minTS < 0) { pInfo->minTS = (TSKEY)(ts / pInfo->interval * pInfo->interval); } int64_t index = (int64_t)((ts - pInfo->minTS) / pInfo->interval); if (index < 0) { - return NULL; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); } if (index >= pInfo->numSBFs) { uint64_t count = index + 1 - pInfo->numSBFs; @@ -199,22 +245,31 @@ static SScalableBf *getSBf(SUpdateInfo *pInfo, TSKEY ts) { windowSBfAdd(pInfo, count); index = pInfo->numSBFs - 1; } - SScalableBf *res = taosArrayGetP(pInfo->pTsSBFs, index); + SScalableBf* res = taosArrayGetP(pInfo->pTsSBFs, index); if (res == NULL) { int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); - res = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE); + code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &res); + TSDB_CHECK_CODE(code, lino, _end); + taosArrayPush(pInfo->pTsSBFs, &res); } - return res; + (*ppSBf) = res; + +_end: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -bool updateInfoIsTableInserted(SUpdateInfo *pInfo, int64_t tbUid) { - void *pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); +bool updateInfoIsTableInserted(SUpdateInfo* pInfo, int64_t tbUid) { + void* pVal = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); if (pVal || taosHashGetSize(pInfo->pMap) >= DEFAULT_MAP_SIZE) return true; return false; } -TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) { +TSKEY updateInfoFillBlockData(SUpdateInfo* pInfo, SSDataBlock* pBlock, int32_t primaryTsCol, int32_t primaryKeyCol) { + int32_t code = TSDB_CODE_SUCCESS; if (pBlock == NULL || pBlock->info.rows == 0) return INT64_MIN; TSKEY maxTs = INT64_MIN; void* pPkVal = NULL; @@ -223,14 +278,14 @@ TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t p int32_t len = 0; int64_t tbUid = pBlock->info.id.uid; - SColumnInfoData *pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol); - SColumnInfoData *pPkDataInfo = NULL; + SColumnInfoData* pColDataInfo = taosArrayGet(pBlock->pDataBlock, primaryTsCol); + SColumnInfoData* pPkDataInfo = NULL; if (primaryKeyCol >= 0) { pPkDataInfo = taosArrayGet(pBlock->pDataBlock, primaryKeyCol); } for (int32_t i = 0; i < pBlock->info.rows; i++) { - TSKEY ts = ((TSKEY *)pColDataInfo->pData)[i]; + TSKEY ts = ((TSKEY*)pColDataInfo->pData)[i]; if (maxTs < ts) { maxTs = ts; if (primaryKeyCol >= 0) { @@ -238,17 +293,22 @@ TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t p maxLen = colDataGetRowLength(pPkDataInfo, i); } } - SScalableBf *pSBf = getSBf(pInfo, ts); + SScalableBf* pSBf = NULL; + int32_t code = getSBf(pInfo, ts, &pSBf); + if (code != TSDB_CODE_SUCCESS) { + uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } if (pSBf) { if (primaryKeyCol >= 0) { pPkVal = colDataGetData(pPkDataInfo, i); len = colDataGetRowLength(pPkDataInfo, i); } int32_t buffLen = getKeyBuff(ts, tbUid, pPkVal, len, pInfo->pKeyBuff); + // we don't care whether the data is updated or not tScalableBfPut(pSBf, pInfo->pKeyBuff, buffLen); } } - void *pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); + void* pMaxTs = taosHashGet(pInfo->pMap, &tbUid, sizeof(int64_t)); if (pMaxTs == NULL || pInfo->comparePkRowFn(pMaxTs, &maxTs, pMaxPkVal, pInfo->comparePkCol) == -1) { int32_t valueLen = getValueBuff(maxTs, pMaxPkVal, maxLen, pInfo->pValueBuff); taosHashPut(pInfo->pMap, &tbUid, sizeof(int64_t), pInfo->pValueBuff, valueLen); @@ -256,14 +316,14 @@ TSKEY updateInfoFillBlockData(SUpdateInfo *pInfo, SSDataBlock *pBlock, int32_t p return maxTs; } -bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { +bool updateInfoIsUpdated(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { int32_t res = TSDB_CODE_FAILED; int32_t buffLen = 0; buffLen = getKeyBuff(ts, tableId, pPkVal, len, pInfo->pKeyBuff); - void* *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); + void** pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); uint64_t index = ((uint64_t)tableId) % pInfo->numBuckets; - TSKEY maxTs = *(TSKEY *)taosArrayGet(pInfo->pTsBuckets, index); + TSKEY maxTs = *(TSKEY*)taosArrayGet(pInfo->pTsBuckets, index); if (ts < maxTs - pInfo->watermark) { // this window has been closed. if (pInfo->pCloseWinSBF) { @@ -277,10 +337,15 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* p return true; } - SScalableBf *pSBf = getSBf(pInfo, ts); + SScalableBf* pSBf = NULL; + int32_t code = getSBf(pInfo, ts, &pSBf); + if (code != TSDB_CODE_SUCCESS) { + uWarn("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code)); + } int32_t size = taosHashGetSize(pInfo->pMap); - if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1 )) { + if ((!pMapMaxTs && size < DEFAULT_MAP_SIZE) || + (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == -1)) { int32_t valueLen = getValueBuff(ts, pPkVal, len, pInfo->pValueBuff); taosHashPut(pInfo->pMap, &tableId, sizeof(uint64_t), pInfo->pValueBuff, valueLen); // pSBf may be a null pointer @@ -309,7 +374,7 @@ bool updateInfoIsUpdated(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* p return true; } -void updateInfoDestroy(SUpdateInfo *pInfo) { +void updateInfoDestroy(SUpdateInfo* pInfo) { if (pInfo == NULL) { return; } @@ -317,7 +382,7 @@ void updateInfoDestroy(SUpdateInfo *pInfo) { uint64_t size = taosArrayGetSize(pInfo->pTsSBFs); for (uint64_t i = 0; i < size; i++) { - SScalableBf *pSBF = taosArrayGetP(pInfo->pTsSBFs, i); + SScalableBf* pSBF = taosArrayGetP(pInfo->pTsSBFs, i); tScalableBfDestroy(pSBF); } @@ -329,15 +394,19 @@ void updateInfoDestroy(SUpdateInfo *pInfo) { taosMemoryFree(pInfo); } -void updateInfoAddCloseWindowSBF(SUpdateInfo *pInfo) { +void updateInfoAddCloseWindowSBF(SUpdateInfo* pInfo) { if (pInfo->pCloseWinSBF) { return; } int64_t rows = adjustExpEntries(pInfo->interval * ROWS_PER_MILLISECOND); - pInfo->pCloseWinSBF = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE); + int32_t code = tScalableBfInit(rows, DEFAULT_FALSE_POSITIVE, &pInfo->pCloseWinSBF); + if (code != TSDB_CODE_SUCCESS) { + pInfo->pCloseWinSBF = NULL; + uError("%s failed to add close window SBF since %s", __func__, tstrerror(code)); + } } -void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) { +void updateInfoDestoryColseWinSBF(SUpdateInfo* pInfo) { if (!pInfo || !pInfo->pCloseWinSBF) { return; } @@ -345,62 +414,124 @@ void updateInfoDestoryColseWinSBF(SUpdateInfo *pInfo) { pInfo->pCloseWinSBF = NULL; } -int32_t updateInfoSerialize(void *buf, int32_t bufLen, const SUpdateInfo *pInfo) { +int32_t updateInfoSerialize(void* buf, int32_t bufLen, const SUpdateInfo* pInfo, int32_t* pLen) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (!pInfo) { - return 0; + return TSDB_CODE_SUCCESS; } SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); - if (tStartEncode(&encoder) < 0) return -1; + if (tStartEncode(&encoder) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } int32_t size = taosArrayGetSize(pInfo->pTsBuckets); - if (tEncodeI32(&encoder, size) < 0) return -1; - for (int32_t i = 0; i < size; i++) { - TSKEY *pTs = (TSKEY *)taosArrayGet(pInfo->pTsBuckets, i); - if (tEncodeI64(&encoder, *pTs) < 0) return -1; + if (tEncodeI32(&encoder, size) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); } - if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) return -1; + for (int32_t i = 0; i < size; i++) { + TSKEY* pTs = (TSKEY*)taosArrayGet(pInfo->pTsBuckets, i); + if (tEncodeI64(&encoder, *pTs) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } + } + + if (tEncodeU64(&encoder, pInfo->numBuckets) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } int32_t sBfSize = taosArrayGetSize(pInfo->pTsSBFs); - if (tEncodeI32(&encoder, sBfSize) < 0) return -1; + if (tEncodeI32(&encoder, sBfSize) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } for (int32_t i = 0; i < sBfSize; i++) { - SScalableBf *pSBf = taosArrayGetP(pInfo->pTsSBFs, i); - if (tScalableBfEncode(pSBf, &encoder) < 0) return -1; + SScalableBf* pSBf = taosArrayGetP(pInfo->pTsSBFs, i); + if (tScalableBfEncode(pSBf, &encoder) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } } - if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) return -1; - if (tEncodeI64(&encoder, pInfo->interval) < 0) return -1; - if (tEncodeI64(&encoder, pInfo->watermark) < 0) return -1; - if (tEncodeI64(&encoder, pInfo->minTS) < 0) return -1; + if (tEncodeU64(&encoder, pInfo->numSBFs) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } + if (tEncodeI64(&encoder, pInfo->interval) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } + if (tEncodeI64(&encoder, pInfo->watermark) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } + if (tEncodeI64(&encoder, pInfo->minTS) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } - if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) return -1; + if (tScalableBfEncode(pInfo->pCloseWinSBF, &encoder) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } int32_t mapSize = taosHashGetSize(pInfo->pMap); - if (tEncodeI32(&encoder, mapSize) < 0) return -1; - void *pIte = NULL; + if (tEncodeI32(&encoder, mapSize) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } + void* pIte = NULL; size_t keyLen = 0; while ((pIte = taosHashIterate(pInfo->pMap, pIte)) != NULL) { - void *key = taosHashGetKey(pIte, &keyLen); - if (tEncodeU64(&encoder, *(uint64_t *)key) < 0) return -1; + void* key = taosHashGetKey(pIte, &keyLen); + if (tEncodeU64(&encoder, *(uint64_t*)key) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } int32_t valueSize = taosHashGetValueSize(pIte); - if (tEncodeBinary(&encoder, (const uint8_t *)pIte, valueSize) < 0) return -1; + if (tEncodeBinary(&encoder, (const uint8_t*)pIte, valueSize) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } } - if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) return -1; + if (tEncodeU64(&encoder, pInfo->maxDataVersion) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } - if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) return -1; - if (tEncodeI8(&encoder, pInfo->pkColType) < 0) return -1; + if (tEncodeI32(&encoder, pInfo->pkColLen) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } + if (tEncodeI8(&encoder, pInfo->pkColType) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _end); + } tEndEncode(&encoder); int32_t tlen = encoder.pos; tEncoderClear(&encoder); - return tlen; + *pLen = tlen; + +_end: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { +int32_t updateInfoDeserialize(void* buf, int32_t bufLen, SUpdateInfo* pInfo) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; ASSERT(pInfo); SDecoder decoder = {0}; tDecoderInit(&decoder, buf, bufLen); @@ -419,10 +550,12 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { int32_t sBfSize = 0; if (tDecodeI32(&decoder, &sBfSize) < 0) return -1; - pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void *)); + pInfo->pTsSBFs = taosArrayInit(sBfSize, sizeof(void*)); for (int32_t i = 0; i < sBfSize; i++) { - SScalableBf *pSBf = tScalableBfDecode(&decoder); - if (!pSBf) return -1; + SScalableBf* pSBf = NULL; + code = tScalableBfDecode(&decoder, &pSBf); + TSDB_CHECK_CODE(code, lino, _error); + taosArrayPush(pInfo->pTsSBFs, &pSBf); } @@ -430,15 +563,20 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { if (tDecodeI64(&decoder, &pInfo->interval) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->watermark) < 0) return -1; if (tDecodeI64(&decoder, &pInfo->minTS) < 0) return -1; - pInfo->pCloseWinSBF = tScalableBfDecode(&decoder); + + code = tScalableBfDecode(&decoder, &pInfo->pCloseWinSBF); + if (code != TSDB_CODE_SUCCESS) { + pInfo->pCloseWinSBF = NULL; + code = TSDB_CODE_SUCCESS; + } int32_t mapSize = 0; if (tDecodeI32(&decoder, &mapSize) < 0) return -1; _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT); pInfo->pMap = taosHashInit(mapSize, hashFn, true, HASH_NO_LOCK); uint64_t uid = 0; - void* pVal = NULL; - int32_t valSize = 0; + void* pVal = NULL; + int32_t valSize = 0; for (int32_t i = 0; i < mapSize; i++) { if (tDecodeU64(&decoder, &uid) < 0) return -1; if (tDecodeBinary(&decoder, (uint8_t**)&pVal, &valSize) < 0) return -1; @@ -454,7 +592,8 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { pInfo->pValueBuff = taosMemoryCalloc(1, sizeof(TSKEY) + pInfo->pkColLen); if (pInfo->pkColLen != 0) { pInfo->comparePkRowFn = compareKeyTsAndPk; - pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC);; + pInfo->comparePkCol = getKeyComparFunc(pInfo->pkColType, TSDB_ORDER_ASC); + ; } else { pInfo->comparePkRowFn = compareKeyTs; pInfo->comparePkCol = NULL; @@ -463,11 +602,16 @@ int32_t updateInfoDeserialize(void *buf, int32_t bufLen, SUpdateInfo *pInfo) { tEndDecode(&decoder); tDecoderClear(&decoder); - return 0; + +_error: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -bool isIncrementalTimeStamp(SUpdateInfo *pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { - TSKEY *pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); +bool isIncrementalTimeStamp(SUpdateInfo* pInfo, uint64_t tableId, TSKEY ts, void* pPkVal, int32_t len) { + TSKEY* pMapMaxTs = taosHashGet(pInfo->pMap, &tableId, sizeof(uint64_t)); bool res = true; if (pMapMaxTs && pInfo->comparePkRowFn(pMapMaxTs, &ts, pPkVal, pInfo->comparePkCol) == 1) { res = false; diff --git a/source/util/src/tbloomfilter.c b/source/util/src/tbloomfilter.c index 150faab571..1cf4ecf72e 100644 --- a/source/util/src/tbloomfilter.c +++ b/source/util/src/tbloomfilter.c @@ -22,26 +22,30 @@ #define UNIT_NUM_BITS 64ULL #define UNIT_ADDR_NUM_BITS 6ULL -static FORCE_INLINE bool setBit(uint64_t *buf, uint64_t index) { +static FORCE_INLINE bool setBit(uint64_t* buf, uint64_t index) { uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS; uint64_t old = buf[unitIndex]; buf[unitIndex] |= (1ULL << (index % UNIT_NUM_BITS)); return buf[unitIndex] != old; } -static FORCE_INLINE bool getBit(uint64_t *buf, uint64_t index) { +static FORCE_INLINE bool getBit(uint64_t* buf, uint64_t index) { uint64_t unitIndex = index >> UNIT_ADDR_NUM_BITS; uint64_t mask = 1ULL << (index % UNIT_NUM_BITS); return buf[unitIndex] & mask; } -SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) { +int32_t tBloomFilterInit(uint64_t expectedEntries, double errorRate, SBloomFilter** ppBF) { + int32_t code = 0; + int32_t lino = 0; if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) { - return NULL; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); } - SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter)); + SBloomFilter* pBF = taosMemoryCalloc(1, sizeof(SBloomFilter)); if (pBF == NULL) { - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } pBF->expectedEntries = expectedEntries; pBF->errorRate = errorRate; @@ -61,12 +65,19 @@ SBloomFilter *tBloomFilterInit(uint64_t expectedEntries, double errorRate) { pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); if (pBF->buffer == NULL) { tBloomFilterDestroy(pBF); - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } - return pBF; + (*ppBF) = pBF; + +_error: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) { +int32_t tBloomFilterPutHash(SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) { ASSERT(!tBloomFilterIsFull(pBF)); bool hasChange = false; const register uint64_t size = pBF->numBits; @@ -82,7 +93,7 @@ int32_t tBloomFilterPutHash(SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) { return TSDB_CODE_FAILED; } -int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) { +int32_t tBloomFilterPut(SBloomFilter* pBF, const void* keyBuf, uint32_t len) { uint64_t h1 = (uint64_t)pBF->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pBF->hashFn2(keyBuf, len); bool hasChange = false; @@ -99,7 +110,7 @@ int32_t tBloomFilterPut(SBloomFilter *pBF, const void *keyBuf, uint32_t len) { return TSDB_CODE_FAILED; } -int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t hash1, uint64_t hash2) { +int32_t tBloomFilterNoContain(const SBloomFilter* pBF, uint64_t hash1, uint64_t hash2) { const register uint64_t size = pBF->numBits; uint64_t cbHash = hash1; for (uint32_t i = 0; i < pBF->hashFunctions; ++i) { @@ -111,7 +122,7 @@ int32_t tBloomFilterNoContain(const SBloomFilter *pBF, uint64_t hash1, uint64_t return TSDB_CODE_FAILED; } -void tBloomFilterDestroy(SBloomFilter *pBF) { +void tBloomFilterDestroy(SBloomFilter* pBF) { if (pBF == NULL) { return; } @@ -119,41 +130,70 @@ void tBloomFilterDestroy(SBloomFilter *pBF) { taosMemoryFree(pBF); } -int32_t tBloomFilterEncode(const SBloomFilter *pBF, SEncoder *pEncoder) { +int32_t tBloomFilterEncode(const SBloomFilter* pBF, SEncoder* pEncoder) { if (tEncodeU32(pEncoder, pBF->hashFunctions) < 0) return -1; if (tEncodeU64(pEncoder, pBF->expectedEntries) < 0) return -1; if (tEncodeU64(pEncoder, pBF->numUnits) < 0) return -1; if (tEncodeU64(pEncoder, pBF->numBits) < 0) return -1; if (tEncodeU64(pEncoder, pBF->size) < 0) return -1; for (uint64_t i = 0; i < pBF->numUnits; i++) { - uint64_t *pUnits = (uint64_t *)pBF->buffer; + uint64_t* pUnits = (uint64_t*)pBF->buffer; if (tEncodeU64(pEncoder, pUnits[i]) < 0) return -1; } if (tEncodeDouble(pEncoder, pBF->errorRate) < 0) return -1; return 0; } -SBloomFilter *tBloomFilterDecode(SDecoder *pDecoder) { - SBloomFilter *pBF = taosMemoryCalloc(1, sizeof(SBloomFilter)); +int32_t tBloomFilterDecode(SDecoder* pDecoder, SBloomFilter** ppBF) { + int32_t code = 0; + int32_t lino = 0; + SBloomFilter* pBF = taosMemoryCalloc(1, sizeof(SBloomFilter)); + if (!pBF) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); + } pBF->buffer = NULL; - if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) goto _error; - if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) goto _error; - if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) goto _error; - if (tDecodeU64(pDecoder, &pBF->numBits) < 0) goto _error; - if (tDecodeU64(pDecoder, &pBF->size) < 0) goto _error; + if (tDecodeU32(pDecoder, &pBF->hashFunctions) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeU64(pDecoder, &pBF->expectedEntries) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeU64(pDecoder, &pBF->numUnits) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeU64(pDecoder, &pBF->numBits) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeU64(pDecoder, &pBF->size) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } pBF->buffer = taosMemoryCalloc(pBF->numUnits, sizeof(uint64_t)); for (int32_t i = 0; i < pBF->numUnits; i++) { - uint64_t *pUnits = (uint64_t *)pBF->buffer; - if (tDecodeU64(pDecoder, pUnits + i) < 0) goto _error; + uint64_t* pUnits = (uint64_t*)pBF->buffer; + if (tDecodeU64(pDecoder, pUnits + i) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + } + if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); } - if (tDecodeDouble(pDecoder, &pBF->errorRate) < 0) goto _error; pBF->hashFn1 = HASH_FUNCTION_1; pBF->hashFn2 = HASH_FUNCTION_2; - return pBF; + (*ppBF) = pBF; + return TSDB_CODE_SUCCESS; _error: tBloomFilterDestroy(pBF); - return NULL; + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + return code; } -bool tBloomFilterIsFull(const SBloomFilter *pBF) { return pBF->size >= pBF->expectedEntries; } +bool tBloomFilterIsFull(const SBloomFilter* pBF) { return pBF->size >= pBF->expectedEntries; } diff --git a/source/util/src/tscalablebf.c b/source/util/src/tscalablebf.c index 7af794546b..4a4dfd2653 100644 --- a/source/util/src/tscalablebf.c +++ b/source/util/src/tscalablebf.c @@ -17,6 +17,7 @@ #include "tscalablebf.h" #include "taoserror.h" +#include "tutil.h" #define DEFAULT_GROWTH 2 #define DEFAULT_TIGHTENING_RATIO 0.5 @@ -24,82 +25,120 @@ #define SBF_INVALID -1 #define SBF_VALID 0 -static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate); +static int32_t tScalableBfAddFilter(SScalableBf* pSBf, uint64_t expectedEntries, double errorRate, + SBloomFilter** ppNormalBf); -SScalableBf *tScalableBfInit(uint64_t expectedEntries, double errorRate) { +int32_t tScalableBfInit(uint64_t expectedEntries, double errorRate, SScalableBf** ppSBf) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; const uint32_t defaultSize = 8; if (expectedEntries < 1 || errorRate <= 0 || errorRate >= 1.0) { - return NULL; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); } - SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); + SScalableBf* pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); if (pSBf == NULL) { - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } pSBf->maxBloomFilters = DEFAULT_MAX_BLOOMFILTERS; pSBf->status = SBF_VALID; pSBf->numBits = 0; - pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void *)); - if (tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO) == NULL) { + pSBf->bfArray = taosArrayInit(defaultSize, sizeof(void*)); + if (!pSBf->bfArray) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); + } + + SBloomFilter* pNormalBf = NULL; + code = tScalableBfAddFilter(pSBf, expectedEntries, errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); + if (code != TSDB_CODE_SUCCESS) { tScalableBfDestroy(pSBf); - return NULL; + TSDB_CHECK_CODE(code, lino, _error); } pSBf->growth = DEFAULT_GROWTH; pSBf->hashFn1 = HASH_FUNCTION_1; pSBf->hashFn2 = HASH_FUNCTION_2; - return pSBf; + (*ppSBf) = pSBf; + return TSDB_CODE_SUCCESS; + +_error: + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + return code; } -int32_t tScalableBfPutNoCheck(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { +int32_t tScalableBfPutNoCheck(SScalableBf* pSBf, const void* keyBuf, uint32_t len) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pSBf->status == SBF_INVALID) { - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + int32_t size = taosArrayGetSize(pSBf->bfArray); + SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); + if (!pNormalBf) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } - int32_t size = taosArrayGetSize(pSBf->bfArray); - SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); - ASSERT(pNormalBf); if (tBloomFilterIsFull(pNormalBf)) { - pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, - pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); - if (pNormalBf == NULL) { + code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, + pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); + if (code != TSDB_CODE_SUCCESS) { pSBf->status = SBF_INVALID; - return TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } } return tBloomFilterPut(pNormalBf, keyBuf, len); + +_error: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -int32_t tScalableBfPut(SScalableBf *pSBf, const void *keyBuf, uint32_t len) { +int32_t tScalableBfPut(SScalableBf* pSBf, const void* keyBuf, uint32_t len) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (pSBf->status == SBF_INVALID) { - return TSDB_CODE_FAILED; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); - int32_t size = taosArrayGetSize(pSBf->bfArray); + int32_t size = taosArrayGetSize(pSBf->bfArray); for (int32_t i = size - 2; i >= 0; --i) { if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; } } - SBloomFilter *pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); + SBloomFilter* pNormalBf = taosArrayGetP(pSBf->bfArray, size - 1); ASSERT(pNormalBf); if (tBloomFilterIsFull(pNormalBf)) { - pNormalBf = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, - pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO); - if (pNormalBf == NULL) { + code = tScalableBfAddFilter(pSBf, pNormalBf->expectedEntries * pSBf->growth, + pNormalBf->errorRate * DEFAULT_TIGHTENING_RATIO, &pNormalBf); + if (code != TSDB_CODE_SUCCESS) { pSBf->status = SBF_INVALID; - return TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } } return tBloomFilterPutHash(pNormalBf, h1, h2); + +_error: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32_t len) { +int32_t tScalableBfNoContain(const SScalableBf* pSBf, const void* keyBuf, uint32_t len) { if (pSBf->status == SBF_INVALID) { return TSDB_CODE_FAILED; } uint64_t h1 = (uint64_t)pSBf->hashFn1(keyBuf, len); uint64_t h2 = (uint64_t)pSBf->hashFn2(keyBuf, len); - int32_t size = taosArrayGetSize(pSBf->bfArray); + int32_t size = taosArrayGetSize(pSBf->bfArray); for (int32_t i = size - 1; i >= 0; --i) { if (tBloomFilterNoContain(taosArrayGetP(pSBf->bfArray, i), h1, h2) != TSDB_CODE_SUCCESS) { return TSDB_CODE_FAILED; @@ -108,24 +147,35 @@ int32_t tScalableBfNoContain(const SScalableBf *pSBf, const void *keyBuf, uint32 return TSDB_CODE_SUCCESS; } -static SBloomFilter *tScalableBfAddFilter(SScalableBf *pSBf, uint64_t expectedEntries, double errorRate) { +static int32_t tScalableBfAddFilter(SScalableBf* pSBf, uint64_t expectedEntries, double errorRate, + SBloomFilter** ppNormalBf) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; if (taosArrayGetSize(pSBf->bfArray) >= pSBf->maxBloomFilters) { - return NULL; + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); } - SBloomFilter *pNormalBf = tBloomFilterInit(expectedEntries, errorRate); - if (pNormalBf == NULL) { - return NULL; - } + SBloomFilter* pNormalBf = NULL; + code = tBloomFilterInit(expectedEntries, errorRate, &pNormalBf); + TSDB_CHECK_CODE(code, lino, _error); + if (taosArrayPush(pSBf->bfArray, &pNormalBf) == NULL) { tBloomFilterDestroy(pNormalBf); - return NULL; + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); } pSBf->numBits += pNormalBf->numBits; - return pNormalBf; + (*ppNormalBf) = pNormalBf; + +_error: + if (code != TSDB_CODE_SUCCESS) { + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } -void tScalableBfDestroy(SScalableBf *pSBf) { +void tScalableBfDestroy(SScalableBf* pSBf) { if (pSBf == NULL) { return; } @@ -135,7 +185,7 @@ void tScalableBfDestroy(SScalableBf *pSBf) { taosMemoryFree(pSBf); } -int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) { +int32_t tScalableBfEncode(const SScalableBf* pSBf, SEncoder* pEncoder) { if (!pSBf) { if (tEncodeI32(pEncoder, 0) < 0) return -1; return 0; @@ -143,7 +193,7 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) { int32_t size = taosArrayGetSize(pSBf->bfArray); if (tEncodeI32(pEncoder, size) < 0) return -1; for (int32_t i = 0; i < size; i++) { - SBloomFilter *pBF = taosArrayGetP(pSBf->bfArray, i); + SBloomFilter* pBF = taosArrayGetP(pSBf->bfArray, i); if (tBloomFilterEncode(pBF, pEncoder) < 0) return -1; } if (tEncodeU32(pEncoder, pSBf->growth) < 0) return -1; @@ -153,30 +203,61 @@ int32_t tScalableBfEncode(const SScalableBf *pSBf, SEncoder *pEncoder) { return 0; } -SScalableBf *tScalableBfDecode(SDecoder *pDecoder) { - SScalableBf *pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); +int32_t tScalableBfDecode(SDecoder* pDecoder, SScalableBf** ppSBf) { + int32_t code = TSDB_CODE_SUCCESS; + int32_t lino = 0; + SScalableBf* pSBf = taosMemoryCalloc(1, sizeof(SScalableBf)); + if (!pSBf) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); + } pSBf->hashFn1 = HASH_FUNCTION_1; pSBf->hashFn2 = HASH_FUNCTION_2; pSBf->bfArray = NULL; int32_t size = 0; - if (tDecodeI32(pDecoder, &size) < 0) goto _error; - if (size == 0) { - tScalableBfDestroy(pSBf); - return NULL; + if (tDecodeI32(pDecoder, &size) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); } - pSBf->bfArray = taosArrayInit(size * 2, sizeof(void *)); + if (size == 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + pSBf->bfArray = taosArrayInit(size * 2, POINTER_BYTES); + if (!pSBf->bfArray) { + code = TSDB_CODE_OUT_OF_MEMORY; + TSDB_CHECK_CODE(code, lino, _error); + } + for (int32_t i = 0; i < size; i++) { - SBloomFilter *pBF = tBloomFilterDecode(pDecoder); - if (!pBF) goto _error; + SBloomFilter* pBF = NULL; + code = tBloomFilterDecode(pDecoder, &pBF); + TSDB_CHECK_CODE(code, lino, _error); taosArrayPush(pSBf->bfArray, &pBF); } - if (tDecodeU32(pDecoder, &pSBf->growth) < 0) goto _error; - if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) goto _error; - if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) goto _error; - if (tDecodeI8(pDecoder, &pSBf->status) < 0) goto _error; - return pSBf; + if (tDecodeU32(pDecoder, &pSBf->growth) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeU64(pDecoder, &pSBf->numBits) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeU32(pDecoder, &pSBf->maxBloomFilters) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + if (tDecodeI8(pDecoder, &pSBf->status) < 0) { + code = TSDB_CODE_FAILED; + TSDB_CHECK_CODE(code, lino, _error); + } + (*ppSBf) = pSBf; _error: - tScalableBfDestroy(pSBf); - return NULL; + + if (code != TSDB_CODE_SUCCESS) { + tScalableBfDestroy(pSBf); + uError("%s failed at line %d since %s", __func__, lino, tstrerror(code)); + } + return code; } diff --git a/source/util/test/bloomFilterTest.cpp b/source/util/test/bloomFilterTest.cpp index c51de3c8a4..9d83cbf086 100644 --- a/source/util/test/bloomFilterTest.cpp +++ b/source/util/test/bloomFilterTest.cpp @@ -8,12 +8,15 @@ using namespace std; TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) { int64_t ts1 = 1650803518000; - GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 0)); - GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, 1)); - GTEST_ASSERT_EQ(NULL, tBloomFilterInit(100, -0.1)); - GTEST_ASSERT_EQ(NULL, tBloomFilterInit(0, 0.01)); + SBloomFilter* pBFTmp = NULL; + GTEST_ASSERT_NE(0, tBloomFilterInit(100, 0, &pBFTmp)); + GTEST_ASSERT_NE(0, tBloomFilterInit(100, 1, &pBFTmp)); + GTEST_ASSERT_NE(0, tBloomFilterInit(100, -0.1, &pBFTmp)); + GTEST_ASSERT_NE(0, tBloomFilterInit(0, 0.01, &pBFTmp)); - SBloomFilter *pBF1 = tBloomFilterInit(100, 0.005); + SBloomFilter *pBF1 = NULL; + int32_t code = tBloomFilterInit(100, 0.005, &pBF1); + GTEST_ASSERT_EQ(0, code); GTEST_ASSERT_EQ(pBF1->numBits, 1152); GTEST_ASSERT_EQ(pBF1->numUnits, 1152 / 64); int64_t count = 0; @@ -25,16 +28,19 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) { } ASSERT_TRUE(tBloomFilterIsFull(pBF1)); - SBloomFilter *pBF2 = tBloomFilterInit(1000 * 10000, 0.1); + SBloomFilter* pBF2 = NULL; + GTEST_ASSERT_EQ(0, tBloomFilterInit(1000 * 10000, 0.1, &pBF2)); GTEST_ASSERT_EQ(pBF2->numBits, 47925312); GTEST_ASSERT_EQ(pBF2->numUnits, 47925312 / 64); - SBloomFilter *pBF3 = tBloomFilterInit(10000 * 10000, 0.001); + SBloomFilter* pBF3 = NULL; + GTEST_ASSERT_EQ(0, tBloomFilterInit(10000 * 10000, 0.001, &pBF3)); GTEST_ASSERT_EQ(pBF3->numBits, 1437758784); GTEST_ASSERT_EQ(pBF3->numUnits, 1437758784 / 64); int64_t size = 10000; - SBloomFilter *pBF4 = tBloomFilterInit(size, 0.001); + SBloomFilter* pBF4 = NULL; + GTEST_ASSERT_EQ(0, tBloomFilterInit(size, 0.001, &pBF4)); for (int64_t i = 0; i < 1000; i++) { int64_t ts = i + ts1; GTEST_ASSERT_EQ(tBloomFilterPut(pBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS); @@ -42,17 +48,17 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) { ASSERT_TRUE(!tBloomFilterIsFull(pBF4)); for (int64_t i = 0; i < 1000; i++) { - int64_t ts = i + ts1; - uint64_t h1 = (uint64_t) pBF4->hashFn1((const char*)&ts, sizeof(int64_t)); - uint64_t h2 = (uint64_t) pBF4->hashFn2((const char*)&ts, sizeof(int64_t)); + int64_t ts = i + ts1; + uint64_t h1 = (uint64_t)pBF4->hashFn1((const char*)&ts, sizeof(int64_t)); + uint64_t h2 = (uint64_t)pBF4->hashFn2((const char*)&ts, sizeof(int64_t)); GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_FAILED); } for (int64_t i = 2000; i < 3000; i++) { - int64_t ts = i + ts1; - uint64_t h1 = (uint64_t) pBF4->hashFn1((const char*)&ts, sizeof(int64_t)); - uint64_t h2 = (uint64_t) pBF4->hashFn2((const char*)&ts, sizeof(int64_t)); - GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_SUCCESS); + int64_t ts = i + ts1; + uint64_t h1 = (uint64_t)pBF4->hashFn1((const char*)&ts, sizeof(int64_t)); + uint64_t h2 = (uint64_t)pBF4->hashFn2((const char*)&ts, sizeof(int64_t)); + GTEST_ASSERT_EQ(tBloomFilterNoContain(pBF4, h1, h2), TSDB_CODE_SUCCESS); } tBloomFilterDestroy(pBF1); @@ -64,12 +70,14 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, normal_bloomFilter) { TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { int64_t ts1 = 1650803518000; - GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 0)); - GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, 1)); - GTEST_ASSERT_EQ(NULL, tScalableBfInit(100, -0.1)); - GTEST_ASSERT_EQ(NULL, tScalableBfInit(0, 0.01)); + SScalableBf* tsSBF = NULL; + GTEST_ASSERT_NE(0, tScalableBfInit(100, 0, &tsSBF)); + GTEST_ASSERT_NE(0, tScalableBfInit(100, 1, &tsSBF)); + GTEST_ASSERT_NE(0, tScalableBfInit(100, -0.1, &tsSBF)); + GTEST_ASSERT_NE(0, tScalableBfInit(0, 0.01, &tsSBF)); - SScalableBf *pSBF1 = tScalableBfInit(100, 0.01); + SScalableBf* pSBF1 = NULL; + GTEST_ASSERT_EQ(0, tScalableBfInit(100, 0.01, &pSBF1)); GTEST_ASSERT_EQ(pSBF1->numBits, 1152); int64_t count = 0; int64_t index = 0; @@ -108,7 +116,7 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { int32_t aSize = taosArrayGetSize(pSBF1->bfArray); int64_t totalBits = 0; for (int64_t i = 0; i < aSize; i++) { - SBloomFilter *pBF = (SBloomFilter *)taosArrayGetP(pSBF1->bfArray, i); + SBloomFilter* pBF = (SBloomFilter*)taosArrayGetP(pSBF1->bfArray, i); ASSERT_TRUE(tBloomFilterIsFull(pBF)); totalBits += pBF->numBits; } @@ -120,7 +128,8 @@ TEST(TD_UTIL_BLOOMFILTER_TEST, scalable_bloomFilter) { } int64_t size = 10000; - SScalableBf *pSBF4 = tScalableBfInit(size, 0.001); + SScalableBf* pSBF4 = NULL; + GTEST_ASSERT_EQ(0, tScalableBfInit(size, 0.001, &pSBF4)); for (int64_t i = 0; i < 1000; i++) { int64_t ts = i + ts1; GTEST_ASSERT_EQ(tScalableBfPut(pSBF4, &ts, sizeof(int64_t)), TSDB_CODE_SUCCESS);