refactor(stream): do some internal refactor.

This commit is contained in:
Haojun Liao 2023-07-14 17:21:20 +08:00
parent 2fd72500e2
commit eb7e6152ae
16 changed files with 244 additions and 331 deletions

View File

@ -257,7 +257,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_STREAM_SCAN_HISTORY_FINISH, "stream-scan-history-finish", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TRANSFER_STATE, "stream-transfer-state", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECK, "stream-task-check", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT, "stream-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)

View File

@ -123,7 +123,7 @@ typedef struct {
int8_t type;
int32_t srcVgId;
int32_t childId;
int32_t srcTaskId;
int64_t sourceVer;
int64_t reqId;
@ -312,7 +312,7 @@ struct SStreamTask {
SStreamId historyTaskId;
SStreamId streamTaskId;
SArray* pUpstreamInfoList; // SArray<SStreamChildEpInfo*>, // children info
SArray* pRpcMsgList; // SArray<SRpcMsg*>
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
// output
union {
@ -368,7 +368,6 @@ typedef struct SStreamMeta {
SHashObj* pTaskBackendUnique;
TdThreadMutex backendMutex;
// uint64_t checkpointId;
int32_t chkptNotReadyTasks;
SArray* checkpointSaved;
SArray* checkpointInUse;
@ -396,7 +395,8 @@ typedef struct {
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t dataSrcVgId;
int32_t type;
int32_t srcVgId;
int32_t upstreamTaskId;
int32_t upstreamChildId;
int32_t upstreamNodeId;
@ -503,24 +503,10 @@ typedef struct {
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
} SStreamCheckpointReq;
} SStreamCheckpointReadyMsg;
typedef struct {
SMsgHead msgHead;
int64_t streamId;
int64_t checkpointId;
int32_t downstreamTaskId;
int32_t downstreamNodeId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t childId;
} SStreamCheckpointRsp;
int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq);
int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq);
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pRsp);
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp);
typedef struct {
int64_t streamId;
@ -541,12 +527,12 @@ int32_t tDecodeStreamTaskCheckReq(SDecoder* pDecoder, SStreamTaskCheckReq* pReq)
int32_t tEncodeStreamTaskCheckRsp(SEncoder* pEncoder, const SStreamTaskCheckRsp* pRsp);
int32_t tDecodeStreamTaskCheckRsp(SDecoder* pDecoder, SStreamTaskCheckRsp* pRsp);
int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq);
int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq);
int32_t tDecodeStreamRetrieveReq(SDecoder* pDecoder, SStreamRetrieveReq* pReq);
void tDeleteStreamRetrieveReq(SStreamRetrieveReq* pReq);
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId);
void tDeleteStreamDispatchReq(SStreamDispatchReq* pReq);
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
@ -556,8 +542,9 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code);
int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq, SRpcMsg* pMsg);
void streamTaskOpenUpstreamInput(SStreamTask* pTask);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId);
void streamTaskInputFail(SStreamTask* pTask);
int32_t streamTryExec(SStreamTask* pTask);
@ -623,16 +610,16 @@ int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask);
int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);
#ifdef __cplusplus
}

View File

@ -80,7 +80,6 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECK_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -751,8 +751,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_PAUSE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RESUME, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_CHECK_POINT_SOURCE, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -221,8 +221,7 @@ int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqCheckStreamStatus(STQ* pTq);
int tqCommit(STQ*);

View File

