fix(stream): fix error in checkpoint consensus.

This commit is contained in:
Haojun Liao 2024-06-27 19:25:18 +08:00
parent 48bf5eb4d8
commit 025437df0c
13 changed files with 235 additions and 125 deletions

View File

@ -213,19 +213,21 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq
typedef struct SRestoreCheckpointInfo {
SMsgHead head;
int64_t startTs;
int64_t streamId;
int64_t checkpointId; // latest checkpoint id
int32_t taskId;
int32_t nodeId;
} SRestoreCheckpointInfo;
int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq);
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq);
typedef struct SRestoreCheckpointInfoRsp {
int64_t streamId;
int64_t checkpointId;
int32_t taskId;
int64_t streamId;
int64_t checkpointId;
int64_t startTs;
int32_t taskId;
} SRestoreCheckpointInfoRsp;
int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo);
@ -238,11 +240,11 @@ typedef struct {
int32_t reqType;
} SStreamTaskRunReq;
typedef struct SCheckpointConsensusInfo {
typedef struct SCheckpointConsensusEntry {
SRestoreCheckpointInfo req;
SRpcMsg rsp;
int64_t ts;
} SCheckpointConsensusInfo;
} SCheckpointConsensusEntry;
#ifdef __cplusplus
}

View File

@ -266,12 +266,13 @@ typedef struct SStreamTaskId {
} SStreamTaskId;
typedef struct SCheckpointInfo {
int64_t startTs;
int64_t checkpointId; // latest checkpoint id
int64_t checkpointVer; // latest checkpoint offset in wal
int64_t checkpointTime; // latest checkpoint time
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
int64_t startTs;
int64_t checkpointId; // latest checkpoint id
int64_t checkpointVer; // latest checkpoint offset in wal
int64_t checkpointTime; // latest checkpoint time
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
SActiveCheckpointInfo* pActiveInfo;
int64_t msgVer;
} SCheckpointInfo;
@ -613,6 +614,12 @@ typedef struct SStreamTaskState {
char* name;
} SStreamTaskState;
typedef struct SCheckpointConsensusInfo {
SArray* pTaskList;
int64_t checkpointId;
int64_t genTs;
} SCheckpointConsensusInfo;
int32_t streamSetupScheduleTrigger(SStreamTask* pTask);
// dispatch related
@ -747,7 +754,7 @@ void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta);
void streamMetaWUnLock(SStreamMeta* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta);
void streamMetaResetStartInfo(STaskStartInfo* pMeta, int32_t vgId);
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
void streamMetaLoadAllTasks(SStreamMeta* pMeta);
@ -755,7 +762,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask);
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask);
// timer
tmr_h streamTimerGetInstance();

View File

@ -828,6 +828,7 @@ TEST(clientCase, projection_query_tables) {
// printf("error in create db, reason:%s\n", taos_errstr(pRes));
// }
// taos_free_result(pRes);
TAOS_RES* pRes = NULL;
pRes= taos_query(pConn, "use abc1");
taos_free_result(pRes);

View File

@ -133,7 +133,7 @@ int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream)
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
int32_t mndStreamSetRestoreCheckpointId(SArray* pList, int64_t checkpointId);
int32_t mndSendConsensusCheckpointIdRsp(SArray* pList, int64_t checkpointId);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
@ -145,8 +145,14 @@ void mndInitExecInfo();
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
void mndAddConsensusTasks(SArray *pList, const SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg);
int64_t mndGetConsensusCheckpointId(SArray *pList, SStreamObj *pStream);
SCheckpointConsensusInfo *mndGetConsensusInfo(SHashObj *pHash, int64_t streamId);
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg);
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo *pInfo, SStreamObj *pStream);
bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo *pInfo, int32_t numOfTasks, int32_t* pTotal);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo *pInfo, SRpcMsg *pMsg, int64_t checkpointId);
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);
#ifdef __cplusplus
}

View File

