fix get error

This commit is contained in:
yihaoDeng 2023-08-18 16:54:33 +08:00
parent c26d625152
commit 0537323bac
2 changed files with 22 additions and 17 deletions

View File

@ -30,10 +30,10 @@
#include "operator.h"
#include "query.h"
#include "querytask.h"
#include "storageapi.h"
#include "tcompare.h"
#include "thash.h"
#include "ttypes.h"
#include "storageapi.h"
#define SET_REVERSE_SCAN_FLAG(runtime) ((runtime)->scanFlag = REVERSE_SCAN)
#define GET_FORWARD_DIRECTION_FACTOR(ord) (((ord) == TSDB_ORDER_ASC) ? QUERY_ASC_FORWARD_STEP : QUERY_DESC_FORWARD_STEP)
@ -697,8 +697,8 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS
if (pBlock->info.rows + pRow->numOfRows > pBlock->info.capacity) {
uint32_t newSize = pBlock->info.rows + pRow->numOfRows + ((numOfRows - i) > 1 ? 1 : 0);
blockDataEnsureCapacity(pBlock, newSize);
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s",
newSize, pBlock->info.capacity, GET_TASKID(pTaskInfo));
qDebug("datablock capacity not sufficient, expand to required:%d, current capacity:%d, %s", newSize,
pBlock->info.capacity, GET_TASKID(pTaskInfo));
// todo set the pOperator->resultInfo size
}
@ -737,10 +737,12 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr
// clear the existed group id
pBlock->info.id.groupId = 0;
ASSERT(!pbInfo->mergeResultBlock);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
false);
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) <
0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);
@ -765,10 +767,12 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG
// clear the existed group id
pBlock->info.id.groupId = 0;
if (!pbInfo->mergeResultBlock) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, false);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
false);
} else {
while (hasRemainResults(pGroupResInfo)) {
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold, true);
doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold,
true);
if (pBlock->info.rows >= pOperator->resultInfo.threshold) {
break;
}
@ -966,8 +970,8 @@ int32_t saveSessionDiscBuf(void* pState, SSessionKey* key, void* buf, int32_t si
return TSDB_CODE_SUCCESS;
}
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock,
SExprSupp* pSup, SGroupResInfo* pGroupResInfo) {
int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDataBlock* pBlock, SExprSupp* pSup,
SGroupResInfo* pGroupResInfo) {
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
SStorageAPI* pAPI = &pTaskInfo->storageAPI;
@ -986,8 +990,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
// ASSERT(code == 0);
if (code == -1) {
// for history
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64, pKey->win.skey,
pKey->win.ekey, pKey->groupId);
qWarn("===stream===not found session result key:%" PRId64 ", ekey:%" PRId64 ", groupId:%" PRIu64 ", opNum:%d",
pKey->win.skey, pKey->win.ekey, pKey->groupId);
pGroupResInfo->index += 1;
continue;
}
@ -1004,7 +1008,8 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, void* pState, SSDa
pBlock->info.id.groupId = pKey->groupId;
void* tbname = NULL;
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) {
if (pAPI->stateStore.streamStateGetParName((void*)pTaskInfo->streamInfo.pState, pBlock->info.id.groupId,
&tbname) < 0) {
pBlock->info.parTbName[0] = 0;
} else {
memcpy(pBlock->info.parTbName, tbname, TSDB_TABLE_NAME_LEN);

View File

@ -1974,7 +1974,7 @@ int32_t streamStateSessionGet_rocksdb(SStreamState* pState, SSessionKey* key, vo
int32_t vLen = 0;
code = streamStateSessionGetKVByCur_rocksdb(pCur, &resKey, &tmp, &vLen);
if (code == 0 && key->win.skey != resKey.win.skey) {
if (code == 0 && key->win.skey == resKey.win.skey) {
*key = resKey;
if (pVal) {