fix(stream): consensus the start checkpoint id, and extract the streamhb related functions into a new file.

This commit is contained in:
Haojun Liao 2024-06-26 10:18:32 +08:00
parent b668532713
commit 1818edcb2b
24 changed files with 543 additions and 353 deletions

View File

@ -3485,12 +3485,6 @@ int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamR
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
void tFreeMDropStreamReq(SMDropStreamReq* pReq);
typedef struct {
int64_t recoverObjUid;
int32_t taskId;
int32_t hasCheckPoint;
} SMVStreamGatherInfoReq;
typedef struct SVUpdateCheckpointInfoReq {
SMsgHead head;
int64_t streamId;
@ -3516,50 +3510,8 @@ typedef struct {
int64_t suid;
} SMqRebVgReq;
static FORCE_INLINE int tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1;
if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1;
if (tEncodeI8(pCoder, pReq->subType) < 0) return -1;
if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1;
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
static FORCE_INLINE int tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1;
if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1;
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
}
}
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq);
int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq);
typedef struct {
char topic[TSDB_TOPIC_FNAME_LEN];

View File

@ -249,6 +249,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_TSMA, "get-table-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TSMA, "get-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TB_WITH_TSMA, "drop-tb-with-tsma", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)
@ -333,7 +334,7 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) //1035 1036
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESTORE_CHECKPOINT, "stream-restore-checkpoint", NULL, NULL) //unused
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_PAUSE, "stream-task-pause", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RESUME, "stream-task-resume", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_STOP, "stream-task-stop", NULL, NULL)

View File

@ -29,6 +29,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessChkptReportRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen,
bool isLeader, bool restored);

View File

@ -17,6 +17,7 @@
#define TDENGINE_STREAMMSG_H
#include "tmsg.h"
#include "trpc.h"
#ifdef __cplusplus
extern "C" {
@ -204,6 +205,26 @@ typedef struct SCheckpointReport {
int32_t tEncodeStreamTaskChkptReport(SEncoder* pEncoder, const SCheckpointReport* pReq);
int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq);
typedef struct SRestoreCheckpointInfo {
SMsgHead head;
int64_t streamId;
int64_t checkpointId; // latest checkpoint id
int32_t taskId;
int32_t nodeId;
} SRestoreCheckpointInfo;
int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
typedef struct SRestoreCheckpointInfoRsp {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
} SRestoreCheckpointInfoRsp;
int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo);
int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo);
typedef struct {
SMsgHead head;
int64_t streamId;
@ -211,6 +232,12 @@ typedef struct {
int32_t reqType;
} SStreamTaskRunReq;
typedef struct SCheckpointConsensusInfo {
SRestoreCheckpointInfo req;
SRpcMsg rsp;
int64_t ts;
} SCheckpointConsensusInfo;
#ifdef __cplusplus
}
#endif

View File

@ -17,6 +17,7 @@
#define TDENGINE_TSTREAM_H
#include "os.h"
#include "streamMsg.h"
#include "streamState.h"
#include "tdatablock.h"
#include "tdbInt.h"
@ -24,7 +25,6 @@
#include "tmsgcb.h"
#include "tqueue.h"
#include "ttimer.h"
#include "streammsg.h"
#ifdef __cplusplus
extern "C" {
@ -704,6 +704,7 @@ void streamTaskSetRemoveBackendFiles(SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask);
// source level
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow);
@ -750,10 +751,11 @@ void streamMetaResetStartInfo(STaskStartInfo* pMeta);
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
void streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn fn);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn fn);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask);
// timer
tmr_h streamTimerGetInstance();
@ -789,6 +791,8 @@ int32_t streamProcessRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* pReq);
int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq *req);
void streamTaskSendRetrieveRsp(SStreamRetrieveReq *pReq, SRpcMsg* pRsp);
int32_t streamTaskSendLatestCheckpointInfo(SStreamTask* pTask);
#ifdef __cplusplus
}
#endif

View File