@ -833,8 +833,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
}
pTask->pRpcMsgList = taosArrayInit(4, sizeof(SRpcMsg));
// sink
if (pTask->outputType == TASK_OUTPUT__SMA) {
pTask->smaSink.vnode = pTq->pVnode;
@ -865,7 +863,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond);
}
streamTaskOpenUpstreamInput(pTask);
streamTaskOpenAllUpstreamInput(pTask);
streamSetupScheduleTrigger(pTask);
SCheckpointInfo* pChkInfo = &pTask->chkInfo;
@ -1276,13 +1274,13 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// even in halt status, the data in inputQ must be processed
int8_t status = pTask->status.taskStatus;
if (status == TASK_STATUS__NORMAL || status == TASK_STATUS__HALT || status == TASK_STATUS__CK) {
tqDebug("vgId:%d s-task:%s start to process block from inputQ, last chk point:%" PRId64, vgId, pTask->id.idStr,
pTask->chkInfo.checkpointVer);
tqDebug("vgId:%d s-task:%s start to process block from inputQ, status:%s, checkpointVer:%" PRId64, vgId,
pTask->id.idStr, streamGetTaskStatusStr(status), pTask->chkInfo.checkpointVer);
streamProcessRunReq(pTask);
} else {
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
tqDebug("vgId:%d s-task:%s ignore run req since not in ready state, status:%s, sched-status:%d", vgId,
pTask->id.idStr, streamGetTaskStatusStr(pTask->status.taskStatus), pTask->status.schedStatus);
pTask->id.idStr, streamGetTaskStatusStr(status), pTask->status.schedStatus);
}
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
@ -1531,7 +1529,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
tqError("vgId:%d failed to decode checkpoint source msg, code:%s", vgId, tstrerror(code));
goto FAIL;
return code;
}
tDecoderClear(&decoder);
@ -1540,12 +1538,12 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
req.taskId);
goto FAIL;
return TSDB_CODE_SUCCESS;
}
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
return code;
}
// todo: when generating checkpoint, no new tasks are allowed to add into current Vnode
@ -1561,67 +1559,27 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
total = taosArrayGetSize(pMeta->pTaskList);
taosWUnLockLatch(&pMeta->lock);
qDebug("s-task:%s level:%d receive the checkpoint msg id:%" PRId64 " from mnode, total source checkpoint req:%d",
qDebug("s-task:%s level:%d receive checkpoint source msg from mnode id:%" PRId64 ", total source checkpoint req:%d",
pTask->id.idStr, pTask->info.taskLevel, req.checkpointId, total);
streamProcessCheckpointSourceReq(pMeta, pTask, &req);
streamMetaReleaseTask(pMeta, pTask);
return code;
FAIL:
return code;
}
int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamCheckpointReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointReq(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
return code;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.downstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId);
return TSDB_CODE_SUCCESS;
}
code = streamAddCheckpointRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaReleaseTask(pMeta, pTask);
return code;
}
streamProcessCheckpointReq(pTask, &req);
streamProcessCheckpointSourceReq(pTask, &req);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
// downstream task has complete the stream task checkpoint procedure
int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) {
// if this task is an agg task, rsp this message to upstream directly.
// if this task is an source task, send source rsp to mnode
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
int32_t tqProcessStreamTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0;
SStreamCheckpointRsp req = {0};
SStreamCheckpointReadyMsg req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeStreamCheckpointRsp(&decoder, &req) < 0) {
if (tDecodeStreamCheckpointReadyMsg(&decoder, &req) < 0) {
code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder);
return code;
@ -1630,13 +1588,13 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x , it may have been destroyed already", vgId, req.downstreamTaskId);
tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
return code;
}
tqDebug("vgId:%d s-task:%s received the checkpoint rsp, handle it", vgId, pTask->id.idStr);
tqDebug("vgId:%d s-task:%s received the checkpoint ready msg, handle it", vgId, pTask->id.idStr);
streamProcessCheckpointRsp(pMeta, pTask);
streamProcessCheckpointReadyMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}

View File

@ -673,10 +673,8 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
return tqProcessTaskRecoverFinishRsp(pVnode->pTq, pMsg);
case TDMT_VND_STREAM_CHECK_POINT_SOURCE:
return tqProcessStreamCheckPointSourceReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT:
return tqProcessStreamCheckPointReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_RSP:
return tqProcessStreamCheckPointRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessStreamTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
default:
vError("unknown msg type:%d in stream queue", pMsg->msgType);
return TSDB_CODE_APP_ERROR;

View File

@ -50,13 +50,12 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamSaveTasks(SStreamMeta* pMeta, int64_t checkpointId);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId);
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask);
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask);
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id);
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);
#ifdef __cplusplus

