refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2023-10-30 14:39:24 +08:00
parent 6e2240a3d1
commit 635243f01a
5 changed files with 4 additions and 11 deletions

View File

@ -159,7 +159,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver); int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
int32_t tqScanWal(STQ* pTq); int32_t tqScanWal(STQ* pTq);
int32_t tqStartStreamTask(STQ* pTq); int32_t tqStartStreamTask(STQ* pTq);
int32_t tqStartStreamTasks(STQ* pTq); int32_t tqResetStreamTaskStatus(STQ* pTq);
int32_t tqStopStreamTasks(STQ* pTq); int32_t tqStopStreamTasks(STQ* pTq);
// tq util // tq util

View File

@ -1990,7 +1990,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) { if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
tqStartStreamTasks(pTq); tqResetStreamTaskStatus(pTq);
tqCheckAndRunStreamTaskAsync(pTq); tqCheckAndRunStreamTaskAsync(pTq);
} else { } else {
vInfo("vgId:%d, follower node not start stream tasks", vgId); vInfo("vgId:%d, follower node not start stream tasks", vgId);

View File

@ -227,7 +227,7 @@ int32_t tqStopStreamTasks(STQ* pTq) {
return 0; return 0;
} }
int32_t tqStartStreamTasks(STQ* pTq) { int32_t tqResetStreamTaskStatus(STQ* pTq) {
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);

View File

@ -564,7 +564,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream tasks", pVnode->config.vgId);
tqStartStreamTasks(pVnode->pTq); tqResetStreamTaskStatus(pVnode->pTq);
tqCheckAndRunStreamTaskAsync(pVnode->pTq); tqCheckAndRunStreamTaskAsync(pVnode->pTq);
} }
} else { } else {

View File

@ -271,13 +271,6 @@ int32_t onNormalTaskReady(SStreamTask* pTask) {
ETaskStatus status = streamTaskGetStatus(pTask, &p); ETaskStatus status = streamTaskGetStatus(pTask, &p);
ASSERT(status == TASK_STATUS__READY); ASSERT(status == TASK_STATUS__READY);
// todo refactor: remove this later
// if (pTask->info.fillHistory == 1) {
// stDebug("s-task:%s fill-history is set normal when start it, try to remove it,set it task to be dropping", id);
// pTask->status.taskStatus = TASK_STATUS__DROPPING;
// ASSERT(pTask->hTaskInfo.id.taskId == 0);
// }
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, stDebug("s-task:%s no need to scan-history data, status:%s, sched-status:%d, ready for data from wal ver:%" PRId64,
id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader)); id, p, pTask->status.schedStatus, walReaderGetCurrentVer(pTask->exec.pWalReader));