@ -9249,6 +9249,52 @@ int32_t tDecodeSTqCheckInfo(SDecoder *pDecoder, STqCheckInfo *pInfo) {
}
void tDeleteSTqCheckInfo(STqCheckInfo *pInfo) { taosArrayDestroy(pInfo->colIdList); }
int32_t tEncodeSMqRebVgReq(SEncoder* pCoder, const SMqRebVgReq* pReq) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pReq->leftForVer) < 0) return -1;
if (tEncodeI32(pCoder, pReq->vgId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->oldConsumerId) < 0) return -1;
if (tEncodeI64(pCoder, pReq->newConsumerId) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->subKey) < 0) return -1;
if (tEncodeI8(pCoder, pReq->subType) < 0) return -1;
if (tEncodeI8(pCoder, pReq->withMeta) < 0) return -1;
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
if (tEncodeI64(pCoder, pReq->suid) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->qmsg) < 0) return -1;
}
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeSMqRebVgReq(SDecoder* pCoder, SMqRebVgReq* pReq) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->leftForVer) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->vgId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->oldConsumerId) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->newConsumerId) < 0) return -1;
if (tDecodeCStrTo(pCoder, pReq->subKey) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->subType) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->withMeta) < 0) return -1;
if (pReq->subType == TOPIC_SUB_TYPE__COLUMN) {
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
} else if (pReq->subType == TOPIC_SUB_TYPE__TABLE) {
if (tDecodeI64(pCoder, &pReq->suid) < 0) return -1;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeCStr(pCoder, &pReq->qmsg) < 0) return -1;
}
}
tEndDecode(pCoder);
return 0;
}
int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
int32_t nUid = taosArrayGetSize(pRes->uidList);

View File

@ -242,6 +242,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -96,6 +96,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -972,6 +972,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_CONSEN_RSP, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -34,6 +34,7 @@ extern "C" {
#define MND_STREAM_TASK_RESET_NAME "stream-task-reset"
#define MND_STREAM_TASK_UPDATE_NAME "stream-task-update"
#define MND_STREAM_CHKPT_UPDATE_NAME "stream-chkpt-update"
#define MND_STREAM_CHKPT_CONSEN_NAME "stream-chkpt-consen"
typedef struct SStreamTransInfo {
int64_t startTime;
@ -61,6 +62,7 @@ typedef struct SStreamExecInfo {
TdThreadMutex lock;
SHashObj *pTransferStateStreams;
SHashObj *pChkptStreams;
SHashObj *pStreamConsensus;
} SStreamExecInfo;
extern SStreamExecInfo execInfo;
@ -131,6 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
int32_t mndStreamSetRestoreCheckpointId(SArray* pList, int64_t checkpointId);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);
@ -141,6 +145,8 @@ void mndInitExecInfo();
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
void mndAddConsensusTasks(SArray *pList, const SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg);
int64_t mndGetConsensusCheckpointId(SArray *pList, SStreamObj *pStream);
#ifdef __cplusplus
}

View File