View File

@ -143,10 +143,10 @@ int32_t streamSchedExec(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
static int32_t streamTaskAppendInputBlocks(SStreamTask* pTask, const SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
int8_t status = 0;
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, STREAM_INPUT__DATA_BLOCK, pReq->dataSrcVgId);
SStreamDataBlock* pBlock = createStreamBlockFromDispatchMsg(pReq, pReq->type, pReq->srcVgId);
if (pBlock == NULL) {
streamTaskInputFail(pTask);
status = TASK_INPUT_STATUS__FAILED;
@ -239,9 +239,12 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
qDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
// todo: if current task has received the checkpoint req from the upstream t#1, the msg from t#1 should all blocked
// Current task has received the checkpoint req from the upstream task, from which the message should all be blocked
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
}
streamTaskEnqueueBlocks(pTask, pReq, pRsp);
streamTaskAppendInputBlocks(pTask, pReq, pRsp);
tDeleteStreamDispatchReq(pReq);
if (exec) {
@ -322,7 +325,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
}
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s checkpoint enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
qDebug("s-task:%s level:%d checkpoint(trigger) enqueue inputQ, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr,
pTask->info.taskLevel, total, size);
} else if (type == STREAM_INPUT__GET_RES) { // use the default memory limit, refactor later.
taosWriteQitem(pTask->inputQueue->queue, pItem);
qDebug("s-task:%s data res enqueue, current(blocks:%d, size:%.2fMiB)", pTask->id.idStr, total, size);
@ -357,25 +361,33 @@ void* streamQueueNextItem(SStreamQueue* pQueue) {
void streamTaskInputFail(SStreamTask* pTask) { atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); }
void streamTaskOpenUpstreamInput(SStreamTask* pTask) {
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
if (num == 0) {
return;
}
for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
pInfo->dataAllowed = true;
}
}
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId) {
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, taskId);
if (pInfo != NULL) {
pInfo->dataAllowed = false;
}
}
SStreamChildEpInfo * streamTaskGetUpstreamTaskEpInfo(SStreamTask* pTask, int32_t taskId) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
for(int32_t i = 0; i < num; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGet(pTask->pUpstreamInfoList, i);
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
if (pInfo->taskId == taskId) {
pInfo->dataAllowed = false;
break;
return pInfo;
}
}
return NULL;
}

View File

