From 810eb22eb284513d968449148914ca7e6aa77358 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Apr 2024 14:15:18 +0800 Subject: [PATCH 1/3] fix(stream): generate the checkpoint id after checking the max checkpoint id from vnode. --- source/dnode/mnode/impl/src/mndStream.c | 36 ++++++++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 566e1a28c3..ed492fe254 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -821,19 +821,47 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { SStreamObj *pStream = NULL; void *pIter = NULL; SSdb *pSdb = pMnode->pSdb; - int64_t maxChkpId = 0; + int64_t maxChkptId = 0; while (1) { pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream); if (pIter == NULL) break; - maxChkpId = TMAX(maxChkpId, pStream->checkpointId); + maxChkptId = TMAX(maxChkptId, pStream->checkpointId); mDebug("stream:%p, %s id:%" PRIx64 "checkpoint %" PRId64 "", pStream, pStream->name, pStream->uid, pStream->checkpointId); sdbRelease(pSdb, pStream); } - mDebug("generated checkpoint %" PRId64 "", maxChkpId + 1); - return maxChkpId + 1; + { // check the max checkpoint id from all vnodes. + int64_t maxCheckpointId = -1; + taosThreadMutexLock(&execInfo.lock); + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { + STaskId *p = taosArrayGet(execInfo.pTaskList, i); + + STaskStatusEntry *pEntry = taosHashGet(execInfo.pTaskMap, p, sizeof(*p)); + if (pEntry == NULL) { + continue; + } + + if (pEntry->checkpointInfo.failed) { + continue; + } + + if (maxCheckpointId < pEntry->checkpointInfo.latestId) { + maxCheckpointId = pEntry->checkpointInfo.latestId; + } + } + + taosThreadMutexUnlock(&execInfo.lock); + if (maxCheckpointId > maxChkptId) { + mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId, + maxCheckpointId); + maxChkptId = maxCheckpointId; + } + } + + mDebug("generated checkpoint %" PRId64 "", maxChkptId + 1); + return maxChkptId + 1; } static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { From 8bdd43c3057b53d7e5ad7e08efe39598d966ca49 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Apr 2024 14:42:21 +0800 Subject: [PATCH 2/3] fix(stream): wait for 50ms when no token in bucket for sink task. --- include/libs/stream/tstream.h | 8 ++++---- source/dnode/vnode/src/tq/tq.c | 2 +- source/libs/stream/inc/streamInt.h | 7 ++++++- source/libs/stream/src/streamExec.c | 18 ++++++++++++------ source/libs/stream/src/streamQueue.c | 16 ++++++++-------- source/libs/stream/src/streamStart.c | 2 +- 6 files changed, 32 insertions(+), 21 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8bced20ca3..5e647c0f9e 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -247,11 +247,11 @@ typedef enum { TASK_SCANHISTORY_CONT = 0x1, TASK_SCANHISTORY_QUIT = 0x2, TASK_SCANHISTORY_REXEC = 0x3, -} EScanHistoryRet; +} EScanHistoryCode; typedef struct { - EScanHistoryRet ret; - int32_t idleTime; + EScanHistoryCode ret; + int32_t idleTime; } SScanhistoryDataInfo; typedef struct { @@ -811,7 +811,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); -int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration); +int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration); bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t latestVer); int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 30ca4c7a36..ef3e643926 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -936,7 +936,7 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) { atomic_store_32(&pTask->status.inScanHistorySentinel, 0); if (retInfo.ret == TASK_SCANHISTORY_REXEC) { - streamReExecScanHistoryFuture(pTask, retInfo.idleTime); + streamExecScanHistoryInFuture(pTask, retInfo.idleTime); } else { SStreamTaskState* p = streamTaskGetStatus(pTask); ETaskStatus s = p->state; diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index ff56ae76b0..d1cc4fb710 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -87,6 +87,11 @@ struct SStreamQueue { int8_t status; }; +typedef enum { + EXEC_CONTINUE = 0x0, + EXEC_AFTER_IDLE = 0x1, +} EExtractDataCode; + extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; @@ -125,7 +130,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo); int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask); void streamClearChkptReadyMsg(SStreamTask* pTask); -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, +EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize); int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 24d1bf7603..d55382be83 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -20,6 +20,7 @@ #define STREAM_RESULT_DUMP_THRESHOLD 300 #define STREAM_RESULT_DUMP_SIZE_THRESHOLD (1048576 * 1) // 1MiB result data #define STREAM_SCAN_HISTORY_TIMESLICE 1000 // 1000 ms +#define MIN_INVOKE_INTERVAL 50 // 50ms static int32_t streamTransferStateDoPrepare(SStreamTask* pTask); @@ -580,16 +581,21 @@ int32_t doStreamExecTask(SStreamTask* pTask) { return 0; } - if (taosGetTimestampMs() - pTask->status.lastExecTs < 50) { + if (taosGetTimestampMs() - pTask->status.lastExecTs < MIN_INVOKE_INTERVAL) { stDebug("s-task:%s invoke with high frequency, idle and retry exec in 50ms", id); - setTaskSchedInfo(pTask, 50); + setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); return 0; } - /*int32_t code = */ streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); - if (pInput == NULL) { - ASSERT(numOfBlocks == 0); - return 0; + EExtractDataCode ret = streamTaskGetDataFromInputQ(pTask, &pInput, &numOfBlocks, &blockSize); + if (ret == EXEC_AFTER_IDLE) { + ASSERT(pInput == NULL && numOfBlocks == 0); + setTaskSchedInfo(pTask, MIN_INVOKE_INTERVAL); + } else { + if (pInput == NULL) { + ASSERT(numOfBlocks == 0); + return 0; + } } // dispatch checkpoint msg to all downstream tasks diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 0936d410bf..9f79501471 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -145,7 +145,7 @@ const char* streamQueueItemGetTypeStr(int32_t type) { } } -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, +EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize) { const char* id = pTask->id.idStr; int32_t taskLevel = pTask->info.taskLevel; @@ -157,13 +157,13 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu // no available token in bucket for sink task, let's wait for a little bit if (taskLevel == TASK_LEVEL__SINK && (!streamTaskExtractAvailableToken(pTask->outputInfo.pTokenBucket, id))) { stDebug("s-task:%s no available token in bucket for sink data, wait for 10ms", id); - return TSDB_CODE_SUCCESS; + return EXEC_AFTER_IDLE; } while (1) { if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) { stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks); - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputq.queue); @@ -179,7 +179,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamTaskPutbackToken(pTask->outputInfo.pTokenBucket); } - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } // do not merge blocks for sink node and check point data block @@ -196,7 +196,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu *blockSize = 0; *numOfBlocks = 1; *pInput = qItem; - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); *blockSize = streamQueueItemGetSize(*pInput); @@ -205,7 +205,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } streamQueueProcessFail(pTask->inputq.queue); - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } } else { if (*pInput == NULL) { @@ -226,7 +226,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } streamQueueProcessFail(pTask->inputq.queue); - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } *pInput = newRet; @@ -243,7 +243,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - return TSDB_CODE_SUCCESS; + return EXEC_CONTINUE; } } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 3abca307da..276997e5a8 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -109,7 +109,7 @@ static void doReExecScanhistory(void* param, void* tmrId) { } } -int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) { +int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) { int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE; if (numOfTicks <= 0) { numOfTicks = 1; From 5ae8b68b176a3890d31d10c3c7b603f3d31f0e7a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 16 Apr 2024 15:37:51 +0800 Subject: [PATCH 3/3] fix(stream): fix deadlock. --- source/dnode/mnode/impl/src/mndStream.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index ed492fe254..364cc062d1 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -817,7 +817,7 @@ _OVER: return terrno; } -int64_t mndStreamGenChkpId(SMnode *pMnode) { +int64_t mndStreamGenChkptId(SMnode *pMnode, bool lock) { SStreamObj *pStream = NULL; void *pIter = NULL; SSdb *pSdb = pMnode->pSdb; @@ -834,7 +834,10 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { { // check the max checkpoint id from all vnodes. int64_t maxCheckpointId = -1; - taosThreadMutexLock(&execInfo.lock); + if (lock) { + taosThreadMutexLock(&execInfo.lock); + } + for (int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) { STaskId *p = taosArrayGet(execInfo.pTaskList, i); @@ -852,7 +855,10 @@ int64_t mndStreamGenChkpId(SMnode *pMnode) { } } - taosThreadMutexUnlock(&execInfo.lock); + if (lock) { + taosThreadMutexUnlock(&execInfo.lock); + } + if (maxCheckpointId > maxChkptId) { mDebug("max checkpointId in mnode:%" PRId64 ", smaller than max checkpointId in vnode:%" PRId64, maxChkptId, maxCheckpointId); @@ -872,7 +878,7 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) { } SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg)); - pMsg->checkpointId = mndStreamGenChkpId(pMnode); + pMsg->checkpointId = mndStreamGenChkptId(pMnode, true); int32_t size = sizeof(SMStreamDoCheckpointMsg); SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_BEGIN_CHECKPOINT, .pCont = pMsg, .contLen = size}; @@ -2329,7 +2335,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) { int32_t total = taosArrayGetSize(*pReqTaskList); if (total == numOfTasks) { // all tasks has send the reqs - int64_t checkpointId = mndStreamGenChkpId(pMnode); + int64_t checkpointId = mndStreamGenChkptId(pMnode, false); mInfo("stream:0x%" PRIx64 " all tasks req checkpoint, start checkpointId:%" PRId64, req.streamId, checkpointId); if (pStream != NULL) { // TODO:handle error