fix(stream): remove invalid timer controllers.
This commit is contained in:
parent
f9d5c0d403
commit
dc0c5539a0
|
@ -19,8 +19,6 @@
|
||||||
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
#define MAX_REPEAT_SCAN_THRESHOLD 3
|
||||||
#define SCAN_WAL_IDLE_DURATION 100
|
#define SCAN_WAL_IDLE_DURATION 100
|
||||||
|
|
||||||
void* tqTimer = NULL;
|
|
||||||
|
|
||||||
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
||||||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||||
|
@ -98,9 +96,9 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
|
||||||
pParam->pTq = pTq;
|
pParam->pTq = pTq;
|
||||||
pParam->numOfTasks = numOfTasks;
|
pParam->numOfTasks = numOfTasks;
|
||||||
if (pMeta->scanInfo.scanTimer == NULL) {
|
if (pMeta->scanInfo.scanTimer == NULL) {
|
||||||
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, tqTimer);
|
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTq->tqTimer);
|
||||||
} else {
|
} else {
|
||||||
taosTmrReset(doStartScanWal, idleDuration, pParam, tqTimer, &pMeta->scanInfo.scanTimer);
|
taosTmrReset(doStartScanWal, idleDuration, pParam, pTq->tqTimer, &pMeta->scanInfo.scanTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
int32_t streamBackendId = 0;
|
int32_t streamBackendId = 0;
|
||||||
int32_t streamBackendCfWrapperId = 0;
|
int32_t streamBackendCfWrapperId = 0;
|
||||||
|
@ -64,7 +64,7 @@ static void streamMetaEnvInit() {
|
||||||
streamTimerInit();
|
streamTimerInit();
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaInit() { /*taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);*/ streamMetaEnvInit();}
|
void streamMetaInit() { taosThreadOnce(&streamMetaModuleInit, streamMetaEnvInit);}
|
||||||
|
|
||||||
void streamMetaCleanup() {
|
void streamMetaCleanup() {
|
||||||
taosCloseRef(streamBackendId);
|
taosCloseRef(streamBackendId);
|
||||||
|
@ -393,7 +393,7 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
|
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
|
||||||
metaRefMgtAdd(pMeta->vgId, pRid);
|
metaRefMgtAdd(pMeta->vgId, pRid);
|
||||||
|
|
||||||
// pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
|
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
|
||||||
pMeta->pHbInfo->tickCounter = 0;
|
pMeta->pHbInfo->tickCounter = 0;
|
||||||
pMeta->pHbInfo->stopFlag = 0;
|
pMeta->pHbInfo->stopFlag = 0;
|
||||||
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
|
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
|
||||||
|
@ -1204,7 +1204,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
|
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
|
||||||
// taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1214,7 +1214,7 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
metaHeartbeatToMnodeImpl(pMeta);
|
metaHeartbeatToMnodeImpl(pMeta);
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaRUnLock(pMeta);
|
||||||
|
|
||||||
// taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -345,16 +345,17 @@ int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
|
||||||
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
||||||
ASSERT(p->state == TASK_STATUS__READY);
|
ASSERT(p->state == TASK_STATUS__READY);
|
||||||
|
|
||||||
|
int8_t schedStatus = pTask->status.schedStatus;
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
|
||||||
if (startVer == -1) {
|
if (startVer == -1) {
|
||||||
startVer = pTask->chkInfo.nextProcessVer;
|
startVer = pTask->chkInfo.nextProcessVer;
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
|
stDebug("s-task:%s status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, id, p->name, schedStatus,
|
||||||
id, p->name, pTask->status.schedStatus, startVer);
|
startVer);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p->name, pTask->status.schedStatus);
|
stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p->name, schedStatus);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -305,8 +305,8 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t walSkipFetchBody(SWalReader *pRead) {
|
int32_t walSkipFetchBody(SWalReader *pRead) {
|
||||||
wDebug("vgId:%d, skip fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
wDebug("vgId:%d, skip fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64
|
||||||
", applied ver:%" PRId64", 0x%"PRIx64,
|
", applied:%" PRId64 ", 0x%" PRIx64,
|
||||||
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
pRead->pWal->cfg.vgId, pRead->pHead->head.version, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer,
|
||||||
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
|
pRead->pWal->vers.lastVer, pRead->pWal->vers.appliedVer, pRead->readerId);
|
||||||
|
|
||||||
|
@ -325,11 +325,11 @@ int32_t walFetchBody(SWalReader *pRead) {
|
||||||
int64_t ver = pReadHead->version;
|
int64_t ver = pReadHead->version;
|
||||||
int32_t vgId = pRead->pWal->cfg.vgId;
|
int32_t vgId = pRead->pWal->cfg.vgId;
|
||||||
int64_t id = pRead->readerId;
|
int64_t id = pRead->readerId;
|
||||||
|
SWalVer *pVer = &pRead->pWal->vers;
|
||||||
|
|
||||||
wDebug("vgId:%d, fetch body %" PRId64 ", first ver:%" PRId64 ", commit ver:%" PRId64 ", last ver:%" PRId64
|
wDebug("vgId:%d, fetch body:%" PRId64 ", first:%" PRId64 ", commit:%" PRId64 ", last:%" PRId64 ", applied:%" PRId64
|
||||||
", applied ver:%" PRId64 ", 0x%" PRIx64,
|
", 0x%" PRIx64,
|
||||||
vgId, ver, pRead->pWal->vers.firstVer, pRead->pWal->vers.commitVer, pRead->pWal->vers.lastVer,
|
vgId, ver, pVer->firstVer, pVer->commitVer, pVer->lastVer, pVer->appliedVer, id);
|
||||||
pRead->pWal->vers.appliedVer, id);
|
|
||||||
|
|
||||||
if (pRead->capacity < pReadHead->bodyLen) {
|
if (pRead->capacity < pReadHead->bodyLen) {
|
||||||
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
SWalCkHead *ptr = (SWalCkHead *)taosMemoryRealloc(pRead->pHead, sizeof(SWalCkHead) + pReadHead->bodyLen);
|
||||||
|
|
Loading…
Reference in New Issue