From 3847ed2dc9ae52ae6384a217509787ef7bde0934 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 4 Dec 2024 09:43:51 +0800 Subject: [PATCH 1/9] refactor: 1) adjust wal scan interval to be 500ms, 2) record the checkpointVer to be timestamp for force_window_close, 3) do some other internal refactor. --- source/dnode/vnode/src/tq/tqSink.c | 8 +- source/dnode/vnode/src/tq/tqStreamTask.c | 106 ++++++++---------- source/libs/stream/src/streamBackendRocksdb.c | 1 - source/libs/stream/src/streamExec.c | 65 ++++++----- source/libs/stream/src/streamSched.c | 13 ++- 5 files changed, 96 insertions(+), 97 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index be41f7e99e..51ef02de56 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -13,9 +13,7 @@ * along with this program. If not, see . */ -#include #include "tcommon.h" -#include "tmsg.h" #include "tq.h" #define IS_NEW_SUBTB_RULE(_t) (((_t)->ver >= SSTREAM_TASK_SUBTABLE_CHANGED_VER) && ((_t)->subtableWithoutMd5 != 1)) @@ -50,7 +48,7 @@ static int32_t doPutSinkTableInfoIntoCache(SSHashObj* pSinkTableMap, STableSinkI static bool doGetSinkTableInfoFromCache(SSHashObj* pTableInfoMap, uint64_t groupId, STableSinkInfo** pInfo); static int32_t doRemoveSinkTableInfoInCache(SSHashObj* pSinkTableMap, uint64_t groupId, const char* id); static int32_t checkTagSchema(SStreamTask* pTask, SVnode* pVnode); -static void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs); +static void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs); static int32_t handleResultBlockMsg(SStreamTask* pTask, SSDataBlock* pDataBlock, int32_t index, SVnode* pVnode, int64_t earlyTs); @@ -1062,7 +1060,7 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { return; } - reubuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs); + rebuildAndSendMultiResBlock(pTask, pBlocks, pVnode, earlyTs); } } @@ -1165,7 +1163,7 @@ int32_t doBuildAndSendDeleteMsg(SVnode* pVnode, char* stbFullName, SSDataBlock* return TSDB_CODE_SUCCESS; } -void reubuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) { +void rebuildAndSendMultiResBlock(SStreamTask* pTask, const SArray* pBlocks, SVnode* pVnode, int64_t earlyTs) { int32_t code = 0; const char* id = pTask->id.idStr; int32_t vgId = pTask->pMeta->vgId; diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 29372c5da7..bc7e2e28e3 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -17,19 +17,20 @@ #include "vnd.h" #define MAX_REPEAT_SCAN_THRESHOLD 3 -#define SCAN_WAL_IDLE_DURATION 100 +#define SCAN_WAL_IDLE_DURATION 500 // idle for 500ms to do next wal scan typedef struct SBuildScanWalMsgParam { int64_t metaId; int32_t numOfTasks; } SBuildScanWalMsgParam; -static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); +static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); static int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems, bool* pSucc); static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); +static int32_t doScanWalAsync(STQ* pTq, bool ckPause); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. int32_t tqScanWal(STQ* pTq) { @@ -37,12 +38,11 @@ int32_t tqScanWal(STQ* pTq) { int32_t vgId = pMeta->vgId; int64_t st = taosGetTimestampMs(); int32_t numOfTasks = 0; - bool shouldIdle = true; tqDebug("vgId:%d continue to check if data in wal are available, scanCounter:%d", vgId, pMeta->scanInfo.scanCounter); // check all tasks - int32_t code = doScanWalForAllTasks(pMeta, &shouldIdle); + int32_t code = doScanWalForAllTasks(pMeta); if (code) { tqError("vgId:%d failed to start all tasks, try next time, code:%s", vgId, tstrerror(code)); return code; @@ -133,10 +133,9 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { } int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { - int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; bool alreadyRestored = pTq->pVnode->restored; - int32_t numOfTasks = 0; + int32_t code = 0; // do not launch the stream tasks, if it is a follower or not restored vnode. if (!(vnodeIsRoleLeader(pTq->pVnode) && alreadyRestored)) { @@ -144,47 +143,8 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) { } streamMetaWLock(pMeta); - - numOfTasks = taosArrayGetSize(pMeta->pTaskList); - if (numOfTasks == 0) { - tqDebug("vgId:%d no stream tasks existed to run", vgId); - streamMetaWUnLock(pMeta); - return 0; - } - - if (pMeta->startInfo.startAllTasks) { - tqTrace("vgId:%d in restart procedure, not scan wal", vgId); - streamMetaWUnLock(pMeta); - return 0; - } - - pMeta->scanInfo.scanCounter += 1; - if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { - pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; - } - - if (pMeta->scanInfo.scanCounter > 1) { - tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); - streamMetaWUnLock(pMeta); - return 0; - } - - int32_t numOfPauseTasks = pMeta->numOfPausedTasks; - if (ckPause && numOfTasks == numOfPauseTasks) { - tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); - - // reset the counter value, since we do not launch the scan wal operation. - pMeta->scanInfo.scanCounter = 0; - streamMetaWUnLock(pMeta); - return 0; - } - - tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, - numOfTasks, alreadyRestored); - - int32_t code = streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); + code = doScanWalAsync(pTq, ckPause); streamMetaWUnLock(pMeta); - return code; } @@ -368,11 +328,8 @@ int32_t doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfIt return code; } -int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { - *pScanIdle = true; - bool noDataInWal = true; +int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta) { int32_t vgId = pStreamMeta->vgId; - int32_t numOfTasks = taosArrayGetSize(pStreamMeta->pTaskList); if (numOfTasks == 0) { return TSDB_CODE_SUCCESS; @@ -410,8 +367,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - *pScanIdle = false; - // seek the stored version and extract data from WAL code = setWalReaderStartOffset(pTask, vgId); if (code != TSDB_CODE_SUCCESS) { @@ -437,7 +392,6 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { streamMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) { - noDataInWal = false; code = streamTrySchedExec(pTask); if (code != TSDB_CODE_SUCCESS) { streamMetaReleaseTask(pStreamMeta, pTask); @@ -449,11 +403,47 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { streamMetaReleaseTask(pStreamMeta, pTask); } - // all wal are checked, and no new data available in wal. - if (noDataInWal) { - *pScanIdle = true; - } - taosArrayDestroy(pTaskList); return TSDB_CODE_SUCCESS; } + +int32_t doScanWalAsync(STQ* pTq, bool ckPause) { + SStreamMeta* pMeta = pTq->pStreamMeta; + bool alreadyRestored = pTq->pVnode->restored; + int32_t vgId = pMeta->vgId; + int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); + + if (numOfTasks == 0) { + tqDebug("vgId:%d no stream tasks existed to run", vgId); + return 0; + } + + if (pMeta->startInfo.startAllTasks) { + tqTrace("vgId:%d in restart procedure, not scan wal", vgId); + return 0; + } + + pMeta->scanInfo.scanCounter += 1; + if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { + pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; + } + + if (pMeta->scanInfo.scanCounter > 1) { + tqDebug("vgId:%d wal read task has been launched, remain scan times:%d", vgId, pMeta->scanInfo.scanCounter); + return 0; + } + + int32_t numOfPauseTasks = pMeta->numOfPausedTasks; + if (ckPause && numOfTasks == numOfPauseTasks) { + tqDebug("vgId:%d ignore all submit, all streams had been paused, reset the walScanCounter", vgId); + + // reset the counter value, since we do not launch the scan wal operation. + pMeta->scanInfo.scanCounter = 0; + return 0; + } + + tqDebug("vgId:%d create msg to start wal scan to launch stream tasks, numOfTasks:%d, vnd restored:%d", vgId, + numOfTasks, alreadyRestored); + + return streamTaskSchedTask(&pTq->pVnode->msgCb, vgId, 0, 0, STREAM_EXEC_T_EXTRACT_WAL_DATA); +} diff --git a/source/libs/stream/src/streamBackendRocksdb.c b/source/libs/stream/src/streamBackendRocksdb.c index 09f4e95376..394c6408ba 100644 --- a/source/libs/stream/src/streamBackendRocksdb.c +++ b/source/libs/stream/src/streamBackendRocksdb.c @@ -4378,7 +4378,6 @@ void streamStateParTagSeekKeyNext_rocksdb(SStreamState* pState, const int64_t gr } int32_t streamStateParTagGetKVByCur_rocksdb(SStreamStateCur* pCur, int64_t* pGroupId, const void** pVal, int32_t* pVLen) { - stDebug("streamStateFillGetKVByCur_rocksdb"); if (!pCur) { return -1; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 318720b5b0..85f287f301 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -522,7 +522,10 @@ static int32_t doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int if (pItem->type == STREAM_INPUT__GET_RES) { const SStreamTrigger* pTrigger = (const SStreamTrigger*)pInput; code = qSetMultiStreamInput(pExecutor, pTrigger->pBlock, 1, STREAM_INPUT__DATA_BLOCK); - + if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) { + stDebug("s-task:%s set force_window_close as source block, skey:%"PRId64, id, pTrigger->pBlock->info.window.skey); + (*pVer) = pTrigger->pBlock->info.window.skey; + } } else if (pItem->type == STREAM_INPUT__DATA_SUBMIT) { const SStreamDataSubmit* pSubmit = (const SStreamDataSubmit*)pInput; code = qSetMultiStreamInput(pExecutor, &pSubmit->submit, 1, STREAM_INPUT__DATA_SUBMIT); @@ -671,7 +674,7 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock doRecordThroughput(&pTask->execInfo, totalBlocks, totalSize, blockSize, st, pTask->id.idStr); - // update the currentVer if processing the submit blocks. + // update the currentVer if processing the submitted blocks. if (!(pInfo->checkpointVer <= pInfo->nextProcessVer && ver >= pInfo->checkpointVer)) { stError("s-task:%s invalid info, checkpointVer:%" PRId64 ", nextProcessVer:%" PRId64 " currentVer:%" PRId64, id, pInfo->checkpointVer, pInfo->nextProcessVer, ver); @@ -688,6 +691,34 @@ static int32_t doStreamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pBlock return code; } +// do nothing after sync executor state to storage backend, untill checkpoint is completed. +static int32_t doHandleChkptBlock(SStreamTask* pTask) { + int32_t code = 0; + const char* id = pTask->id.idStr; + + streamMutexLock(&pTask->lock); + SStreamTaskState pState = streamTaskGetStatus(pTask); + if (pState.state == TASK_STATUS__CK) { // todo other thread may change the status + stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); + code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue + } else { // todo refactor + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { + code = streamTaskSendCheckpointSourceRsp(pTask); + } else { + code = streamTaskSendCheckpointReadyMsg(pTask); + } + + if (code != TSDB_CODE_SUCCESS) { + // todo: let's retry send rsp to upstream/mnode + stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0, + tstrerror(code)); + } + } + + streamMutexUnlock(&pTask->lock); + return code; +} + int32_t flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointBlock) { const char* id = pTask->id.idStr; @@ -832,36 +863,16 @@ static int32_t doStreamExecTask(SStreamTask* pTask) { } } - if (type != STREAM_INPUT__CHECKPOINT) { + if (type == STREAM_INPUT__CHECKPOINT) { + code = doHandleChkptBlock(pTask); + streamFreeQitem(pInput); + return code; + } else { code = doStreamTaskExecImpl(pTask, pInput, numOfBlocks); streamFreeQitem(pInput); if (code) { return code; } - } else { // todo other thread may change the status - // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. - streamMutexLock(&pTask->lock); - SStreamTaskState pState = streamTaskGetStatus(pTask); - if (pState.state == TASK_STATUS__CK) { - stDebug("s-task:%s checkpoint block received, set status:%s", id, pState.name); - code = streamTaskBuildCheckpoint(pTask); // ignore this error msg, and continue - } else { // todo refactor - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - code = streamTaskSendCheckpointSourceRsp(pTask); - } else { - code = streamTaskSendCheckpointReadyMsg(pTask); - } - - if (code != TSDB_CODE_SUCCESS) { - // todo: let's retry send rsp to upstream/mnode - stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%d, code:%s", id, 0, - tstrerror(code)); - } - } - - streamMutexUnlock(&pTask->lock); - streamFreeQitem(pInput); - return code; } } } diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 468c5f2139..bf402234ba 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -214,7 +214,6 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig // check whether the time window gaps exist or not int64_t now = taosGetTimestamp(precision); - int64_t ekey = pTrigger->pBlock->info.window.skey + pTask->info.interval.interval; // there are gaps, needs to be filled STimeWindow w = pTrigger->pBlock->info.window; @@ -226,13 +225,15 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig } pTask->status.latestForceWindow = w; - if (ekey + pTask->info.watermark + pTask->info.interval.interval > now) { + if (w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) { int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); - *pNextTrigger = ekey + pTask->info.watermark + pTask->info.interval.interval - now; + *pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now; *pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); - stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d", id, num, prev, - *pNextTrigger); + + pTask->chkInfo.nextProcessVer = w.ekey + pTask->info.interval.interval; + stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d, set ver:%" PRId64, id, + num, prev, *pNextTrigger, pTask->chkInfo.nextProcessVer); return code; } else { stDebug("s-task:%s gap exist for force_window_close, current force_window_skey:%" PRId64, id, w.skey); @@ -289,7 +290,7 @@ void streamTaskSchedHelper(void* param, void* tmrId) { } if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { - nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 seec + nextTrigger = TRIGGER_RECHECK_INTERVAL; // retry in 10 sec stDebug("s-task:%s in checkpoint procedure, not retrieve result, next:%dms", id, TRIGGER_RECHECK_INTERVAL); } else { if (pTask->info.trigger == STREAM_TRIGGER_FORCE_WINDOW_CLOSE && pTask->info.taskLevel == TASK_LEVEL__SOURCE) { From 067d2bd5c7b1b3b2f62e8fd45318206e98a27442 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 4 Dec 2024 22:19:37 +0800 Subject: [PATCH 2/9] fix(stream): fix deadlock when update checkpoint info failed. --- source/libs/stream/src/streamCheckpoint.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d619351a93..3ca283ce98 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -634,9 +634,11 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pInfo->processedVer <= pReq->checkpointVer); if (!valid) { - stFatal("invalid checkpoint id check, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 - " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64, - pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, pReq->checkpointVer); + stFatal("s-task:%s invalid checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64 + " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it", + id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, + pReq->checkpointVer); + streamMutexUnlock(&pTask->lock); return TSDB_CODE_STREAM_INTERNAL_ERROR; } From 406e8777005e02a38b330e15fa818f192cba0998 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 6 Dec 2024 00:11:51 +0800 Subject: [PATCH 3/9] fix(stream): send chkpt-source rsp to mnd even if it is failed. --- source/dnode/vnode/src/tq/tq.c | 116 ++++++++++++--------------------- 1 file changed, 41 insertions(+), 75 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bd78f62cae..937bc6e268 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1108,91 +1108,76 @@ _OVER: return code; } +// always return success to mnode +//todo: handle failure of build and send msg to mnode +static void doSendChkptSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, int32_t code, + int32_t taskId) { + SRpcMsg rsp = {0}; + int32_t ret = streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &rsp, code); + if (ret) { // suppress the error in build checkpoint source rsp + tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", taskId, tstrerror(ret)); + } + tmsgSendRsp(&rsp); // error occurs +} + // no matter what kinds of error happened, make sure the mnode will receive the success execution code. int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) { - int32_t vgId = TD_VID(pTq->pVnode); - SStreamMeta* pMeta = pTq->pStreamMeta; - char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); - int32_t len = pMsg->contLen - sizeof(SMsgHead); - int32_t code = 0; + int32_t vgId = TD_VID(pTq->pVnode); + SStreamMeta* pMeta = pTq->pStreamMeta; + char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); + int32_t len = pMsg->contLen - sizeof(SMsgHead); + int32_t code = 0; + SStreamCheckpointSourceReq req = {0}; + SDecoder decoder = {0}; + SStreamTask* pTask = NULL; + int64_t checkpointId = 0; // disable auto rsp to mnode pRsp->info.handle = NULL; - SStreamCheckpointSourceReq req = {0}; - SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msg, len); if (tDecodeStreamCheckpointSourceReq(&decoder, &req) < 0) { code = TSDB_CODE_MSG_DECODE_ERROR; tDecoderClear(&decoder); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); - - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); + return TSDB_CODE_SUCCESS; // always return success to mnode, } + tDecoderClear(&decoder); if (!vnodeIsRoleLeader(pTq->pVnode)) { tqDebug("vgId:%d not leader, ignore checkpoint-source msg, s-task:0x%x", vgId, req.taskId); - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, todo: handle failure of build and send msg to mnode + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); + return TSDB_CODE_SUCCESS; // always return success to mnode } if (!pTq->pVnode->restored) { tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64 ", transId:%d s-task:0x%x ignore it", vgId, req.checkpointId, req.transId, req.taskId); - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:0x%x failed to build checkpoint-source rsp, code:%s", req.taskId, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; // always return success to mnode, , todo: handle failure of build and send msg to mnode + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); + return TSDB_CODE_SUCCESS; // always return success to mnode } - SStreamTask* pTask = NULL; code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); if (pTask == NULL || code != 0) { tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64 " transId:%d it may have been destroyed", vgId, req.taskId, req.checkpointId, req.transId); - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - tmsgSendRsp(&rsp); // error occurs + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); return TSDB_CODE_SUCCESS; } if (pTask->status.downstreamReady != 1) { - streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId); // record the latest failed checkpoint id + // record the latest failed checkpoint id + streamTaskSetFailedChkptInfo(pTask, req.transId, req.checkpointId); tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64 ", transId:%d set it failed", pTask->id.idStr, req.checkpointId, req.transId); + streamMetaReleaseTask(pMeta, pTask); - - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); return TSDB_CODE_SUCCESS; // todo retry handle error } @@ -1207,14 +1192,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); return TSDB_CODE_SUCCESS; } } else { @@ -1226,7 +1204,6 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) // check if the checkpoint msg already sent or not. if (status == TASK_STATUS__CK) { - int64_t checkpointId = 0; streamTaskGetActiveCheckpointInfo(pTask, NULL, &checkpointId); tqWarn("s-task:%s repeatly recv checkpoint-source msg checkpointId:%" PRId64 @@ -1235,7 +1212,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SYN_PROPOSE_NOT_READY, req.taskId); return TSDB_CODE_SUCCESS; } else { // checkpoint already finished, and not in checkpoint status if (req.checkpointId <= pTask->chkInfo.checkpointId) { @@ -1245,15 +1222,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) streamMutexUnlock(&pTask->lock); streamMetaReleaseTask(pMeta, pTask); - - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - - tmsgSendRsp(&rsp); // error occurs - + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); return TSDB_CODE_SUCCESS; } } @@ -1264,7 +1233,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) if (code) { qError("s-task:%s (vgId:%d) failed to process checkpoint-source req, code:%s", pTask->id.idStr, vgId, tstrerror(code)); - return code; + streamMetaReleaseTask(pMeta, pTask); + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); + return TSDB_CODE_SUCCESS; } if (req.mndTrigger) { @@ -1279,13 +1250,8 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp) code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask); if (code != TSDB_CODE_SUCCESS) { - SRpcMsg rsp = {0}; - int32_t ret = streamTaskBuildCheckpointSourceRsp(&req, &pMsg->info, &rsp, TSDB_CODE_SUCCESS); - if (ret) { // suppress the error in build checkpointsource rsp - tqError("s-task:%s failed to build checkpoint-source rsp, code:%s", pTask->id.idStr, tstrerror(code)); - } - tmsgSendRsp(&rsp); // error occurs - return TSDB_CODE_SUCCESS; + streamTaskSetCheckpointFailed(pTask); // set the checkpoint failed + doSendChkptSourceRsp(&req, &pMsg->info, TSDB_CODE_SUCCESS, req.taskId); } streamMetaReleaseTask(pMeta, pTask); From 43ad35f91165825a41c4f4f15ffecb6a747407e1 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 6 Dec 2024 09:20:39 +0800 Subject: [PATCH 4/9] refactor: adjust the lock procedure for unregister stream tasks. --- source/dnode/vnode/src/sma/smaRollup.c | 3 ++- source/dnode/vnode/src/tqCommon/tqCommon.c | 3 --- source/libs/stream/src/streamCheckpoint.c | 6 ------ source/libs/stream/src/streamMeta.c | 6 ------ 4 files changed, 2 insertions(+), 16 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 80c04a3276..bcacac20a2 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -254,11 +254,12 @@ static void tdRSmaTaskInit(SStreamMeta *pMeta, SRSmaInfoItem *pItem, SStreamTask } static void tdRSmaTaskRemove(SStreamMeta *pMeta, int64_t streamId, int32_t taskId) { + streamMetaWLock(pMeta); + int32_t code = streamMetaUnregisterTask(pMeta, streamId, taskId); if (code != 0) { smaError("vgId:%d, rsma task:%" PRIi64 ",%d drop failed since %s", pMeta->vgId, streamId, taskId, tstrerror(code)); } - streamMetaWLock(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); if (streamMetaCommit(pMeta) < 0) { // persist to disk diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1ea524dc78..68fb651566 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -718,8 +718,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } } - streamMetaWUnLock(pMeta); - // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); @@ -737,7 +735,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } // commit the update - streamMetaWLock(pMeta); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 3ca283ce98..641f41daa9 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -591,7 +591,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV { // destroy the related fill-history tasks // drop task should not in the meta-lock, and drop the related fill-history task now - streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); @@ -599,7 +598,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV id, vgId, pReq->taskId, numOfTasks); } - streamMetaWLock(pMeta); if (pReq->dropRelHTask) { code = streamMetaCommit(pMeta); } @@ -675,8 +673,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV return TSDB_CODE_SUCCESS; } - streamMetaWUnLock(pMeta); - // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); @@ -685,9 +681,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV (int32_t)pReq->hTaskId, numOfTasks); } - streamMetaWLock(pMeta); code = streamMetaCommit(pMeta); - return TSDB_CODE_SUCCESS; } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index a6f87711bf..23a98ef3ae 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -501,8 +501,6 @@ _err: void streamMetaInitBackend(SStreamMeta* pMeta) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); if (pMeta->streamBackend == NULL) { - streamMetaWUnLock(pMeta); - while (1) { streamMetaWLock(pMeta); pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); @@ -908,8 +906,6 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t int32_t code = 0; STaskId id = {.streamId = streamId, .taskId = taskId}; - streamMetaWLock(pMeta); - code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask); if (code == 0) { // desc the paused task counter @@ -958,10 +954,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t } streamMetaReleaseTask(pMeta, pTask); - streamMetaWUnLock(pMeta); } else { stDebug("vgId:%d failed to find the task:0x%x, it may have been dropped already", vgId, taskId); - streamMetaWUnLock(pMeta); } return 0; From 1d700079d44ff1d83b583ead05de361840cae2d8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 6 Dec 2024 18:56:49 +0800 Subject: [PATCH 5/9] refactor: 1) add update task nodeep trans for every involved streams. 2) do some internal refactor for conflict check. --- source/dnode/mnode/impl/src/mndStream.c | 11 ++-- source/dnode/mnode/impl/src/mndStreamTrans.c | 55 +++++++++++--------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 5d41e1506c..0a107518df 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -1931,11 +1931,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange sdbCancelFetch(pSdb, pIter); return terrno = code; } - - code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); - if (code) { - mError("failed to register trans, transId:%d, and continue", pTrans->id); - } } if (!includeAllNodes) { @@ -1951,6 +1946,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid, pStream->name, pTrans->id); + // NOTE: for each stream, we register one trans entry for task update + code = mndStreamRegisterTrans(pTrans, MND_STREAM_TASK_UPDATE_NAME, pStream->uid); + if (code) { + mError("failed to register trans, transId:%d, and continue", pTrans->id); + } + code = mndStreamSetUpdateEpsetAction(pMnode, pStream, pChangeInfo, pTrans); // todo: not continue, drop all and retry again diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index 905a73ad48..a1f87f4d72 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -35,7 +35,12 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) size_t keyLen = 0; void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); - int32_t num = 0; + int32_t numOfChkpt = 0; + int32_t numOfTaskUpdate = 0; + + if (pNumOfActiveChkpt != NULL) { + *pNumOfActiveChkpt = 0; + } if (pList == NULL) { return terrno; @@ -50,15 +55,15 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) void *pKey = taosHashGetKey(pEntry, &keyLen); // key is the name of src/dst db name SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; - mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, - pEntry->startTime); + mDebug("transId:%d stream:0x%" PRIx64 " %s startTs:%" PRId64 " cleared since finished", pEntry->transId, + pEntry->streamId, pEntry->name, pEntry->startTime); void* p = taosArrayPush(pList, &info); if (p == NULL) { return terrno; } } else { if (strcmp(pEntry->name, MND_STREAM_CHECKPOINT_NAME) == 0) { - num++; + numOfChkpt++; } mndReleaseTrans(pMnode, pTrans); } @@ -78,48 +83,34 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) } } - mDebug("clear %d finished stream-trans, remained:%d, active checkpoint trans:%d", size, - taosHashGetSize(execInfo.transMgmt.pDBTrans), num); + mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d, update trans:%d", size, + taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt, numOfTaskUpdate); taosArrayDestroy(pList); if (pNumOfActiveChkpt != NULL) { - *pNumOfActiveChkpt = num; + *pNumOfActiveChkpt = numOfChkpt; } return 0; } -// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. -// For a given stream: -// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. -// 2. create/drop/reset/update trans are conflict with any other trans. -int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { - if (lock) { - streamMutexLock(&execInfo.lock); - } - +static int32_t doStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName) { int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); if (num <= 0) { - if (lock) { - streamMutexUnlock(&execInfo.lock); - } return 0; } + // if any task updates exist, any other stream trans are not allowed to be created int32_t code = mndStreamClearFinishedTrans(pMnode, NULL); if (code) { - mError("failed to clear finish trans, code:%s", tstrerror(code)); + mError("failed to clear finish trans, code:%s, and continue", tstrerror(code)); } SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, &streamId, sizeof(streamId)); if (pEntry != NULL) { SStreamTransInfo tInfo = *pEntry; - if (lock) { - streamMutexUnlock(&execInfo.lock); - } - if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0) && (strcmp(pTransName, MND_STREAM_RESTART_NAME) != 0)) { @@ -141,11 +132,25 @@ int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char mDebug("stream:0x%" PRIx64 " no conflict trans existed, continue create trans", streamId); } + return TSDB_CODE_SUCCESS; +} + +// * Transactions of different streams are not related. Here only check the conflict of transaction for a given stream. +// For a given stream: +// 1. checkpoint trans is conflict with any other trans except for the drop and reset trans. +// 2. create/drop/reset/update trans are conflict with any other trans. +int32_t mndStreamTransConflictCheck(SMnode *pMnode, int64_t streamId, const char *pTransName, bool lock) { + if (lock) { + streamMutexLock(&execInfo.lock); + } + + int32_t code = doStreamTransConflictCheck(pMnode, streamId, pTransName); + if (lock) { streamMutexUnlock(&execInfo.lock); } - return 0; + return code; } int32_t mndStreamGetRelTrans(SMnode *pMnode, int64_t streamId) { From 85c7d513c5bb555ba128240651ea75b152992477 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 6 Dec 2024 19:17:35 +0800 Subject: [PATCH 6/9] refactor(stream): remove unused variables. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index a1f87f4d72..da55a14d74 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -36,7 +36,6 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) void *pIter = NULL; SArray *pList = taosArrayInit(4, sizeof(SKeyInfo)); int32_t numOfChkpt = 0; - int32_t numOfTaskUpdate = 0; if (pNumOfActiveChkpt != NULL) { *pNumOfActiveChkpt = 0; @@ -84,7 +83,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) } mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d, update trans:%d", size, - taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt, numOfTaskUpdate); + taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt); taosArrayDestroy(pList); From 06f91b64e69012dfbc38548b9551409c06d6797c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Dec 2024 11:51:42 +0800 Subject: [PATCH 7/9] fix(stream): fix syntax error. --- source/dnode/mnode/impl/src/mndStreamTrans.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index da55a14d74..a1e104aeca 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -82,7 +82,7 @@ int32_t mndStreamClearFinishedTrans(SMnode *pMnode, int32_t *pNumOfActiveChkpt) } } - mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d, update trans:%d", size, + mDebug("clear %d finished stream-trans, active trans:%d, active checkpoint trans:%d", size, taosHashGetSize(execInfo.transMgmt.pDBTrans), numOfChkpt); taosArrayDestroy(pList); From cea647daf6905b12cab2e2ee606a0de8b315c6ba Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 7 Dec 2024 12:21:28 +0800 Subject: [PATCH 8/9] refactor: limit the number of items in inputq to be 5120 --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamSched.c | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 863bc76c79..8f9e4a311c 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -37,7 +37,7 @@ extern "C" { #define META_HB_CHECK_INTERVAL 200 #define META_HB_SEND_IDLE_COUNTER 25 // send hb every 5 sec #define STREAM_TASK_KEY_LEN ((sizeof(int64_t)) << 1) -#define STREAM_TASK_QUEUE_CAPACITY 20480 +#define STREAM_TASK_QUEUE_CAPACITY 5120 #define STREAM_TASK_QUEUE_CAPACITY_IN_SIZE (30) // clang-format off diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index bf402234ba..0436ae7ee4 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -225,7 +225,8 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig } pTask->status.latestForceWindow = w; - if (w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) { + if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || + streamQueueIsFull(pTask->inputq.queue)) { int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); *pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now; From b715df1ac45fd49c2069ae10ca3234b54f0362f9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 8 Dec 2024 00:20:28 +0800 Subject: [PATCH 9/9] fix(stream): adjust schedule interval. --- source/libs/stream/src/streamSched.c | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/source/libs/stream/src/streamSched.c b/source/libs/stream/src/streamSched.c index 0436ae7ee4..1d0c882003 100644 --- a/source/libs/stream/src/streamSched.c +++ b/source/libs/stream/src/streamSched.c @@ -192,6 +192,7 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig const char* id = pTask->id.idStr; int8_t precision = pTask->info.interval.precision; SStreamTrigger* pTrigger = NULL; + bool isFull = false; while (1) { code = streamCreateForcewindowTrigger(&pTrigger, pTask->info.delaySchedParam, &pTask->info.interval, @@ -225,13 +226,15 @@ static int32_t doCreateForceWindowTrigger(SStreamTask* pTask, int32_t* pNextTrig } pTask->status.latestForceWindow = w; - if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || - streamQueueIsFull(pTask->inputq.queue)) { + isFull = streamQueueIsFull(pTask->inputq.queue); + + if ((w.ekey + pTask->info.watermark + pTask->info.interval.interval > now) || isFull) { int64_t prev = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); + if (!isFull) { + *pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now; + } - *pNextTrigger = w.ekey + pTask->info.watermark + pTask->info.interval.interval - now; *pNextTrigger = convertTimePrecision(*pNextTrigger, precision, TSDB_TIME_PRECISION_MILLI); - pTask->chkInfo.nextProcessVer = w.ekey + pTask->info.interval.interval; stDebug("s-task:%s generate %d time window(s), trigger delay adjust from %" PRId64 " to %d, set ver:%" PRId64, id, num, prev, *pNextTrigger, pTask->chkInfo.nextProcessVer);