Merge pull request #14067 from taosdata/feature/TD-16738
fix(tmq): tmq assert
This commit is contained in:
commit
e3c1cef69b
|
@ -903,7 +903,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
// NOTE: this operator does never check if current status is done or not
|
||||
SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
|
||||
SStreamBlockScanInfo* pInfo = pOperator->info;
|
||||
int32_t rows = 0;
|
||||
|
||||
pTaskInfo->code = pOperator->fpSet._openFn(pOperator);
|
||||
if (pTaskInfo->code != TSDB_CODE_SUCCESS || pOperator->status == OP_EXEC_DONE) {
|
||||
|
@ -1023,9 +1022,6 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
pTaskInfo->code = terrno;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
rows = pBlockInfo->rows;
|
||||
|
||||
// currently only the tbname pseudo column
|
||||
if (pInfo->numOfPseudoExpr > 0) {
|
||||
addTagPseudoColumnData(&pInfo->readHandle, pInfo->pPseudoExpr, pInfo->numOfPseudoExpr, pInfo->pRes);
|
||||
|
@ -1033,14 +1029,16 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
|
||||
doFilter(pInfo->pCondition, pInfo->pRes);
|
||||
blockDataUpdateTsWindow(pInfo->pRes, pInfo->primaryTsIndex);
|
||||
break;
|
||||
if (pBlockInfo->rows > 0) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// record the scan action.
|
||||
pInfo->numOfExec++;
|
||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||
|
||||
if (rows == 0) {
|
||||
if (pBlockInfo->rows == 0) {
|
||||
pOperator->status = OP_EXEC_DONE;
|
||||
} else if (pInfo->pUpdateInfo) {
|
||||
pInfo->tsArrayIndex = 0;
|
||||
|
@ -1056,7 +1054,7 @@ static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) {
|
|||
}
|
||||
}
|
||||
|
||||
return (rows == 0) ? NULL : pInfo->pRes;
|
||||
return (pBlockInfo->rows == 0) ? NULL : pInfo->pRes;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1270,7 +1270,8 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
|
|||
}
|
||||
|
||||
bool isCloseWindow(STimeWindow *pWin, STimeWindowAggSupp* pSup) {
|
||||
return pWin->ekey < pSup->maxTs - pSup->waterMark;
|
||||
ASSERT(pSup->maxTs == INT64_MIN || pSup->maxTs > 0);
|
||||
return pSup->maxTs != INT64_MIN && pWin->ekey < pSup->maxTs - pSup->waterMark;
|
||||
}
|
||||
|
||||
static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup, SInterval* pInterval,
|
||||
|
@ -2141,7 +2142,9 @@ void compactFunctions(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx, int3
|
|||
static void rebuildIntervalWindow(SStreamFinalIntervalOperatorInfo* pInfo, SExprSupp* pSup, SArray* pWinArray, int32_t groupId,
|
||||
int32_t numOfOutput, SExecTaskInfo* pTaskInfo) {
|
||||
int32_t size = taosArrayGetSize(pWinArray);
|
||||
ASSERT(pInfo->pChildren);
|
||||
if (!pInfo->pChildren) {
|
||||
return;
|
||||
}
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
STimeWindow* pParentWin = taosArrayGet(pWinArray, i);
|
||||
SResultRow* pCurResult = NULL;
|
||||
|
@ -2339,6 +2342,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
SStreamFinalIntervalOperatorInfo* pChInfo = pChildOp->info;
|
||||
setInputDataBlock(pChildOp, pChildOp->exprSupp.pCtx, pBlock, pChInfo->order, MAIN_SCAN, true);
|
||||
doHashInterval(pChildOp, pBlock, pBlock->info.groupId, NULL);
|
||||
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, pBlock->info.window.ekey);
|
||||
}
|
||||
maxTs = TMAX(maxTs, pBlock->info.window.ekey);
|
||||
}
|
||||
|
@ -2406,7 +2410,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
initResultRowInfo(&pInfo->binfo.resultRowInfo);
|
||||
pInfo->pChildren = NULL;
|
||||
if (numOfChild > 0) {
|
||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(SOperatorInfo));
|
||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void *));
|
||||
for (int32_t i = 0; i < numOfChild; i++) {
|
||||
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
|
||||
if (pChildOp) {
|
||||
|
|
Loading…
Reference in New Issue