enh(stream): add some logs.

This commit is contained in:
Haojun Liao 2023-10-13 19:45:29 +08:00
parent f667bf3866
commit ffa798c4fa
2 changed files with 11 additions and 10 deletions

View File

@ -2111,11 +2111,12 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
if (p == NULL && p1 == NULL) {
mDebug("stream:0x%"PRIx64" %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
mndReleaseStream(pMnode, pStream);
continue;
}
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans", pStream->uid, pStream->name);
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
if (code != TSDB_CODE_SUCCESS) {
sdbCancelFetch(pSdb, pIter);

View File

@ -301,6 +301,8 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
if (status == TASK_STATUS__SCAN_HISTORY) {
stDebug("s-task:%s enter into scan-history data stage, status:%s", id, str);
streamTaskStartScanHistory(pTask);
// start the related fill-history task, when current task is ready
streamLaunchFillHistoryTask(pTask);
} else {
// fill-history tasks are not allowed to reach here.
if (pTask->info.fillHistory == 1) {
@ -312,9 +314,6 @@ static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
streamTaskEnablePause(pTask);
}
}
// when current stream task is ready, check the related fill history task.
streamLaunchFillHistoryTask(pTask);
}
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
@ -370,14 +369,15 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
"s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, not send check again, "
"roll-back needed",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} else if (pRsp->status == TASK_SELF_NEW_STAGE) {
stError(
"s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, not send check "
"again, roll-back needed",
id, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
} else {
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
if (pRsp->status == TASK_SELF_NEW_STAGE) {
stError(
"s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, continue check "
"till downstream nodeUpdate",
id, pRsp->oldStage, (int32_t)pTask->pMeta->stage);
}
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, retry in 100ms, ref:%d ", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);