fix(stream): add some logs.

This commit is contained in:
Haojun Liao 2024-01-08 11:33:54 +08:00
parent a024fe960c
commit bb9edde79e
2 changed files with 12 additions and 4 deletions

View File

@ -109,6 +109,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
int64_t st = taosGetTimestampMs();
SStreamTaskNodeUpdateMsg req = {0};
@ -194,7 +195,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("s-task:%s vgId:%d not save since restore not finish", pTask->id.idStr, vgId);
}
tqDebug("s-task:%s start to stop task after save task", pTask->id.idStr);
tqDebug("s-task:%s vgId:%d start to stop task after save task", pTask->id.idStr, vgId);
streamTaskStop(pTask);
// keep the already handled info
@ -202,10 +203,16 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
if (ppHTask != NULL) {
streamTaskStop(*ppHTask);
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
int64_t now = taosGetTimestampMs();
tqDebug("s-task:%s vgId:%d task nodeEp update completed, streamTask and related fill-history task closed, elapsed:%" PRId64
" ms",
pTask->id.idStr, vgId, now-st);
taosHashPut(pMeta->updateInfo.pTasks, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
} else {
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
int64_t now = taosGetTimestampMs();
tqDebug("s-task:%s vgId:%d, task nodeEp update completed, streamTask closed, elapsed time:%" PRId64 "ms", pTask->id.idStr,
vgId, now - st);
}
rsp.code = 0;
@ -226,7 +233,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
pMeta->startInfo.tasksWillRestart = 0;
streamMetaWUnLock(pMeta);
} else {
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed", vgId, numOfTasks);
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
#if 0
// for test purpose, to trigger the leader election
taosMSleep(5000);

View File

@ -175,6 +175,7 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
if (qItem == NULL) {
if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(WAIT_FOR_DURATION);
// todo remove it
continue;
}