@ -59,6 +59,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray* pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
@ -117,6 +118,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_CONSEN, mndProcessConsensusCheckpointId);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
@ -2514,7 +2516,7 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
return 0;
}
static void doAddTaskInfo(SArray* pList, SCheckpointReport* pReport) {
static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pReport) {
bool existed = false;
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
STaskChkptInfo* p = taosArrayGet(pList ,i);
@ -2586,12 +2588,12 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo));
doAddTaskInfo(pList, &req);
doAddReportStreamTask(pList, &req);
taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
} else {
doAddTaskInfo(*pReqTaskList, &req);
doAddReportStreamTask(*pReqTaskList, &req);
}
int32_t total = taosArrayGetSize(*pReqTaskList);
@ -2599,25 +2601,6 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
" will be issued soon",
req.streamId, pStream->name, total, req.checkpointId);
// if (pStream != NULL) {
// bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
// if (conflict) {
// mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", req.streamId);
// } else {
// int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, *pReqTaskList);
// if (code == TSDB_CODE_SUCCESS) { // remove this entry
// taosHashRemove(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
//
// int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
// mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", req.streamId,
// numOfStreams);
// } else {
// mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet",
// req.streamId);
// }
// }
// }
}
if (pStream != NULL) {
@ -2639,6 +2622,102 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
return 0;
}
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
SDecoder decoder = {0};
SRestoreCheckpointInfo req = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamTaskLatestChkptInfo(&decoder, &req)) {
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
mError("invalid task consensus-checkpoint msg received");
return -1;
}
tDecoderClear(&decoder);
mDebug("receive stream task consensus-checkpoint msg, vgId:%d, s-task:0x%" PRIx64 "-0x%x, checkpointId:%" PRId64,
req.nodeId, req.streamId, req.taskId, req.checkpointId);
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
taosThreadMutexLock(&execInfo.lock);
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
if (pStream == NULL) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
taosThreadMutexUnlock(&execInfo.lock);
return -1;
} else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
req.streamId, req.taskId);
}
}
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SArray **pTaskList = (SArray **)taosHashGet(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId));
if (pTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(SCheckpointConsensusInfo));
mndAddConsensusTasks(pList, &req, pReq);
taosHashPut(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
pTaskList = (SArray **)taosHashGet(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId));
} else {
mndAddConsensusTasks(*pTaskList, &req, pReq);
}
int32_t total = taosArrayGetSize(*pTaskList);
if (total == numOfTasks) { // all tasks has send the reqs
// start transaction to set the checkpoint id
int64_t checkpointId = mndGetConsensusCheckpointId(*pTaskList, pStream);
mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64
" will be issued soon",
req.streamId, pStream->name, total, checkpointId);
// start the checkpoint consensus trans
int32_t code = mndStreamSetRestoreCheckpointId(*pTaskList, checkpointId);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
taosHashRemove(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId));
int32_t numOfStreams = taosHashGetSize(execInfo.pStreamConsensus);
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", req.streamId, numOfStreams);
} else {
mDebug("stream:0x%" PRIx64 " not start set consensus-checkpointId trans, due to not all task ready",
req.streamId);
}
} else {
mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks);
}
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
taosThreadMutexUnlock(&execInfo.lock);
pReq->info.handle = NULL; // disable auto rsp
// { // start an transaction to set the start checkpoint id
// SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SRestoreCheckpointInfoRsp)};
// rsp.pCont = rpcMallocCont(rsp.contLen);
// SMsgHead *pHead = rsp.pCont;
// pHead->vgId = htonl(req.nodeId);
//
// tmsgSendRsp(&rsp);
// pReq->info.handle = NULL; // disable auto rsp
// }
return 0;
}
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
int32_t code = mndProcessCreateStreamReq(pReq);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -300,5 +300,3 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("complete clear checkpoints in Dbs");
}

View File

@ -575,10 +575,12 @@ void mndInitExecInfo() {
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pTransferStateStreams = taosHashInit(32, fn, true, HASH_NO_LOCK);
execInfo.pChkptStreams = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
execInfo.pStreamConsensus = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
execInfo.pNodeList = taosArrayInit(4, sizeof(SNodeEntry));
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
}
void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
@ -817,4 +819,80 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
taosArrayDestroy(pDropped);
return TSDB_CODE_SUCCESS;
}
static int32_t doSendRestoreCheckpointInfo(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) {
int32_t code = 0;
int32_t blen;
SRestoreCheckpointInfoRsp req = {.streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId};
tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code);
if (code < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
tEncodeRestoreCheckpointInfoRsp(&encoder, &req);
SMsgHead *pMsgHead = (SMsgHead *)pMsg->pCont;
pMsgHead->contLen = htonl(tlen);
pMsgHead->vgId = htonl(pInfo->nodeId);
tEncoderClear(&encoder);
tmsgSendRsp(pMsg);
return code;
}
int32_t mndStreamSetRestoreCheckpointId(SArray* pInfoList, int64_t checkpointId) {
for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) {
SCheckpointConsensusInfo* pInfo = taosArrayGet(pInfoList, i);
doSendRestoreCheckpointInfo(&pInfo->req, &pInfo->rsp, checkpointId);
}
return 0;
}
void mndAddConsensusTasks(SArray* pList, const SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg) {
bool existed = false;
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
STaskChkptInfo* p = taosArrayGet(pList ,i);
if (p->taskId == pInfo->taskId) {
existed = true;
break;
}
}
if (!existed) {
SCheckpointConsensusInfo info = {0};
memcpy(&info.req, pInfo, sizeof(info.req));
info.rsp.code = 0;
info.rsp.info = pMsg->info;
info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead);
info.rsp.pCont = rpcMallocCont(info.rsp.contLen);
SMsgHead *pHead = info.rsp.pCont;
pHead->vgId = htonl(pInfo->nodeId);
taosArrayPush(pList, &info);
}
}
int64_t mndGetConsensusCheckpointId(SArray* pList, SStreamObj* pStream) {
int64_t checkpointId = INT64_MAX;
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SCheckpointConsensusInfo *pInfo = taosArrayGet(pList, i);
if (pInfo->req.checkpointId < checkpointId) {
checkpointId = pInfo->req.checkpointId;
mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name,
pInfo->req.taskId, pInfo->req.nodeId, pInfo->req.checkpointId);
}
}
return checkpointId;
}

