fix(stream): fix syntax error.

This commit is contained in:
Haojun Liao 2024-08-02 18:21:46 +08:00
parent 9d219ad62a
commit 281f636954
1 changed files with 11 additions and 9 deletions

View File

@ -1565,11 +1565,12 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
int64_t endTs, bool ready) {
STaskStartInfo* pStartInfo = &pMeta->startInfo;
STaskId id = {.streamId = streamId, .taskId = taskId};
int32_t vgId = pMeta->vgId;
streamMetaWLock(pMeta);
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) { // task does not exists in current vnode, not record the complete info
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
@ -1584,7 +1585,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
stDebug(
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
"time:%" PRId64 "ms",
pMeta->vgId, taskId, ready, el);
vgId, taskId, ready, el);
streamMetaWUnLock(pMeta);
return 0;
}
@ -1595,10 +1596,11 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
if (code) {
if (code == TSDB_CODE_DUP_KEY) {
stError("vgId:%d record start task result failed, s-task:0x%x already exist start results in meta dst hashmap",
pMeta->vgId, id.taskId);
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
" already exist start results in meta start task result hashmap",
vgId, id.taskId);
} else {
stError("vgId:%d failed to record start task:0x%x results, start all tasks failed", pMeta->vgId, id.taskId);
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
}
streamMetaWUnLock(pMeta);
return code;
@ -1613,20 +1615,20 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
", readyTs:%" PRId64 " total elapsed time:%.2fs",
pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
pStartInfo->elapsedTime / 1000.0);
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo, pMeta->vgId);
streamMetaResetStartInfo(pStartInfo, vgId);
streamMetaWUnLock(pMeta);
code = pStartInfo->completeFn(pMeta);
} else {
streamMetaWUnLock(pMeta);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", pMeta->vgId, taskId,
ready, numOfRecv, numOfTotal);
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
numOfRecv, numOfTotal);
}
return code;