diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index a7f0e02815..2567953cfc 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -30,10 +30,10 @@ #include "operator.h" #include "query.h" #include "querytask.h" +#include "storageapi.h" #include "tcompare.h" #include "thash.h" #include "ttypes.h" -#include "storageapi.h" #define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN) #define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP) @@ -697,8 +697,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) { uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - i) > 1 ? 1 : 0); blockDataEnsureCapacity(pBlock, newSize); - qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", - newSize, pBlock->info.capacity, GET_TASKID(pTaskInfo)); + qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize, + pBlock->info.capacity, GET_TASKID(pTaskInfo)); // todo set the pOperator->resultInfo size } @@ -722,9 +722,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; - SSDataBlock* pBlock = pbInfo->pRes; + SSDataBlock* pBlock = pbInfo->pRes; // set output datablock version pBlock->info.version = pTaskInfo->version; @@ -737,10 +737,12 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr // clear the existed group id pBlock->info.id.groupId = 0; ASSERT(!pbInfo->mergeResultBlock); - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, + false); void* tbname = NULL; - if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { + if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < + 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); @@ -765,10 +767,12 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // clear the existed group id pBlock->info.id.groupId = 0; if (!pbInfo->mergeResultBlock) { - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, + false); } else { while (hasRemainResults(pGroupResInfo)) { - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, true); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, + true); if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; } @@ -966,10 +970,10 @@ int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t si return TSDB_CODE_SUCCESS; } -int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, - SExprSupp* pSup, SGroupResInfo* pGroupResInfo) { - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - SStorageAPI* pAPI = &pTaskInfo->storageAPI; +int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup, + SGroupResInfo* pGroupResInfo) { + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SStorageAPI* pAPI = &pTaskInfo->storageAPI; SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; @@ -986,8 +990,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa // ASSERT(code == 0); if (code == -1) { // for history - qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, pKey->win.skey, - pKey->win.ekey, pKey->groupId); + qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ", opNum:%d", + pKey->win.skey, pKey->win.ekey, pKey->groupId); pGroupResInfo->index += 1; continue; } @@ -1004,7 +1008,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa pBlock->info.id.groupId = pKey->groupId; void* tbname = NULL; - if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { + if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, + &tbname) < 0) { pBlock->info.parTbName[0] = 0; } else { memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN); diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 6ab39ce81b..47bbbde24a 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -1974,7 +1974,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo int32_t vLen = 0; code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen); - if (code == 0 && key->win.skey != resKey.win.skey) { + if (code == 0 && key->win.skey == resKey.win.skey) { *key = resKey; if (pVal) {