View File

@ -155,6 +155,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
case TDMT_STREAM_TASK_UPDATE_CHKPT:
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen);
case TDMT_MND_STREAM_CHKPT_CONSEN_RSP:
return tqStreamProcessConsensusChkptRsp(pSnode->pMeta, pMsg);
default:
ASSERT(0);
}

View File

@ -262,6 +262,7 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamReqCheckpointRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqBuildStreamTask(void* pTq, SStreamTask* pTask, int64_t ver);

View File

@ -1029,7 +1029,6 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
//
return 0;
}
@ -1273,3 +1272,7 @@ int32_t tqProcessTaskCheckpointReadyRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskChkptReportRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessChkptReportRsp(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskConsensusChkptRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessConsensusChkptRsp(pTq->pStreamMeta, pMsg);
}

View File

@ -121,7 +121,6 @@ int32_t tqStreamTaskStartAsync(SStreamMeta* pMeta, SMsgCb* cb, bool restart) {
int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
@ -132,6 +131,26 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
}
int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
if (numOfTasks == 0) {
tqDebug("vgId:%d no stream tasks existed to run", vgId);
return 0;
}
tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
if (pTask == NULL) {
tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
int32_t code = streamTaskSendConsensusChkptMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -706,7 +725,8 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
if (isLeader && !tsDisableStream) {
streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
ASSERT(0);
streamMetaStartAllTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaWUnLock(pMeta);
@ -724,10 +744,10 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t vgId = pMeta->vgId;
if (type == STREAM_EXEC_T_START_ONE_TASK) {
streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId, tqExpandStreamTask);
streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
return 0;
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
streamMetaStartAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
restartStreamTasks(pMeta, isLeader);
@ -854,7 +874,8 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
} else if (pState->state == TASK_STATUS__UNINIT) {
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
ASSERT(pTask->status.downstreamReady == 0);
tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
// tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
} else {
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
}
@ -1076,6 +1097,70 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
}
streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t vgId = pMeta->vgId;
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
bool updateCheckpointId = false;
SRestoreCheckpointInfoRsp req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
rsp.info.handle = NULL;
if (tDecodeRestoreCheckpointInfoRsp(&decoder, &req) < 0) {
// rsp.code = TSDB_CODE_MSG_DECODE_ERROR; // disable it temporarily
tqError("vgId:%d failed to decode restore task checkpointId, code:%s", vgId, tstrerror(rsp.code));
tDecoderClear(&decoder);
return TSDB_CODE_SUCCESS;
}
tDecoderClear(&decoder);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
req.taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s vgId:%d checkpointId:%" PRId64 " restore to consensus-checkpointId:%" PRId64 " from mnode",
pTask->id.idStr, vgId, pTask->chkInfo.checkpointId, req.checkpointId);
taosThreadMutexLock(&pTask->lock);
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
if ((pTask->chkInfo.checkpointId != req.checkpointId) && req.checkpointId != 0) {
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
pTask->chkInfo.checkpointId, req.checkpointId);
pTask->chkInfo.checkpointId = req.checkpointId;
updateCheckpointId = true;
streamMetaSaveTask(pMeta, pTask);
}
taosThreadMutexUnlock(&pTask->lock);
if (updateCheckpointId) {
streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
streamMetaWUnLock(pMeta);
}
// todo: set the update transId, and discard with less transId.
if (pMeta->role == NODE_ROLE_LEADER) {
/*code = */tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
} else {
tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -647,6 +647,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
tqProcessTaskResetReq(pVnode->pTq, pMsg);
}
} break;
case TDMT_MND_STREAM_CHKPT_CONSEN_RSP: {
if (pVnode->restored) {
tqProcessTaskConsensusChkptRsp(pVnode->pTq, pMsg);
}
} break;
case TDMT_VND_ALTER_CONFIRM:
needCommit = pVnode->config.hashChange;
if (vnodeProcessAlterConfirmReq(pVnode, ver, pReq, len, pRsp) < 0) {

View File

@ -157,6 +157,7 @@ extern void* streamTimer;
extern int32_t streamBackendId;
extern int32_t streamBackendCfWrapperId;
extern int32_t taskDbWrapperId;
extern int32_t streamMetaId;
int32_t streamTimerInit();
void streamTimerCleanUp();
@ -211,7 +212,12 @@ void* streamQueueNextItem(SStreamQueue* pQueue);
void streamFreeQitem(SStreamQueueItem* data);
int32_t streamQueueGetItemSize(const SStreamQueue* pQueue);
void streamMetaRemoveDB(void* arg, char* key);
void streamMetaRemoveDB(void* arg, char* key);
void streamMetaHbToMnode(void* param, void* tmrId);
SMetaHbInfo* createMetaHbInfo(int64_t* pRid);
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta);
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount);
int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();