@ -1235,6 +1235,9 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
sdbRelease(pSdb, p);
// clear the consensus checkpoint info
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, p->uid);
if (code != -1) {
started += 1;
@ -2627,7 +2630,7 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
SRestoreCheckpointInfo req = {0};
tDecoderInit(&decoder, pReq->pCont, pReq->contLen);
if (tDecodeStreamTaskLatestChkptInfo(&decoder, &req)) {
if (tDecodeRestoreCheckpointInfo(&decoder, &req)) {
tDecoderClear(&decoder);
terrno = TSDB_CODE_INVALID_MSG;
mError("invalid task consensus-checkpoint msg received");
@ -2662,34 +2665,44 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SArray **pTaskList = (SArray **)taosHashGet(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId));
if (pTaskList == NULL) {
SArray *pList = taosArrayInit(4, sizeof(SCheckpointConsensusInfo));
mndAddConsensusTasks(pList, &req, pReq);
taosHashPut(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES);
SCheckpointConsensusInfo *pInfo = mndGetConsensusInfo(execInfo.pStreamConsensus, req.streamId);
int64_t ckId = mndGetConsensusCheckpointId(pInfo, pStream);
if (ckId != -1) { // consensus checkpoint id already exist
SRpcMsg rsp = {0};
rsp.code = 0;
rsp.info = pReq->info;
rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead);
rsp.pCont = rpcMallocCont(rsp.contLen);
pTaskList = (SArray **)taosHashGet(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId));
} else {
mndAddConsensusTasks(*pTaskList, &req, pReq);
SMsgHead *pHead = rsp.pCont;
pHead->vgId = htonl(req.nodeId);
mDebug("stream:0x%" PRIx64 " consensus checkpointId:%" PRId64 " exists, return directly", req.streamId, ckId);
doSendConsensusCheckpointRsp(&req, &rsp, ckId);
taosThreadMutexUnlock(&execInfo.lock);
pReq->info.handle = NULL; // disable auto rsp
return TSDB_CODE_SUCCESS;
}
int32_t total = taosArrayGetSize(*pTaskList);
if (total == numOfTasks) { // all tasks has send the reqs
mndAddConsensusTasks(pInfo, &req, pReq);
int32_t total = 0;
if (mndAllTaskSendCheckpointId(pInfo, numOfTasks, &total)) { // all tasks has send the reqs
// start transaction to set the checkpoint id
int64_t checkpointId = mndGetConsensusCheckpointId(*pTaskList, pStream);
int64_t checkpointId = mndGetConsensusCheckpointId(pInfo, pStream);
mInfo("stream:0x%" PRIx64 " %s all %d tasks send latest checkpointId, the consensus-checkpointId is:%" PRId64
" will be issued soon",
req.streamId, pStream->name, total, checkpointId);
req.streamId, pStream->name, numOfTasks, checkpointId);
// start the checkpoint consensus trans
int32_t code = mndStreamSetRestoreCheckpointId(*pTaskList, checkpointId);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
taosHashRemove(execInfo.pStreamConsensus, &req.streamId, sizeof(req.streamId));
int32_t numOfStreams = taosHashGetSize(execInfo.pStreamConsensus);
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list, remain:%d", req.streamId, numOfStreams);
int32_t code = mndSendConsensusCheckpointIdRsp(pInfo->pTaskList, checkpointId);
if (code == TSDB_CODE_SUCCESS) {
mndClearConsensusRspEntry(pInfo);
mDebug("clear all waiting for rsp entry for stream:0x%" PRIx64, req.streamId);
} else {
mDebug("stream:0x%" PRIx64 " not start set consensus-checkpointId trans, due to not all task ready",
req.streamId);
mDebug("stream:0x%" PRIx64 " not start send consensus-checkpointId msg, due to not all task ready", req.streamId);
}
} else {
mDebug("stream:0x%" PRIx64 " %d/%d tasks send consensus-checkpointId info", req.streamId, total, numOfTasks);
@ -2700,19 +2713,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pReq) {
}
taosThreadMutexUnlock(&execInfo.lock);
pReq->info.handle = NULL; // disable auto rsp
// { // start an transaction to set the start checkpoint id
// SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SRestoreCheckpointInfoRsp)};
// rsp.pCont = rpcMallocCont(rsp.contLen);
// SMsgHead *pHead = rsp.pCont;
// pHead->vgId = htonl(req.nodeId);
//
// tmsgSendRsp(&rsp);
// pReq->info.handle = NULL; // disable auto rsp
// }
return 0;
}

