fix(stream): refactor the checkpoint consensus policy.

This commit is contained in:
Haojun Liao 2024-07-04 13:55:50 +08:00
parent 27cb3638c2
commit 653f7a1a43
12 changed files with 223 additions and 166 deletions

View File

@ -251,6 +251,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_CONSEN, "stream-chkpt-consen", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CONSEN_TIMER, "stream-consen-tmr", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_MAX_MSG, "mnd-max", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_MND_MSG)

View File

@ -242,8 +242,8 @@ typedef struct {
typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
SRpcMsg rsp;
int64_t ts;
SRpcHandleInfo rspInfo;
int64_t ts;
} SCheckpointConsensusEntry;
#ifdef __cplusplus

View File

@ -616,8 +616,9 @@ typedef struct SStreamTaskState {
typedef struct SCheckpointConsensusInfo {
SArray* pTaskList;
int64_t checkpointId;
int64_t genTs;
// int64_t checkpointId;
// int64_t genTs;
int32_t numOfTasks;
} SCheckpointConsensusInfo;
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);

View File

@ -133,7 +133,8 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
int32_t mndSendConsensusCheckpointIdRsp(SArray* pList, int64_t checkpointId);
int32_t mndSendQuickConsensusChkptIdRsp(SRestoreCheckpointInfo *pReq, int32_t code, int64_t streamId,
int64_t checkpointId, SRpcHandleInfo *pRpcInfo);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
@ -146,10 +147,9 @@ void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInf
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg);
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo *pInfo, SStreamObj *pStream);
bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numOfTasks, int32_t* pTotal);
SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTasks);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo,
SRpcHandleInfo *pRpcInfo);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId);
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);

View File

