diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index eeb4d6d4f3..65c869433b 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -29,8 +29,10 @@ extern "C" { #ifndef _STREAM_H_ #define _STREAM_H_ -#define ONE_MB_F (1048576.0) -#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F) +#define ONE_MiB_F (1048576.0) +#define ONE_KiB_F (1024.0) +#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F) +#define SIZE_IN_KiB(_v) ((_v) / ONE_KiB_F) typedef struct SStreamTask SStreamTask; diff --git a/source/dnode/vnode/src/tq/tqSink.c b/source/dnode/vnode/src/tq/tqSink.c index f700294e79..0ca28277e7 100644 --- a/source/dnode/vnode/src/tq/tqSink.c +++ b/source/dnode/vnode/src/tq/tqSink.c @@ -270,14 +270,14 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2* tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); } - pTask->sinkRecorder.numOfSubmit += 1; + SSinkTaskRecorder* pRec = &pTask->sinkRecorder; - if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) { - SSinkTaskRecorder* pRec = &pTask->sinkRecorder; + pRec->numOfSubmit += 1; + if ((pRec->numOfSubmit % 5000) == 0) { double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0; tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 " submit into dst table, %.2fMiB duration:%.2f Sec.", - pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MB(pRec->bytes), el); + pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el); } return TSDB_CODE_SUCCESS; @@ -868,7 +868,6 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) { } pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; - pTask->sinkRecorder.bytes += pDataBlock->info.rowSize; } taosHashCleanup(pTableIndexMap); diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index edf36d2a1c..40cadd3387 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -57,7 +57,6 @@ extern SStreamGlobalEnv streamEnv; extern int32_t streamBackendId; extern int32_t streamBackendCfWrapperId; -const char* streamGetBlockTypeStr(int32_t type); void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); int32_t streamDispatchStreamBlock(SStreamTask* pTask); @@ -81,7 +80,12 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); -int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); +int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); +int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue); +int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem); +void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size); +const char* streamQueueItemGetTypeStr(int32_t type); + SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); diff --git a/source/libs/stream/src/streamData.c b/source/libs/stream/src/streamData.c index 8cf9ea1bbc..51487e5588 100644 --- a/source/libs/stream/src/streamData.c +++ b/source/libs/stream/src/streamData.c @@ -166,12 +166,16 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); taosArrayDestroy(pBlockSrc->blocks); taosFreeQitem(pElem); + + streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); return dst; } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; streamMergeSubmit(pMerged, pBlockSrc); taosFreeQitem(pElem); + + streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem)); return dst; } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); @@ -184,9 +188,11 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); taosFreeQitem(dst); taosFreeQitem(pElem); + + streamQueueItemIncSize((SStreamQueueItem*)pMerged, streamQueueItemGetSize(pElem)); return (SStreamQueueItem*)pMerged; } else { - stDebug("block type:%s not merged with existed blocks list, type:%d", streamGetBlockTypeStr(pElem->type), dst->type); + stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type); return NULL; } } @@ -227,16 +233,3 @@ void streamFreeQitem(SStreamQueueItem* data) { taosFreeQitem(pBlock); } } - -const char* streamGetBlockTypeStr(int32_t type) { - switch (type) { - case STREAM_INPUT__CHECKPOINT: - return "checkpoint"; - case STREAM_INPUT__CHECKPOINT_TRIGGER: - return "checkpoint-trigger"; - case STREAM_INPUT__TRANS_STATE: - return "trans-state"; - default: - return ""; - } -} \ No newline at end of file diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 8219f6ec85..8baf411c83 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -500,7 +500,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { const char* id = pTask->id.idStr; int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue); if (numOfElems > 0) { - double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue)); stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 57f8bd016b..712b0fe610 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -72,7 +72,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray* } stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, - SIZE_IN_MB(size)); + SIZE_IN_MiB(size)); int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); if (code != TSDB_CODE_SUCCESS) { // back pressure and record position @@ -163,7 +163,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i taosArrayPush(pRes, &block); stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, - pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size)); + pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size)); // current output should be dispatched to down stream nodes if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { @@ -553,8 +553,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) { ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT); + // here only handle the data block sink operation if (type == STREAM_INPUT__DATA_BLOCK) { - stDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks); + int32_t blockSize = streamQueueItemGetSize(pInput); + pTask->sinkRecorder.bytes += blockSize; + + stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize)); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); continue; } @@ -574,7 +578,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { double el = (taosGetTimestampMs() - st) / 1000.0; stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, - SIZE_IN_MB(resSize), totalBlocks); + SIZE_IN_MiB(resSize), totalBlocks); // update the currentVer if processing the submit blocks. ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer); @@ -590,8 +594,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { // todo other thread may change the status // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. if (type == STREAM_INPUT__CHECKPOINT) { - stDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr, - streamGetTaskStatusStr(pTask->status.taskStatus)); + stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr, + streamGetTaskStatusStr(pTask->status.taskStatus)); streamTaskBuildCheckpoint(pTask); return 0; } diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 65a7d56923..882c57383e 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -102,64 +102,6 @@ void streamQueueProcessFail(SStreamQueue* queue) { atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); } -#if 0 -bool streamQueueResEmpty(const SStreamQueueRes* pRes) { - // - return true; -} -int64_t streamQueueResSize(const SStreamQueueRes* pRes) { return pRes->size; } -SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes) { return pRes->head; } -SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes) { - SStreamQueueNode* pRet = pRes->head; - pRes->head = pRes->head->next; - return pRet; -} - -void streamQueueResClear(SStreamQueueRes* pRes) { - while (pRes->head) { - SStreamQueueNode* pNode = pRes->head; - streamFreeQitem(pRes->head->item); - pRes->head = pNode; - } -} - -SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pTail) { - int64_t size = 0; - SStreamQueueNode* head = NULL; - - while (pTail) { - SStreamQueueNode* pTmp = pTail->next; - pTail->next = head; - head = pTail; - pTail = pTmp; - size++; - } - - return (SStreamQueueRes){.head = head, .size = size}; -} - -bool streamQueueHasTask(const SStreamQueue1* pQueue) { return atomic_load_ptr(pQueue->pHead); } -int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem) { - SStreamQueueNode* pNode = taosMemoryMalloc(sizeof(SStreamQueueNode)); - pNode->item = pItem; - SStreamQueueNode* pHead = atomic_load_ptr(pQueue->pHead); - while (1) { - pNode->next = pHead; - SStreamQueueNode* pOld = atomic_val_compare_exchange_ptr(pQueue->pHead, pHead, pNode); - if (pOld == pHead) { - break; - } - } - return 0; -} - -SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) { - SStreamQueueNode* pNode = atomic_exchange_ptr(pQueue->pHead, NULL); - if (pNode) return streamQueueBuildRes(pNode); - return (SStreamQueueRes){0}; -} -#endif - bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) { bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY; if (isFull) { @@ -167,7 +109,7 @@ bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) { } int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; - double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*)pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize((STaosQueue*)pQueue)); return (size >= threahold); } @@ -178,38 +120,74 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) { return numOfItems1 + numOfItems2; } +int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue) { + return taosQueueItemSize(pQueue->pQueue); +} + +int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) { + STaosQnode* p = (STaosQnode*)((char*) pItem - sizeof(STaosQnode)); + return p->dataSize; +} + +void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) { + STaosQnode* p = (STaosQnode*)((char*) pItem - sizeof(STaosQnode)); + p->dataSize += size; +} + +const char* streamQueueItemGetTypeStr(int32_t type) { + switch (type) { + case STREAM_INPUT__CHECKPOINT: + return "checkpoint"; + case STREAM_INPUT__CHECKPOINT_TRIGGER: + return "checkpoint-trigger"; + case STREAM_INPUT__TRANS_STATE: + return "trans-state"; + default: + return "datablock"; + } +} + int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { int32_t retryTimes = 0; int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr; + int32_t taskLevel = pTask->info.taskLevel; + *numOfBlocks = 0; - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one - while (1) { - if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { - stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); - return TSDB_CODE_SUCCESS; - } - - STokenBucket* pBucket = pTask->pTokenBucket; -// if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution -// stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr, -// pBucket->capacity, pBucket->rate); -// return TSDB_CODE_SUCCESS; -// } - - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); - if (qItem == NULL) { - stDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); - return TSDB_CODE_SUCCESS; - } - - stDebug("s-task:%s sink task handle block one-by-one, type:%d", id, qItem->type); - - *numOfBlocks = 1; - *pInput = qItem; - return TSDB_CODE_SUCCESS; - } - } + // if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one + // while (1) { + // if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { + // stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); + // return TSDB_CODE_SUCCESS; + // } + // + // STokenBucket* pBucket = pTask->pTokenBucket; + // // if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this + // execution + // // stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", + // pTask->id.idStr, + // // pBucket->capacity, pBucket->rate); + // // return TSDB_CODE_SUCCESS; + // // } + // + // SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); + // if (qItem == NULL) { + // if (++retryTimes < MAX_RETRY_TIMES) { + // taosMsleep(10); + // continue; + // } + // + // return TSDB_CODE_SUCCESS; + // } + // + // stDebug("s-task:%s sink task handle block, type:%s", id, streamQueueItemGetTypeStr(qItem->type)); + // pTask->sinkRecorder.bytes += streamQueueItemGetSize(qItem); + // + // *numOfBlocks = 1; + // *pInput = qItem; + // return TSDB_CODE_SUCCESS; + // } + // } while (1) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { @@ -219,20 +197,18 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); if (qItem == NULL) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { + if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) { taosMsleep(10); - stDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id); continue; } - - stDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); return TSDB_CODE_SUCCESS; } // do not merge blocks for sink node and check point data block - if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER || - qItem->type == STREAM_INPUT__TRANS_STATE) { - const char* p = streamGetBlockTypeStr(qItem->type); + int8_t type = qItem->type; + if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || + type == STREAM_INPUT__TRANS_STATE) { + const char* p = streamQueueItemGetTypeStr(qItem->type); if (*pInput == NULL) { stDebug("s-task:%s %s msg extracted, start to process immediately", id, p); @@ -285,7 +261,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) if (type == STREAM_INPUT__DATA_SUBMIT) { SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) { - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace( "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); @@ -304,15 +280,15 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return code; } - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, - msgLen, ver, total, size + SIZE_IN_MB(msgLen)); + msgLen, ver, total, size + SIZE_IN_MiB(msgLen)); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { if (streamQueueIsFull(pQueue, true)) { - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); @@ -326,7 +302,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return code; } - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || type == STREAM_INPUT__TRANS_STATE) { @@ -336,9 +312,9 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return code; } - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, - pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); + pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size); } else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later. int32_t code = taosWriteQitem(pQueue, pItem); @@ -347,7 +323,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem) return code; } - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); } else { ASSERT(0); @@ -372,7 +348,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc } int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); // let's wait for there are enough space to hold this result pBlock stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, total, size); @@ -382,7 +358,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc int32_t code = taosWriteQitem(pQueue, pBlock); int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); - double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); + double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue)); if (code != 0) { stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", pTask->id.idStr, total + 1, size, tstrerror(code));