fix(stream): adj result flag
This commit is contained in:
parent
55b5b905b7
commit
7799898896
|
@ -1836,6 +1836,8 @@ static int32_t doStreamTimeSliceNext(SOperatorInfo* pOperator, SSDataBlock** ppR
|
||||||
}
|
}
|
||||||
|
|
||||||
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
|
||||||
|
pInfo->groupResInfo.freeItem = false;
|
||||||
|
|
||||||
pInfo->pUpdated = taosArrayInit(16, sizeof(SWinKey));
|
pInfo->pUpdated = taosArrayInit(16, sizeof(SWinKey));
|
||||||
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
|
||||||
|
|
||||||
|
@ -2023,6 +2025,7 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t keyBytes = sizeof(TSKEY);
|
int32_t keyBytes = sizeof(TSKEY);
|
||||||
|
keyBytes += blockDataGetRowSize(pDownRes) + sizeof(SResultCellData) * taosArrayGetSize(pDownRes->pDataBlock);
|
||||||
if (pPkCol) {
|
if (pPkCol) {
|
||||||
keyBytes += pPkCol->bytes;
|
keyBytes += pPkCol->bytes;
|
||||||
}
|
}
|
||||||
|
@ -2073,9 +2076,6 @@ int32_t createStreamTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode*
|
||||||
pInfo->isHistoryOp = pHandle->fillHistory;
|
pInfo->isHistoryOp = pHandle->fillHistory;
|
||||||
}
|
}
|
||||||
|
|
||||||
// init Info->groupResInfo
|
|
||||||
pInfo->groupResInfo.freeItem = false;
|
|
||||||
|
|
||||||
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
|
pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC;
|
||||||
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
|
setOperatorInfo(pOperator, getStreamOpName(pOperator->operatorType), QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERP_FUNC,
|
||||||
true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
true, OP_NOT_OPENED, pInfo, pTaskInfo);
|
||||||
|
|
|
@ -237,7 +237,7 @@ int32_t getHashSortPrevRow(SStreamFileState* pFileState, const SWinKey* pKey, SW
|
||||||
if (ppBuff) {
|
if (ppBuff) {
|
||||||
pWinStates = (SArray*)(*ppBuff);
|
pWinStates = (SArray*)(*ppBuff);
|
||||||
} else {
|
} else {
|
||||||
qTrace("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
|
qDebug("===stream=== search buff is empty.group id:%" PRId64, pKey->groupId);
|
||||||
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
|
SStreamStateCur* pCur = streamStateFillSeekKeyPrev_rocksdb(pState, pKey);
|
||||||
void* tmpVal = NULL;
|
void* tmpVal = NULL;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
|
|
Loading…
Reference in New Issue