diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 63dceb7dc6..14a98d94f9 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -319,11 +319,13 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } taosThreadMutexLock(&pTask->lock); + pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); streamMetaReleaseTask(pStreamMeta, pTask); taosThreadMutexUnlock(&pTask->lock); + streamMetaReleaseTask(pStreamMeta, pTask); continue; } diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 4cf3b3239d..53fef3b7e3 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -838,6 +838,7 @@ void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillS if (hasPrevWindow(pFillSup)) { setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo); pFillInfo->pos = FILL_POS_END; + resetFillWindow(&pFillSup->next); pFillSup->next.key = pFillSup->cur.key; pFillSup->next.pRowVal = pFillSup->cur.pRowVal; pFillInfo->preRowKey = INT64_MIN; diff --git a/source/libs/executor/src/streamtimewindowoperator.c b/source/libs/executor/src/streamtimewindowoperator.c index a6326be62f..4ead0df159 100644 --- a/source/libs/executor/src/streamtimewindowoperator.c +++ b/source/libs/executor/src/streamtimewindowoperator.c @@ -1535,6 +1535,7 @@ void destroyStreamSessionAggOperatorInfo(void* param) { SStreamSessionAggOperatorInfo* pInfo = (SStreamSessionAggOperatorInfo*)param; cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); @@ -2861,6 +2862,7 @@ void destroyStreamStateOperatorInfo(void* param) { cleanupBasicInfo(&pInfo->binfo); destroyStreamAggSupporter(&pInfo->streamAggSup); cleanupGroupResInfo(&pInfo->groupResInfo); + cleanupExprSupp(&pInfo->scalarSupp); if (pInfo->pChildren != NULL) { int32_t size = taosArrayGetSize(pInfo->pChildren); for (int32_t i = 0; i < size; i++) { @@ -2874,6 +2876,7 @@ void destroyStreamStateOperatorInfo(void* param) { taosArrayDestroy(pInfo->historyWins); tSimpleHashCleanup(pInfo->pSeUpdated); tSimpleHashCleanup(pInfo->pSeDeleted); + pInfo->pUpdated = taosArrayDestroy(pInfo->pUpdated); blockDataDestroy(pInfo->pCheckpointRes); taosMemoryFreeClear(param); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3d5e0a8f29..c58da7d3f9 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -308,7 +308,7 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { ASSERT(status == TASK_STATUS__HALT || status == TASK_STATUS__DROPPING); } else { - ASSERT(status == TASK_STATUS__SCAN_HISTORY); + ASSERT(status == TASK_STATUS__NORMAL); pStreamTask->status.taskStatus = TASK_STATUS__HALT; qDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr); }