fix(stream): fix syntax error.

This commit is contained in:
Haojun Liao 2024-05-29 21:59:26 +08:00
parent becc4cb368
commit 0946f25e5f
3 changed files with 4 additions and 4 deletions

View File

@ -1156,7 +1156,7 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
taosArrayPush(pList, &in);
int32_t currentSize = taosArrayGetSize(pList);
mDebug("stream:%s (uid:0x%" PRIx64 ") checkpoint interval beyond threshold: %" PRId64 "s(%" PRId64
mDebug("stream:%s (uid:0x%" PRIx64 ") checkpoint interval beyond threshold: %ds(%" PRId64
"s) beyond threshold:%d",
pStream->name, pStream->uid, tsStreamCheckpointInterval, duration / 1000, currentSize);

View File

@ -1038,7 +1038,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
taosThreadMutexUnlock(&pTask->lock);
stWarn("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
// todo handle the agg task failure, add test case

View File

@ -562,13 +562,13 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
if (streamQueueIsFull(pTask->outputq.queue)) {
stWarn("s-task:%s outputQ is full, idle for 500ms and retry", id);
stTrace("s-task:%s outputQ is full, idle for 500ms and retry", id);
streamTaskSetIdleInfo(pTask, 1000);
return 0;
}
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
stWarn("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
stTrace("s-task:%s downstream task inputQ blocked, idle for 1sec and retry", id);
streamTaskSetIdleInfo(pTask, 1000);
return 0;
}