View File

@ -821,11 +821,12 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
return TSDB_CODE_SUCCESS;
}
static int32_t doSendRestoreCheckpointInfo(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) {
int32_t doSendConsensusCheckpointRsp(SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg, int64_t checkpointId) {
int32_t code = 0;
int32_t blen;
SRestoreCheckpointInfoRsp req = {.streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId};
SRestoreCheckpointInfoRsp req = {
.streamId = pInfo->streamId, .taskId = pInfo->taskId, .checkpointId = checkpointId, .startTs = pInfo->startTs};
tEncodeSize(tEncodeRestoreCheckpointInfoRsp, &req, blen, code);
if (code < 0) {
@ -848,51 +849,120 @@ static int32_t doSendRestoreCheckpointInfo(SRestoreCheckpointInfo* pInfo, SRpcMs
return code;
}
int32_t mndStreamSetRestoreCheckpointId(SArray* pInfoList, int64_t checkpointId) {
int32_t mndSendConsensusCheckpointIdRsp(SArray* pInfoList, int64_t checkpointId) {
for(int32_t i = 0; i < taosArrayGetSize(pInfoList); ++i) {
SCheckpointConsensusInfo* pInfo = taosArrayGet(pInfoList, i);
doSendRestoreCheckpointInfo(&pInfo->req, &pInfo->rsp, checkpointId);
SCheckpointConsensusEntry* pInfo = taosArrayGet(pInfoList, i);
doSendConsensusCheckpointRsp(&pInfo->req, &pInfo->rsp, checkpointId);
}
return 0;
}
void mndAddConsensusTasks(SArray* pList, const SRestoreCheckpointInfo* pInfo, SRpcMsg* pMsg) {
bool existed = false;
for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
STaskChkptInfo* p = taosArrayGet(pList ,i);
if (p->taskId == pInfo->taskId) {
existed = true;
break;
}
SCheckpointConsensusInfo* mndGetConsensusInfo(SHashObj* pHash, int64_t streamId) {
void* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
if (pInfo != NULL) {
return (SCheckpointConsensusInfo*)pInfo;
}
if (!existed) {
SCheckpointConsensusInfo info = {0};
memcpy(&info.req, pInfo, sizeof(info.req));
SCheckpointConsensusInfo p = {
.genTs = -1, .checkpointId = -1, .pTaskList = taosArrayInit(4, sizeof(SCheckpointConsensusEntry))};
taosHashPut(pHash, &streamId, sizeof(streamId), &p, sizeof(p));
info.rsp.code = 0;
info.rsp.info = pMsg->info;
info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead);
info.rsp.pCont = rpcMallocCont(info.rsp.contLen);
SMsgHead *pHead = info.rsp.pCont;
pHead->vgId = htonl(pInfo->nodeId);
taosArrayPush(pList, &info);
}
void* pChkptInfo = (SCheckpointConsensusInfo*)taosHashGet(pHash, &streamId, sizeof(streamId));
return pChkptInfo;
}
int64_t mndGetConsensusCheckpointId(SArray* pList, SStreamObj* pStream) {
int64_t checkpointId = INT64_MAX;
// no matter existed or not, add the request into info list anyway, since we need to send rsp mannually
// discard the msg may lead to the lost of connections.
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo, SRpcMsg *pMsg) {
SCheckpointConsensusEntry info = {0};
memcpy(&info.req, pRestoreInfo, sizeof(info.req));
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
SCheckpointConsensusInfo *pInfo = taosArrayGet(pList, i);
if (pInfo->req.checkpointId < checkpointId) {
checkpointId = pInfo->req.checkpointId;
mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name,
pInfo->req.taskId, pInfo->req.nodeId, pInfo->req.checkpointId);
info.rsp.code = 0;
info.rsp.info = pMsg->info;
info.rsp.contLen = sizeof(SRestoreCheckpointInfoRsp) + sizeof(SMsgHead);
info.rsp.pCont = rpcMallocCont(info.rsp.contLen);
SMsgHead *pHead = info.rsp.pCont;
pHead->vgId = htonl(pRestoreInfo->nodeId);
taosArrayPush(pInfo->pTaskList, &info);
}
static int32_t entryComparFn(const void* p1, const void* p2) {
const SCheckpointConsensusEntry* pe1 = p1;
const SCheckpointConsensusEntry* pe2 = p2;
if (pe1->req.taskId == pe2->req.taskId) {
return 0;
}
return pe1->req.taskId < pe2->req.taskId? -1:1;
}
bool mndAllTaskSendCheckpointId(SCheckpointConsensusInfo* pInfo, int32_t numOfTasks, int32_t* pTotal) {
int32_t numOfExisted = taosArrayGetSize(pInfo->pTaskList);
if (numOfExisted < numOfTasks) {
if (pTotal != NULL) {
*pTotal = numOfExisted;
}
return false;
}
taosArraySort(pInfo->pTaskList, entryComparFn);
int32_t num = 1;
int32_t taskId = ((SCheckpointConsensusEntry*)taosArrayGet(pInfo->pTaskList, 0))->req.taskId;
for(int32_t i = 1; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
SCheckpointConsensusEntry* pe = taosArrayGet(pInfo->pTaskList, i);
if (pe->req.taskId != taskId) {
num += 1;
taskId = pe->req.taskId;
}
}
if (pTotal != NULL) {
*pTotal = num;
}
ASSERT(num <= numOfTasks);
return num == numOfTasks;
}
int64_t mndGetConsensusCheckpointId(SCheckpointConsensusInfo* pInfo, SStreamObj* pStream) {
if (pInfo->genTs > 0) {
ASSERT(pInfo->checkpointId > 0);
return pInfo->checkpointId;
}
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
if (!mndAllTaskSendCheckpointId(pInfo, numOfTasks, NULL)) {
return -1;
}
int64_t checkpointId = INT64_MAX;
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pTaskList); ++i) {
SCheckpointConsensusEntry *pEntry = taosArrayGet(pInfo->pTaskList, i);
if (pEntry->req.checkpointId < checkpointId) {
checkpointId = pEntry->req.checkpointId;
mTrace("stream:0x%" PRIx64 " %s task:0x%x vgId:%d latest checkpointId:%" PRId64, pStream->uid, pStream->name,
pEntry->req.taskId, pEntry->req.nodeId, pEntry->req.checkpointId);
}
}
pInfo->checkpointId = checkpointId;
pInfo->genTs = taosGetTimestampMs();
return checkpointId;
}
}
void mndClearConsensusRspEntry(SCheckpointConsensusInfo* pInfo) {
pInfo->pTaskList = taosArrayDestroy(pInfo->pTaskList);
}
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
taosHashRemove(pHash, &streamId, sizeof(streamId));
int32_t numOfStreams = taosHashGetSize(pHash);
mDebug("drop stream:0x%" PRIx64 " in consensus-checkpointId list after new checkpoint generated, remain:%d", streamId,
numOfStreams);
return TSDB_CODE_SUCCESS;
}