@ -61,7 +61,7 @@ int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSo
return 0;
}
int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq) {
int32_t tEncodeStreamCheckpointReadyMsg(SEncoder* pEncoder, const SStreamCheckpointReadyMsg* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
@ -74,33 +74,7 @@ int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointRe
return pEncoder->pos;
}
int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->downstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->childId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->downstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->childId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) {
int32_t tDecodeStreamCheckpointReadyMsg(SDecoder* pDecoder, SStreamCheckpointReadyMsg* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
@ -113,7 +87,7 @@ int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRs
return 0;
}
static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, int32_t childId) {
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
int64_t old = atomic_val_compare_exchange_32(&pTask->checkpointAlignCnt, 0, num);
if (old == 0) {
@ -124,19 +98,27 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i
}
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamCheckpoint* pChkpoint = taosAllocateQitem(sizeof(SStreamCheckpoint), DEF_QITEM, sizeof(SSDataBlock));
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pChkpoint->type = checkpointType;
pChkpoint->pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pChkpoint->pBlock == NULL) {
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) {
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
}
pChkpoint->pBlock->info.type = STREAM_CHECKPOINT;
pBlock->info.type = STREAM_CHECKPOINT;
pBlock->info.version = pTask->checkpointingId;
pBlock->info.rows = 1;
pBlock->info.childId = pTask->info.selfChildId;
pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));//pBlock;
taosArrayPush(pChkpoint->blocks, pBlock);
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pChkpoint) < 0) {
taosFreeQitem(pChkpoint);
return TSDB_CODE_OUT_OF_MEMORY;
@ -146,56 +128,72 @@ static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpoint
return TSDB_CODE_SUCCESS;
}
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
int32_t code = 0;
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
pTask->status.taskStatus = TASK_STATUS__CK;
pTask->checkpointNotReadyTasks = 1;
pTask->checkpointingId = pReq->checkpointId;
pTask->checkpointNotReadyTasks = 1;
// 2. let's dispatch checkpoint msg to downstream task directly and do nothing else.
// 2. put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task already.
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
streamSchedExec(pTask);
return code;
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
}
int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pReq) {
int64_t checkpointId = pReq->checkpointId;
int32_t childId = pReq->childId;
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
int64_t checkpointId = pDataBlock->info.version;
// set the task status
pTask->checkpointingId = checkpointId;
pTask->status.taskStatus = TASK_STATUS__CK;
//todo fix race condition: set the status and append checkpoint block
int32_t taskLevel = pTask->info.taskLevel;
if (taskLevel == TASK_LEVEL__SOURCE) {
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG || pTask->info.taskLevel == TASK_LEVEL__SINK);
qDebug("s-task:%s set childIdx:%d, and add checkpoint block into outputQ", pTask->id.idStr,
pTask->info.selfChildId);
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code != 0) { // todo failed to add it into the output queue, free it.
return code;
}
streamFreeQitem((SStreamQueueItem*)pBlock);
streamDispatchStreamBlock(pTask);
} else { // only one task exists
streamProcessCheckpointReadyMsg(pTask);
}
} else if (taskLevel == TASK_LEVEL__SINK) {
// todo: sink node needs alignment??
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT);
streamSchedExec(pTask);
qDebug("s-task:%s sink task set to checkpoint ready, start to send rsp to upstream", pTask->id.idStr);
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
// update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
qDebug("s-task:%s sink task do checkpoint ready, send ready msg to upstream", pTask->id.idStr);
} else {
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) > 0);
// close the inputQ for data from upstream task.
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
// update the child Id for downstream tasks
streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
// there are still some upstream tasks not send checkpoint request, do nothing and wait for then
int32_t notReady = streamAlignCheckpoint(pTask, checkpointId, childId);
int32_t notReady = streamAlignCheckpoint(pTask);
int32_t num = taosArrayGetSize(pTask->pUpstreamInfoList);
if (notReady > 0) {
qDebug("s-task:%s received checkpoint req, %d upstream tasks not send checkpoint info yet, total:%d",
pTask->id.idStr, notReady, num);
qDebug("s-task:%s received checkpoint block, idx:%d, %d upstream tasks not send checkpoint info yet, total:%d",
pTask->id.idStr, pTask->info.selfChildId, notReady, num);
return 0;
}
qDebug(
"s-task:%s receive one checkpoint req, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to "
"s-task:%s receive one checkpoint block, all %d upstream sent checkpoint msgs, dispatch checkpoint msg to "
"downstream",
pTask->id.idStr, num);
@ -206,8 +204,20 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
// if all upstreams are ready for generating checkpoint, set the status to be TASK_STATUS__CK_READY
// put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
streamSchedExec(pTask);
{
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
ASSERT(pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH || pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH);
int32_t code = taosWriteQitem(pTask->outputQueue->queue, pBlock);
if (code != 0) { // todo failed to add it into the output queue, free it.
return code;
}
streamFreeQitem((SStreamQueueItem*)pBlock);
streamDispatchStreamBlock(pTask);
}
}
return 0;
@ -217,16 +227,17 @@ int32_t streamProcessCheckpointReq(SStreamTask* pTask, SStreamCheckpointReq* pRe
* All down stream tasks have successfully completed the check point task.
* Current stream task is allowed to start to do checkpoint things in ASYNC model.
*/
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask) {
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t notReady = atomic_sub_fetch_32(&pTask->checkpointNotReadyTasks, 1);
ASSERT(notReady >= 0);
if (notReady == 0) {
qDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task",
pTask->id.idStr);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT);
streamSchedExec(pTask);
} else {
int32_t total = streamTaskGetNumOfDownstream(pTask);
qDebug("s-task:%s %d/%d downstream tasks are not ready, wait", pTask->id.idStr, notReady, total);

View File

@ -23,6 +23,7 @@ SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pRe
pData->type = blockType;
pData->srcVgId = srcVg;
pData->srcTaskId = pReq->upstreamTaskId;
int32_t blockNum = pReq->blockNum;
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
@ -59,16 +60,15 @@ SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamT
return NULL;
}
pStreamBlocks->srcTaskId = pTask->id.taskId;
pStreamBlocks->type = STREAM_INPUT__DATA_BLOCK;
pStreamBlocks->blocks = pRes;
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmit = (SStreamDataSubmit*)pItem;
pStreamBlocks->childId = pTask->info.selfChildId;
pStreamBlocks->sourceVer = pSubmit->ver;
} else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)pItem;
pStreamBlocks->childId = pTask->info.selfChildId;
pStreamBlocks->sourceVer = pMerged->ver;
}

