fix(stream): update the msg encoder.

This commit is contained in:
Haojun Liao 2024-11-15 00:19:35 +08:00
parent 4dd4f217c1
commit f65651f6ef
12 changed files with 392 additions and 98 deletions

View File

@ -170,8 +170,8 @@ typedef struct SStreamHbMsg {
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.
} SStreamHbMsg;
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pRsp);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pRsp);
int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq);
int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq);
void tCleanupStreamHbMsg(SStreamHbMsg* pMsg);
typedef struct {
@ -179,6 +179,9 @@ typedef struct {
int32_t msgId;
} SMStreamHbRspMsg;
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp);
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp);
typedef struct SRetrieveChkptTriggerReq {
SMsgHead head;
int64_t streamId;
@ -189,6 +192,9 @@ typedef struct SRetrieveChkptTriggerReq {
int64_t downstreamTaskId;
} SRetrieveChkptTriggerReq;
int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq);
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq);
typedef struct SCheckpointTriggerRsp {
int64_t streamId;
int64_t checkpointId;
@ -198,6 +204,9 @@ typedef struct SCheckpointTriggerRsp {
int32_t rspCode;
} SCheckpointTriggerRsp;
int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp);
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp);
typedef struct SCheckpointReport {
int64_t streamId;
int32_t taskId;
@ -222,7 +231,7 @@ typedef struct SRestoreCheckpointInfo {
int32_t nodeId;
} SRestoreCheckpointInfo;
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
typedef struct {
@ -232,10 +241,8 @@ typedef struct {
int32_t reqType;
} SStreamTaskRunReq;
typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
int64_t ts;
} SCheckpointConsensusEntry;
int32_t tEncodeStreamTaskRunReq(SEncoder* pEncoder, const SStreamTaskRunReq* pReq);
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq);
#ifdef __cplusplus
}

View File

@ -643,6 +643,11 @@ typedef struct SCheckpointConsensusInfo {
int64_t streamId;
} SCheckpointConsensusInfo;
typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
int64_t ts;
} SCheckpointConsensusEntry;
void streamSetupScheduleTrigger(SStreamTask* pTask);
// dispatch related

View File

@ -1,4 +1,11 @@
aux_source_directory(src COMMON_SRC)
aux_source_directory("src/msg/" MSG_SRC_FILES)
list(
APPEND
COMMON_SRC
${MSG_SRC_FILES}
)
if(TD_ENTERPRISE)
LIST(APPEND COMMON_SRC ${TD_ENTERPRISE_DIR}/src/plugins/common/src/tglobal.c)

View File

@ -46,7 +46,7 @@ if (${TD_LINUX})
target_sources(tmsgTest
PRIVATE
"tmsgTest.cpp"
"../src/tmsg.c"
"../src/msg/tmsg.c"
)
target_include_directories(tmsgTest PUBLIC "${TD_SOURCE_DIR}/include/common/")
target_link_libraries(tmsgTest PUBLIC os util gtest gtest_main)

View File

@ -553,12 +553,37 @@ void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArr
}
void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId) {
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SMStreamHbRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
int32_t ret = 0;
int32_t tlen = 0;
void *buf = NULL;
SMStreamHbRspMsg *pMsg = rsp.pCont;
pMsg->head.vgId = htonl(vgId);
pMsg->msgId = msgId;
const SMStreamHbRspMsg msg = {.msgId = msgId};
tEncodeSize(tEncodeStreamHbRsp, &msg, tlen, ret);
if (ret < 0) {
mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
}
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) {
mError("encode stream hb msg rsp failed, code:%s", tstrerror(terrno));
return;
}
((SMStreamHbRspMsg*)buf)->head.vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamHbRsp(&encoder, &msg)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
mError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
return;
}
tEncoderClear(&encoder);
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = tlen + sizeof(SMsgHead), .pCont = buf};
tmsgSendRsp(&rsp);
pRpcInfo->handle = NULL; // disable auto rsp

View File

