refactor: do refactor the sink task.

This commit is contained in:
Haojun Liao 2023-09-21 13:32:57 +08:00
parent e4ac835554
commit ef9efc8a8e
7 changed files with 111 additions and 133 deletions

View File

@ -29,8 +29,10 @@ extern "C" {
#ifndef _STREAM_H_ #ifndef _STREAM_H_
#define _STREAM_H_ #define _STREAM_H_
#define ONE_MB_F (1048576.0) #define ONE_MiB_F (1048576.0)
#define SIZE_IN_MB(_v) ((_v) / ONE_MB_F) #define ONE_KiB_F (1024.0)
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
#define SIZE_IN_KiB(_v) ((_v) / ONE_KiB_F)
typedef struct SStreamTask SStreamTask; typedef struct SStreamTask SStreamTask;

View File

@ -270,14 +270,14 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
tqError("s-task:%s failed to put into write-queue since %s", id, terrstr()); tqError("s-task:%s failed to put into write-queue since %s", id, terrstr());
} }
pTask->sinkRecorder.numOfSubmit += 1;
if ((pTask->sinkRecorder.numOfSubmit % 5000) == 0) {
SSinkTaskRecorder* pRec = &pTask->sinkRecorder; SSinkTaskRecorder* pRec = &pTask->sinkRecorder;
pRec->numOfSubmit += 1;
if ((pRec->numOfSubmit % 5000) == 0) {
double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0; double el = (taosGetTimestampMs() - pTask->taskExecInfo.start) / 1000.0;
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64 tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
" submit into dst table, %.2fMiB duration:%.2f Sec.", " submit into dst table, %.2fMiB duration:%.2f Sec.",
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MB(pRec->bytes), el); pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->bytes), el);
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -868,7 +868,6 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
} }
pTask->sinkRecorder.numOfRows += pDataBlock->info.rows; pTask->sinkRecorder.numOfRows += pDataBlock->info.rows;
pTask->sinkRecorder.bytes += pDataBlock->info.rowSize;
} }
taosHashCleanup(pTableIndexMap); taosHashCleanup(pTableIndexMap);

View File

@ -57,7 +57,6 @@ extern SStreamGlobalEnv streamEnv;
extern int32_t streamBackendId; extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId; extern int32_t streamBackendCfWrapperId;
const char* streamGetBlockTypeStr(int32_t type);
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration); void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration);
int32_t streamDispatchStreamBlock(SStreamTask* pTask); int32_t streamDispatchStreamBlock(SStreamTask* pTask);
@ -82,6 +81,11 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask); int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks); int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks);
int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size);
const char* streamQueueItemGetTypeStr(int32_t type);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen); int32_t streamTaskBuildScanhistoryRspMsg(SStreamTask* pTask, SStreamScanHistoryFinishReq* pReq, void** pBuffer, int32_t* pLen);

View File

@ -166,12 +166,16 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks); taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
taosArrayDestroy(pBlockSrc->blocks); taosArrayDestroy(pBlockSrc->blocks);
taosFreeQitem(pElem); taosFreeQitem(pElem);
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
return dst; return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst; SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem; SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
streamMergeSubmit(pMerged, pBlockSrc); streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(pElem); taosFreeQitem(pElem);
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
return dst; return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) { } else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew(); SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
@ -184,9 +188,11 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem); streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
taosFreeQitem(dst); taosFreeQitem(dst);
taosFreeQitem(pElem); taosFreeQitem(pElem);
streamQueueItemIncSize((SStreamQueueItem*)pMerged, streamQueueItemGetSize(pElem));
return (SStreamQueueItem*)pMerged; return (SStreamQueueItem*)pMerged;
} else { } else {
stDebug("block type:%s not merged with existed blocks list, type:%d", streamGetBlockTypeStr(pElem->type), dst->type); stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type);
return NULL; return NULL;
} }
} }
@ -227,16 +233,3 @@ void streamFreeQitem(SStreamQueueItem* data) {
taosFreeQitem(pBlock); taosFreeQitem(pBlock);
} }
} }
const char* streamGetBlockTypeStr(int32_t type) {
switch (type) {
case STREAM_INPUT__CHECKPOINT:
return "checkpoint";
case STREAM_INPUT__CHECKPOINT_TRIGGER:
return "checkpoint-trigger";
case STREAM_INPUT__TRANS_STATE:
return "trans-state";
default:
return "";
}
}

