diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 4d5e18520c..d4ca0aba62 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -19,7 +19,7 @@ // message process int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart); int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId); -int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored); +int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader); int32_t tqStreamTaskProcessDispatchReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDispatchRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveReq(SStreamMeta* pMeta, SRpcMsg* pMsg); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index c4c0aaf742..0fa32acfbb 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -137,6 +137,7 @@ enum { STREAM_QUEUE__SUCESS = 1, STREAM_QUEUE__FAILED, STREAM_QUEUE__PROCESSING, + STREAM_QUEUE__CHKPTFAILED, }; typedef enum EStreamTaskEvent { diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6eee8c510b..b15ac447b0 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -157,7 +157,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { case TDMT_STREAM_TASK_DROP: return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: - return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); + return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true, true); case TDMT_VND_STREAM_TASK_RESET: return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg->pCont); case TDMT_STREAM_TASK_PAUSE: diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 63727e5c45..e1298b11ea 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1364,7 +1364,8 @@ int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { - return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, pTq->pVnode->restored); + return tqStreamTaskProcessUpdateReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, pMsg, + pTq->pVnode->restored, (pTq->pStreamMeta->role == NODE_ROLE_LEADER)); } int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) { diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index b34ea78f64..7c65f805e9 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -87,6 +87,8 @@ static void doStartScanWal(void* param, void* tmrId) { tmr_h pTimer = NULL; SBuildScanWalMsgParam* pParam = (SBuildScanWalMsgParam*)param; + tqDebug("start to do scan wal in tmr, metaRid:%" PRId64, pParam->metaId); + SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, pParam->metaId); if (pMeta == NULL) { tqError("metaRid:%" PRId64 " not valid now, stream meta has been freed", pParam->metaId); @@ -131,7 +133,7 @@ static void doStartScanWal(void* param, void* tmrId) { } if (pMeta->startInfo.startAllTasks) { - tqTrace("vgId:%d in restart procedure, not ready to scan wal", vgId); + tqDebug("vgId:%d in restart procedure, not ready to scan wal", vgId); goto _end; } @@ -154,7 +156,7 @@ static void doStartScanWal(void* param, void* tmrId) { goto _end; } - tqTrace("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); + tqDebug("vgId:%d create msg to start wal scan, numOfTasks:%d", vgId, numOfTasks); #if 0 // wait for the vnode is freed, and invalid read may occur. @@ -317,9 +319,13 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { return false; } - // check if input queue is full or not + // check whether input queue is full or not if (streamQueueIsFull(pTask->inputq.queue)) { - tqTrace("s-task:%s input queue is full, do nothing", pTask->id.idStr); + tqTrace("s-task:%s input queue is full, launch task without scanning wal", pTask->id.idStr); + int32_t code = streamTrySchedExec(pTask); + if (code) { + tqError("s-task:%s failed to start task while inputQ is full", pTask->id.idStr); + } return false; } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 2c48ada0fa..fb3582c5ff 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -139,7 +139,7 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream } // this is to process request from transaction, always return true. -int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) { +int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored, bool isLeader) { int32_t vgId = pMeta->vgId; char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); @@ -298,14 +298,19 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks); - if (restored) { + if (restored && isLeader) { tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId); pMeta->startInfo.tasksWillRestart = 1; } if (updateTasks < numOfTasks) { - tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, - updateTasks, (numOfTasks - updateTasks)); + if (isLeader) { + tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId, + updateTasks, (numOfTasks - updateTasks)); + } else { + tqDebug("vgId:%d closed tasks:%d, unclosed:%d, follower not restart tasks", vgId, updateTasks, + (numOfTasks - updateTasks)); + } } else { if ((code = streamMetaCommit(pMeta)) < 0) { // always return true @@ -316,17 +321,21 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamMetaClearSetUpdateTaskListComplete(pMeta); - if (!restored) { - tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); - } else { - tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); + if (isLeader) { + if (!restored) { + tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId); + } else { + tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId); #if 0 taosMSleep(5000);// for test purpose, to trigger the leader election #endif - code = tqStreamTaskStartAsync(pMeta, cb, true); - if (code) { - tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code)); + code = tqStreamTaskStartAsync(pMeta, cb, true); + if (code) { + tqError("vgId:%d async start all tasks, failed, code:%s", vgId, tstrerror(code)); + } } + } else { + tqDebug("vgId:%d follower nodes not restart tasks", vgId); } } diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index d9778a6a05..0e5aaac58c 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -144,6 +144,8 @@ struct SStreamQueue { STaosQall* qall; void* qItem; int8_t status; + STaosQueue* pChkptQueue; + void* qChkptItem; }; struct SStreamQueueItem { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index b7039d372d..b093f808c0 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1098,6 +1098,8 @@ static void chkptReadyMsgSendMonitorFn(void* param, void* tmrId) { pActiveInfo = pTask->chkInfo.pActiveInfo; pTmrInfo = &pActiveInfo->chkptReadyMsgTmr; + stDebug("s-task:%s acquire task, refId:%" PRId64, id, taskRefId); + // check the status every 100ms if (streamTaskShouldStop(pTask)) { streamCleanBeforeQuitTmr(pTmrInfo, param); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 054f88ec3d..1015917f61 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -915,8 +915,35 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) { } } +static bool shouldNotCont(SStreamTask* pTask) { + int32_t level = pTask->info.taskLevel; + SStreamQueue* pQueue = pTask->inputq.queue; + ETaskStatus status = streamTaskGetStatus(pTask).state; + + // 1. task should jump out + bool quit = (status == TASK_STATUS__STOP) || (status == TASK_STATUS__PAUSE) || (status == TASK_STATUS__DROPPING); + + // 2. checkpoint procedure, the source task's checkpoint queue is empty, not read from ordinary queue + bool emptyCkQueue = (taosQueueItemSize(pQueue->pChkptQueue) == 0); + + // 3. no data in ordinary queue + bool emptyBlockQueue = (streamQueueGetNumOfItems(pQueue) == 0); + + if (quit) { + return true; + } else { + if (status == TASK_STATUS__CK && level == TASK_LEVEL__SOURCE) { + // in checkpoint procedure, we only check whether the controller queue is empty or not + return emptyCkQueue; + } else { // otherwise, if the block queue is empty, not continue. + return emptyBlockQueue && emptyCkQueue; + } + } +} + int32_t streamResumeTask(SStreamTask* pTask) { const char* id = pTask->id.idStr; + int32_t level = pTask->info.taskLevel; int32_t code = 0; if (pTask->status.schedStatus != TASK_SCHED_STATUS__ACTIVE) { @@ -929,11 +956,10 @@ int32_t streamResumeTask(SStreamTask* pTask) { if (code) { stError("s-task:%s failed to exec stream task, code:%s, continue", id, tstrerror(code)); } - // check if continue + streamMutexLock(&pTask->lock); - int32_t numOfItems = streamQueueGetNumOfItems(pTask->inputq.queue); - if ((numOfItems == 0) || streamTaskShouldStop(pTask) || streamTaskShouldPause(pTask)) { + if (shouldNotCont(pTask)) { atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE); streamTaskClearSchedIdleInfo(pTask); streamMutexUnlock(&pTask->lock); diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 401aa7530d..954e41f288 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -32,11 +32,12 @@ typedef struct SQueueReader { static bool streamTaskExtractAvailableToken(STokenBucket* pBucket, const char* id); static void streamTaskPutbackToken(STokenBucket* pBucket); static void streamTaskConsumeQuota(STokenBucket* pBucket, int32_t bytes); +static void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id); static void streamQueueCleanup(SStreamQueue* pQueue) { SStreamQueueItem* qItem = NULL; while (1) { - streamQueueNextItem(pQueue, &qItem); + streamQueueNextItemInSourceQ(pQueue, &qItem, TASK_STATUS__READY, NULL); if (qItem == NULL) { break; } @@ -47,7 +48,9 @@ static void streamQueueCleanup(SStreamQueue* pQueue) { int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) { *pQ = NULL; + int32_t code = 0; + int32_t lino = 0; SStreamQueue* pQueue = taosMemoryCalloc(1, sizeof(SStreamQueue)); if (pQueue == NULL) { @@ -55,24 +58,26 @@ int32_t streamQueueOpen(int64_t cap, SStreamQueue** pQ) { } code = taosOpenQueue(&pQueue->pQueue); - if (code) { - taosMemoryFreeClear(pQueue); - return code; - } + TSDB_CHECK_CODE(code, lino, _error); code = taosAllocateQall(&pQueue->qall); - if (code) { - taosCloseQueue(pQueue->pQueue); - taosMemoryFree(pQueue); - return code; - } + TSDB_CHECK_CODE(code, lino, _error); + + code = taosOpenQueue(&pQueue->pChkptQueue); + TSDB_CHECK_CODE(code, lino, _error); pQueue->status = STREAM_QUEUE__SUCESS; + taosSetQueueCapacity(pQueue->pQueue, cap); taosSetQueueMemoryCapacity(pQueue->pQueue, cap * 1024); *pQ = pQueue; return code; + +_error: + streamQueueClose(pQueue, 0); + stError("failed to open stream queue at line:%d, code:%s", lino, tstrerror(code)); + return code; } void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { @@ -82,6 +87,11 @@ void streamQueueClose(SStreamQueue* pQueue, int32_t taskId) { taosFreeQall(pQueue->qall); taosCloseQueue(pQueue->pQueue); + pQueue->pQueue = NULL; + + taosCloseQueue(pQueue->pChkptQueue); + pQueue->pChkptQueue = NULL; + taosMemoryFree(pQueue); } @@ -94,6 +104,7 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) { } else { pQueue->qItem = NULL; (void) taosGetQitem(pQueue->qall, &pQueue->qItem); + if (pQueue->qItem == NULL) { (void) taosReadAllQitems(pQueue->pQueue, pQueue->qall); (void) taosGetQitem(pQueue->qall, &pQueue->qItem); @@ -103,6 +114,56 @@ void streamQueueNextItem(SStreamQueue* pQueue, SStreamQueueItem** pItem) { } } +void streamQueueNextItemInSourceQ(SStreamQueue* pQueue, SStreamQueueItem** pItem, ETaskStatus status, const char* id) { + *pItem = NULL; + int8_t flag = atomic_exchange_8(&pQueue->status, STREAM_QUEUE__PROCESSING); + + if (flag == STREAM_QUEUE__CHKPTFAILED) { + *pItem = pQueue->qChkptItem; + return; + } + + if (flag == STREAM_QUEUE__FAILED) { + *pItem = pQueue->qItem; + return; + } + + pQueue->qChkptItem = NULL; + taosReadQitem(pQueue->pChkptQueue, (void**)&pQueue->qChkptItem); + if (pQueue->qChkptItem != NULL) { + stDebug("s-task:%s read data from checkpoint queue, status:%d", id, status); + *pItem = pQueue->qChkptItem; + return; + } + + // if in checkpoint status, not read data from ordinary input q. + if (status == TASK_STATUS__CK) { + stDebug("s-task:%s in checkpoint status, not read data in block queue, status:%d", id, status); + return; + } + + // let's try the ordinary input q + pQueue->qItem = NULL; + int32_t code = taosGetQitem(pQueue->qall, &pQueue->qItem); + if (code) { + stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code)); + } + + if (pQueue->qItem == NULL) { + code = taosReadAllQitems(pQueue->pQueue, pQueue->qall); + if (code) { + stError("s-task:%s failed to get all items in inputq, code:%s", id, tstrerror(code)); + } + + code = taosGetQitem(pQueue->qall, &pQueue->qItem); + if (code) { + stError("s-task:%s failed to get item in inputq, code:%s", id, tstrerror(code)); + } + } + + *pItem = streamQueueCurItem(pQueue); +} + void streamQueueProcessSuccess(SStreamQueue* queue) { if (atomic_load_8(&queue->status) != STREAM_QUEUE__PROCESSING) { stError("invalid queue status:%d, expect:%d", atomic_load_8(&queue->status), STREAM_QUEUE__PROCESSING); @@ -110,6 +171,7 @@ void streamQueueProcessSuccess(SStreamQueue* queue) { } queue->qItem = NULL; + queue->qChkptItem = NULL; atomic_store_8(&queue->status, STREAM_QUEUE__SUCESS); } @@ -121,6 +183,14 @@ void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } +void streamQueueGetSourceChkptFailed(SStreamQueue* pQueue) { + if (atomic_load_8(&pQueue->status) != STREAM_QUEUE__PROCESSING) { + stError("invalid queue status:%d, expect:%d", atomic_load_8(&pQueue->status), STREAM_QUEUE__PROCESSING); + return; + } + atomic_store_8(&pQueue->status, STREAM_QUEUE__CHKPTFAILED); +} + bool streamQueueIsFull(const SStreamQueue* pQueue) { int32_t numOfItems = streamQueueGetNumOfItems(pQueue); if (numOfItems >= STREAM_TASK_QUEUE_CAPACITY) { @@ -175,8 +245,9 @@ const char* streamQueueItemGetTypeStr(int32_t type) { EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, int32_t* blockSize) { - const char* id = pTask->id.idStr; - int32_t taskLevel = pTask->info.taskLevel; + const char* id = pTask->id.idStr; + int32_t taskLevel = pTask->info.taskLevel; + SStreamQueue* pQueue = pTask->inputq.queue; *pInput = NULL; *numOfBlocks = 0; @@ -189,13 +260,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte } while (1) { - if (streamTaskShouldPause(pTask) || streamTaskShouldStop(pTask)) { - stDebug("s-task:%s task should pause, extract input blocks:%d", id, *numOfBlocks); + ETaskStatus status = streamTaskGetStatus(pTask).state; + if (status == TASK_STATUS__PAUSE || status == TASK_STATUS__STOP) { + stDebug("s-task:%s task should pause/stop, extract input blocks:%d", id, *numOfBlocks); return EXEC_CONTINUE; } SStreamQueueItem* qItem = NULL; - streamQueueNextItem(pTask->inputq.queue, (SStreamQueueItem**)&qItem); + if (taskLevel == TASK_LEVEL__SOURCE) { + streamQueueNextItemInSourceQ(pQueue, &qItem, status, id); + } else { + streamQueueNextItem(pQueue, &qItem); + } + if (qItem == NULL) { // restore the token to bucket if (*numOfBlocks > 0) { @@ -225,14 +302,19 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte *numOfBlocks = 1; *pInput = qItem; return EXEC_CONTINUE; - } else { // previous existed blocks needs to be handle, before handle the checkpoint msg block + } else { // previous existed blocks needs to be handled, before handle the checkpoint msg block stDebug("s-task:%s %s msg extracted, handle previous blocks, numOfBlocks:%d", id, p, *numOfBlocks); *blockSize = streamQueueItemGetSize(*pInput); if (taskLevel == TASK_LEVEL__SINK) { streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - streamQueueProcessFail(pTask->inputq.queue); + if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && + (taskLevel == TASK_LEVEL__SOURCE)) { + streamQueueGetSourceChkptFailed(pQueue); + } else { + streamQueueProcessFail(pQueue); + } return EXEC_CONTINUE; } } else { @@ -252,7 +334,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte streamTaskConsumeQuota(pTask->outputInfo.pTokenBucket, *blockSize); } - streamQueueProcessFail(pTask->inputq.queue); + streamQueueProcessFail(pQueue); return EXEC_CONTINUE; } @@ -260,7 +342,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte } *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputq.queue); + streamQueueProcessSuccess(pQueue); if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { stDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); @@ -279,6 +361,7 @@ EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueIte int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) { int8_t type = pItem->type; STaosQueue* pQueue = pTask->inputq.queue->pQueue; + int32_t level = pTask->info.taskLevel; int32_t total = streamQueueGetNumOfItems(pTask->inputq.queue) + 1; if (type == STREAM_INPUT__DATA_SUBMIT) { @@ -326,15 +409,28 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) { - int32_t code = taosWriteQitem(pQueue, pItem); - if (code != TSDB_CODE_SUCCESS) { - streamFreeQitem(pItem); - return code; - } - double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); - stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size); + int32_t code = 0; + if ((type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__CHECKPOINT) && (level == TASK_LEVEL__SOURCE)) { + STaosQueue* pChkptQ = pTask->inputq.queue->pChkptQueue; + code = taosWriteQitem(pChkptQ, pItem); + + double size = SIZE_IN_MiB(taosQueueMemorySize(pChkptQ)); + int32_t num = taosQueueItemSize(pChkptQ); + + stDebug("s-task:%s level:%d %s checkpoint enqueue ctrl queue, total in queue:%d, size:%.2fMiB, data queue:%d", + pTask->id.idStr, pTask->info.taskLevel, streamQueueItemGetTypeStr(type), num, size, (total - 1)); + } else { + code = taosWriteQitem(pQueue, pItem); + if (code != TSDB_CODE_SUCCESS) { + streamFreeQitem(pItem); + return code; + } + + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); + stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, + pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size); + } } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. int32_t code = taosWriteQitem(pQueue, pItem);