@ -607,7 +607,7 @@ int32_t mndStreamSetChkptIdAction(SMnode *pMnode, STrans *pTrans, SStreamTask* p
tEncoderInit(&encoder, abuf, tlen);
code = tEncodeRestoreCheckpointInfo(&encoder, &req);
tEncoderClear(&encoder);
if (code == -1) {
if (code < 0) {
taosMemoryFree(pBuf);
return code;
}

View File

@ -1009,21 +1009,34 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t code = 0;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder;
SStreamTaskRunReq req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
tqError("vgId:%d failed to decode task run req, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;
}
tDecoderClear(&decoder);
// extracted submit data from wal files for all tasks
if (pReq->reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
if (req.reqType == STREAM_EXEC_T_EXTRACT_WAL_DATA) {
return tqScanWal(pTq);
}
int32_t code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
code = tqStreamTaskProcessRunReq(pTq->pStreamMeta, pMsg, vnodeIsRoleLeader(pTq->pVnode));
if (code) {
tqError("vgId:%d failed to create task run req, code:%s", TD_VID(pTq->pVnode), tstrerror(code));
return code;
}
// let's continue scan data in the wal files
if (pReq->reqType >= 0 || pReq->reqType == STREAM_EXEC_T_RESUME_TASK) {
if (req.reqType >= 0 || req.reqType == STREAM_EXEC_T_RESUME_TASK) {
code = tqScanWalAsync(pTq, false); // it's ok to failed
if (code) {
tqError("vgId:%d failed to start scan wal file, code:%s", pTq->pStreamMeta->vgId, tstrerror(code));
@ -1297,7 +1310,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
SStreamCheckpointReadyMsg* pReq = (SStreamCheckpointReadyMsg*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId);
@ -1318,10 +1331,23 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId);
SRetrieveChkptTriggerReq req = {0};
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
tDecoderClear(&decoder);
tqError("vgId:%d invalid retrieve checkpoint-trigger req received", vgId);
return TSDB_CODE_INVALID_MSG;
}
tDecoderClear(&decoder);
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from s-task:0x%" PRId64, vgId,
req.downstreamTaskId);
return TSDB_CODE_STREAM_NOT_LEADER;
}

View File

@ -828,14 +828,25 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
}
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) {
SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t code = 0;
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder;
int32_t type = pReq->reqType;
int32_t vgId = pMeta->vgId;
int32_t code = 0;
SStreamTaskRunReq req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if ((code = tDecodeStreamTaskRunReq(&decoder, &req)) < 0) {
tqError("vgId:%d failed to decode task run req, code:%s", pMeta->vgId, tstrerror(code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;
}
tDecoderClear(&decoder);
int32_t type = req.reqType;
if (type == STREAM_EXEC_T_START_ONE_TASK) {
code = streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
code = streamMetaStartOneTask(pMeta, req.streamId, req.taskId);
return 0;
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
code = streamMetaStartAllTasks(pMeta);
@ -847,11 +858,11 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
code = streamMetaStopAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return code;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask != NULL && (code == 0)) {
char* pStatus = NULL;
@ -873,7 +884,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
}
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId, &pTask);
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if ((pTask != NULL) && (code == 0)) { // even in halt status, the data in inputQ must be processed
char* p = NULL;
if (streamTaskReadyToRun(pTask, &p)) {
@ -890,7 +901,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
return 0;
} else { // NOTE: pTask->status.schedStatus is not updated since it is not be handled by the run exec.
// todo add one function to handle this
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, pReq->taskId);
tqError("vgId:%d failed to found s-task, taskId:0x%x may have been dropped", vgId, req.taskId);
return code;
}
}
@ -976,25 +987,36 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
}
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*)pMsg->pCont;
SRetrieveChkptTriggerReq req = {0};
SStreamTask* pTask = NULL;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder = {0};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->upstreamTaskId, &pTask);
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeRetrieveChkptTriggerReq(&decoder, &req) < 0) {
tDecoderClear(&decoder);
tqError("vgId:%d invalid retrieve checkpoint-trigger req received", pMeta->vgId);
return TSDB_CODE_INVALID_MSG;
}
tDecoderClear(&decoder);
int32_t code = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId, &pTask);
if (pTask == NULL || (code != 0)) {
tqError("vgId:%d process retrieve checkpoint trigger, checkpointId:%" PRId64
" from s-task:0x%x, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->checkpointId, (int32_t)pReq->downstreamTaskId, pReq->upstreamTaskId);
pMeta->vgId, req.checkpointId, (int32_t)req.downstreamTaskId, req.upstreamTaskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
tqDebug("s-task:0x%x recv retrieve checkpoint-trigger msg from downstream s-task:0x%x, checkpointId:%" PRId64,
pReq->upstreamTaskId, (int32_t)pReq->downstreamTaskId, pReq->checkpointId);
req.upstreamTaskId, (int32_t)req.downstreamTaskId, req.checkpointId);
if (pTask->status.downstreamReady != 1) {
tqError("s-task:%s not ready for checkpoint-trigger retrieve from 0x%x, since downstream not ready",
pTask->id.idStr, (int32_t)pReq->downstreamTaskId);
pTask->id.idStr, (int32_t)req.downstreamTaskId);
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
TSDB_CODE_STREAM_TASK_IVLD_STATUS);
streamMetaReleaseTask(pMeta, pTask);
return code;
@ -1006,19 +1028,19 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
int64_t checkpointId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
if (checkpointId != pReq->checkpointId) {
if (checkpointId != req.checkpointId) {
tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
" req:%" PRId64,
pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId);
pTask->id.idStr, req.downstreamTaskId, checkpointId, req.checkpointId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_INVALID_MSG;
}
if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) {
if (streamTaskAlreadySendTrigger(pTask, req.downstreamNodeId)) {
// re-send the lost checkpoint-trigger msg to downstream task
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
(int32_t)req.downstreamTaskId, checkpointId, transId);
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
TSDB_CODE_SUCCESS);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
@ -1032,7 +1054,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"sending checkpoint-source/trigger",
pTask->id.idStr, recv, total);
}
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
TSDB_CODE_ACTION_IN_PROGRESS);
}
} else { // upstream not recv the checkpoint-source/trigger till now
@ -1044,7 +1066,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
"s-task:%s not recv checkpoint-source from mnode or checkpoint-trigger from upstream yet, wait for all "
"upstream sending checkpoint-source/trigger",
pTask->id.idStr);
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
code = streamTaskSendCheckpointTriggerMsg(pTask, req.downstreamTaskId, req.downstreamNodeId, &pMsg->info,
TSDB_CODE_ACTION_IN_PROGRESS);
}
@ -1053,23 +1075,34 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
}
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SCheckpointTriggerRsp rsp = {0};
SStreamTask* pTask = NULL;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SDecoder decoder = {0};
SStreamTask* pTask = NULL;
int32_t code = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId, &pTask);
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeCheckpointTriggerRsp(&decoder, &rsp) < 0) {
tDecoderClear(&decoder);
tqError("vgId:%d invalid retrieve checkpoint-trigger rsp received", pMeta->vgId);
return TSDB_CODE_INVALID_MSG;
}
tDecoderClear(&decoder);
int32_t code = streamMetaAcquireTask(pMeta, rsp.streamId, rsp.taskId, &pTask);
if (pTask == NULL || (code != 0)) {
tqError(
"vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pRsp->taskId);
pMeta->vgId, rsp.taskId);
return code;
}
tqDebug(
"s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, "
"checkpointId:%" PRId64 ", transId:%d",
pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
pTask->id.idStr, rsp.upstreamTaskId, rsp.checkpointId, rsp.transId);
code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);
code = streamTaskProcessCheckpointTriggerRsp(pTask, &rsp);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
@ -1199,7 +1232,22 @@ int32_t doProcessDummyRspMsg(SStreamMeta* UNUSED_PARAM(pMeta), SRpcMsg* pMsg) {
}
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return streamProcessHeartbeatRsp(pMeta, pMsg->pCont);
SMStreamHbRspMsg rsp;
int32_t len = 0;
int32_t code = 0;
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pMsg->pCont, len);
code = tDecodeStreamHbRsp(&decoder, &rsp);
if (code < 0) {
terrno = TSDB_CODE_INVALID_MSG;
tDecoderClear(&decoder);
tqError("vgId:%d failed to parse hb rsp msg, code:%s", pMeta->vgId, tstrerror(terrno));
return terrno;
}
tDecoderClear(&decoder);
return streamProcessHeartbeatRsp(pMeta, &rsp);
}
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
@ -1233,7 +1281,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SRestoreCheckpointInfo req = {0};
tDecoderInit(&decoder, (uint8_t*)msg, len);
if (tDecodeRestoreCheckpointInfo(&decoder, &req) < 0) {
if ((code = tDecodeRestoreCheckpointInfo(&decoder, &req)) < 0) {
tqError("vgId:%d failed to decode set consensus checkpointId req, code:%s", vgId, tstrerror(code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;

View File

@ -161,33 +161,52 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size);
if (pBuf == NULL) {
int32_t ret = 0;
int32_t tlen = 0;
void* buf = NULL;
SEncoder encoder;
SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId,
.upstreamTaskId = pTask->id.taskId,
.taskId = dstTaskId,
.rspCode = code};
if (code == TSDB_CODE_SUCCESS) {
req.checkpointId = pTask->chkInfo.pActiveInfo->activeId;
req.transId = pTask->chkInfo.pActiveInfo->transId;
} else {
req.checkpointId = -1;
req.transId = -1;
}
tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret);
if (ret < 0) {
stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code));
return ret;
}
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) {
stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId);
return terrno;
}
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
((SMsgHead*)buf)->vgId = htonl(downstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
pRsp->streamId = pTask->id.streamId;
pRsp->upstreamTaskId = pTask->id.taskId;
pRsp->taskId = dstTaskId;
pRsp->rspCode = code;
if (code == TSDB_CODE_SUCCESS) {
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
pRsp->transId = pTask->chkInfo.pActiveInfo->transId;
} else {
pRsp->checkpointId = -1;
pRsp->transId = -1;
tEncoderInit(&encoder, abuf, tlen);
if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code));
return ret;
}
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
return ret;
}
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
@ -1115,23 +1134,46 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
return TSDB_CODE_INVALID_PARA;
}
SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
if (pReq == NULL) {
code = terrno;
int32_t ret = 0;
int32_t tlen = 0;
void* buf = NULL;
SRpcMsg rpcMsg = {0};
SEncoder encoder;
SRetrieveChkptTriggerReq req =
{
.streamId = pTask->id.streamId,
.downstreamTaskId = pTask->id.taskId,
.downstreamNodeId = vgId,
.upstreamTaskId = pUpstreamTask->taskId,
.upstreamNodeId = pUpstreamTask->nodeId,
.checkpointId = checkpointId,
};
tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
if (ret < 0) {
stError("encode stream hb msg rsp failed, code:%s", tstrerror(code));
}
buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) {
stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId);
continue;
}
pReq->head.vgId = htonl(pUpstreamTask->nodeId);
pReq->streamId = pTask->id.streamId;
pReq->downstreamTaskId = pTask->id.taskId;
pReq->downstreamNodeId = vgId;
pReq->upstreamTaskId = pUpstreamTask->taskId;
pReq->upstreamNodeId = pUpstreamTask->nodeId;
pReq->checkpointId = checkpointId;
((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(vgId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SRpcMsg rpcMsg = {0};
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code));
continue;
}
tEncoderClear(&encoder);
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead));
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
if (code == TSDB_CODE_SUCCESS) {

View File

@ -605,6 +605,98 @@ void tCleanupStreamHbMsg(SStreamHbMsg* pMsg) {
pMsg->numOfTasks = -1;
}
int32_t tEncodeStreamHbRsp(SEncoder* pEncoder, const SMStreamHbRspMsg* pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->msgId));
tEndEncode(pEncoder);
_exit:
return code;
}
int32_t tDecodeStreamHbRsp(SDecoder* pDecoder, SMStreamHbRspMsg* pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->msgId));
tEndDecode(pDecoder);
_exit:
return code;
}
int32_t tEncodeRetrieveChkptTriggerReq(SEncoder* pEncoder, const SRetrieveChkptTriggerReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->checkpointId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamNodeId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->upstreamTaskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->downstreamNodeId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->downstreamTaskId));
tEndEncode(pEncoder);
_exit:
return code;
}
int32_t tDecodeRetrieveChkptTriggerReq(SDecoder* pDecoder, SRetrieveChkptTriggerReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->checkpointId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamNodeId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->upstreamTaskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->downstreamNodeId));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->downstreamTaskId));
tEndDecode(pDecoder);
_exit:
return code;
}
int32_t tEncodeCheckpointTriggerRsp(SEncoder* pEncoder, const SCheckpointTriggerRsp* pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->streamId));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pRsp->checkpointId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->upstreamTaskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->transId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pRsp->rspCode));
tEndEncode(pEncoder);
_exit:
return code;
}
int32_t tDecodeCheckpointTriggerRsp(SDecoder* pDecoder, SCheckpointTriggerRsp* pRsp) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->streamId));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pRsp->checkpointId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->upstreamTaskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->transId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pRsp->rspCode));
tEndDecode(pDecoder);
_exit:
return code;
}
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
int32_t code = 0;
int32_t lino;
@ -830,11 +922,7 @@ int32_t tEncodeRestoreCheckpointInfo(SEncoder* pEncoder, const SRestoreCheckpoin
tEndEncode(pEncoder);
_exit:
if (code) {
return code;
} else {
return pEncoder->pos;
}
return code;
}
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
@ -853,3 +941,31 @@ int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo*
_exit:
return code;
}
int32_t tEncodeStreamTaskRunReq (SEncoder* pEncoder, const SStreamTaskRunReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartEncode(pEncoder));
TAOS_CHECK_EXIT(tEncodeI64(pEncoder, pReq->streamId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->taskId));
TAOS_CHECK_EXIT(tEncodeI32(pEncoder, pReq->reqType));
tEndEncode(pEncoder);
_exit:
return code;
}
int32_t tDecodeStreamTaskRunReq(SDecoder* pDecoder, SStreamTaskRunReq* pReq) {
int32_t code = 0;
int32_t lino;
TAOS_CHECK_EXIT(tStartDecode(pDecoder));
TAOS_CHECK_EXIT(tDecodeI64(pDecoder, &pReq->streamId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->taskId));
TAOS_CHECK_EXIT(tDecodeI32(pDecoder, &pReq->reqType));
tEndDecode(pDecoder);
_exit:
return code;
}

