enh(stream): remove time controller.

This commit is contained in:
Haojun Liao 2024-01-17 14:55:15 +08:00
parent 3d98fc6a67
commit 1ea671e373
5 changed files with 11 additions and 17 deletions

View File

@ -888,6 +888,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
tmr_h streamTimerGetInstance();
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);

View File

@ -107,7 +107,6 @@ struct STQ {
TTB* pExecStore;
TTB* pCheckStore;
SStreamMeta* pStreamMeta;
void* tqTimer;
};
int32_t tEncodeSTqHandle(SEncoder* pEncoder, const STqHandle* pHandle);

View File

@ -26,18 +26,6 @@ static FORCE_INLINE bool tqIsHandleExec(STqHandle* pHandle) { return TMQ_HANDLE_
static FORCE_INLINE void tqSetHandleExec(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_EXEC; }
static FORCE_INLINE void tqSetHandleIdle(STqHandle* pHandle) { pHandle->status = TMQ_HANDLE_STATUS_IDLE; }
static int32_t tqTimerInit(STQ* pTq) {
pTq->tqTimer = taosTmrInit(100, 100, 1000, "TQ");
if (pTq->tqTimer == NULL) {
return -1;
}
return 0;
}
static void tqTimerCleanUp(STQ* pTq) {
taosTmrCleanUp(pTq->tqTimer);
}
void tqDestroyTqHandle(void* data) {
STqHandle* pData = (STqHandle*)data;
qDestroyTask(pData->execHandle.task);
@ -118,7 +106,6 @@ int32_t tqInitialize(STQ* pTq) {
return -1;
}
tqTimerInit(pTq);
return 0;
}
@ -149,7 +136,6 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta);
tqTimerCleanUp(pTq);
qDebug("end to close tq");
taosMemoryFree(pTq);

View File

@ -95,10 +95,14 @@ int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration) {
pParam->pTq = pTq;
pParam->numOfTasks = numOfTasks;
tmr_h pTimer = streamTimerGetInstance();
ASSERT(pTimer);
if (pMeta->scanInfo.scanTimer == NULL) {
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTq->tqTimer);
pMeta->scanInfo.scanTimer = taosTmrStart(doStartScanWal, idleDuration, pParam, pTimer);
} else {
taosTmrReset(doStartScanWal, idleDuration, pParam, pTq->tqTimer, &pMeta->scanInfo.scanTimer);
taosTmrReset(doStartScanWal, idleDuration, pParam, pTimer, &pMeta->scanInfo.scanTimer);
}
return TSDB_CODE_SUCCESS;

View File

@ -35,6 +35,10 @@ void streamTimerCleanUp() {
streamTimer = NULL;
}
tmr_h streamTimerGetInstance() {
return streamTimer;
}
char* createStreamTaskIdStr(int64_t streamId, int32_t taskId) {
char buf[128] = {0};
sprintf(buf, "0x%" PRIx64 "-0x%x", streamId, taskId);