diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index 95a4474460..2a600d08ef 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -19,8 +19,6 @@ #define MAX_REPEAT_SCAN_THRESHOLD 3 #define SCAN_WAL_IDLE_DURATION 100 -void* tqTimer = NULL; - static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); @@ -98,9 +96,9 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) { pParam->pTq = pTq; pParam->numOfTasks = numOfTasks; if (pMeta->scanInfo.scanTimer == NULL) { - pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, tqTimer); + pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTq->tqTimer); } else { - taosTmrReset(doStartScanWal, idleDuration, pParam, tqTimer, &pMeta->scanInfo.scanTimer); + taosTmrReset(doStartScanWal, idleDuration, pParam, pTq->tqTimer, &pMeta->scanInfo.scanTimer); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index f687841016..15256ececf 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -23,7 +23,7 @@ #include "ttimer.h" #include "wal.h" -TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; +static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT; int32_t streamBackendId = 0; int32_t streamBackendCfWrapperId = 0; @@ -64,7 +64,7 @@ static void streamMetaEnvInit() { streamTimerInit(); } -void streamMetaInit() { /*taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);*/ streamMetaEnvInit();} +void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);} void streamMetaCleanup() { taosCloseRef(streamBackendId); @@ -393,7 +393,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid)); metaRefMgtAdd(pMeta->vgId, pRid); -// pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); + pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pMeta->pHbInfo->tickCounter = 0; pMeta->pHbInfo->stopFlag = 0; pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL); @@ -1204,7 +1204,7 @@ void metaHbToMnode(void* param, void* tmrId) { } if (!waitForEnoughDuration(pMeta->pHbInfo)) { -// taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); return; } @@ -1214,7 +1214,7 @@ void metaHbToMnode(void* param, void* tmrId) { metaHeartbeatToMnodeImpl(pMeta); streamMetaRUnLock(pMeta); -// taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); + taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr); taosReleaseRef(streamMetaId, rid); } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index 52c3c87431..c852215d16 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -345,16 +345,17 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) { SStreamTaskState* p = streamTaskGetStatus(pTask); ASSERT(p->state == TASK_STATUS__READY); + int8_t schedStatus = pTask->status.schedStatus; if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader); if (startVer == -1) { startVer = pTask->chkInfo.nextProcessVer; } - stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, - id, p->name, pTask->status.schedStatus, startVer); + stDebug("s-task:%s status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, id, p->name, schedStatus, + startVer); } else { - stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p->name, pTask->status.schedStatus); + stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p->name, schedStatus); } return TSDB_CODE_SUCCESS; diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 1660a0ecf0..6748d161ae 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -305,8 +305,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) { } int32_t walSkipFetchBody(SWalReader *pRead) { - wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64", 0x%"PRIx64, + wDebug("vgId:%d, skip fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 + ", applied:%" PRId64 ", 0x%" PRIx64, pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId); @@ -325,11 +325,11 @@ int32_t walFetchBody(SWalReader *pRead) { int64_t ver = pReadHead->version; int32_t vgId = pRead->pWal->cfg.vgId; int64_t id = pRead->readerId; + SWalVer *pVer = &pRead->pWal->vers; - wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64 - ", applied ver:%" PRId64 ", 0x%" PRIx64, - vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer, - pRead->pWal->vers.appliedVer, id); + wDebug("vgId:%d, fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 ", applied:%" PRId64 + ", 0x%" PRIx64, + vgId, ver, pVer->firstVer, pVer->commitVer, pVer->lastVer, pVer->appliedVer, id); if (pRead->capacity < pReadHead->bodyLen) { SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);