View File

@ -26,8 +26,13 @@ typedef struct SBlockName {
char parTbName[TSDB_TABLE_NAME_LEN];
} SBlockName;
typedef struct {
int32_t taskId;
SRpcMsg msg;
SEpSet epset;
} SStreamChkptReadyInfo;
static void doRetryDispatchData(void* param, void* tmrId);
static int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* pReq);
static int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatchReq* pReq);
static int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet);
@ -46,7 +51,8 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamTaskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->dataSrcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->srcVgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->upstreamNodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->blockNum) < 0) return -1;
@ -70,15 +76,16 @@ int32_t tDecodeStreamDispatchReq(SDecoder* pDecoder, SStreamDispatchReq* pReq) {
return 0;
}
int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, int32_t numOfBlocks,
int64_t dstTaskId) {
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
pReq->streamId = pTask->id.streamId;
pReq->dataSrcVgId = vgId;
pReq->srcVgId = vgId;
pReq->upstreamTaskId = pTask->id.taskId;
pReq->upstreamChildId = pTask->info.selfChildId;
pReq->upstreamNodeId = pTask->info.nodeId;
pReq->blockNum = numOfBlocks;
pReq->taskId = dstTaskId;
pReq->type = type;
pReq->data = taosArrayInit(numOfBlocks, POINTER_BYTES);
pReq->dataLen = taosArrayInit(numOfBlocks, sizeof(int32_t));
@ -238,9 +245,8 @@ int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pR
return 0;
}
static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
static int32_t doDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData) {
int32_t code = 0;
int32_t numOfBlocks = taosArrayGetSize(pData->blocks);
ASSERT(numOfBlocks != 0);
@ -248,7 +254,7 @@ static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBloc
SStreamDispatchReq req = {0};
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId);
code = tInitStreamDispatchReq(&req, pTask, pData->srcVgId, numOfBlocks, downstreamTaskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
@ -289,7 +295,7 @@ static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBloc
for (int32_t i = 0; i < vgSz; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId);
code = tInitStreamDispatchReq(&pReqs[i], pTask, pData->srcVgId, 0, pVgInfo->taskId, pData->type);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL_SHUFFLE_DISPATCH;
}
@ -299,7 +305,7 @@ static int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBloc
SSDataBlock* pDataBlock = taosArrayGet(pData->blocks, i);
// TODO: do not use broadcast
if (pDataBlock->info.type == STREAM_DELETE_RESULT) {
if (pDataBlock->info.type == STREAM_DELETE_RESULT || pDataBlock->info.type == STREAM_CHECKPOINT) {
for (int32_t j = 0; j < vgSz; j++) {
if (streamAddBlockIntoDispatchMsg(pDataBlock, &pReqs[j]) < 0) {
goto FAIL_SHUFFLE_DISPATCH;
@ -359,7 +365,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
int32_t numOfElems = taosQueueItemSize(pTask->outputQueue->queue);
if (numOfElems > 0) {
qDebug("s-task:%s try to dispatch intermediate result block to downstream, elem in outputQ:%d", pTask->id.idStr,
qDebug("s-task:%s try to dispatch intermediate block to downstream, elem in outputQ:%d", pTask->id.idStr,
numOfElems);
}
@ -382,12 +388,12 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
}
pTask->msgInfo.pData = pBlock;
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK);
ASSERT(pBlock->type == STREAM_INPUT__DATA_BLOCK || pBlock->type == STREAM_INPUT__CHECKPOINT_TRIGGER);
int32_t retryCount = 0;
while (1) {
int32_t code = streamDispatchAllBlocks(pTask, pBlock);
int32_t code = doDispatchAllBlocks(pTask, pBlock);
if (code == TSDB_CODE_SUCCESS) {
break;
}
@ -415,81 +421,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
static int32_t doDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId,
SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamCheckpointReq, pReq, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
return -1;
}
((SMsgHead*)buf)->vgId = htonl(nodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamCheckpointReq(&encoder, pReq)) < 0) {
rpcFreeCont(buf);
return code;
}
tEncoderClear(&encoder);
initRpcMsg(&msg, TDMT_STREAM_TASK_CHECKPOINT, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d, vgId:%d) dispatch checkpoint msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)",
pTask->id.idStr, pTask->info.taskLevel, pTask->info.nodeId, pReq->streamId, pReq->downstreamTaskId, nodeId);
tmsgSendReq(pEpSet, &msg);
return 0;
}
int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) {
SStreamCheckpointReq req = {
.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId,
.downstreamNodeId = pTask->info.nodeId,
.downstreamTaskId = pTask->id.taskId,
.childId = pTask->info.selfChildId,
.checkpointId = checkpointId,
};
// serialize
if (pTask->outputType == TASK_OUTPUT__FIXED_DISPATCH) {
req.downstreamNodeId = pTask->fixedEpDispatcher.nodeId;
req.downstreamTaskId = pTask->fixedEpDispatcher.taskId;
doDispatchCheckpointMsg(pTask, &req, pTask->fixedEpDispatcher.nodeId, &pTask->fixedEpDispatcher.epSet);
} else if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
pTask->notReadyTasks = numOfVgs;
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
qDebug("s-task:%s dispatch %d checkpoint msg to downstream", pTask->id.idStr, numOfVgs);
for (int32_t i = 0; i < numOfVgs; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId;
doDispatchCheckpointMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // no need to dispatch msg to downstream task
qDebug("s-task:%s no down stream task, not dispatch checkpoint msg to downstream", pTask->id.idStr);
streamProcessCheckpointRsp(NULL, pTask);
}
return 0;
}
int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
SStreamScanHistoryFinishReq req = {.streamId = pTask->id.streamId, .childId = pTask->info.selfChildId};
@ -514,32 +445,31 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
}
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pRpcMsgList);
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->pReadyMsgList);
ASSERT(taosArrayGetSize(pTask->pUpstreamInfoList) == num);
qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr,
pTask->info.taskLevel, num);
for (int32_t i = 0; i < num; ++i) {
SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, i);
tmsgSendRsp(pMsg);
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
tmsgSendReq(&pInfo->epset, &pInfo->msg);
qDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel,
pInfo->taskId);
}
taosArrayClear(pTask->pRpcMsgList);
qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream", pTask->id.idStr, pTask->info.taskLevel);
taosArrayClear(pTask->pReadyMsgList);
qDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel, num);
return TSDB_CODE_SUCCESS;
}
// this function is only invoked by source task, and send rsp to mnode
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosArrayGetSize(pTask->pRpcMsgList) == 1);
SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, 0);
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosArrayGetSize(pTask->pReadyMsgList) == 1);
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, 0);
tmsgSendRsp(pMsg);
tmsgSendRsp(&pInfo->msg);
taosArrayClear(pTask->pRpcMsgList);
taosArrayClear(pTask->pReadyMsgList);
qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
return TSDB_CODE_SUCCESS;
@ -550,7 +480,8 @@ int32_t tEncodeStreamDispatchReq(SEncoder* pEncoder, const SStreamDispatchReq* p
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamTaskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->dataSrcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->type) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->srcVgId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->upstreamNodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->blockNum) < 0) return -1;
@ -753,7 +684,7 @@ void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
ASSERT(pTask->outputStatus == TASK_OUTPUT_STATUS__WAIT);
int32_t code = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
int32_t code = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (code != TSDB_CODE_SUCCESS) {
qDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
@ -794,49 +725,73 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
tEncodeStreamCheckpointSourceRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
taosArrayPush(pTask->pRpcMsgList, &rspMsg);
SStreamChkptReadyInfo info = {0};
initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
info.msg.info = *pRpcInfo;
qDebug("s-task:%s add checkpoint rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pRpcMsgList));
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
}
taosArrayPush(pTask->pReadyMsgList, &info);
qDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pReadyMsgList));
return TSDB_CODE_SUCCESS;
}
int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) {
int32_t code = 0;
int32_t tlen = 0;
void* buf = NULL;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return TSDB_CODE_SUCCESS;
}
SStreamCheckpointRsp rsp = {
.checkpointId = pReq->checkpointId,
.downstreamTaskId = pReq->downstreamTaskId,
.downstreamNodeId = pReq->downstreamNodeId,
.streamId = pReq->streamId,
.upstreamTaskId = pReq->upstreamTaskId,
.upstreamNodeId = pReq->upstreamNodeId,
};
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
tEncodeSize(tEncodeStreamCheckpointRsp, &rsp, len, code);
SStreamCheckpointReadyMsg req = {0};
req.downstreamNodeId = pTask->pMeta->vgId;
req.downstreamTaskId = pTask->id.taskId;
req.streamId = pTask->id.streamId;
req.checkpointId = checkpointId;
req.childId = pInfo->childId;
req.upstreamNodeId = pInfo->nodeId;
req.upstreamTaskId = pInfo->taskId;
tEncodeSize(tEncodeStreamCheckpointReadyMsg, &req, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
return -1;
}
((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) {
rpcFreeCont(buf);
return code;
}
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamNodeId);
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamCheckpointRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
taosArrayPush(pTask->pRpcMsgList, &rspMsg);
ASSERT(req.upstreamTaskId != 0);
return TSDB_CODE_SUCCESS;
SStreamChkptReadyInfo info = {.taskId = pInfo->taskId, .epset = pInfo->epSet};
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
qDebug("s-task:%s (level:%d) prepare checkpoint ready msg to upstream s-task:0x%" PRIx64 ":0x%x (vgId:%d) idx:%d",
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.downstreamNodeId, index);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
}
taosArrayPush(pTask->pReadyMsgList, &info);
return 0;
}
// todo record the idle time for dispatch data
@ -848,7 +803,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// happened too fast. todo handle the shuffle dispatch failure
qError("s-task:%s failed to dispatch msg to task:0x%x, code:%s, retry cnt:%d", pTask->id.idStr,
pRsp->downstreamTaskId, tstrerror(code), ++pTask->msgInfo.retryCount);
int32_t ret = streamDispatchAllBlocks(pTask, pTask->msgInfo.pData);
int32_t ret = doDispatchAllBlocks(pTask, pTask->msgInfo.pData);
if (ret != TSDB_CODE_SUCCESS) {
}