View File

@ -25,7 +25,7 @@
#define sndDebug(...) do { if (sndDebugFlag & DEBUG_DEBUG) { taosPrintLog("SND ", DEBUG_DEBUG, sndDebugFlag, __VA_ARGS__);}} while (0)
// clang-format on
int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
int32_t sndBuildStreamTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__AGG && taosArrayGetSize(pTask->upstreamInfo.pList) != 0);
int32_t code = streamTaskInit(pTask, pSnode->pMeta, &pSnode->msgCb, nextProcessVer);
if (code != TSDB_CODE_SUCCESS) {
@ -71,8 +71,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
startRsync();
pSnode->msgCb = pOption->msgCb;
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndExpandTask, tqExpandStreamTask, SNODE_HANDLE,
taosGetTimestampMs(), tqStartTaskCompleteCallback);
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskBuild *)sndBuildStreamTask, tqExpandStreamTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
if (pSnode->pMeta == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;

View File

@ -139,9 +139,11 @@ void tqClose(STQ* pTq) {
taosHashCleanup(pTq->pCheckInfo);
taosMemoryFree(pTq->path);
tqMetaClose(pTq);
int32_t vgId = pTq->pStreamMeta->vgId;
streamMetaClose(pTq->pStreamMeta);
qDebug("end to close tq");
qDebug("vgId:%d end to close tq", vgId);
taosMemoryFree(pTq);
}

View File

@ -146,7 +146,7 @@ int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int3
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
int32_t code = streamTaskSendConsensusChkptMsg(pTask);
int32_t code = streamTaskSendRestoreChkptMsg(pTask);
streamMetaReleaseTask(pMeta, pTask);
return code;
}
@ -675,8 +675,11 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored
if (ppTask != NULL && (*ppTask) != NULL) {
streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq);
} else { // failed to get the task.
tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already",
vgId, pReq->taskId);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
tqError(
"vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, numOfTasks:%d, it may have been "
"dropped already",
vgId, pReq->taskId, numOfTasks);
}
streamMetaWUnLock(pMeta);
@ -729,7 +732,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaResetStartInfo(&pMeta->startInfo, pMeta->vgId);
streamMetaWUnLock(pMeta);
tqInfo("vgId:%d, follower node not start stream tasks or stream is disabled", vgId);
}
@ -877,7 +880,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
ASSERT(pTask->status.downstreamReady == 0);
tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId);
// tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, pTask->id.streamId, pTask->id.taskId);
} else {
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
}
@ -1114,7 +1116,7 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
bool updateCheckpointId = false;
int64_t now = taosGetTimestampMs();
SRestoreCheckpointInfoRsp req = {0};
@ -1133,8 +1135,19 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already", pMeta->vgId,
req.taskId);
tqError("vgId:%d process restore checkpointId req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, req.taskId);
streamMetaAddFailedTask(pMeta, req.streamId, req.taskId);
return TSDB_CODE_SUCCESS;
}
// discard the rsp from before restart
if (req.startTs < pTask->execInfo.created) {
tqWarn("s-task:%s vgId:%d create time:%" PRId64 " recv expired consensus checkpointId:%" PRId64
" from task createTs:%" PRId64 ", discard",
pTask->id.idStr, pMeta->vgId, pTask->execInfo.created, req.checkpointId, req.startTs);
streamMetaAddFailedTaskSelf(pTask, now);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
@ -1148,23 +1161,11 @@ int32_t tqStreamProcessConsensusChkptRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tqDebug("s-task:%s vgId:%d update the checkpoint from %" PRId64 " to %" PRId64, pTask->id.idStr, vgId,
pTask->chkInfo.checkpointId, req.checkpointId);
pTask->chkInfo.checkpointId = req.checkpointId;
updateCheckpointId = true;
streamMetaSaveTask(pMeta, pTask);
}
taosThreadMutexUnlock(&pTask->lock);
if (updateCheckpointId) {
streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
streamMetaWUnLock(pMeta);
}
// todo: set the update transId, and discard with less transId.
if (pMeta->role == NODE_ROLE_LEADER) {
/*code = */tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
/*code = */ tqStreamStartOneTaskAsync(pMeta, pTask->pMsgCb, req.streamId, req.taskId);
} else {
tqDebug("vgId:%d follower not start task:%s", vgId, pTask->id.idStr);
}