View File

@ -443,13 +443,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
// if (restored && (pStatus->state != TASK_STATUS__CK)) {
// stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
// " failed",
// pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
// taosThreadMutexUnlock(&pTask->lock);
// return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
// }
if (restored && (pStatus->state != TASK_STATUS__CK)) {
stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" failed",
pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (!restored) { // during restore procedure, do update checkpoint-info
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
@ -1103,3 +1103,46 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
s3DeleteObjects((const char**)&tmp, 1);
return 0;
}
int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) {
int32_t code;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
ASSERT(pTask->pBackend == NULL);
SRestoreCheckpointInfo req = {
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId, .checkpointId = pInfo->checkpointId};
tEncodeSize(tEncodeStreamTaskLatestChkptInfo, &req, tlen, code);
if (code < 0) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskLatestChkptInfo(&encoder, &req)) < 0) {
rpcFreeCont(buf);
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen);
stDebug("s-task:%s vgId:%d send task latest-checkpoint-id to mnode:%" PRId64 " to reach the consensus checkpointId",
id, vgId, pInfo->checkpointId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;
}

View File

@ -27,10 +27,8 @@ static TdThreadOnce streamMetaModuleInit = PTHREAD_ONCE_INIT;
int32_t streamBackendId = 0;
int32_t streamBackendCfWrapperId = 0;
int32_t streamMetaId = 0;
int32_t taskDbWrapperId = 0;
static void metaHbToMnode(void* param, void* tmrId);
static int32_t streamMetaBegin(SStreamMeta* pMeta);
static void streamMetaCloseImpl(void* arg);
@ -39,14 +37,6 @@ typedef struct {
SHashObj* pTable;
} SMetaRefMgt;
struct SMetaHbInfo {
tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter;
int32_t hbCount;
int64_t hbStart;
};
typedef struct STaskInitTs {
int64_t start;
int64_t end;
@ -352,12 +342,6 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
goto _err;
}
pMeta->pHbInfo = taosMemoryCalloc(1, sizeof(SMetaHbInfo));
if (pMeta->pHbInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
// task list
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
if (pMeta->pTaskList == NULL) {
@ -400,9 +384,12 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTas
memcpy(pRid, &pMeta->rid, sizeof(pMeta->rid));
metaRefMgtAdd(pMeta->vgId, pRid);
pMeta->pHbInfo->hbTmr = taosTmrStart(metaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pMeta->pHbInfo->tickCounter = 0;
pMeta->pHbInfo->stopFlag = 0;
pMeta->pHbInfo = createMetaHbInfo(pRid);
if (pMeta->pHbInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pMeta->qHandle = taosInitScheduler(32, 1, "stream-chkp", NULL);
pMeta->bkdChkptMgt = bkdMgtCreate(tpath);
@ -960,213 +947,6 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
taosArrayDestroy(pRecycleList);
}
static bool waitForEnoughDuration(SMetaHbInfo* pInfo) {
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
pInfo->tickCounter = 0;
return true;
}
return false;
}
static bool existInHbMsg(SStreamHbMsg* pMsg, SDownstreamTaskEpset* pTaskEpset) {
int32_t numOfExisted = taosArrayGetSize(pMsg->pUpdateNodes);
for (int k = 0; k < numOfExisted; ++k) {
if (pTaskEpset->nodeId == *(int32_t*)taosArrayGet(pMsg->pUpdateNodes, k)) {
return true;
}
}
return false;
}
static void addUpdateNodeIntoHbMsg(SStreamTask* pTask, SStreamHbMsg* pMsg) {
SStreamMeta* pMeta = pTask->pMeta;
taosThreadMutexLock(&pTask->lock);
int32_t num = taosArrayGetSize(pTask->outputInfo.pNodeEpsetUpdateList);
for (int j = 0; j < num; ++j) {
SDownstreamTaskEpset* pTaskEpset = taosArrayGet(pTask->outputInfo.pNodeEpsetUpdateList, j);
bool exist = existInHbMsg(pMsg, pTaskEpset);
if (!exist) {
taosArrayPush(pMsg->pUpdateNodes, &pTaskEpset->nodeId);
stDebug("vgId:%d nodeId:%d added into hb update list, total:%d", pMeta->vgId, pTaskEpset->nodeId,
(int32_t)taosArrayGetSize(pMsg->pUpdateNodes));
}
}
taosArrayClear(pTask->outputInfo.pNodeEpsetUpdateList);
taosThreadMutexUnlock(&pTask->lock);
}
static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
SStreamHbMsg hbMsg = {0};
SEpSet epset = {0};
bool hasMnodeEpset = false;
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
hbMsg.vgId = pMeta->vgId;
hbMsg.pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
hbMsg.pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pId->streamId, .taskId = pId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (pTask == NULL) {
continue;
}
// not report the status of fill-history task
if ((*pTask)->info.fillHistory == 1) {
continue;
}
STaskStatusEntry entry = {
.id = id,
.status = streamTaskGetStatus(*pTask)->state,
.nodeId = hbMsg.vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
.startTime = (*pTask)->execInfo.readyTs,
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
.checkpointInfo.latestSize = 0,
.checkpointInfo.remoteBackup = 0,
.hTaskId = (*pTask)->hTaskInfo.id.taskId,
.procsTotal = SIZE_IN_MiB((*pTask)->execInfo.inputDataSize),
.outputTotal = SIZE_IN_MiB((*pTask)->execInfo.outputDataSize),
.procsThroughput = SIZE_IN_KiB((*pTask)->execInfo.procsThroughput),
.outputThroughput = SIZE_IN_KiB((*pTask)->execInfo.outputThroughput),
.startCheckpointId = (*pTask)->execInfo.startCheckpointId,
.startCheckpointVer = (*pTask)->execInfo.startCheckpointVer,
};
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {
entry.sinkQuota = (*pTask)->outputInfo.pTokenBucket->quotaRate;
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
}
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) {
entry.checkpointInfo.failed = ((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId;
if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.pActiveInfo->transId);
}
}
if ((*pTask)->exec.pWalReader != NULL) {
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
if (entry.processedVer < 0) {
entry.processedVer = (*pTask)->chkInfo.processedVer;
}
walReaderValidVersionRange((*pTask)->exec.pWalReader, &entry.verRange.minVer, &entry.verRange.maxVer);
}
addUpdateNodeIntoHbMsg(*pTask, &hbMsg);
taosArrayPush(hbMsg.pTaskStatus, &entry);
if (!hasMnodeEpset) {
epsetAssign(&epset, &(*pTask)->info.mnodeEpset);
hasMnodeEpset = true;
}
}
hbMsg.numOfTasks = taosArrayGetSize(hbMsg.pTaskStatus);
if (hasMnodeEpset) {
int32_t code = 0;
int32_t tlen = 0;
tEncodeSize(tEncodeStreamHbMsg, &hbMsg, tlen, code);
if (code < 0) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
goto _end;
}
void* buf = rpcMallocCont(tlen);
if (buf == NULL) {
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
goto _end;
}
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamHbMsg(&encoder, &hbMsg)) < 0) {
rpcFreeCont(buf);
stError("vgId:%d encode stream hb msg failed, code:%s", pMeta->vgId, tstrerror(code));
goto _end;
}
tEncoderClear(&encoder);
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_HEARTBEAT, buf, tlen);
pMeta->pHbInfo->hbCount += 1;
stDebug("vgId:%d build and send hb to mnode, numOfTasks:%d total:%d", pMeta->vgId, hbMsg.numOfTasks,
pMeta->pHbInfo->hbCount);
tmsgSendReq(&epset, &msg);
} else {
stDebug("vgId:%d no tasks and no mnd epset, not send stream hb to mnode", pMeta->vgId);
}
_end:
tCleanupStreamHbMsg(&hbMsg);
return TSDB_CODE_SUCCESS;
}
void metaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid);
return;
}
// need to stop, stop now
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP;
stDebug("vgId:%d jump out of meta timer", pMeta->vgId);
taosReleaseRef(streamMetaId, rid);
return;
}
// not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role);
taosReleaseRef(streamMetaId, rid);
pMeta->pHbInfo->hbStart = 0;
return;
}
// set the hb start time
if (pMeta->pHbInfo->hbStart == 0) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs();
}
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
return;
}
stDebug("vgId:%d build stream task hb, leader:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER));
streamMetaRLock(pMeta);
metaHeartbeatToMnodeImpl(pMeta);
streamMetaRUnLock(pMeta);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr);
taosReleaseRef(streamMetaId, rid);
}
bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false;
streamMetaRLock(pMeta);
@ -1192,14 +972,19 @@ bool streamMetaTaskInTimer(SStreamMeta* pMeta) {
void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
int64_t startTs = 0;
int32_t sendCount = 0;
streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
vgId, (pMeta->role == NODE_ROLE_LEADER), pMeta->pHbInfo->hbStart, pMeta->pHbInfo->hbCount);
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
// wait for the stream meta hb function stopping
streamMetaWaitForHbTmrQuit(pMeta);
streamMetaWLock(pMeta);
pMeta->closeFlag = true;
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasksMap, pIter);
@ -1214,15 +999,6 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
streamMetaWUnLock(pMeta);
// wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP;
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
}
stDebug("vgId:%d start to check all tasks for closing", vgId);
int64_t st = taosGetTimestampMs();
@ -1239,7 +1015,7 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
metaRefMgtAdd(pMeta->vgId, pRid);
*pRid = pMeta->rid;
metaHbToMnode(pRid, NULL);
streamMetaHbToMnode(pRid, NULL);
}
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
@ -1328,7 +1104,7 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
streamMetaReleaseTask(pMeta, pTask);
}
metaHeartbeatToMnodeImpl(pMeta);
streamMetaSendHbHelper(pMeta);
pMeta->sendMsgBeforeClosing = false;
return pTaskList;
}
@ -1378,16 +1154,17 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64
return TSDB_CODE_SUCCESS;
}
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expandFn) {
// restore the checkpoint id by negotiating the latest consensus checkpoint id
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int64_t now = taosGetTimestampMs();
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status, start ts:%"PRId64, vgId, numOfTasks, now);
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%"PRId64, vgId, numOfTasks, now);
if (numOfTasks == 0) {
stInfo("vgId:%d no tasks to be started", pMeta->vgId);
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
return TSDB_CODE_SUCCESS;
}
@ -1398,11 +1175,11 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
return TSDB_CODE_SUCCESS;
}
// broadcast the check downstream tasks msg
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
numOfTasks = taosArrayGetSize(pTaskList);
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization , when the operation of check downstream tasks status is executed far quickly.
// initialization, when the operation of check downstream tasks status is executed far quickly.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
@ -1412,19 +1189,18 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
continue;
}
if (pTask->pBackend == NULL) { // TODO: add test cases for this
code = expandFn(pTask);
if ((pTask->pBackend == NULL) && (pTask->info.fillHistory == 1 || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
code = pMeta->expandTaskFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
}
} else {
stDebug("s-task:0x%x vgId:%d fill-history task backend has initialized already", pTaskId->taskId, vgId);
}
streamMetaReleaseTask(pMeta, pTask);
}
// Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
@ -1457,16 +1233,28 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expa
continue;
}
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret;
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
code = ret;
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// negotiate the consensus checkpoint id for current task
ASSERT(pTask->pBackend == NULL);
code = streamTaskSendConsensusChkptMsg(pTask);
// this task may has no checkpoint, but others tasks may generate checkpoint already?
streamMetaReleaseTask(pMeta, pTask);
}
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
// initialization, when the operation of check downstream tasks status is executed far quickly.
stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
taosArrayDestroy(pTaskList);
return code;
@ -1527,7 +1315,7 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
return true;
}
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) {
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t code = 0;
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
@ -1548,7 +1336,7 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
ASSERT(pTask->status.downstreamReady == 0);
if (pTask->pBackend == NULL) {
code = expandFn(pTask);
code = pMeta->expandTaskFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
}

View File

@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "streamMsg.h"
#include "os.h"
#include "streammsg.h"
#include "tstream.h"
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamUpstreamEpInfo* pInfo) {
@ -77,7 +77,6 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp
return pEncoder->pos;
}
int32_t tEncodeStreamTaskUpdateMsg(SEncoder* pEncoder, const SStreamTaskNodeUpdateMsg* pMsg) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pMsg->streamId) < 0) return -1;
@ -624,4 +623,42 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq
if (tDecodeI8(pDecoder, &pReq->dropHTask) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->nodeId) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}
int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1;
if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1;
if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1;
tEndEncode(pCoder);
return 0;
}
int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1;
if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1;
tEndDecode(pCoder);
return 0;
}

