refactor: do some internal refactor.
This commit is contained in:
parent
0e189f70a1
commit
0d38f389ab
|
@ -254,7 +254,6 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY, "stream-scan-history", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
|
||||
|
|
|
@ -274,7 +274,7 @@ typedef struct SStreamStatus {
|
|||
int8_t schedStatus;
|
||||
int8_t keepTaskStatus;
|
||||
bool transferState;
|
||||
bool appendTranstateBlock; // has append the transfer state data block already
|
||||
bool appendTranstateBlock; // has append the transfer state data block already, todo: remove it
|
||||
int8_t timerActive; // timer is active
|
||||
int8_t pauseAllowed; // allowed task status to be set to be paused
|
||||
} SStreamStatus;
|
||||
|
@ -582,6 +582,7 @@ int32_t streamProcessRunReq(SStreamTask* pTask);
|
|||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pMsg, bool exec);
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
|
||||
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
|
||||
|
||||
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
|
||||
|
||||
|
@ -629,7 +630,6 @@ int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* p
|
|||
int32_t streamSourceScanHistoryData(SStreamTask* pTask);
|
||||
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask);
|
||||
|
||||
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask);
|
||||
int32_t appendTranstateIntoInputQ(SStreamTask* pTask);
|
||||
|
||||
// agg level
|
||||
|
|
|
@ -742,7 +742,6 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TRANSFER_STATE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TRIGGER, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -250,7 +250,6 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg);
|
|||
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqProcessTaskScanHistoryFinishRsp(STQ* pTq, SRpcMsg* pMsg);
|
||||
int32_t tqCheckLogInWal(STQ* pTq, int64_t version);
|
||||
|
|
|
@ -928,6 +928,8 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
|
|||
pTask->pMsgCb = &pTq->pVnode->msgCb;
|
||||
pTask->pMeta = pTq->pStreamMeta;
|
||||
|
||||
streamTaskOpenAllUpstreamInput(pTask);
|
||||
|
||||
// backup the initial status, and set it to be TASK_STATUS__INIT
|
||||
pTask->chkInfo.version = ver;
|
||||
pTask->chkInfo.currentVer = ver;
|
||||
|
@ -1272,7 +1274,8 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
if (done) {
|
||||
pTask->tsInfo.step2Start = taosGetTimestampMs();
|
||||
streamTaskEndScanWAL(pTask);
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, 0.0);
|
||||
appendTranstateIntoInputQ(pTask);
|
||||
} else {
|
||||
STimeWindow* pWindow = &pTask->dataRange.window;
|
||||
tqDebug("s-task:%s level:%d verRange:%" PRId64 " - %" PRId64 " window:%" PRId64 "-%" PRId64
|
||||
|
@ -1337,44 +1340,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// notify the downstream tasks to transfer executor state after handle all history blocks.
|
||||
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
||||
SStreamTransferReq req = {0};
|
||||
|
||||
SDecoder decoder;
|
||||
tDecoderInit(&decoder, (uint8_t*)pReq, len);
|
||||
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId, req.downstreamTaskId);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
|
||||
if (pTask == NULL) {
|
||||
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed", req.downstreamTaskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t remain = streamAlignTransferState(pTask);
|
||||
if (remain > 0) {
|
||||
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
// transfer the ownership of executor state
|
||||
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
|
||||
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
|
||||
|
||||
pTask->status.transferState = true;
|
||||
|
||||
streamSchedExec(pTask);
|
||||
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -1706,6 +1671,8 @@ int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
|
||||
int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
||||
STQ* pTq = pVnode->pTq;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
|
||||
SMsgHead* msgStr = pMsg->pCont;
|
||||
char* msgBody = POINTER_SHIFT(msgStr, sizeof(SMsgHead));
|
||||
int32_t msgLen = pMsg->contLen - sizeof(SMsgHead);
|
||||
|
@ -1722,7 +1689,9 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
|||
tDecoderClear(&decoder);
|
||||
|
||||
int32_t taskId = req.taskId;
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.taskId);
|
||||
tqDebug("vgId:%d receive dispatch msg to s-task:0x%"PRIx64"-0x%x", vgId, req.streamId, taskId);
|
||||
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, taskId);
|
||||
if (pTask != NULL) {
|
||||
SRpcMsg rsp = {.info = pMsg->info, .code = 0};
|
||||
streamProcessDispatchMsg(pTask, &req, &rsp, false);
|
||||
|
@ -1739,7 +1708,7 @@ int32_t vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
|
|||
|
||||
FAIL:
|
||||
if (pMsg->info.handle == NULL) {
|
||||
tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", pTq->pStreamMeta->vgId, taskId);
|
||||
tqError("s-task:0x%x vgId:%d msg handle is null, abort enqueue dispatch msg", vgId, taskId);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -210,14 +210,21 @@ int32_t doSetOffsetForWalReader(SStreamTask *pTask, int32_t vgId) {
|
|||
}
|
||||
|
||||
static void checkForFillHistoryVerRange(SStreamTask* pTask, int64_t ver) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if ((pTask->info.fillHistory == 1) && ver > pTask->dataRange.range.maxVer) {
|
||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
|
||||
", not scan wal anymore, set the transfer state flag",
|
||||
pTask->id.idStr, ver, pTask->dataRange.range.maxVer);
|
||||
if (!pTask->status.appendTranstateBlock) {
|
||||
pTask->status.appendTranstateBlock = true;
|
||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64
|
||||
", not scan wal anymore, add transfer-state block into inputQ",
|
||||
id, ver, pTask->dataRange.range.maxVer);
|
||||
|
||||
double el = (taosGetTimestampMs() - pTask->tsInfo.step2Start) / 1000.0;
|
||||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
appendTranstateIntoInputQ(pTask);
|
||||
/*int32_t code = */streamSchedExec(pTask);
|
||||
} else {
|
||||
qWarn("s-task:%s fill-history scan WAL, currentVer:%" PRId64 " reach the maximum ver:%" PRId64 ", not scan wal",
|
||||
id, ver, pTask->dataRange.range.maxVer);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -264,7 +271,7 @@ int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
if ((pTask->info.fillHistory == 1) && pTask->status.transferState) {
|
||||
if ((pTask->info.fillHistory == 1) && pTask->status.appendTranstateBlock) {
|
||||
ASSERT(status == TASK_STATUS__NORMAL);
|
||||
// the maximum version of data in the WAL has reached already, the step2 is done
|
||||
tqDebug("s-task:%s fill-history reach the maximum ver:%" PRId64 ", not scan wal anymore", pTask->id.idStr,
|
||||
|
|
|
@ -660,8 +660,6 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
|
|||
return tqProcessTaskRetrieveRsp(pVnode->pTq, pMsg);
|
||||
case TDMT_VND_STREAM_SCAN_HISTORY:
|
||||
return tqProcessTaskScanHistory(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_TRANSFER_STATE:
|
||||
return tqProcessTaskTransferStateReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH:
|
||||
return tqProcessTaskScanHistoryFinishReq(pVnode->pTq, pMsg);
|
||||
case TDMT_STREAM_SCAN_HISTORY_FINISH_RSP:
|
||||
|
|
|
@ -62,6 +62,7 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
|
|||
int32_t streamAddEndScanHistoryMsg(SStreamTask* pTask, SRpcHandleInfo* pRpcInfo, SStreamScanHistoryFinishReq* pReq);
|
||||
int32_t streamNotifyUpstreamContinue(SStreamTask* pTask);
|
||||
int32_t streamTaskFillHistoryFinished(SStreamTask* pTask);
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask);
|
||||
|
||||
extern int32_t streamBackendId;
|
||||
extern int32_t streamBackendCfWrapperId;
|
||||
|
|
|
@ -213,6 +213,10 @@ static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDisp
|
|||
qError("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
pTask->id.idStr);
|
||||
} else {
|
||||
if (pBlock->type == STREAM_INPUT__TRANS_STATE) {
|
||||
pTask->status.appendTranstateBlock = true;
|
||||
}
|
||||
|
||||
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
||||
// input queue is full, upstream is blocked now
|
||||
status = (code == TSDB_CODE_SUCCESS) ? TASK_INPUT_STATUS__NORMAL : TASK_INPUT_STATUS__BLOCKED;
|
||||
|
@ -379,6 +383,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
// use the default memory limit, refactor later.
|
||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
if (type != STREAM_INPUT__GET_RES && type != STREAM_INPUT__CHECKPOINT && pTask->triggerParam != 0) {
|
||||
|
@ -421,4 +427,16 @@ SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t
|
|||
}
|
||||
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
|
||||
int32_t num = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||
if (num == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
for(int32_t i = 0; i < num; ++i) {
|
||||
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamEpInfoList, i);
|
||||
pInfo->dataAllowed = true;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -38,6 +38,7 @@ static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatc
|
|||
if (tStartEncode(pEncoder) < 0) return -1;
|
||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
|
||||
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
|
||||
|
@ -91,6 +92,7 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
|||
if (tStartDecode(pDecoder) < 0) return -1;
|
||||
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
|
||||
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
|
||||
|
@ -115,8 +117,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
|
||||
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
|
||||
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
|
||||
int64_t dstTaskId, int32_t type) {
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
pReq->srcVgId = vgId;
|
||||
pReq->upstreamTaskId = pTask->id.taskId;
|
||||
|
@ -456,8 +458,8 @@ static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* p
|
|||
|
||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
|
||||
code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
|
||||
|
||||
code = streamAddBlockIntoDispatchMsg(pDataBlock, &req);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosArrayDestroyP(req.data, taosMemoryFree);
|
||||
taosArrayDestroy(req.dataLen);
|
||||
|
@ -720,14 +722,16 @@ int32_t streamNotifyUpstreamContinue(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
|
||||
const char* id = pTask->id.idStr;
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
// dispatch message failed: network error, or node not available.
|
||||
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp>inputStatus will be set
|
||||
// flag. here we need to retry dispatch this message to downstream task immediately. handle the case the failure
|
||||
// happened too fast.
|
||||
// todo handle the shuffle dispatch failure
|
||||
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
|
||||
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
|
||||
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", id, pRsp->downstreamTaskId,
|
||||
tstrerror(code), ++pTask->msgInfo.retryCount);
|
||||
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
}
|
||||
|
@ -735,22 +739,35 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
qDebug("s-task:%s recv dispatch rsp, downstream task input status:%d code:%d", pTask->id.idStr, pRsp->inputStatus,
|
||||
code);
|
||||
qDebug("s-task:%s recv dispatch rsp from 0x%x, downstream task input status:%d code:%d", id, pRsp->downstreamTaskId,
|
||||
pRsp->inputStatus, code);
|
||||
|
||||
// there are other dispatch message not response yet
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||
qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp);
|
||||
qDebug("s-task:%s is shuffle, left waiting rsp %d", id, leftRsp);
|
||||
if (leftRsp > 0) {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
|
||||
SStreamDataBlock* p = pTask->msgInfo.pData;
|
||||
if (p->type == STREAM_INPUT__TRANS_STATE) {
|
||||
qDebug("s-task:%s dispatch transtate msg to downstream successfully, start to transfer state", id);
|
||||
ASSERT(pTask->info.fillHistory == 1);
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
||||
pTask->msgInfo.retryCount = 0;
|
||||
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
||||
qDebug("s-task:%s output status is set to:%d", pTask->id.idStr, pTask->outputInfo.status);
|
||||
qDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status);
|
||||
|
||||
// the input queue of the (down stream) task that receive the output data is full,
|
||||
// so the TASK_INPUT_STATUS_BLOCKED is rsp
|
||||
|
@ -758,7 +775,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
|
||||
pTask->id.idStr, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
|
||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else { // pipeline send data in output queue
|
||||
// this message has been sent successfully, let's try next one.
|
||||
|
@ -767,8 +784,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
|
||||
if (pTask->msgInfo.blockingTs != 0) {
|
||||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
|
||||
qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
|
||||
pTask->id.idStr, pRsp->downstreamTaskId, el);
|
||||
qDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", id,
|
||||
pRsp->downstreamTaskId, el);
|
||||
pTask->msgInfo.blockingTs = 0;
|
||||
|
||||
// put data into inputQ of current task is also allowed
|
||||
|
|
|
@ -287,7 +287,7 @@ static void waitForTaskIdle(SStreamTask* pTask, SStreamTask* pStreamTask) {
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
|
||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||
|
@ -301,7 +301,7 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
pStreamTask->id.idStr);
|
||||
}
|
||||
|
||||
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.transferState == true);
|
||||
ASSERT(pStreamTask->historyTaskId.taskId == pTask->id.taskId && pTask->status.appendTranstateBlock == true);
|
||||
|
||||
STimeWindow* pTimeWindow = &pStreamTask->dataRange.window;
|
||||
|
||||
|
@ -383,11 +383,9 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
if (!pTask->status.transferState) {
|
||||
return code;
|
||||
}
|
||||
ASSERT(pTask->status.appendTranstateBlock == 1);
|
||||
|
||||
int32_t level = pTask->info.taskLevel;
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
|
@ -513,14 +511,12 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
|
||||
// transfer the ownership of executor state
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
qDebug("s-task:%s open transfer state flag for source task", id);
|
||||
qDebug("s-task:%s add transfer-state block into outputQ", id);
|
||||
} else {
|
||||
qDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", id);
|
||||
qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
|
||||
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
|
||||
}
|
||||
|
||||
pTask->status.transferState = true;
|
||||
|
||||
// dispatch the tran-state block to downstream task immediately
|
||||
int32_t type = pTask->outputInfo.type;
|
||||
if ((level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) &&
|
||||
|
@ -639,16 +635,7 @@ int32_t streamTaskEndScanWAL(SStreamTask* pTask) {
|
|||
qDebug("s-task:%s scan-history from WAL stage(step 2) ended, elapsed time:%.2fs", id, el);
|
||||
|
||||
// 1. notify all downstream tasks to transfer executor state after handle all history blocks.
|
||||
// pTask->status.transferState = true;
|
||||
appendTranstateIntoInputQ(pTask);
|
||||
|
||||
// 2. do transfer stream task operator states.
|
||||
// todo remove this
|
||||
// int32_t code = streamDoTransferStateToStreamTask(pTask);
|
||||
// if (code != TSDB_CODE_SUCCESS) { // todo handle error
|
||||
// return code;
|
||||
// }
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -667,35 +654,36 @@ int32_t streamTryExec(SStreamTask* pTask) {
|
|||
}
|
||||
|
||||
// todo the task should be commit here
|
||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||
// if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
||||
// fill-history WAL scan has completed
|
||||
if (pTask->status.transferState) {
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
return code;
|
||||
}
|
||||
// if (pTask->status.transferState) {
|
||||
// code = streamTransferStateToStreamTask(pTask);
|
||||
// if (code != TSDB_CODE_SUCCESS) {
|
||||
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
// return code;
|
||||
// }
|
||||
|
||||
// the schedStatus == TASK_SCHED_STATUS__ACTIVE, streamSchedExec cannot be executed, so execute once again by
|
||||
// call this function (streamExecForAll) directly.
|
||||
// code = streamExecForAll(pTask);
|
||||
// if (code < 0) {
|
||||
// do nothing
|
||||
// }
|
||||
}
|
||||
// code = streamExecForAll(pTask);
|
||||
// if (code < 0) {
|
||||
// do nothing
|
||||
// }
|
||||
// }
|
||||
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||
} else {
|
||||
// atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
// qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id,
|
||||
// streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||
// } else {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
qDebug("s-task:%s exec completed, status:%s, sched-status:%d", id, streamGetTaskStatusStr(pTask->status.taskStatus),
|
||||
pTask->status.schedStatus);
|
||||
|
||||
if ((!streamTaskShouldStop(&pTask->status)) && (!streamTaskShouldPause(&pTask->status))) {
|
||||
if (!(taosQueueEmpty(pTask->inputQueue->queue) || streamTaskShouldStop(&pTask->status) ||
|
||||
streamTaskShouldPause(&pTask->status))) {
|
||||
streamSchedExec(pTask);
|
||||
}
|
||||
}
|
||||
// }
|
||||
} else {
|
||||
qDebug("s-task:%s already started to exec by other thread, status:%s, sched-status:%d", id,
|
||||
streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
|
||||
|
|
|
@ -372,49 +372,6 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doDispatchTransferMsg(SStreamTask* pTask, const SStreamTransferReq* pReq, int32_t vgId, SEpSet* pEpSet) {
|
||||
void* buf = NULL;
|
||||
int32_t code = -1;
|
||||
SRpcMsg msg = {0};
|
||||
|
||||
int32_t tlen;
|
||||
tEncodeSize(tEncodeStreamScanHistoryFinishReq, pReq, tlen, code);
|
||||
if (code < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
((SMsgHead*)buf)->vgId = htonl(vgId);
|
||||
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||
|
||||
SEncoder encoder;
|
||||
tEncoderInit(&encoder, abuf, tlen);
|
||||
if ((code = tEncodeStreamScanHistoryFinishReq(&encoder, pReq)) < 0) {
|
||||
if (buf) {
|
||||
rpcFreeCont(buf);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
msg.contLen = tlen + sizeof(SMsgHead);
|
||||
msg.pCont = buf;
|
||||
msg.msgType = TDMT_STREAM_TRANSFER_STATE;
|
||||
msg.info.noResp = 1;
|
||||
|
||||
tmsgSendReq(pEpSet, &msg);
|
||||
qDebug("s-task:%s level:%d, status:%s dispatch transfer state msg to taskId:0x%x (vgId:%d)", pTask->id.idStr,
|
||||
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), pReq->downstreamTaskId, vgId);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
|
||||
SStreamDataBlock* pTranstate = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
|
||||
if (pTranstate == NULL) {
|
||||
|
@ -442,6 +399,8 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pTask->status.appendTranstateBlock = true;
|
||||
|
||||
qDebug("s-task:%s set sched-status:%d, prev:%d", pTask->id.idStr, TASK_SCHED_STATUS__INACTIVE, pTask->status.schedStatus);
|
||||
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
|
||||
streamSchedExec(pTask);
|
||||
|
@ -449,27 +408,6 @@ int32_t appendTranstateIntoInputQ(SStreamTask* pTask) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t streamDispatchTransferStateMsg(SStreamTask* pTask) {
|
||||
SStreamTransferReq req = { .streamId = pTask->id.streamId, .childId = pTask->info.selfChildId };
|
||||
|
||||
// serialize
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
|
||||
doDispatchTransferMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
|
||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||
|
||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||
req.downstreamTaskId = pVgInfo->taskId;
|
||||
doDispatchTransferMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
// agg
|
||||
int32_t streamTaskScanHistoryPrepare(SStreamTask* pTask) {
|
||||
pTask->numOfWaitingUpstream = taosArrayGetSize(pTask->pUpstreamEpInfoList);
|
||||
|
|
Loading…
Reference in New Issue