refactor: do some internal refactor.
This commit is contained in:
parent
91de00597d
commit
27b7d1ec88
|
@ -1086,7 +1086,7 @@ int32_t tqProcessTaskRecover2Req(STQ* pTq, int64_t sversion, char* msg, int32_t
|
|||
return -1;
|
||||
}
|
||||
|
||||
qDebug("s-task:%s set the start wal offset to be:%"PRId64, pTask->id.idStr, sversion);
|
||||
qDebug("s-task:%s set start wal scan start ver:%"PRId64, pTask->id.idStr, sversion);
|
||||
walReaderSeekVer(pTask->exec.pWalReader, sversion);
|
||||
pTask->chkInfo.currentVer = sversion;
|
||||
|
||||
|
|
|
@ -2005,7 +2005,7 @@ FETCH_NEXT_BLOCK:
|
|||
// printDataBlock(pBlock, "stream scan recv");
|
||||
return pBlock;
|
||||
} else if (pInfo->blockType == STREAM_INPUT__DATA_SUBMIT) {
|
||||
qDebug("scan mode %d", pInfo->scanMode);
|
||||
qDebug("stream scan mode:%d, %s", pInfo->scanMode, id);
|
||||
switch (pInfo->scanMode) {
|
||||
case STREAM_SCAN_FROM_RES: {
|
||||
pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE;
|
||||
|
@ -2122,7 +2122,7 @@ FETCH_NEXT_BLOCK:
|
|||
pInfo->numOfExec++;
|
||||
pOperator->resultInfo.totalRows += pBlockInfo->rows;
|
||||
|
||||
qDebug("stream scan get source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
||||
qDebug("stream scan completed, and return source rows:%" PRId64", %s", pBlockInfo->rows, id);
|
||||
if (pBlockInfo->rows > 0) {
|
||||
return pBlock;
|
||||
}
|
||||
|
|
|
@ -296,11 +296,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
|
||||
if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
SStreamDataSubmit* px = (SStreamDataSubmit*)pItem;
|
||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
px->submit.msgLen, px->submit.ver, total, size);
|
||||
|
||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && tInputQueueIsFull(pTask)) {
|
||||
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort",
|
||||
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) stop to push data",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
|
||||
size);
|
||||
streamDataSubmitDestroy(px);
|
||||
|
@ -314,9 +311,12 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
taosFreeQitem(pItem);
|
||||
return code;
|
||||
}
|
||||
|
||||
qDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||
px->submit.msgLen, px->submit.ver, total, size + px->submit.msgLen/1048576.0);
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
if (/*(pTask->taskLevel == TASK_LEVEL__SOURCE) && */(tInputQueueIsFull(pTask))) {
|
||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (tInputQueueIsFull(pTask))) {
|
||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
|
||||
size);
|
||||
|
|
|
@ -283,7 +283,7 @@ int32_t streamDispatchOneRecoverFinishReq(SStreamTask* pTask, const SStreamRecov
|
|||
msg.info.noResp = 1;
|
||||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
qDebug("s-task:%s dispatch recover finish msg to taskId:%d node %d: recover finish msg", pTask->id.idStr,
|
||||
qDebug("s-task:%s dispatch recover finish msg to downstream taskId:0x%x node %d: recover finish msg", pTask->id.idStr,
|
||||
pReq->taskId, vgId);
|
||||
|
||||
return 0;
|
||||
|
@ -414,7 +414,7 @@ int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pDat
|
|||
|
||||
req.taskId = downstreamTaskId;
|
||||
|
||||
qDebug("s-task:%s (child taskId:%d) fix-dispatch blocks:%d to down stream s-task:%d in vgId:%d", pTask->id.idStr,
|
||||
qDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to down stream s-task:0x%x in vgId:%d", pTask->id.idStr,
|
||||
pTask->selfChildId, numOfBlocks, downstreamTaskId, vgId);
|
||||
|
||||
code = doSendDispatchMsg(pTask, &req, vgId, pEpSet);
|
||||
|
@ -514,7 +514,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
qDebug("s-task:%s start to dispatch msg, output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
qDebug("s-task:%s start to dispatch msg, set output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
|
||||
SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue);
|
||||
if (pDispatchedBlock == NULL) {
|
||||
|
|
|
@ -236,11 +236,11 @@ int32_t streamScanExec(SStreamTask* pTask, int32_t batchSz) {
|
|||
taosFreeQitem(qRes);
|
||||
return code;
|
||||
}
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
|
||||
streamDispatchStreamBlock(pTask);
|
||||
}
|
||||
//
|
||||
// if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
// qDebug("s-task:%s scan exec dispatch blocks:%d", pTask->id.idStr, batchCnt);
|
||||
// streamDispatchStreamBlock(pTask);
|
||||
// }
|
||||
|
||||
if (finished) {
|
||||
break;
|
||||
|
@ -438,7 +438,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
|
||||
SArray* pBlockList = pMerged->submits;
|
||||
int32_t numOfBlocks = taosArrayGetSize(pBlockList);
|
||||
qDebug("s-task:%s %p set submit input (merged), batch num:%d", id, pTask, numOfBlocks);
|
||||
qDebug("s-task:%s %p set (merged) submit blocks as a batch, numOfBlocks:%d", id, pTask, numOfBlocks);
|
||||
qSetMultiStreamInput(pExecutor, pBlockList->pData, numOfBlocks, STREAM_INPUT__MERGED_SUBMIT);
|
||||
} else if (pItem->type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
|
||||
|
|
Loading…
Reference in New Issue