View File

@ -832,6 +832,34 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->hTaskId = pSrc->hTaskId;
}
STaskStatusEntry streamTaskGetStatusEntry(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
STaskStatusEntry entry = {
.id = streamTaskGetTaskId(pTask),
.status = streamTaskGetStatus(pTask)->state,
.nodeId = pMeta->vgId,
.stage = pMeta->stage,
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize(pTask->inputq.queue)),
.startTime = pExecInfo->readyTs,
.checkpointInfo.latestId = pTask->chkInfo.checkpointId,
.checkpointInfo.latestVer = pTask->chkInfo.checkpointVer,
.checkpointInfo.latestTime = pTask->chkInfo.checkpointTime,
.checkpointInfo.latestSize = 0,
.checkpointInfo.remoteBackup = 0,
.hTaskId = pTask->hTaskInfo.id.taskId,
.procsTotal = SIZE_IN_MiB(pExecInfo->inputDataSize),
.outputTotal = SIZE_IN_MiB(pExecInfo->outputDataSize),
.procsThroughput = SIZE_IN_KiB(pExecInfo->procsThroughput),
.outputThroughput = SIZE_IN_KiB(pExecInfo->outputThroughput),
.startCheckpointId = pExecInfo->startCheckpointId,
.startCheckpointVer = pExecInfo->startCheckpointVer,
};
return entry;
}
static int32_t taskPauseCallback(SStreamTask* pTask, void* param) {
SStreamMeta* pMeta = pTask->pMeta;

View File

@ -199,9 +199,7 @@ static int32_t doHandleWaitingEvent(SStreamTaskSM* pSM, const char* pEventName,
static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent event) {
SStreamTaskSM* pSM = pTask->status.pSM;
bool removed = false;
taosThreadMutexLock(&pTask->lock);
bool removed = false;
int32_t num = taosArrayGetSize(pSM->pWaitingEventList);
for (int32_t i = 0; i < num; ++i) {
SFutureHandleEventInfo* pInfo = taosArrayGet(pSM->pWaitingEventList, i);
@ -218,7 +216,6 @@ static int32_t removeEventInWaitingList(SStreamTask* pTask, EStreamTaskEvent eve
stDebug("s-task:%s failed to remove event:%s in waiting list", pTask->id.idStr, StreamTaskEventList[event].name);
}
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}