Merge pull request #19153 from taosdata/fix/TD-21217
fix:remove window operator assert
This commit is contained in:
commit
4e8ed70fa4
|
@ -907,7 +907,7 @@ static void removeDeleteResults(SHashObj* pUpdatedMap, SArray* pDelWins) {
|
|||
}
|
||||
|
||||
bool isOverdue(TSKEY ekey, STimeWindowAggSupp* pTwSup) {
|
||||
ASSERT(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0);
|
||||
ASSERTS(pTwSup->maxTs == INT64_MIN || pTwSup->maxTs > 0, "maxts should greater than 0");
|
||||
return pTwSup->maxTs != INT64_MIN && ekey < pTwSup->maxTs - pTwSup->waterMark;
|
||||
}
|
||||
|
||||
|
@ -1396,7 +1396,6 @@ static int32_t getAllIntervalWindow(SSHashObj* pHashMap, SHashObj* resWins) {
|
|||
while ((pIte = tSimpleHashIterate(pHashMap, pIte, &iter)) != NULL) {
|
||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
||||
uint64_t groupId = *(uint64_t*)key;
|
||||
ASSERT(keyLen == GET_RES_WINDOW_KEY_LEN(sizeof(TSKEY)));
|
||||
TSKEY ts = *(int64_t*)((char*)key + sizeof(uint64_t));
|
||||
SResultRowPosition* pPos = (SResultRowPosition*)pIte;
|
||||
int32_t code = saveWinResult(ts, pPos->pageId, pPos->offset, groupId, resWins);
|
||||
|
@ -1547,7 +1546,7 @@ static void closeChildIntervalWindow(SOperatorInfo* pOperator, SArray* pChildren
|
|||
for (int32_t i = 0; i < size; i++) {
|
||||
SOperatorInfo* pChildOp = taosArrayGetP(pChildren, i);
|
||||
SStreamIntervalOperatorInfo* pChInfo = pChildOp->info;
|
||||
ASSERT(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE);
|
||||
ASSERTS(pChInfo->twAggSup.calTrigger == STREAM_TRIGGER_AT_ONCE, "children trigger type should be at once");
|
||||
pChInfo->twAggSup.maxTs = TMAX(pChInfo->twAggSup.maxTs, maxTs);
|
||||
closeStreamIntervalWindow(pChInfo->aggSup.pResultRowHashTable, &pChInfo->twAggSup, &pChInfo->interval, NULL, NULL,
|
||||
NULL, pOperator);
|
||||
|
@ -1767,8 +1766,6 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
|
|||
.maxTs = INT64_MIN,
|
||||
};
|
||||
|
||||
ASSERT(as.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
|
||||
pInfo->win = pTaskInfo->window;
|
||||
pInfo->inputOrder = (pPhyNode->window.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
pInfo->resultTsOrder = (pPhyNode->window.outputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
|
||||
|
@ -2252,7 +2249,6 @@ static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pB
|
|||
return;
|
||||
}
|
||||
blockDataEnsureCapacity(pBlock, size - (*pIndex));
|
||||
ASSERT(3 <= taosArrayGetSize(pBlock->pDataBlock));
|
||||
SColumnInfoData* pStartTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pEndTs = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
|
||||
SColumnInfoData* pGroupId = (SColumnInfoData*)taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
|
||||
|
@ -2359,7 +2355,6 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p
|
|||
SResultRow* pResult = NULL;
|
||||
int32_t forwardRows = 0;
|
||||
|
||||
ASSERT(pSDataBlock->pDataBlock != NULL);
|
||||
SColumnInfoData* pColDataInfo = taosArrayGet(pSDataBlock->pDataBlock, pInfo->primaryTsIndex);
|
||||
tsCols = (int64_t*)pColDataInfo->pData;
|
||||
|
||||
|
@ -2482,7 +2477,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
||||
if (pInfo->pPullDataRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
ASSERT(IS_FINAL_OP(pInfo));
|
||||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
return pInfo->pPullDataRes;
|
||||
}
|
||||
|
@ -2543,7 +2537,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
pInfo->numOfDatapack++;
|
||||
printDataBlock(pBlock, IS_FINAL_OP(pInfo) ? "interval final recv" : "interval semi recv");
|
||||
|
||||
ASSERT(pBlock->info.type != STREAM_INVERT);
|
||||
if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) {
|
||||
pInfo->binfo.pRes->info.type = pBlock->info.type;
|
||||
} else if (pBlock->info.type == STREAM_DELETE_DATA || pBlock->info.type == STREAM_DELETE_RESULT ||
|
||||
|
@ -2633,7 +2626,6 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
doBuildPullDataBlock(pInfo->pPullWins, &pInfo->pullIndex, pInfo->pPullDataRes);
|
||||
if (pInfo->pPullDataRes->info.rows != 0) {
|
||||
// process the rest of the data
|
||||
ASSERT(IS_FINAL_OP(pInfo));
|
||||
printDataBlock(pInfo->pPullDataRes, IS_FINAL_OP(pInfo) ? "interval final" : "interval semi");
|
||||
return pInfo->pPullDataRes;
|
||||
}
|
||||
|
@ -2688,7 +2680,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
.deleteMarkSaved = 0,
|
||||
.calTriggerSaved = 0,
|
||||
};
|
||||
ASSERT(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
ASSERTS(pInfo->twAggSup.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
|
||||
pInfo->primaryTsIndex = ((SColumnNode*)pIntervalPhyNode->window.pTspk)->slotId;
|
||||
size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES;
|
||||
initResultSizeInfo(&pOperator->resultInfo, 4096);
|
||||
|
@ -2713,7 +2705,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
|
||||
initStreamFunciton(pOperator->exprSupp.pCtx, pOperator->exprSupp.numOfExprs);
|
||||
|
||||
ASSERT(numOfCols > 0);
|
||||
initExecTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pTaskInfo->window);
|
||||
|
||||
pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
|
||||
|
@ -2724,6 +2715,9 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
pInfo->pChildren = NULL;
|
||||
if (numOfChild > 0) {
|
||||
pInfo->pChildren = taosArrayInit(numOfChild, sizeof(void*));
|
||||
if (!pInfo->pChildren) {
|
||||
goto _error;
|
||||
}
|
||||
for (int32_t i = 0; i < numOfChild; i++) {
|
||||
SOperatorInfo* pChildOp = createStreamFinalIntervalOperatorInfo(NULL, pPhyNode, pTaskInfo, 0);
|
||||
if (pChildOp) {
|
||||
|
@ -2746,7 +2740,6 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream,
|
|||
// semi interval operator does not catch result
|
||||
pInfo->isFinal = false;
|
||||
pOperator->name = "StreamSemiIntervalOperator";
|
||||
ASSERT(pInfo->aggSup.currentPageId == -1);
|
||||
}
|
||||
|
||||
if (!IS_FINAL_OP(pInfo) || numOfChild == 0) {
|
||||
|
@ -3162,15 +3155,6 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
|
|||
}
|
||||
}
|
||||
|
||||
void deleteWindow(SArray* pWinInfos, int32_t index, FDelete fp) {
|
||||
ASSERT(index >= 0 && index < taosArrayGetSize(pWinInfos));
|
||||
if (fp) {
|
||||
void* ptr = taosArrayGet(pWinInfos, index);
|
||||
fp(ptr);
|
||||
}
|
||||
taosArrayRemove(pWinInfos, index);
|
||||
}
|
||||
|
||||
static void doDeleteTimeWindows(SStreamAggSupporter* pAggSup, SSDataBlock* pBlock, SArray* result) {
|
||||
SColumnInfoData* pStartTsCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
|
||||
TSKEY* startDatas = (TSKEY*)pStartTsCol->pData;
|
||||
|
@ -3218,7 +3202,6 @@ static int32_t copyUpdateResult(SSHashObj* pStUpdated, SArray* pUpdated) {
|
|||
int32_t iter = 0;
|
||||
while ((pIte = tSimpleHashIterate(pStUpdated, pIte, &iter)) != NULL) {
|
||||
void* key = tSimpleHashGetKey(pIte, &keyLen);
|
||||
ASSERT(keyLen == sizeof(SSessionKey));
|
||||
taosArrayPush(pUpdated, key);
|
||||
}
|
||||
taosArraySort(pUpdated, sessionKeyCompareAsc);
|
||||
|
@ -3279,7 +3262,6 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
|
|||
SStreamAggSupporter* pAggSup = &pInfo->streamAggSup;
|
||||
int32_t numOfOutput = pSup->numOfExprs;
|
||||
int32_t numOfChildren = taosArrayGetSize(pInfo->pChildren);
|
||||
ASSERT(pInfo->pChildren);
|
||||
|
||||
for (int32_t i = 0; i < size; i++) {
|
||||
SSessionKey* pWinKey = taosArrayGet(pWinArray, i);
|
||||
|
@ -3380,7 +3362,6 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
|
|||
void initGroupResInfoFromArrayList(SGroupResInfo* pGroupResInfo, SArray* pArrayList) {
|
||||
pGroupResInfo->pRows = pArrayList;
|
||||
pGroupResInfo->index = 0;
|
||||
ASSERT(pGroupResInfo->index <= getNumOfTotalRes(pGroupResInfo));
|
||||
}
|
||||
|
||||
void doBuildSessionResult(SOperatorInfo* pOperator, SStreamState* pState, SGroupResInfo* pGroupResInfo,
|
||||
|
@ -4811,7 +4792,6 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t numOfCols = 0;
|
||||
SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols);
|
||||
ASSERT(numOfCols > 0);
|
||||
|
||||
SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
|
||||
SInterval interval = {
|
||||
|
@ -4831,7 +4811,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys
|
|||
.deleteMark = getDeleteMark(pIntervalPhyNode),
|
||||
};
|
||||
|
||||
ASSERT(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY);
|
||||
ASSERTS(twAggSupp.calTrigger != STREAM_TRIGGER_MAX_DELAY, "trigger type should not be max delay");
|
||||
|
||||
pOperator->pTaskInfo = pTaskInfo;
|
||||
pInfo->interval = interval;
|
||||
|
|
Loading…
Reference in New Issue