View File

@ -83,13 +83,36 @@ int32_t streamTrySchedExec(SStreamTask* pTask) {
}
int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int32_t taskId, int32_t execType) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) {
int32_t code = 0;
int32_t tlen = 0;
SStreamTaskRunReq req = {.streamId = streamId, .taskId = taskId, .reqType = execType};
tEncodeSize(tEncodeStreamTaskRunReq, &req, tlen, code);
if (code < 0) {
stError("s-task:0x%" PRIx64 " vgId:%d encode stream task run req failed, code:%s", streamId, vgId, tstrerror(code));
return code;
}
void* buf = rpcMallocCont(tlen + sizeof(SMsgHead));
if (buf == NULL) {
stError("vgId:%d failed to create msg to start stream task:0x%x exec, type:%d, code:%s", vgId, taskId, execType,
tstrerror(terrno));
return terrno;
}
((SMsgHead*)buf)->vgId = vgId;
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskRunReq(&encoder, &req)) < 0) {
rpcFreeCont(buf);
tEncoderClear(&encoder);
stError("s-task:%s vgId:%d encode stream task checkpoint-report msg failed, code:%s", taskId, vgId, tstrerror(code));
return code;
}
tEncoderClear(&encoder);
if (streamId != 0) {
stDebug("vgId:%d create msg to for task:0x%x, exec type:%d, %s", vgId, taskId, execType,
streamTaskGetExecType(execType));
@ -97,13 +120,8 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
stDebug("vgId:%d create msg to exec, type:%d, %s", vgId, execType, streamTaskGetExecType(execType));
}
pRunReq->head.vgId = vgId;
pRunReq->streamId = streamId;
pRunReq->taskId = taskId;
pRunReq->reqType = execType;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
int32_t code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = buf, .contLen = tlen + sizeof(SMsgHead)};
code = tmsgPutToQueue(pMsgCb, STREAM_QUEUE, &msg);
if (code) {
stError("vgId:%d failed to put msg into stream queue, code:%s, %x", vgId, tstrerror(code), taskId);
}