View File

@ -57,8 +57,7 @@ static int32_t doDumpResult(SStreamTask* pTask, SStreamQueueItem* pItem, SArray*
return TSDB_CODE_SUCCESS;
}
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize,
int32_t* totalBlocks) {
static int32_t streamTaskExecImpl(SStreamTask* pTask, SStreamQueueItem* pItem, int64_t* totalSize, int32_t* totalBlocks) {
int32_t code = TSDB_CODE_SUCCESS;
void* pExecutor = pTask->exec.pExecutor;
@ -413,9 +412,9 @@ static void doSetStreamInputBlock(SStreamTask* pTask, const void* pInput, int64_
const SStreamRefDataBlock* pRefBlock = (const SStreamRefDataBlock*)pInput;
qSetMultiStreamInput(pExecutor, pRefBlock->pBlock, 1, STREAM_INPUT__DATA_BLOCK);
} else if (pItem->type == STREAM_INPUT__CHECKPOINT) {
} else if (pItem->type == STREAM_INPUT__CHECKPOINT || pItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
const SStreamCheckpoint* pCheckpoint = (const SStreamCheckpoint*)pInput;
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, STREAM_INPUT__CHECKPOINT);
qSetMultiStreamInput(pExecutor, pCheckpoint->pBlock, 1, pItem->type);
} else {
ASSERT(0);
@ -434,9 +433,7 @@ int32_t streamExecForAll(SStreamTask* pTask) {
SStreamQueueItem* pInput = NULL;
// merge multiple input data if possible in the input queue.
qDebug("s-task:%s start to extract data block from inputQ", id);
/*int32_t code = */ extractBlocksFromInputQ(pTask, &pInput, &batchSize, id);
extractBlocksFromInputQ(pTask, &pInput, &batchSize);
if (pInput == NULL) {
ASSERT(batchSize == 0);
if (pTask->info.fillHistory && pTask->status.transferState) {
@ -451,8 +448,11 @@ int32_t streamExecForAll(SStreamTask* pTask) {
}
int32_t type = pInput->type;
if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
ASSERT(getNumOfItemsInputQ(pTask) == 1);
// dispatch checkpoint msg to all downstream tasks
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput);
continue;
}
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
@ -462,21 +462,9 @@ int32_t streamExecForAll(SStreamTask* pTask) {
qDebug("s-task:%s sink task start to sink %d blocks", id, batchSize);
streamTaskOutputResultBlock(pTask, (SStreamDataBlock*)pInput);
continue;
} else { // pInput->type == STREAM_INPUT__CHECKPOINT, for sink task, do nothing.
ASSERT(pTask->status.taskStatus == TASK_STATUS__CK);
pTask->status.taskStatus = TASK_STATUS__CK_READY;
return 0;
}
}
// dispatch checkpoint msg to all downstream tasks
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
qDebug("s-task:%s start to dispatch checkpoint msg to downstream", id);
streamTaskDispatchCheckpointMsg(pTask, pTask->checkpointingId);
return 0;
}
int64_t st = taosGetTimestampMs();
const SStreamQueueItem* pItem = pInput;
@ -565,7 +553,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
} else {
code = streamTaskSendCheckpointRsp(pTask);
code = streamTaskSendCheckpointReadyMsg(pTask);
}
if (code != TSDB_CODE_SUCCESS) {

View File

@ -167,13 +167,14 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char*
}
#endif
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id) {
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
int32_t retryTimes = 0;
int32_t MAX_RETRY_TIMES = 5;
const char* id = pTask->id.idStr;
while (1) {
if (streamTaskShouldPause(&pTask->status)) {
qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks);
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
return TSDB_CODE_SUCCESS;
}
@ -181,17 +182,24 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
if (qItem == NULL) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
taosMsleep(10);
qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes);
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
continue;
}
qDebug("===stream===break batchSize:%d", *numOfBlocks);
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
return TSDB_CODE_SUCCESS;
}
// do not merge blocks for sink node and check point data block
if ((pTask->info.taskLevel == TASK_LEVEL__SINK) ||
(qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER)) {
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
qDebug("s-task:%s checkpoint msg extracted, start to process immediately", id);
} else {
qDebug("s-task:%s sink task handle result block one-by-one", id);
}
*numOfBlocks = 1;
*pInput = qItem;
return TSDB_CODE_SUCCESS;

View File

@ -132,7 +132,7 @@ int32_t streamTaskCheckDownstreamTasks(SStreamTask* pTask) {
}
} else {
pTask->status.downstreamReady = 1;
qDebug("s-task:%s (vgId:%d) no downstream tasks, set downstream checked, try to launch scan-history-data, status:%s",
qDebug("s-task:%s (vgId:%d) set downstream checked since no downstream, try to launch scan-history-data, status:%s",
pTask->id.idStr, pTask->info.nodeId, streamGetTaskStatusStr(pTask->status.taskStatus));
streamTaskLaunchScanHistory(pTask);

View File

@ -242,7 +242,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
}
pTask->pRpcMsgList = taosArrayDestroy(pTask->pRpcMsgList);
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);