fix(stream):update the log level.
This commit is contained in:
parent
706f1e4744
commit
c4079332ff
|
@ -1828,13 +1828,13 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
// info needs to be kept till the new trans to update the nodeEp arrived.
|
// info needs to be kept till the new trans to update the nodeEp arrived.
|
||||||
taosHashClear(pMeta->updateInfo.pTasks);
|
taosHashClear(pMeta->updateInfo.pTasks);
|
||||||
} else {
|
} else {
|
||||||
tqInfo("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId);
|
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", pTask->id.idStr, req.transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
|
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
|
||||||
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
void* exist = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
|
||||||
if (exist != NULL) {
|
if (exist != NULL) {
|
||||||
tqInfo("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
tqDebug("s-task:%s (vgId:%d) already update in trans:%d, discard the nodeEp update msg", pTask->id.idStr, vgId,
|
||||||
req.transId);
|
req.transId);
|
||||||
rsp.code = TSDB_CODE_SUCCESS;
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
@ -1859,7 +1859,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pMeta->vgId, req.taskId);
|
pMeta->vgId, req.taskId);
|
||||||
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
|
||||||
} else {
|
} else {
|
||||||
tqInfo("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||||
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1882,10 +1882,10 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
if (ppHTask != NULL) {
|
if (ppHTask != NULL) {
|
||||||
streamTaskStop(*ppHTask);
|
streamTaskStop(*ppHTask);
|
||||||
tqInfo("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
|
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
|
||||||
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
|
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
|
||||||
} else {
|
} else {
|
||||||
tqInfo("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
|
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
rsp.code = 0;
|
rsp.code = 0;
|
||||||
|
@ -1897,12 +1897,12 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pMeta->startInfo.startAllTasksFlag = 1;
|
pMeta->startInfo.startAllTasksFlag = 1;
|
||||||
|
|
||||||
if (updateTasks < numOfTasks) {
|
if (updateTasks < numOfTasks) {
|
||||||
tqInfo("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
|
||||||
updateTasks, (numOfTasks - updateTasks));
|
updateTasks, (numOfTasks - updateTasks));
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqInfo("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks, clear the start after nodeUpdate flag", vgId);
|
||||||
pMeta->startInfo.startAllTasksFlag = 0;
|
pMeta->startInfo.startAllTasksFlag = 0;
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1912,7 +1912,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
while (streamMetaTaskInTimer(pMeta)) {
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
tqInfo("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
taosMsleep(100);
|
taosMsleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1934,11 +1934,11 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
vInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId);
|
||||||
tqResetStreamTaskStatus(pTq);
|
tqResetStreamTaskStatus(pTq);
|
||||||
tqLaunchStreamTaskAsync(pTq);
|
tqLaunchStreamTaskAsync(pTq);
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d, follower node not start stream tasks", vgId);
|
tqInfo("vgId:%d, follower node not start stream tasks", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
Loading…
Reference in New Issue