From f06117dbc915a02d086dbc22c46fe85e40858d6c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Dec 2023 17:32:29 +0800 Subject: [PATCH] fix:[TD-27644]heartbeat closed before snode close leading to snode is hanged --- include/libs/stream/tstream.h | 3 -- source/dnode/mgmt/node_mgmt/src/dmEnv.c | 6 ++++ source/dnode/vnode/src/inc/vnodeInt.h | 2 -- source/dnode/vnode/src/tq/tq.c | 36 ------------------- source/dnode/vnode/src/vnd/vnodeModule.c | 8 ----- source/libs/stream/inc/streamInt.h | 7 +--- source/libs/stream/src/stream.c | 44 +++++++----------------- source/libs/stream/src/streamDispatch.c | 4 +-- source/libs/stream/src/streamMeta.c | 6 ++-- source/libs/stream/src/streamStart.c | 16 ++++----- 10 files changed, 32 insertions(+), 100 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 7f65ef8358..00d2bee880 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -216,9 +216,6 @@ int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem); SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue); #endif -int32_t streamInit(); -void streamCleanUp(); - SStreamDataSubmit* streamDataSubmitNew(SPackedData* pData, int32_t type); void streamDataSubmitDestroy(SStreamDataSubmit* pDataSubmit); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index 6f13abcebc..e0503c83c6 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -28,6 +28,9 @@ } \ } while (0) +extern int32_t streamTimerInit(); +extern void streamTimerCleanUp(); + static SDnode globalDnode = {0}; SDnode *dmInstance() { return &globalDnode; } @@ -166,6 +169,7 @@ int32_t dmInit() { #if defined(USE_S3) if (s3Begin() != 0) return -1; #endif + if (streamTimerInit() != 0) return -1; dInfo("dnode env is initialized"); return 0; @@ -194,6 +198,8 @@ void dmCleanup() { #if defined(USE_S3) s3End(); #endif + streamTimerCleanUp(); + dInfo("dnode env is cleaned up"); taosCleanupCfg(); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 8a4cbb5fd0..7ed0b5103f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -223,8 +223,6 @@ int32_t tsdbDeleteTableData(STsdb* pTsdb, int64_t version, tb_uid_t suid, tb_uid int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg); // tq -int tqInit(); -void tqCleanUp(); STQ* tqOpen(const char* path, SVnode* pVnode); void tqNotifyClose(STQ*); void tqClose(STQ*); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index e9943d2abf..ee76a27414 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -17,12 +17,6 @@ #include "vnd.h" #include "tqCommon.h" -typedef struct { - int8_t inited; -} STqMgmt; - -static STqMgmt tqMgmt = {0}; - // 0: not init // 1: already inited // 2: wait to be inited or cleaup @@ -32,36 +26,6 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_ static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } -int32_t tqInit() { - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 2); - if (old != 2) break; - } - - if (old == 0) { - if (streamInit() < 0) { - return -1; - } - atomic_store_8(&tqMgmt.inited, 1); - } - - return 0; -} - -void tqCleanUp() { - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 2); - if (old != 2) break; - } - - if (old == 1) { - streamCleanUp(); - atomic_store_8(&tqMgmt.inited, 0); - } -} - void tqDestroyTqHandle(void* data) { STqHandle* pData = (STqHandle*)data; qDestroyTask(pData->execHandle.task); diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 4e3cee42c6..44fcbefba7 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -39,12 +39,6 @@ int vnodeInit(int nthreads) { if (walInit() < 0) { return -1; } - if (tqInit() < 0) { - return -1; - } - if (s3Init() < 0) { - return -1; - } return 0; } @@ -58,7 +52,5 @@ void vnodeCleanup() { vnodeAsyncDestroy(&vnodeAsyncHandle[1]); walCleanUp(); - tqCleanUp(); smaCleanUp(); - s3CleanUp(); } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 7b8dae8be7..0df36ec391 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -57,11 +57,6 @@ typedef struct { SSDataBlock* pBlock; } SStreamTrigger; -typedef struct SStreamGlobalEnv { - int8_t inited; - void* timer; -} SStreamGlobalEnv; - typedef struct SStreamContinueExecInfo { SEpSet epset; int32_t taskId; @@ -92,7 +87,7 @@ struct SStreamQueue { int8_t status; }; -extern SStreamGlobalEnv streamEnv; +extern void* streamTimer; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; extern int32_t taskDbWrapperId; diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 1c874f34de..1bef42bf14 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -16,38 +16,18 @@ #include "streamInt.h" #include "ttimer.h" -SStreamGlobalEnv streamEnv; +void* streamTimer = NULL; -int32_t streamInit() { - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&streamEnv.inited, 0, 2); - if (old != 2) break; +int32_t streamTimerInit() { + streamTimer = taosTmrInit(1000, 100, 10000, "STREAM"); + if (streamTimer == NULL) { + return -1; } - - if (old == 0) { - streamEnv.timer = taosTmrInit(1000, 100, 10000, "STREAM"); - if (streamEnv.timer == NULL) { - atomic_store_8(&streamEnv.inited, 0); - return -1; - } - atomic_store_8(&streamEnv.inited, 1); - } - return 0; } -void streamCleanUp() { - int8_t old; - while (1) { - old = atomic_val_compare_exchange_8(&streamEnv.inited, 1, 2); - if (old != 2) break; - } - - if (old == 1) { - taosTmrCleanUp(streamEnv.timer); - atomic_store_8(&streamEnv.inited, 0); - } +void streamTimerCleanUp() { + taosTmrCleanUp(streamTimer); } char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { @@ -77,7 +57,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { if (pTrigger == NULL) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); return; } @@ -88,7 +68,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { stError("s-task:%s failed to prepare retrieve data trigger, code:%s, try again in %dms", id, "out of memory", nextTrigger); - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); return; } @@ -97,7 +77,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { int32_t code = streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pTrigger); if (code != TSDB_CODE_SUCCESS) { - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); return; } @@ -105,7 +85,7 @@ static void streamSchedByTimer(void* param, void* tmrId) { } } - taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamEnv.timer, &pTask->schedInfo.pTimer); + taosTmrReset(streamSchedByTimer, nextTrigger, pTask, streamTimer, &pTask->schedInfo.pTimer); } int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { @@ -115,7 +95,7 @@ int32_t streamSetupScheduleTrigger(SStreamTask* pTask) { stDebug("s-task:%s setup scheduler trigger, delay:%" PRId64 " ms", pTask->id.idStr, pTask->info.triggerParam); - pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamEnv.timer); + pTask->schedInfo.pTimer = taosTmrStart(streamSchedByTimer, (int32_t)pTask->info.triggerParam, pTask, streamTimer); pTask->schedInfo.status = TASK_TRIGGER_STATUS__INACTIVE; } diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 6247d4ed53..1a67b08749 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -506,9 +506,9 @@ void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) { waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount); if (pTask->msgInfo.pTimer != NULL) { - taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->msgInfo.pTimer); + taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamTimer, &pTask->msgInfo.pTimer); } else { - pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamEnv.timer); + pTask->msgInfo.pTimer = taosTmrStart(doRetryDispatchData, waitDuration, pTask, streamTimer); } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index bd23e41a84..807f120cb7 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -369,7 +369,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); metaRefMgtAdd(pMeta->vgId, pRid); - pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamEnv.timer); + pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pMeta->pHbInfo->tickCounter = 0; pMeta->pHbInfo->stopFlag = 0; pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); @@ -1099,7 +1099,7 @@ void metaHbToMnode(void* param, void* tmrId) { } if (!waitForEnoughDuration(pMeta->pHbInfo)) { - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; } @@ -1215,7 +1215,7 @@ void metaHbToMnode(void* param, void* tmrId) { _end: clearHbMsg(&hbMsg, pIdList); - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->pHbInfo->hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index a4c448f678..4c748d58bc 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -114,7 +114,7 @@ static void doReExecScanhistory(void* param, void* tmrId) { // release the task. streamMetaReleaseTask(pTask->pMeta, pTask); } else { - taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); } } @@ -137,9 +137,9 @@ int32_t streamReExecScanHistoryFuture(SStreamTask* pTask, int32_t idleDuration) stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks*0.1, ref); if (pTask->schedHistoryInfo.pTimer == NULL) { - pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer); + pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer); } else { - taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamEnv.timer, &pTask->schedHistoryInfo.pTimer); + taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer); } return TSDB_CODE_SUCCESS; @@ -485,7 +485,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%"PRId64", retry in 100ms, ref:%d ", id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref); - pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamEnv.timer); + pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer); } } @@ -726,7 +726,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { pHTaskInfo->tickCount -= 1; if (pHTaskInfo->tickCount > 0) { - taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask); return; } @@ -754,7 +754,7 @@ static void tryLaunchHistoryTask(void* param, void* tmrId) { stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d", pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes); - taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamEnv.timer, &pHTaskInfo->pTimer); + taosTmrReset(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer); streamMetaReleaseTask(pMeta, pTask); return; } @@ -815,7 +815,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { streamTaskInitForLaunchHTask(&pTask->hTaskInfo); if (pTask->hTaskInfo.pTimer == NULL) { int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); - pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer); + pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer); if (pTask->hTaskInfo.pTimer == NULL) { atomic_sub_fetch_32(&pTask->status.timerActive, 1); stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", pTask->id.idStr, @@ -828,7 +828,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) { } else { // timer exists ASSERT(pTask->status.timerActive >= 1); stDebug("s-task:%s set timer active flag, task timer not null", pTask->id.idStr); - taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamEnv.timer, &pTask->hTaskInfo.pTimer); + taosTmrReset(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer); } return TSDB_CODE_SUCCESS;