View File

@ -500,7 +500,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue); int32_t numOfElems = streamQueueGetNumOfItems(pTask->outputInfo.queue);
if (numOfElems > 0) { if (numOfElems > 0) {
double size = SIZE_IN_MB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pTask->outputInfo.queue->pQueue));
stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size); stDebug("s-task:%s start to dispatch intermediate block to downstream, elem in outputQ:%d, size:%.2fMiB", id, numOfElems, size);
} }

View File

@ -72,7 +72,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
} }
stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks, stDebug("s-task:%s dump stream result data blocks, num:%d, size:%.2fMiB", pTask->id.idStr, numOfBlocks,
SIZE_IN_MB(size)); SIZE_IN_MiB(size));
int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks); int32_t code = doOutputResultBlockImpl(pTask, pStreamBlocks);
if (code != TSDB_CODE_SUCCESS) { // back pressure and record position if (code != TSDB_CODE_SUCCESS) { // back pressure and record position
@ -163,7 +163,7 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, i
taosArrayPush(pRes, &block); taosArrayPush(pRes, &block);
stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr, stDebug("s-task:%s (child %d) executed and get %d result blocks, size:%.2fMiB", pTask->id.idStr,
pTask->info.selfChildId, numOfBlocks, SIZE_IN_MB(size)); pTask->info.selfChildId, numOfBlocks, SIZE_IN_MiB(size));
// current output should be dispatched to down stream nodes // current output should be dispatched to down stream nodes
if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { if (numOfBlocks >= STREAM_RESULT_DUMP_THRESHOLD || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
@ -553,8 +553,12 @@ int32_t streamExecForAll(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT); ASSERT(type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__CHECKPOINT);
// here only handle the data block sink operation
if (type == STREAM_INPUT__DATA_BLOCK) { if (type == STREAM_INPUT__DATA_BLOCK) {
stDebug("s-task:%s sink task start to sink %d blocks", id, numOfBlocks); int32_t blockSize = streamQueueItemGetSize(pInput);
pTask->sinkRecorder.bytes += blockSize;
stDebug("s-task:%s sink task start to sink %d blocks, size:%.2fKiB", id, numOfBlocks, SIZE_IN_KiB(blockSize));
doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput); doOutputResultBlockImpl(pTask, (SStreamDataBlock*)pInput);
continue; continue;
} }
@ -574,7 +578,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
double el = (taosGetTimestampMs() - st) / 1000.0; double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, stDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el,
SIZE_IN_MB(resSize), totalBlocks); SIZE_IN_MiB(resSize), totalBlocks);
// update the currentVer if processing the submit blocks. // update the currentVer if processing the submit blocks.
ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer); ASSERT(pTask->chkInfo.checkpointVer <= pTask->chkInfo.nextProcessVer && ver >= pTask->chkInfo.checkpointVer);
@ -590,7 +594,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
// todo other thread may change the status // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed. // do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) { if (type == STREAM_INPUT__CHECKPOINT) {
stDebug("s-task:%s checkpoint block received, set the status:%s", pTask->id.idStr, stDebug("s-task:%s checkpoint block received, set status:%s", pTask->id.idStr,
streamGetTaskStatusStr(pTask->status.taskStatus)); streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskBuildCheckpoint(pTask); streamTaskBuildCheckpoint(pTask);
return 0; return 0;

View File

@ -102,64 +102,6 @@ void streamQueueProcessFail(SStreamQueue* queue) {
atomic_store_8(&queue->status, STREAM_QUEUE__FAILED); atomic_store_8(&queue->status, STREAM_QUEUE__FAILED);
} }
#if 0
bool streamQueueResEmpty(const SStreamQueueRes* pRes) {
//
return true;
}
int64_t streamQueueResSize(const SStreamQueueRes* pRes) { return pRes->size; }
SStreamQueueNode* streamQueueResFront(SStreamQueueRes* pRes) { return pRes->head; }
SStreamQueueNode* streamQueueResPop(SStreamQueueRes* pRes) {
SStreamQueueNode* pRet = pRes->head;
pRes->head = pRes->head->next;
return pRet;
}
void streamQueueResClear(SStreamQueueRes* pRes) {
while (pRes->head) {
SStreamQueueNode* pNode = pRes->head;
streamFreeQitem(pRes->head->item);
pRes->head = pNode;
}
}
SStreamQueueRes streamQueueBuildRes(SStreamQueueNode* pTail) {
int64_t size = 0;
SStreamQueueNode* head = NULL;
while (pTail) {
SStreamQueueNode* pTmp = pTail->next;
pTail->next = head;
head = pTail;
pTail = pTmp;
size++;
}
return (SStreamQueueRes){.head = head, .size = size};
}
bool streamQueueHasTask(const SStreamQueue1* pQueue) { return atomic_load_ptr(pQueue->pHead); }
int32_t streamQueuePush(SStreamQueue1* pQueue, SStreamQueueItem* pItem) {
SStreamQueueNode* pNode = taosMemoryMalloc(sizeof(SStreamQueueNode));
pNode->item = pItem;
SStreamQueueNode* pHead = atomic_load_ptr(pQueue->pHead);
while (1) {
pNode->next = pHead;
SStreamQueueNode* pOld = atomic_val_compare_exchange_ptr(pQueue->pHead, pHead, pNode);
if (pOld == pHead) {
break;
}
}
return 0;
}
SStreamQueueRes streamQueueGetRes(SStreamQueue1* pQueue) {
SStreamQueueNode* pNode = atomic_exchange_ptr(pQueue->pHead, NULL);
if (pNode) return streamQueueBuildRes(pNode);
return (SStreamQueueRes){0};
}
#endif
bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) { bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) {
bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY; bool isFull = taosQueueItemSize((STaosQueue*)pQueue) >= STREAM_TASK_QUEUE_CAPACITY;
if (isFull) { if (isFull) {
@ -167,7 +109,7 @@ bool streamQueueIsFull(const STaosQueue* pQueue, bool inputQ) {
} }
int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE; int32_t threahold = (inputQ) ? STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE : STREAM_TASK_OUTPUT_QUEUE_CAPACITY_IN_SIZE;
double size = SIZE_IN_MB(taosQueueMemorySize((STaosQueue*)pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize((STaosQueue*)pQueue));
return (size >= threahold); return (size >= threahold);
} }
@ -178,38 +120,74 @@ int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue) {
return numOfItems1 + numOfItems2; return numOfItems1 + numOfItems2;
} }
int32_t streamQueueGetNumOfItemsInQueue(const SStreamQueue* pQueue) {
return taosQueueItemSize(pQueue->pQueue);
}
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem) {
STaosQnode* p = (STaosQnode*)((char*) pItem - sizeof(STaosQnode));
return p->dataSize;
}
void streamQueueItemIncSize(const SStreamQueueItem* pItem, int32_t size) {
STaosQnode* p = (STaosQnode*)((char*) pItem - sizeof(STaosQnode));
p->dataSize += size;
}
const char* streamQueueItemGetTypeStr(int32_t type) {
switch (type) {
case STREAM_INPUT__CHECKPOINT:
return "checkpoint";
case STREAM_INPUT__CHECKPOINT_TRIGGER:
return "checkpoint-trigger";
case STREAM_INPUT__TRANS_STATE:
return "trans-state";
default:
return "datablock";
}
}
int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
int32_t retryTimes = 0; int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5; int32_t MAX_RETRY_TIMES = 5;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t taskLevel = pTask->info.taskLevel;
*numOfBlocks = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one // if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one
while (1) { // while (1) {
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { // if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); // stDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS; // return TSDB_CODE_SUCCESS;
} // }
//
STokenBucket* pBucket = pTask->pTokenBucket; // STokenBucket* pBucket = pTask->pTokenBucket;
// if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this execution // // if (!streamTaskHasAvailableToken(pBucket)) { // no available token in th bucket, ignore this
// stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit", pTask->id.idStr, // execution
// pBucket->capacity, pBucket->rate); // // stInfo("s-task:%s no available token for sink, capacity:%d, rate:%d token/sec, quit",
// return TSDB_CODE_SUCCESS; // pTask->id.idStr,
// } // // pBucket->capacity, pBucket->rate);
// // return TSDB_CODE_SUCCESS;
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); // // }
if (qItem == NULL) { //
stDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); // SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
return TSDB_CODE_SUCCESS; // if (qItem == NULL) {
} // if (++retryTimes < MAX_RETRY_TIMES) {
// taosMsleep(10);
stDebug("s-task:%s sink task handle block one-by-one, type:%d", id, qItem->type); // continue;
// }
*numOfBlocks = 1; //
*pInput = qItem; // return TSDB_CODE_SUCCESS;
return TSDB_CODE_SUCCESS; // }
} //
} // stDebug("s-task:%s sink task handle block, type:%s", id, streamQueueItemGetTypeStr(qItem->type));
// pTask->sinkRecorder.bytes += streamQueueItemGetSize(qItem);
//
// *numOfBlocks = 1;
// *pInput = qItem;
// return TSDB_CODE_SUCCESS;
// }
// }
while (1) { while (1) {
if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) { if (streamTaskShouldPause(&pTask->status) || streamTaskShouldStop(&pTask->status)) {
@ -219,20 +197,18 @@ int32_t streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInpu
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue); SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputInfo.queue);
if (qItem == NULL) { if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { if ((taskLevel == TASK_LEVEL__SOURCE || taskLevel == TASK_LEVEL__SINK) && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10); taosMsleep(10);
stDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue; continue;
} }
stDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// do not merge blocks for sink node and check point data block // do not merge blocks for sink node and check point data block
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER || int8_t type = qItem->type;
qItem->type == STREAM_INPUT__TRANS_STATE) { if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
const char* p = streamGetBlockTypeStr(qItem->type); type == STREAM_INPUT__TRANS_STATE) {
const char* p = streamQueueItemGetTypeStr(qItem->type);
if (*pInput == NULL) { if (*pInput == NULL) {
stDebug("s-task:%s %s msg extracted, start to process immediately", id, p); stDebug("s-task:%s %s msg extracted, start to process immediately", id, p);
@ -285,7 +261,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
if (type == STREAM_INPUT__DATA_SUBMIT) { if (type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem; SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) { if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && streamQueueIsFull(pQueue, true)) {
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace( stTrace(
"s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data", "s-task:%s inputQ is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
@ -304,15 +280,15 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return code; return code;
} }
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already. // use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr, stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
msgLen, ver, total, size + SIZE_IN_MB(msgLen)); msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
if (streamQueueIsFull(pQueue, true)) { if (streamQueueIsFull(pQueue, true)) {
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort", stTrace("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size); pTask->id.idStr, STREAM_TASK_QUEUE_CAPACITY, STREAM_TASK_INPUT_QUEUE_CAPACITY_IN_SIZE, total, size);
@ -326,7 +302,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return code; return code;
} }
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size); stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER || } else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
type == STREAM_INPUT__TRANS_STATE) { type == STREAM_INPUT__TRANS_STATE) {
@ -336,9 +312,9 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return code; return code;
} }
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, stDebug("s-task:%s level:%d %s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr,
pTask->info.taskLevel, streamGetBlockTypeStr(type), total, size); pTask->info.taskLevel, streamQueueItemGetTypeStr(type), total, size);
} else if (type == STREAM_INPUT__GET_RES) { } else if (type == STREAM_INPUT__GET_RES) {
// use the default memory limit, refactor later. // use the default memory limit, refactor later.
int32_t code = taosWriteQitem(pQueue, pItem); int32_t code = taosWriteQitem(pQueue, pItem);
@ -347,7 +323,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
return code; return code;
} }
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size); stDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
} else { } else {
ASSERT(0); ASSERT(0);
@ -372,7 +348,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
} }
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
// let's wait for there are enough space to hold this result pBlock // let's wait for there are enough space to hold this result pBlock
stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr, stDebug("s-task:%s outputQ is full, wait for 500ms and retry, outputQ items:%d, size:%.2fMiB", pTask->id.idStr,
total, size); total, size);
@ -382,7 +358,7 @@ int32_t streamTaskPutDataIntoOutputQ(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t code = taosWriteQitem(pQueue, pBlock); int32_t code = taosWriteQitem(pQueue, pBlock);
int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue); int32_t total = streamQueueGetNumOfItems(pTask->outputInfo.queue);
double size = SIZE_IN_MB(taosQueueMemorySize(pQueue)); double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
if (code != 0) { if (code != 0) {
stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost", stError("s-task:%s failed to put res into outputQ, outputQ items:%d, size:%.2fMiB code:%s, result lost",
pTask->id.idStr, total + 1, size, tstrerror(code)); pTask->id.idStr, total + 1, size, tstrerror(code));