From 14e591c802c93f0d90729d4ffa8d1819e37c5f8d Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 18 Jul 2023 14:51:32 +0800 Subject: [PATCH 1/3] mem leak --- source/libs/executor/src/timewindowoperator.c | 8 +- .../tsim/stream/checkpointInterval0.sim | 89 +++++++++++++++++-- 2 files changed, 86 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 769be6268c..69aaef48f4 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -2594,8 +2594,6 @@ int32_t doStreamIntervalEncodeOpState(void** buf, int32_t len, SOperatorInfo* pO while ((pIte = tSimpleHashIterate(pInfo->aggSup.pResultRowHashTable, pIte, &iter)) != NULL) { void* key = taosHashGetKey(pIte, &keyLen); tlen += encodeSWinKey(buf, key); - SRowBuffPos* pPos = *(void**)pIte; - tlen += encodeSRowBuffPos(buf, pPos); } // 2.twAggSup @@ -2655,10 +2653,10 @@ void doStreamIntervalDecodeOpState(void* buf, int32_t len, SOperatorInfo* pOpera buf = taosDecodeFixedI32(buf, &mapSize); for (int32_t i = 0; i < mapSize; i++) { SWinKey key = {0}; - SRowBuffPos* pPos = taosMemoryCalloc(1, sizeof(SRowBuffPos)); - pPos->pKey = taosMemoryCalloc(1, sizeof(SWinKey)); buf = decodeSWinKey(buf, &key); - buf = decodeSRowBuffPos(buf, pPos); + SRowBuffPos* pPos = NULL; + int32_t resSize = pInfo->aggSup.resultRowSize; + pInfo->stateStore.streamStateAddIfNotExist(pInfo->pState, &key, (void**)&pPos, &resSize); tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), &pPos, POINTER_BYTES); } diff --git a/tests/script/tsim/stream/checkpointInterval0.sim b/tests/script/tsim/stream/checkpointInterval0.sim index 5bc8222a54..1c212eb2a7 100644 --- a/tests/script/tsim/stream/checkpointInterval0.sim +++ b/tests/script/tsim/stream/checkpointInterval0.sim @@ -14,6 +14,7 @@ sql use test; sql create table t1(ts timestamp, a int, b int , c int, d double); sql create stream streams0 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt as select _wstart, count(*) c1, sum(a) from t1 interval(10s); +sql create stream streams1 trigger window_close IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt1 as select _wstart, count(*) c1, sum(a) from t1 interval(10s); sql insert into t1 values(1648791213000,1,2,3,1.0); sql insert into t1 values(1648791213001,2,2,3,1.1); @@ -45,6 +46,23 @@ if $data02 != 3 then goto loop0 endi +$loop_count = 0 + +loop01: +sleep 1000 + +sql select * from streamt1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 0 then + print =====rows=$rows expect 1 + goto loop01 +endi + print waiting for checkpoint generation 1 ...... sleep 25000 @@ -126,6 +144,36 @@ if $data12 != 4 then goto loop2 endi + +$loop_count = 0 + +loop3: +sleep 1000 + +print select * from streamt1; +sql select * from streamt1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 2 + goto loop3 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop3 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop3 +endi + print step 2 print restart taosd 02 ...... @@ -136,7 +184,7 @@ system sh/exec.sh -n dnode1 -s start sql insert into t1 values(1648791223004,5,2,3,1.1); -loop20: +loop4: sleep 1000 sql select * from streamt; @@ -148,29 +196,58 @@ endi if $rows != 2 then print =====rows=$rows expect 2 - goto loop20 + goto loop4 endi # row 0 if $data01 != 3 then print =====data01=$data01 - goto loop20 + goto loop4 endi if $data02 != 6 then print =====data02=$data02 - goto loop20 + goto loop4 endi # row 1 if $data11 != 2 then print =====data11=$data11 - goto loop20 + goto loop4 endi if $data12 != 9 then print =====data12=$data12 - goto loop20 + goto loop4 +endi + +$loop_count = 0 + +loop5: +sleep 1000 + +print select * from streamt1; +sql select * from streamt1; + +$loop_count = $loop_count + 1 +if $loop_count == 10 then + return -1 +endi + +if $rows != 1 then + print =====rows=$rows expect 2 + goto loop5 +endi + +# row 0 +if $data01 != 3 then + print =====data01=$data01 + goto loop5 +endi + +if $data02 != 6 then + print =====data02=$data02 + goto loop5 endi print end--------------------------------- From 901b7d8eccbf670192c8e15cb5de770a71faf459 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 18 Jul 2023 15:18:57 +0800 Subject: [PATCH 2/3] fix(stream): if a source task is set to be non-normal, no data should be put into inputQ anymore. --- include/libs/stream/tstream.h | 4 ++-- source/dnode/snode/src/snode.c | 1 + source/dnode/vnode/src/tq/tq.c | 12 ++++++++---- source/dnode/vnode/src/tq/tqRestore.c | 16 +++++++++++++--- source/libs/stream/src/streamCheckpoint.c | 18 ++++++------------ source/libs/stream/src/streamExec.c | 2 +- source/libs/stream/src/streamTask.c | 1 + 7 files changed, 32 insertions(+), 22 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 278c19bdf7..cea80fe4ec 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -310,6 +310,7 @@ struct SStreamTask { SStreamId streamTaskId; SArray* pUpstreamInfoList; // SArray, // children info SArray* pReadyMsgList; // SArray + TdThreadMutex lock; // secure the operation of set task status and puting data into inputQ // output union { @@ -566,6 +567,7 @@ int32_t streamCheckHistoryTaskDownstream(SStreamTask* pTask); int32_t streamTaskScanHistoryDataComplete(SStreamTask* pTask); int32_t streamStartRecoverTask(SStreamTask* pTask, int8_t igUntreated); void streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask); +int32_t streamTaskGetInputQItems(const SStreamTask* pTask); bool streamTaskRecoverScanStep1Finished(SStreamTask* pTask); bool streamTaskRecoverScanStep2Finished(SStreamTask* pTask); @@ -615,9 +617,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamAlignTransferState(SStreamTask* pTask); - int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask); -int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId); #ifdef __cplusplus } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index a959060ee2..534704e462 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -76,6 +76,7 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) { pTask->outputStatus = TASK_OUTPUT_STATUS__NORMAL; pTask->pMsgCb = &pSnode->msgCb; pTask->pMeta = pSnode->pMeta; + taosThreadMutexInit(&pTask->lock, NULL); pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1); if (pTask->pState == NULL) { diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 89beaadf72..456d1ec2c7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -928,16 +928,20 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; - tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 - " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", - vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, - pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); + taosThreadMutexInit(&pTask->lock, NULL); if (pTask->chkInfo.checkpointId != 0) { + // checkpoint ver is the kept version, handled data should be the next version. + pTask->chkInfo.currentVer = pTask->chkInfo.checkpointVer + 1; tqInfo("s-task:%s restore from the checkpointId:%" PRId64 " ver:%" PRId64 " currentVer:%" PRId64, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer); } + tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " currentVer:%" PRId64 + " child id:%d, level:%d, scan-history:%d, trigger:%" PRId64 " ms", + vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->currentVer, + pTask->info.selfChildId, pTask->info.taskLevel, pTask->info.fillHistory, pTask->triggerParam); + return 0; } diff --git a/source/dnode/vnode/src/tq/tqRestore.c b/source/dnode/vnode/src/tq/tqRestore.c index 9eae7c66e7..70277a8356 100644 --- a/source/dnode/vnode/src/tq/tqRestore.c +++ b/source/dnode/vnode/src/tq/tqRestore.c @@ -268,17 +268,25 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - int32_t numOfItemsInQ = taosQueueItemSize(pTask->inputQueue->queue); + int32_t numOfItems = streamTaskGetInputQItems(pTask); // append the data for the stream SStreamQueueItem* pItem = NULL; code = extractMsgFromWal(pTask->exec.pWalReader, (void**) &pItem, pTask->id.idStr); - if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItemsInQ == 0)) { // failed, continue + if ((code != TSDB_CODE_SUCCESS || pItem == NULL) && (numOfItems == 0)) { // failed, continue streamMetaReleaseTask(pStreamMeta, pTask); continue; } + taosThreadMutexLock(&pTask->lock); + + if (pTask->status.taskStatus != TASK_STATUS__NORMAL) { + tqDebug("s-task:%s not ready for submit block from wal, status:%s", pTask->id.idStr, pStatus); + taosThreadMutexUnlock(&pTask->lock); + continue; + } + if (pItem != NULL) { noDataInWal = false; code = tAppendDataToInputQueue(pTask, pItem); @@ -292,7 +300,9 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) { } } - if ((code == TSDB_CODE_SUCCESS) || (numOfItemsInQ > 0)) { + taosThreadMutexUnlock(&pTask->lock); + + if ((code == TSDB_CODE_SUCCESS) || (numOfItems > 0)) { code = streamSchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 7358d16bad..1fc2e96161 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -133,13 +133,18 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); // 1. set task status to be prepared for check point, no data are allowed to put into inputQ. + taosThreadMutexLock(&pTask->lock); + pTask->status.taskStatus = TASK_STATUS__CK; pTask->checkpointingId = pReq->checkpointId; pTask->checkpointNotReadyTasks = streamTaskGetNumOfDownstream(pTask); // 2. let's dispatch checkpoint msg to downstream task directly and do nothing else. put the checkpoint block into // inputQ, to make sure all blocks with less version have been handled by this task already. - return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER); + taosThreadMutexUnlock(&pTask->lock); + + return code; } static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) { @@ -180,15 +185,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc streamFreeQitem((SStreamQueueItem*)pBlock); } } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { - // todo: sink node needs alignment?? - /* ASSERT(pTask->status.taskStatus == TASK_STATUS__CK); - pTask->status.taskStatus = TASK_STATUS__CK_READY; - - // update the child Id for downstream tasks - streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId); - qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", id); - streamFreeQitem((SStreamQueueItem*)pBlock); - } else {*/ ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); // update the child Id for downstream tasks @@ -204,8 +200,6 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc return code; } - - if (taskLevel == TASK_LEVEL__SINK) { pTask->status.taskStatus = TASK_STATUS__CK_READY; qDebug("s-task:%s process checkpoint block, all %d upstreams sent checkpoint msgs, send ready msg to upstream", diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index cad0d58925..b896b47ee4 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -295,7 +295,7 @@ int32_t streamBatchExec(SStreamTask* pTask, int32_t batchLimit) { } #endif -static int32_t getNumOfItemsInputQ(const SStreamTask* pTask) { +int32_t streamTaskGetInputQItems(const SStreamTask* pTask) { int32_t numOfItems1 = taosQueueItemSize(pTask->inputQueue->queue); int32_t numOfItems2 = taosQallItemSize(pTask->inputQueue->qall); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index dec2768975..77f3202c79 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -244,6 +244,7 @@ void tFreeStreamTask(SStreamTask* pTask) { } pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList); + taosThreadMutexDestroy(&pTask->lock); if (pTask->id.idStr != NULL) { taosMemoryFree((void*)pTask->id.idStr); From bada18f5f02455b1e4a131e8dd4f3616419611d1 Mon Sep 17 00:00:00 2001 From: liuyao <54liuyao@163.com> Date: Tue, 18 Jul 2023 17:59:42 +0800 Subject: [PATCH 3/3] heap use after free --- source/libs/executor/src/timewindowoperator.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 69aaef48f4..7a743f8d33 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3816,14 +3816,13 @@ void* decodeSSessionKey(void* buf, SSessionKey* key) { int32_t encodeSResultWindowInfo(void** buf, SResultWindowInfo* key, int32_t outLen) { int32_t tlen = 0; tlen += taosEncodeFixedBool(buf, key->isOutput); - tlen += taosEncodeBinary(buf, key->pOutputBuf, outLen); tlen += encodeSSessionKey(buf, &key->sessionWin); return tlen; } void* decodeSResultWindowInfo(void* buf, SResultWindowInfo* key, int32_t outLen) { buf = taosDecodeFixedBool(buf, &key->isOutput); - buf = taosDecodeBinary(buf, &key->pOutputBuf, outLen); + key->pOutputBuf = NULL; buf = decodeSSessionKey(buf, &key->sessionWin); return buf; }