refactor: do some internal refactor.
This commit is contained in:
parent
65b81a803f
commit
a3abe25539
|
@ -1272,13 +1272,15 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t taskId = ntohl(pRsp->upstreamTaskId);
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, taskId);
|
||||
tqDebug("recv dispatch rsp, code:%x", pMsg->code);
|
||||
|
||||
int32_t vgId = pTq->pStreamMeta->vgId;
|
||||
tqDebug("vgId:%d recv dispatch rsp, code:%d", vgId, pMsg->code);
|
||||
if (pTask) {
|
||||
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
} else {
|
||||
return -1;
|
||||
tqDebug("vgId:%d failed to find task:0x%x", vgId, taskId);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -137,7 +137,7 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
|
||||
int32_t blockSz = taosArrayGetSize(pBlocks);
|
||||
|
||||
tqDebug("vgId:%d, s-task:%s write results blocks:%d into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
|
||||
tqDebug("vgId:%d, s-task:%s write results %d blocks into table", TD_VID(pVnode), pTask->id.idStr, blockSz);
|
||||
|
||||
void* pBuf = NULL;
|
||||
SArray* tagArray = NULL;
|
||||
|
@ -482,17 +482,15 @@ void tqSinkToTablePipeline(SStreamTask* pTask, void* vnode, int64_t ver, void* d
|
|||
tEncoderClear(&encoder);
|
||||
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||
|
||||
SRpcMsg msg = {
|
||||
.msgType = TDMT_VND_SUBMIT,
|
||||
.pCont = pBuf,
|
||||
.contLen = len,
|
||||
};
|
||||
|
||||
SRpcMsg msg = { .msgType = TDMT_VND_SUBMIT, .pCont = pBuf, .contLen = len };
|
||||
if (tmsgPutToQueue(&pVnode->msgCb, WRITE_QUEUE, &msg) != 0) {
|
||||
tqDebug("failed to put into write-queue since %s", terrstr());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d, s-task:%s write results completed", TD_VID(pVnode), pTask->id.idStr);
|
||||
|
||||
_end:
|
||||
taosArrayDestroy(tagArray);
|
||||
taosArrayDestroy(pVals);
|
||||
|
|
|
@ -126,7 +126,7 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
|
|||
if (pBlock == NULL) {
|
||||
streamTaskInputFail(pTask);
|
||||
status = TASK_INPUT_STATUS__FAILED;
|
||||
qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
pTask->id.idStr);
|
||||
} else {
|
||||
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
||||
|
@ -233,7 +233,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
}
|
||||
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||
qDebug("s-task:%s receive dispatch rsp, status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
|
||||
qDebug("s-task:%s receive dispatch rsp, output status:%d code:%d", pTask->id.idStr, pRsp->inputStatus, code);
|
||||
|
||||
if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||
|
@ -246,13 +246,15 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus);
|
||||
ASSERT(old == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
||||
qDebug("s-task:%s receive dispatch rsp, output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
|
||||
// the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp
|
||||
// todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms.
|
||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
// TODO: init recover timer
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, need to block output", pTask->id.idStr, pRsp->downstreamTaskId);
|
||||
|
||||
atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL);
|
||||
qError("s-task:%s ignore error, and reset task output status:%d", pTask->id.idStr, pTask->outputStatus);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -385,7 +385,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
|
|||
|
||||
if (pTask->taskLevel == TASK_LEVEL__SINK) {
|
||||
ASSERT(pInput->type == STREAM_INPUT__DATA_BLOCK);
|
||||
qDebug("s-task:%s sink node start to sink result. numOfBlocks:%d", pTask->id.idStr, batchSize);
|
||||
qDebug("s-task:%s sink task start to sink %d blocks", pTask->id.idStr, batchSize);
|
||||
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
|
||||
continue;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue