fix(stream): send correct rsp to mnode.

This commit is contained in:
Haojun Liao 2023-07-08 09:39:44 +08:00
parent 9c30abf95b
commit a5c19427e7
7 changed files with 137 additions and 78 deletions

View File

@ -474,6 +474,7 @@ typedef struct {
int64_t checkpointId; int64_t checkpointId;
int32_t taskId; int32_t taskId;
int32_t nodeId; int32_t nodeId;
int32_t mnodeId;
int64_t expireTime; int64_t expireTime;
} SStreamCheckpointSourceReq; } SStreamCheckpointSourceReq;
@ -482,6 +483,7 @@ typedef struct {
int64_t checkpointId; int64_t checkpointId;
int32_t taskId; int32_t taskId;
int32_t nodeId; int32_t nodeId;
int32_t mnodeId;
int64_t expireTime; int64_t expireTime;
} SStreamCheckpointSourceRsp; } SStreamCheckpointSourceRsp;
@ -500,7 +502,7 @@ typedef struct {
int32_t upstreamTaskId; int32_t upstreamTaskId;
int32_t upstreamNodeId; int32_t upstreamNodeId;
int32_t childId; int32_t childId;
} SStreamTaskCheckpointReq; } SStreamCheckpointReq;
typedef struct { typedef struct {
SMsgHead msgHead; SMsgHead msgHead;
@ -511,13 +513,13 @@ typedef struct {
int32_t upstreamTaskId; int32_t upstreamTaskId;
int32_t upstreamNodeId; int32_t upstreamNodeId;
int32_t childId; int32_t childId;
} SStreamTaskCheckpointRsp; } SStreamCheckpointRsp;
int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq); int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq);
int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq); int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq);
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpointRsp* pRsp); int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp);
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamTaskCheckpointRsp* pRsp); int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp);
typedef struct { typedef struct {
int64_t streamId; int64_t streamId;
@ -613,20 +615,22 @@ SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int32_t taskId);
void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask);
void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); void streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamDoCheckpoint(SStreamMeta* streamMeta);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver); int32_t streamLoadTasks(SStreamMeta* pMeta, int64_t ver);
// checkpoint // checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq); int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq); int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq);
int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamProcessCheckpointRsp(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamTaskReleaseState(SStreamTask* pTask); int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask); int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask);
int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -819,7 +819,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
} }
pTask->pRpcMsgList = taosArrayInit(4, POINTER_BYTES); pTask->pRpcMsgList = taosArrayInit(4, sizeof(SRpcMsg));
// sink // sink
if (pTask->outputType == TASK_OUTPUT__SMA) { if (pTask->outputType == TASK_OUTPUT__SMA) {
@ -1516,16 +1516,18 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
// todo handle this bug: task not in ready state. // todo handle this bug: task not in ready state.
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.taskId);
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, req.taskId); tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
req.taskId);
goto FAIL; goto FAIL;
} }
// backup the rpchandle for rsp code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
SRpcMsg* pRpcMsg = taosMemoryCalloc(1, sizeof(SRpcMsg)); if (code != TSDB_CODE_SUCCESS) {
memcpy(pRpcMsg, (SRpcMsg*)pMsg, sizeof(SRpcMsg)); goto FAIL;
taosArrayPush(pTask->pRpcMsgList, &pRpcMsg); }
// todo: when generating checkpoint, no new tasks are allowed to add into current Vnode // todo: when generating checkpoint, no new tasks are allowed to add into current Vnode
// todo: when generating checkpoint, leader of mnode has transfer to other DNode?
// set the initial value for generating check point // set the initial value for generating check point
int32_t total = 0; int32_t total = 0;
@ -1544,7 +1546,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;
FAIL: FAIL:
return code; return code;
} }
@ -1555,7 +1557,7 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode); int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta; SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTaskCheckpointReq req = {0}; SStreamCheckpointReq req = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len); tDecoderInit(&decoder, (uint8_t*)msg, len);
@ -1572,6 +1574,11 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, SRpcMsg* pMsg) {
goto FAIL; goto FAIL;
} }
code = streamAddCheckpointRspMsg(&req, &pMsg->info, pTask);
if (code != TSDB_CODE_SUCCESS) {
goto FAIL;
}
streamProcessCheckpointReq(pMeta, pTask, &req); streamProcessCheckpointReq(pMeta, pTask, &req);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return code; return code;
@ -1591,7 +1598,7 @@ int32_t tqProcessStreamCheckPointRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
int32_t code = 0; int32_t code = 0;
SStreamTaskCheckpointRsp req = {0}; SStreamCheckpointRsp req = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len); tDecoderInit(&decoder, (uint8_t*)msg, len);

View File

@ -165,6 +165,9 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu
} }
pInfo->blockType = STREAM_INPUT__DATA_BLOCK; pInfo->blockType = STREAM_INPUT__DATA_BLOCK;
} else if (type == STREAM_INPUT__CHECKPOINT) {
taosArrayPush(pInfo->pBlockLists, input);
pInfo->blockType = STREAM_INPUT__CHECKPOINT;
} else { } else {
ASSERT(0); ASSERT(0);
} }

View File

@ -50,9 +50,9 @@ int32_t tEncodeStreamRetrieveReq(SEncoder* pEncoder, const SStreamRetrieveReq* p
int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData); int32_t streamDispatchAllBlocks(SStreamTask* pTask, const SStreamDataBlock* pData);
int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet); int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId); int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask, int32_t vgId); int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask);
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id); int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, const char* id);
SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem); SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem* pElem);

View File

@ -21,6 +21,7 @@ int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckp
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->mnodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1; if (tEncodeI64(pEncoder, pReq->expireTime) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
@ -32,6 +33,7 @@ int32_t tDecodeStreamCheckpointSourceReq(SDecoder* pDecoder, SStreamCheckpointSo
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->mnodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->expireTime) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
@ -59,7 +61,7 @@ int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSo
return 0; return 0;
} }
int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpointReq* pReq) { int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamCheckpointReq* pReq) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1; if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
@ -72,7 +74,7 @@ int32_t tEncodeStreamCheckpointReq(SEncoder* pEncoder, const SStreamTaskCheckpoi
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq* pReq) { int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamCheckpointReq* pReq) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
@ -85,7 +87,7 @@ int32_t tDecodeStreamCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpointReq*
return 0; return 0;
} }
int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpointRsp* pRsp) { int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamCheckpointRsp* pRsp) {
if (tStartEncode(pEncoder) < 0) return -1; if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1; if (tEncodeI64(pEncoder, pRsp->checkpointId) < 0) return -1;
@ -98,7 +100,7 @@ int32_t tEncodeStreamCheckpointRsp(SEncoder* pEncoder, const SStreamTaskCheckpoi
return pEncoder->pos; return pEncoder->pos;
} }
int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamTaskCheckpointRsp* pRsp) { int32_t tDecodeStreamCheckpointRsp(SDecoder* pDecoder, SStreamCheckpointRsp* pRsp) {
if (tStartDecode(pDecoder) < 0) return -1; if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1; if (tDecodeI64(pDecoder, &pRsp->checkpointId) < 0) return -1;
@ -124,7 +126,7 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i
} }
static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) { static int32_t streamTaskDispatchCheckpointMsg(SStreamTask* pTask, uint64_t checkpointId) {
SStreamTaskCheckpointReq req = { SStreamCheckpointReq req = {
.streamId = pTask->id.streamId, .streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId, .upstreamTaskId = pTask->id.taskId,
.upstreamNodeId = pTask->info.nodeId, .upstreamNodeId = pTask->info.nodeId,
@ -208,7 +210,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamMeta* pMeta, SStreamTask* pTask,
return code; return code;
} }
int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTaskCheckpointReq* pReq) { int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStreamCheckpointReq* pReq) {
int32_t code; int32_t code;
int64_t checkpointId = pReq->checkpointId; int64_t checkpointId = pReq->checkpointId;
int32_t childId = pReq->childId; int32_t childId = pReq->childId;

View File

@ -415,7 +415,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamTaskCheckpointReq* pReq, int32_t nodeId, int32_t streamDispatchCheckpointMsg(SStreamTask* pTask, const SStreamCheckpointReq* pReq, int32_t nodeId,
SEpSet* pEpSet) { SEpSet* pEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = -1;
@ -476,70 +476,36 @@ int32_t streamDispatchScanHistoryFinishMsg(SStreamTask* pTask) {
// this function is usually invoked by sink/agg task // this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) { int32_t streamTaskSendCheckpointRsp(SStreamTask* pTask, int32_t vgId) {
// int32_t code = 0;
// int32_t len;
//
// // todo set upstreamTaskId Info
// const SStreamTaskCheckpointRsp rsp = {
// .streamId = pTask->id.streamId, .downstreamTaskId = pTask->id.taskId, .downstreamNodeId = vgId};
//
// SEncoder encoder;
// tEncodeSize(tEncodeStreamCheckpointRsp, &rsp, len, code);
// if (code < 0) {
// qError("vgId:%d failed to encode checkpoint rsp, task:0x%x", vgId, rsp.downstreamTaskId);
// return -1;
// }
//
// void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
// ((SMsgHead*)buf)->vgId = htonl(rsp.upstreamNodeId);
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
// tEncoderInit(&encoder, (uint8_t*)abuf, len);
// tEncodeStreamCheckpointRsp(&encoder, &rsp);
// tEncoderClear(&encoder);
int32_t num = taosArrayGetSize(pTask->pRpcMsgList); int32_t num = taosArrayGetSize(pTask->pRpcMsgList);
ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num); ASSERT(taosArrayGetSize(pTask->pUpstreamEpInfoList) == num);
qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel, qDebug("s-task:%s level:%d checkpoint completed msg sent to %d upstream tasks", pTask->id.idStr, pTask->info.taskLevel,
num); num);
for(int32_t i = 0; i < num; ++i) { for(int32_t i = 0; i < num; ++i) {
SRpcMsg* pMsg = taosArrayGetP(pTask->pRpcMsgList, 0); SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, i);
tmsgSendRsp(pMsg); tmsgSendRsp(pMsg);
} }
int8_t prev = pTask->status.taskStatus;
pTask->status.taskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s level:%d source checkpoint completed msg sent to upstream, set status:%s, prev:%s", pTask->id.idStr,
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), streamGetTaskStatusStr(prev));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// this function is only invoked by source task, and send rsp to mnode // this function is only invoked by source task, and send rsp to mnode
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask, int32_t vgId) { int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE); ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosArrayGetSize(pTask->pRpcMsgList) == 1);
// int32_t code = 0; SRpcMsg* pMsg = taosArrayGet(pTask->pRpcMsgList, 0);
// int32_t len;
// SEncoder encoder;
// SStreamMeta* pMeta = pTask->pMeta;
// const SStreamCheckpointSourceRsp rsp = {
// .streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId, .checkpointId = pMeta->checkpointId};
//
// tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code);
// if (code < 0) {
// qError("vgId:%d failed to encode source checkpoint rsp, task:0x%x", vgId, pTask->id.taskId);
// return -1;
// }
//
// void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
// ((SMsgHead*)buf)->vgId = htonl(rsp.nodeId);
//
// void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
// tEncoderInit(&encoder, (uint8_t*)abuf, len);
// tEncodeStreamCheckpointSourceRsp(&encoder, &rsp);
// tEncoderClear(&encoder);
// SRpcMsg rspMsg = *pTask->rpcMsg;
ASSERT(taosArrayGetSize(pTask->pRpcMsgList) == 0); tmsgSendRsp(pMsg);
qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); int8_t prev = pTask->status.taskStatus;
tmsgSendRsp(taosArrayGetP(pTask->pRpcMsgList, 0)); pTask->status.taskStatus = TASK_STATUS__NORMAL;
qDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode, set status:%s, prev:%s", pTask->id.idStr,
pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus), streamGetTaskStatusStr(prev));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -598,7 +564,7 @@ int32_t streamAddBlockIntoDispatchMsg(const SSDataBlock* pBlock, SStreamDispatch
} }
int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId, int32_t doDispatchScanHistoryFinishMsg(SStreamTask* pTask, const SStreamScanHistoryFinishReq* pReq, int32_t vgId,
SEpSet* pEpSet) { SEpSet* pEpSet) {
void* buf = NULL; void* buf = NULL;
int32_t code = -1; int32_t code = -1;
SRpcMsg msg = {0}; SRpcMsg msg = {0};
@ -759,3 +725,80 @@ void doRetryDispatchData(void* param, void* tmrId) {
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS); streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} }
} }
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
SStreamCheckpointSourceRsp rsp = {
.checkpointId = pReq->checkpointId,
.taskId = pReq->taskId,
.nodeId = pReq->nodeId,
.streamId = pReq->streamId,
.expireTime = pReq->expireTime,
.mnodeId = pReq->mnodeId,
};
tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code);
if (code < 0) {
return code;
}
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pBuf)->vgId = htonl(pReq->mnodeId);
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamCheckpointSourceRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
taosArrayPush(pTask->pRpcMsgList, &rspMsg);
return TSDB_CODE_SUCCESS;
}
int32_t streamAddCheckpointRspMsg(SStreamCheckpointReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
SStreamCheckpointRsp rsp = {
.checkpointId = pReq->checkpointId,
.downstreamTaskId = pReq->downstreamTaskId,
.downstreamNodeId = pReq->downstreamNodeId,
.streamId = pReq->streamId,
.upstreamTaskId = pReq->upstreamTaskId,
.upstreamNodeId = pReq->upstreamNodeId,
};
tEncodeSize(tEncodeStreamCheckpointRsp, &rsp, len, code);
if (code < 0) {
return code;
}
void* pBuf = rpcMallocCont(sizeof(SMsgHead) + len);
if (pBuf == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
((SMsgHead*)pBuf)->vgId = htonl(pReq->upstreamTaskId);
void* abuf = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
tEncodeStreamCheckpointRsp(&encoder, &rsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
taosArrayPush(pTask->pRpcMsgList, &rspMsg);
return TSDB_CODE_SUCCESS;
}

View File

@ -546,7 +546,7 @@ int32_t streamTryExec(SStreamTask* pTask) {
// send check point response to upstream task // send check point response to upstream task
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
streamTaskSendCheckpointSourceRsp(pTask, pMeta->vgId); streamTaskSendCheckpointSourceRsp(pTask);
} else { } else {
streamTaskSendCheckpointRsp(pTask, pMeta->vgId); streamTaskSendCheckpointRsp(pTask, pMeta->vgId);
} }