@ -177,6 +177,15 @@ static void mndStreamCheckNode(SMnode *pMnode) {
}
}
static void mndStreamConsensusChkpt(SMnode *pMnode) {
int32_t contLen = 0;
void *pReq = mndBuildTimerMsg(&contLen);
if (pReq != NULL) {
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_CONSEN_TIMER, .pCont = pReq, .contLen = contLen};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}
static void mndPullupTelem(SMnode *pMnode) {
mTrace("pullup telem msg");
int32_t contLen = 0;
@ -308,7 +317,6 @@ static int32_t minCronTime() {
min = TMIN(min, tsCompactPullupInterval);
min = TMIN(min, tsMqRebalanceInterval);
min = TMIN(min, tsStreamCheckpointInterval);
min = TMIN(min, 6); // checkpointRemain
min = TMIN(min, tsStreamNodeCheckInterval);
min = TMIN(min, tsArbHeartBeatIntervalSec);
min = TMIN(min, tsArbCheckSyncIntervalSec);
@ -353,6 +361,10 @@ void mndDoTimerPullupTask(SMnode *pMnode, int64_t sec) {
mndStreamCheckNode(pMnode);
}
if (sec % 5 == 0) {
mndStreamConsensusChkpt(pMnode);
}
if (sec % tsTelemInterval == (TMIN(60, (tsTelemInterval - 1)))) {
mndPullupTelem(pMnode);
}

View File

@ -59,7 +59,8 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
static int32_t extractNodeListFromStream(SMnode *pMnode, SArray *pNodeList);
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
static int32_t mndProcessCheckpointReport(SRpcMsg *pReq);
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq);
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg);
static int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
@ -123,6 +124,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CONSEN_TIMER, mndProcessConsensusInTmr);
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RESUME_STREAM, mndProcessResumeStreamReq);
@ -803,7 +805,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
mDebug("stream stream:%s start to register tasks into task nodeList and set initial checkpointId", createReq.name);
saveTaskAndNodeInfoIntoBuf(&streamObj, &execInfo);
mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid);
// mndRegisterConsensusChkptId(execInfo.pStreamConsensus, streamObj.uid);
taosThreadMutexUnlock(&execInfo.lock);
// execute creation
@ -2625,12 +2627,42 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
return 0;
}
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, bool* pAllEqual) {
int32_t num = 0;
int64_t chkId = INT64_MAX;
*pAllEqual = true;
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pTaskList); ++i) {
STaskId* p = taosArrayGet(execInfo.pTaskList, i);
if (p->streamId != streamId) {
continue;
}
num += 1;
STaskStatusEntry* pe = taosHashGet(execInfo.pTaskMap, p, sizeof(*p));
if (chkId != INT64_MAX && chkId != pe->checkpointInfo.latestId) {
*pAllEqual = false;
}
if (chkId > pe->checkpointInfo.latestId) {
chkId = pe->checkpointInfo.latestId;
}
}
if (num < numOfTasks) { // not all task send info to mnode through hbMsg, no valid checkpoint Id
return -1;
}
return chkId;
}
static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) {
SMnode *pMnode = pMsg->info.node;
SDecoder decoder = {0};
SRestoreCheckpointInfo req = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
tDecoderInit(&decoder, pMsg->pCont, pMsg->contLen);
if (tDecodeRestoreCheckpointInfo(&decoder, &req)) {
tDecoderClear(&decoder);
@ -2647,80 +2679,155 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
taosThreadMutexLock(&execInfo.lock);
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
// mnode handle the create stream transaction too slow may cause this problem
if (pStream == NULL) {
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
mWarn("failed to find the stream:0x%" PRIx64 ", not handle consensus-checkpointId", req.streamId);
// not in meta-store yet, try to acquire the task in exec buffer
// the checkpoint req arrives too soon before the completion of the create stream trans.
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (p == NULL) {
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle the checkpoint-report", req.streamId);
mError("failed to find the stream:0x%" PRIx64 " in buf, not handle consensus-checkpointId", req.streamId);
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
mndSendQuickConsensusChkptIdRsp(&req, terrno, req.streamId, 0, &pMsg->info);
taosThreadMutexUnlock(&execInfo.lock);
pMsg->info.handle = NULL; // disable auto rsp
return -1;
} else {
mDebug("s-task:0x%" PRIx64 "-0x%x in buf not in mnode/meta, create stream trans may not complete yet",
req.streamId, req.taskId);
// todo wait for stream is created
}
}
mInfo("vgId:%d meta-stored checkpointId for stream:0x%" PRIx64 " %s is:%" PRId64, req.nodeId, req.streamId,
pStream->name, pStream->checkpointId);
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId);
int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream);
if (ckId != -1) { // consensus checkpoint id already exist
SRpcMsg rsp = {0};
rsp.code = 0;
rsp.info = pReq->info;
rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead);
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId);
mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId);
doSendConsensusCheckpointRsp(&req, &rsp, ckId);
if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly
mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, 0, &pMsg->info);
taosThreadMutexUnlock(&execInfo.lock);
pReq->info.handle = NULL; // disable auto rsp
pMsg->info.handle = NULL; // disable auto rsp
return TSDB_CODE_SUCCESS;
}
mndAddConsensusTasks(pInfo, &req, pReq);
bool allEqual = true;
int64_t chkId = getConsensusId(req.streamId, numOfTasks, &allEqual);
int32_t total = 0;
if (mndAllTaskSendCheckpointId(pInfo, numOfTasks, &total)) { // all tasks has send the reqs
// start transaction to set the checkpoint id
int64_t checkpointId = mndGetConsensusCheckpointId(pInfo, pStream);
mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64
" will be issued soon",
req.streamId, pStream->name, numOfTasks, checkpointId);
// some tasks not send hbMsg to mnode yet, wait for 5s.
if (chkId == -1) {
mDebug(
"not all task send hbMsg yet, add into list and wait for 10s to check the consensus-checkpointId again, "
"s-task:0x%x", req.taskId);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &req, &pMsg->info);
taosThreadMutexUnlock(&execInfo.lock);
// start the checkpoint consensus trans
int32_t code = mndSendConsensusCheckpointIdRsp(pInfo->pTaskList, checkpointId);
if (code == TSDB_CODE_SUCCESS) {
mndClearConsensusRspEntry(pInfo);
mDebug("clear all waiting for rsp entry for stream:0x%" PRIx64, req.streamId);
} else {
mDebug("stream:0x%" PRIx64 " not start send consensus-checkpointId msg, due to not all task ready", req.streamId);
}
} else {
mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks);
pMsg->info.handle = NULL; // disable auto rsp
return 0;
}
if (chkId == req.checkpointId) {
mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64, req.nodeId, req.streamId,
pStream->name, pStream->checkpointId);
mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, chkId, &pMsg->info);
taosThreadMutexUnlock(&execInfo.lock);
pMsg->info.handle = NULL; // disable auto rsp
return 0;
}
// wait for 5s and check again
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId, numOfTasks);
mndAddConsensusTasks(pInfo, &req, &pMsg->info);
if (pStream != NULL) {
mndReleaseStream(pMnode, pStream);
}
taosThreadMutexUnlock(&execInfo.lock);
pReq->info.handle = NULL; // disable auto rsp
pMsg->info.handle = NULL; // disable auto rsp
return 0;
}
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
int64_t now = taosGetTimestampMs();
int64_t streamId = -1; // todo: fix only one
mDebug("start to process consensus-checkpointId in tmr");
taosThreadMutexLock(&execInfo.lock);
void *pIter = NULL;
while ((pIter = taosHashIterate(execInfo.pStreamConsensus, pIter)) != NULL) {
SCheckpointConsensusInfo *pInfo = (SCheckpointConsensusInfo *)pIter;
int32_t j = 0;
int32_t num = taosArrayGetSize(pInfo->pTaskList);
SArray *pList = taosArrayInit(4, sizeof(int32_t));
for (; j < num; ++j) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, j);
if ((now - pe->ts) > 10 * 1000) {
bool allEqual = true;
int64_t chkId = getConsensusId(pe->req.streamId, pInfo->numOfTasks, &allEqual);
if (chkId == -1) {
mDebug("tasks send hbMsg for stream:0x%" PRIx64 ", wait for next round", pe->req.streamId);
break;
}
if (allEqual) {
mDebug("all has identical checkpointId for stream:0x%"PRIx64" send checkpointId to s-task:0x%x",
pe->req.streamId, pe->req.taskId);
mndSendQuickConsensusChkptIdRsp(&pe->req, TSDB_CODE_SUCCESS, pe->req.streamId, chkId, &pe->rspInfo);
} else {
ASSERT(chkId <= pe->req.checkpointId);
mndSendQuickConsensusChkptIdRsp(&pe->req, TSDB_CODE_SUCCESS, pe->req.streamId, chkId, &pe->rspInfo);
}
taosArrayPush(pList, &pe->req.taskId);
streamId = pe->req.streamId;
} else {
mDebug("s-task:0x%x sendTs:%" PRId64 " wait %2.fs already, wait for next round to check", pe->req.taskId,
(now - pe->ts)/ 1000.0, pe->ts);
}
}
if (taosArrayGetSize(pList) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
int32_t *taskId = taosArrayGet(pList, i);
for (int32_t k = 0; k < taosArrayGetSize(pInfo->pTaskList); ++k) {
SCheckpointConsensusEntry *pe = taosArrayGet(pInfo->pTaskList, k);
if (pe->req.taskId == *taskId) {
taosArrayRemove(pInfo->pTaskList, k);
break;
}
}
}
}
taosArrayDestroy(pList);
if (taosArrayGetSize(pInfo->pTaskList) == 0) {
mndClearConsensusRspEntry(pInfo);
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, streamId);
}
}
taosThreadMutexUnlock(&execInfo.lock);
mDebug("end to process consensus-checkpointId in tmr");
return TSDB_CODE_SUCCESS;
}
static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
int32_t code = mndProcessCreateStreamReq(pReq);
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {

View File

@ -848,7 +848,7 @@ int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMs
return -1;
}
int32_t tlen = sizeof(SMsgHead) + blen;
int32_t tlen = sizeof(SMsgHead) + blen;
void *abuf = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
@ -863,22 +863,30 @@ int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMs
return code;
}
int32_t mndSendConsensusCheckpointIdRsp(SArray* pInfoList, int64_t checkpointId) {
for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) {
SCheckpointConsensusEntry* pInfo = taosArrayGet(pInfoList, i);
doSendConsensusCheckpointRsp(&pInfo->req, &pInfo->rsp, checkpointId);
}
return 0;
int32_t mndSendQuickConsensusChkptIdRsp(SRestoreCheckpointInfo *pReq, int32_t code, int64_t streamId,
int64_t checkpointId, SRpcHandleInfo *pRpcInfo) {
SRpcMsg rsp = {.code = code, .info = *pRpcInfo, .contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(pReq->nodeId);
mDebug("stream:0x%" PRIx64 " consensus-checkpointId:%" PRId64 " exists, s-task:0x%x send to vnode",
streamId, checkpointId, pReq->taskId);
return doSendConsensusCheckpointRsp(pReq, &rsp, checkpointId);
}
SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) {
SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId, int32_t numOfTasks) {
void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
if (pInfo != NULL) {
return (SCheckpointConsensusInfo*)pInfo;
}
SCheckpointConsensusInfo p = {
.genTs = -1, .checkpointId = -1, .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry))};
.pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry)),
.numOfTasks = numOfTasks,
};
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
@ -887,87 +895,15 @@ SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId)
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
// discard the msg may lead to the lost of connections.
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg) {
SCheckpointConsensusEntry info = {0};
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcHandleInfo* pRpcInfo) {
SCheckpointConsensusEntry info = {.ts = taosGetTimestampMs(), .rspInfo = *pRpcInfo};
memcpy(&info.req, pRestoreInfo, sizeof(info.req));
info.rsp.code = 0;
info.rsp.info = pMsg->info;
info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead);
info.rsp.pCont = rpcMallocCont(info.rsp.contLen);
SMsgHead *pHead = info.rsp.pCont;
pHead->vgId = htonl(pRestoreInfo->nodeId);
taosArrayPush(pInfo->pTaskList, &info);
}
static int32_t entryComparFn(const void* p1, const void* p2) {
const SCheckpointConsensusEntry* pe1 = p1;
const SCheckpointConsensusEntry* pe2 = p2;
if (pe1->req.taskId == pe2->req.taskId) {
return 0;
}
return pe1->req.taskId < pe2->req.taskId? -1:1;
}
bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTasks, int32_t* pTotal) {
int32_t numOfExisted = taosArrayGetSize(pInfo->pTaskList);
if (numOfExisted < numOfTasks) {
if (pTotal != NULL) {
*pTotal = numOfExisted;
}
return false;
}
taosArraySort(pInfo->pTaskList, entryComparFn);
int32_t num = 1;
int32_t taskId = ((SCheckpointConsensusEntry*)taosArrayGet(pInfo->pTaskList, 0))->req.taskId;
for(int32_t i = 1; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
SCheckpointConsensusEntry* pe = taosArrayGet(pInfo->pTaskList, i);
if (pe->req.taskId != taskId) {
num += 1;
taskId = pe->req.taskId;
}
}
if (pTotal != NULL) {
*pTotal = num;
}
ASSERT(num <= numOfTasks);
return num == numOfTasks;
}
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) {
if (pInfo->genTs > 0) { // there is no checkpoint ever generated if the checkpointId is 0.
mDebug("existed consensus-checkpointId:%" PRId64 " for stream:0x%" PRIx64 " %s exist, and return",
pInfo->checkpointId, pStream->uid, pStream->name);
return pInfo->checkpointId;
}
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
if (!mndAllTaskSendCheckpointId(pInfo, numOfTasks, NULL)) {
return -1;
}
int64_t checkpointId = INT64_MAX;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
SCheckpointConsensusEntry *pEntry = taosArrayGet(pInfo->pTaskList, i);
if (pEntry->req.checkpointId < checkpointId) {
checkpointId = pEntry->req.checkpointId;
mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name,
pEntry->req.taskId, pEntry->req.nodeId, pEntry->req.checkpointId);
}
}
pInfo->checkpointId = checkpointId;
pInfo->genTs = taosGetTimestampMs();
return checkpointId;
int32_t num = taosArrayGetSize(pInfo->pTaskList);
mDebug("s-task:0x%x added into consensus-checkpointId list, stream:0x%" PRIx64 " total waiting:%d",
pRestoreInfo->taskId, pRestoreInfo->streamId, num);
}
void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
@ -982,15 +918,15 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
return TSDB_CODE_SUCCESS;
}
int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) {
void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
ASSERT(pInfo == NULL);
SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL};
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0);
mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId);
return TSDB_CODE_SUCCESS;
}
//int32_t mndRegisterConsensusChkptId(SHashObj* pHash, int64_t streamId) {
// void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
// ASSERT(pInfo == NULL);
//
// SCheckpointConsensusInfo p = {.genTs = taosGetTimestampMs(), .checkpointId = 0, .pTaskList = NULL};
// taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
//
// SCheckpointConsensusInfo* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
// ASSERT(pChkptInfo->genTs > 0 && pChkptInfo->checkpointId == 0);
// mDebug("s-task:0x%" PRIx64 " set the initial consensus-checkpointId:0", streamId);
// return TSDB_CODE_SUCCESS;
//}

View File

@ -43,14 +43,14 @@ int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProce
char *p = streamTaskGetStatus(pTask)->name;
if (pTask->info.fillHistory) {
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, p, pTask->info.fillHistory,
(int32_t)pTask->streamTaskId.taskId, pTask->info.delaySchedParam);
} else {
sndInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
sndInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
SNODE_HANDLE, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,

View File

@ -279,7 +279,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
int32_t updateTasks = taosHashGetSize(pMeta->updateInfo.pTasks);
pMeta->startInfo.tasksWillRestart = 1;
if (restored) {
tqDebug("vgId:%d s-task:0x%x update epset transId:%d, set the restart flag", vgId, req.taskId, req.transId);
pMeta->startInfo.tasksWillRestart = 1;
}
if (updateTasks < numOfTasks) {
tqDebug("vgId:%d closed tasks:%d, unclosed:%d, all tasks will be started when nodeEp update completed", vgId,
@ -292,8 +295,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaClearUpdateTaskList(pMeta);
if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0;
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
} else {
tqDebug("vgId:%d all %d task(s) nodeEp updated and closed, transId:%d", vgId, numOfTasks, req.transId);
#if 0

View File

@ -666,7 +666,11 @@ void rspMonitorFn(void* param, void* tmrId) {
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheckRsp(pInfo, true, id);
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
// not record the failed of the current task if try to close current vnode
if (!pMeta->closeFlag) {
addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
}
streamMetaReleaseTask(pMeta, pTask);
return;

View File

@ -447,14 +447,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (restored && (pStatus->state != TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) {
stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" failed",
pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (!restored) { // during restore procedure, do update checkpoint-info
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,

View File

@ -168,7 +168,9 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
continue;
}
taosThreadMutexLock(&(*pTask)->lock);
STaskStatusEntry entry = streamTaskGetStatusEntry(*pTask);
taosThreadMutexUnlock(&(*pTask)->lock);
entry.inputRate = entry.inputQUsed * 100.0 / (2 * STREAM_TASK_QUEUE_CAPACITY_IN_SIZE);
if ((*pTask)->info.taskLevel == TASK_LEVEL__SINK) {