View File

@ -65,6 +65,11 @@ struct SActiveCheckpointInfo {
tmr_h pSendReadyMsgTmr;
};
struct SConsensusCheckpoint {
int8_t inProcess;
};
typedef struct {
int8_t type;
SSDataBlock* pBlock;

View File

@ -1108,7 +1108,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
return 0;
}
int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) {
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
int32_t code;
int32_t tlen = 0;
int32_t vgId = pTask->pMeta->vgId;
@ -1118,9 +1118,14 @@ int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) {
ASSERT(pTask->pBackend == NULL);
SRestoreCheckpointInfo req = {
.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId, .checkpointId = pInfo->checkpointId};
.streamId = pTask->id.streamId,
.taskId = pTask->id.taskId,
.nodeId = vgId,
.checkpointId = pInfo->checkpointId,
.startTs = pTask->execInfo.created,
};
tEncodeSize(tEncodeStreamTaskLatestChkptInfo, &req, tlen, code);
tEncodeSize(tEncodeRestoreCheckpointInfo, &req, tlen, code);
if (code < 0) {
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id failed, code:%s", id, vgId, tstrerror(code));
return -1;
@ -1135,7 +1140,7 @@ int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) {
SEncoder encoder;
tEncoderInit(&encoder, buf, tlen);
if ((code = tEncodeStreamTaskLatestChkptInfo(&encoder, &req)) < 0) {
if ((code = tEncodeRestoreCheckpointInfo(&encoder, &req)) < 0) {
rpcFreeCont(buf);
stError("s-task:%s vgId:%d encode stream task latest-checkpoint-id msg failed, code:%s", id, vgId, tstrerror(code));
return -1;
@ -1144,8 +1149,8 @@ int32_t streamTaskSendConsensusChkptMsg(SStreamTask* pTask) {
SRpcMsg msg = {0};
initRpcMsg(&msg, TDMT_MND_STREAM_CHKPT_CONSEN, buf, tlen);
stDebug("s-task:%s vgId:%d send task latest-checkpoint-id to mnode:%" PRId64 " to reach the consensus checkpointId",
id, vgId, pInfo->checkpointId);
stDebug("s-task:%s vgId:%d send latest checkpointId:%" PRId64 " to mnode to get the consensus checkpointId", id, vgId,
pInfo->checkpointId);
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
return 0;

View File

@ -473,7 +473,7 @@ void streamMetaClear(SStreamMeta* pMeta) {
}
void streamMetaClose(SStreamMeta* pMeta) {
stDebug("start to close stream meta");
stDebug("vgId:%d start to close stream meta", pMeta->vgId);
if (pMeta == NULL) {
return;
}
@ -489,11 +489,13 @@ void streamMetaClose(SStreamMeta* pMeta) {
void streamMetaCloseImpl(void* arg) {
SStreamMeta* pMeta = arg;
stDebug("start to do-close stream meta");
if (pMeta == NULL) {
return;
}
int32_t vgId = pMeta->vgId;
stDebug("vgId:%d start to do-close stream meta", vgId);
streamMetaWLock(pMeta);
streamMetaClear(pMeta);
streamMetaWUnLock(pMeta);
@ -526,7 +528,7 @@ void streamMetaCloseImpl(void* arg) {
taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta);
stDebug("end to close stream meta");
stDebug("vgId:%d end to close stream meta", vgId);
}
// todo let's check the status for each task
@ -888,13 +890,16 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue;
}
stDebug("s-task:0x%" PRIx64 "-0x%x vgId:%d loaded from meta file, checkpointId:%" PRId64 " checkpointVer:%" PRId64,
pTask->id.streamId, pTask->id.taskId, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer);
// do duplicate task check.
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) {
code = pMeta->buildTaskFn(pMeta->ahandle, pTask, pTask->chkInfo.checkpointVer + 1);
if (code < 0) {
stError("failed to expand s-task:0x%" PRIx64 ", code:%s, continue", id.taskId, tstrerror(terrno));
stError("failed to load s-task:0x%"PRIx64", code:%s, continue", id.taskId, tstrerror(terrno));
tFreeStreamTask(pTask);
continue;
}
@ -978,7 +983,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t sendCount = 0;
streamMetaGetHbSendInfo(pMeta->pHbInfo, &startTs, &sendCount);
stDebug("vgId:%d notify all stream tasks that the vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
stInfo("vgId:%d notify all stream tasks that current vnode is closing. isLeader:%d startHb:%" PRId64 ", totalHb:%d",
vgId, (pMeta->role == NODE_ROLE_LEADER), startTs, sendCount);
// wait for the stream meta hb function stopping
@ -1020,7 +1025,7 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
streamMetaHbToMnode(pRid, NULL);
}
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
taosHashClear(pStartInfo->pReadyTaskSet);
taosHashClear(pStartInfo->pFailedTaskSet);
pStartInfo->tasksWillRestart = 0;
@ -1028,6 +1033,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
// reset the sentinel flag value to be 0
pStartInfo->startAllTasks = 0;
stDebug("vgId:%d clear all start-all-task info", vgId);
}
void streamMetaRLock(SStreamMeta* pMeta) {
@ -1249,7 +1255,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
// negotiate the consensus checkpoint id for current task
ASSERT(pTask->pBackend == NULL);
code = streamTaskSendConsensusChkptMsg(pTask);
code = streamTaskSendRestoreChkptMsg(pTask);
// this task may has no checkpoint, but others tasks may generate checkpoint already?
streamMetaReleaseTask(pMeta, pTask);
@ -1420,7 +1426,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
// print the initialization elapsed time and info
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
streamMetaResetStartInfo(pStartInfo);
streamMetaResetStartInfo(pStartInfo, pMeta->vgId);
streamMetaWUnLock(pMeta);
pStartInfo->completeFn(pMeta);

View File

@ -629,8 +629,9 @@ int32_t tDecodeStreamTaskChkptReport(SDecoder* pDecoder, SCheckpointReport* pReq
return 0;
}
int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
int32_t tEncodeRestoreCheckpointInfo (SEncoder* pEncoder, const SRestoreCheckpointInfo* pReq) {
if (tStartEncode(pEncoder) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->startTs) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
@ -639,8 +640,9 @@ int32_t tEncodeStreamTaskLatestChkptInfo (SEncoder* pEncoder, const SRestoreChec
return pEncoder->pos;
}
int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
int32_t tDecodeRestoreCheckpointInfo(SDecoder* pDecoder, SRestoreCheckpointInfo* pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->startTs) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->streamId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->checkpointId) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->taskId) < 0) return -1;
@ -651,6 +653,7 @@ int32_t tDecodeStreamTaskLatestChkptInfo(SDecoder* pDecoder, SRestoreCheckpointI
int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpointInfoRsp* pInfo) {
if (tStartEncode(pCoder) < 0) return -1;
if (tEncodeI64(pCoder, pInfo->startTs) < 0) return -1;
if (tEncodeI64(pCoder, pInfo->streamId) < 0) return -1;
if (tEncodeI32(pCoder, pInfo->taskId) < 0) return -1;
if (tEncodeI64(pCoder, pInfo->checkpointId) < 0) return -1;
@ -660,6 +663,7 @@ int32_t tEncodeRestoreCheckpointInfoRsp(SEncoder* pCoder, const SRestoreCheckpoi
int32_t tDecodeRestoreCheckpointInfoRsp(SDecoder* pCoder, SRestoreCheckpointInfoRsp* pInfo) {
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pInfo->startTs) < 0) return -1;
if (tDecodeI64(pCoder, &pInfo->streamId) < 0) return -1;
if (tDecodeI32(pCoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI64(pCoder, &pInfo->checkpointId) < 0) return -1;