diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index b3f749d7b9..ae4bc5366d 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -199,6 +199,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0); + // todo: this may be not the first one pTask->chkInfo.startTs = taosGetTimestampMs(); // update the child Id for downstream tasks diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 7906f1d6f4..8219f6ec85 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -942,7 +942,7 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) { } taosArrayClear(pTask->pRspMsgList); - stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, + stDebug("s-task:%s level:%d continue process msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, num); return 0; } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 3109bf5967..57f8bd016b 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -580,7 +580,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer); if (ver != pTask->chkInfo.checkpointVer) { - stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 " , currentVer:%" PRId64, + stDebug("s-task:%s update checkpointVer(unsaved) from %" PRId64 " to %" PRId64 ", nextProcessVer:%" PRId64, pTask->id.idStr, pTask->chkInfo.checkpointVer, ver, pTask->chkInfo.nextProcessVer); pTask->chkInfo.checkpointVer = ver; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 901484377a..65a7d56923 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -191,11 +191,11 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu } STokenBucket* pBucket = pTask->pTokenBucket; - if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution +// if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution // stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr, // pBucket->capacity, pBucket->rate); - return TSDB_CODE_SUCCESS; - } +// return TSDB_CODE_SUCCESS; +// } SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) {