diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index f9bba19fbb..d173c83e97 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -28,9 +28,6 @@ } \ } while (0) -extern int32_t streamTimerInit(); -extern void streamTimerCleanUp(); - static SDnode globalDnode = {0}; SDnode *dmInstance() { return &globalDnode; } @@ -169,7 +166,6 @@ int32_t dmInit() { #if defined(USE_S3) if (s3Begin() != 0) return -1; #endif - if (streamTimerInit() != 0) return -1; dInfo("dnode env is initialized"); return 0; @@ -196,10 +192,10 @@ void dmCleanup() { udfStopUdfd(); taosStopCacheRefreshWorker(); dmDiskClose(); + #if defined(USE_S3) s3End(); #endif - streamTimerCleanUp(); dInfo("dnode env is cleaned up"); diff --git a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c index 84465640c0..070d78fd69 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMgmt.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMgmt.c @@ -90,6 +90,7 @@ int32_t dmInitDnode(SDnode *pDnode) { goto _OVER; } #endif + indexInit(tsNumOfCommitThreads); streamMetaInit(); @@ -108,7 +109,9 @@ _OVER: } void dmCleanupDnode(SDnode *pDnode) { - if (pDnode == NULL) return; + if (pDnode == NULL) { + return; + } dmCleanupClient(pDnode); dmCleanupStatusClient(pDnode); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e4c673f533..435b12bc06 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -42,7 +42,6 @@ extern "C" { // clang-format on typedef struct STqOffsetStore STqOffsetStore; -extern void* tqTimer; #define IS_OFFSET_RESET_TYPE(_t) ((_t) < 0) @@ -108,6 +107,7 @@ struct STQ { TTB* pExecStore; TTB* pCheckStore; SStreamMeta* pStreamMeta; + void* tqTimer; }; int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 862d7e1d66..b3cb3c80f7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -26,16 +26,16 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_ static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; } static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; } -int32_t tqTimerInit() { - tqTimer = taosTmrInit(100, 100, 1000, "TQ"); - if (tqTimer == NULL) { +static int32_t tqTimerInit(STQ* pTq) { + pTq->tqTimer = taosTmrInit(100, 100, 1000, "TQ"); + if (pTq->tqTimer == NULL) { return -1; } return 0; } -void tqTimerCleanUp() { - taosTmrCleanUp(tqTimer); +static void tqTimerCleanUp(STQ* pTq) { + taosTmrCleanUp(pTq->tqTimer); } void tqDestroyTqHandle(void* data) { @@ -118,7 +118,7 @@ int32_t tqInitialize(STQ* pTq) { return -1; } - tqTimerInit(); + tqTimerInit(pTq); return 0; } @@ -149,7 +149,7 @@ void tqClose(STQ* pTq) { taosMemoryFree(pTq->path); tqMetaClose(pTq); streamMetaClose(pTq->pStreamMeta); - tqTimerCleanUp(); + tqTimerCleanUp(pTq); qDebug("end to close tq"); taosMemoryFree(pTq); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 9b5134d449..1d1eda74f6 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -92,6 +92,9 @@ extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; extern int32_t taskDbWrapperId; +int32_t streamTimerInit(); +void streamTimerCleanUp(); + void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups); diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 10b1b3fbbe..7db4bec6a0 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -21,13 +21,18 @@ void* streamTimer = NULL; int32_t streamTimerInit() { streamTimer = taosTmrInit(1000, 100, 10000, "STREAM"); if (streamTimer == NULL) { + stError("init stream timer failed, code:%s", tstrerror(terrno)); return -1; } + + stInfo("init stream timer, %p", streamTimer); return 0; } void streamTimerCleanUp() { + stInfo("cleanup stream timer, %p", streamTimer); taosTmrCleanUp(streamTimer); + streamTimer = NULL; } char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 6616fddaed..f687841016 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,7 +23,7 @@ #include "ttimer.h" #include "wal.h" -static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; +TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; @@ -61,9 +61,10 @@ static void streamMetaEnvInit() { streamMetaId = taosOpenRef(64, streamMetaCloseImpl); metaRefMgtInit(); + streamTimerInit(); } -void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit); } +void streamMetaInit() { /*taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);*/ streamMetaEnvInit();} void streamMetaCleanup() { taosCloseRef(streamBackendId); @@ -71,6 +72,7 @@ void streamMetaCleanup() { taosCloseRef(streamMetaId); metaRefMgtCleanup(); + streamTimerCleanUp(); } void metaRefMgtInit() { @@ -391,7 +393,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); metaRefMgtAdd(pMeta->vgId, pRid); - pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); +// pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pMeta->pHbInfo->tickCounter = 0; pMeta->pHbInfo->stopFlag = 0; pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); @@ -1202,7 +1204,7 @@ void metaHbToMnode(void* param, void* tmrId) { } if (!waitForEnoughDuration(pMeta->pHbInfo)) { - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); +// taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; } @@ -1212,7 +1214,7 @@ void metaHbToMnode(void* param, void* tmrId) { metaHeartbeatToMnodeImpl(pMeta); streamMetaRUnLock(pMeta); - taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); +// taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } @@ -1262,11 +1264,11 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { // wait for the stream meta hb function stopping if (pMeta->role == NODE_ROLE_LEADER) { - pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; - while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { - taosMsleep(100); - stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); - } +// pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; +// while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) { +// taosMsleep(100); +// stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); +// } } stDebug("vgId:%d start to check all tasks", vgId); @@ -1285,7 +1287,7 @@ void streamMetaStartHb(SStreamMeta* pMeta) { int64_t* pRid = taosMemoryMalloc(sizeof(int64_t)); metaRefMgtAdd(pMeta->vgId, pRid); *pRid = pMeta->rid; - metaHbToMnode(pRid, NULL); +// metaHbToMnode(pRid, NULL); } void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {