fix(stream): check the checkpoint-report transId and checkpointId, and identify the expired checkpoint-report info.
This commit is contained in:
parent
84eaed0bbb
commit
efd33aa4d7
|
@ -2416,8 +2416,8 @@ static bool validateChkptReport(const SCheckpointReport *pReport, int64_t report
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
|
static void doAddReportStreamTask(SArray *pList, int64_t reportedChkptId, const SCheckpointReport *pReport) {
|
||||||
bool valid = validateChkptReport(pReport, reportChkptId);
|
bool valid = validateChkptReport(pReport, reportedChkptId);
|
||||||
if (!valid) {
|
if (!valid) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -2433,7 +2433,7 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
|
||||||
mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
|
mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
|
||||||
pReport->taskId, p->checkpointId, pReport->checkpointId);
|
pReport->taskId, p->checkpointId, pReport->checkpointId);
|
||||||
} else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it
|
} else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it
|
||||||
mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
|
mInfo("s-task:0x%x expired checkpoint-report info in checkpoint-report list update from %" PRId64 "->%" PRId64,
|
||||||
pReport->taskId, p->checkpointId, pReport->checkpointId);
|
pReport->taskId, p->checkpointId, pReport->checkpointId);
|
||||||
|
|
||||||
// update the checkpoint report info
|
// update the checkpoint report info
|
||||||
|
@ -2465,7 +2465,8 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
|
||||||
mError("failed to put into task list, taskId:0x%x", pReport->taskId);
|
mError("failed to put into task list, taskId:0x%x", pReport->taskId);
|
||||||
} else {
|
} else {
|
||||||
int32_t size = taosArrayGetSize(pList);
|
int32_t size = taosArrayGetSize(pList);
|
||||||
mDebug("stream:0x%" PRIx64 " %d tasks has send checkpoint-report", pReport->streamId, size);
|
mDebug("stream:0x%" PRIx64 " taskId:0x%x checkpoint-report recv, %d tasks has send checkpoint-report",
|
||||||
|
pReport->streamId, pReport->taskId, size);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2491,7 +2492,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
|
||||||
" checkpointVer:%" PRId64 " transId:%d",
|
" checkpointVer:%" PRId64 " transId:%d",
|
||||||
req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
|
req.nodeId, req.taskId, req.checkpointId, req.checkpointVer, req.transId);
|
||||||
|
|
||||||
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
|
// register to the stream task done map, if all tasks has sent these kinds of message, start the checkpoint trans.
|
||||||
streamMutexLock(&execInfo.lock);
|
streamMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
@ -2500,7 +2501,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
|
||||||
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
|
mWarn("failed to find the stream:0x%" PRIx64 ", not handle checkpoint-report, try to acquire in buf", req.streamId);
|
||||||
|
|
||||||
// not in meta-store yet, try to acquire the task in exec buffer
|
// not in meta-store yet, try to acquire the task in exec buffer
|
||||||
// the checkpoint req arrives too soon before the completion of the create stream trans.
|
// the checkpoint req arrives too soon before the completion of the creation of stream trans.
|
||||||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||||
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
void *p = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||||
if (p == NULL) {
|
if (p == NULL) {
|
||||||
|
@ -2533,7 +2534,7 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t total = taosArrayGetSize(pInfo->pTaskList);
|
int32_t total = taosArrayGetSize(pInfo->pTaskList);
|
||||||
if (total == numOfTasks) { // all tasks has send the reqs
|
if (total == numOfTasks) { // all tasks have sent the reqs
|
||||||
mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
|
mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
|
||||||
" will be issued soon",
|
" will be issued soon",
|
||||||
req.streamId, pStream->name, total, req.checkpointId);
|
req.streamId, pStream->name, total, req.checkpointId);
|
||||||
|
|
|
@ -658,6 +658,65 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t allTasksSendChkptReport(SChkptReportInfo* pReportInfo, int32_t numOfTasks, const char* pName) {
|
||||||
|
int64_t checkpointId = -1;
|
||||||
|
int32_t transId = -1;
|
||||||
|
int32_t taskId = -1;
|
||||||
|
|
||||||
|
int32_t existed = (int32_t)taosArrayGetSize(pReportInfo->pTaskList);
|
||||||
|
if (existed != numOfTasks) {
|
||||||
|
mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pReportInfo->streamId, pName,
|
||||||
|
existed, numOfTasks, numOfTasks - existed);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// acquire current active checkpointId, and do cross-check checkpointId info in exec.pTaskList
|
||||||
|
for(int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
STaskChkptInfo *pInfo = taosArrayGet(pReportInfo->pTaskList, i);
|
||||||
|
if (pInfo == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (checkpointId == -1) {
|
||||||
|
checkpointId = pInfo->checkpointId;
|
||||||
|
transId = pInfo->transId;
|
||||||
|
taskId = pInfo->taskId;
|
||||||
|
} else if (checkpointId != pInfo->checkpointId) {
|
||||||
|
mError("stream:0x%" PRIx64
|
||||||
|
" checkpointId in checkpoint-report list are not identical, type 1 taskId:0x%x checkpointId:%" PRId64
|
||||||
|
", type 2 taskId:0x%x checkpointId:%" PRId64,
|
||||||
|
pReportInfo->streamId, taskId, checkpointId, pInfo->taskId, pInfo->checkpointId);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check for the correct checkpointId for current task info in STaskChkptInfo
|
||||||
|
STaskChkptInfo *p = taosArrayGet(pReportInfo->pTaskList, 0);
|
||||||
|
STaskId id = {.streamId = p->streamId, .taskId = p->taskId};
|
||||||
|
STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
|
||||||
|
|
||||||
|
// cross-check failed, there must be something unknown wrong
|
||||||
|
SStreamTransInfo *pTransInfo = taosHashGet(execInfo.transMgmt.pDBTrans, &id.streamId, sizeof(id.streamId));
|
||||||
|
if (pTransInfo == NULL) {
|
||||||
|
mError("stream:0x%" PRIx64" no active exists for checkpoint transId:%d, clear checkpoint-report list", id.streamId);
|
||||||
|
taosArrayClear(pReportInfo->pTaskList);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pTransInfo->transId != transId) {
|
||||||
|
mError("stream:0x%" PRIx64
|
||||||
|
" checkpoint-report list info are expired, clear and retry, active transId:%d trans in list:%d",
|
||||||
|
id.streamId, pTransInfo->transId, transId);
|
||||||
|
taosArrayClear(pReportInfo->pTaskList);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", id.streamId,
|
||||||
|
pName, numOfTasks);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -668,6 +727,7 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("start to scan checkpoint report info");
|
mDebug("start to scan checkpoint report info");
|
||||||
|
|
||||||
streamMutexLock(&execInfo.lock);
|
streamMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
|
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
|
||||||
|
@ -693,30 +753,27 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t total = mndGetNumOfStreamTasks(pStream);
|
int32_t total = mndGetNumOfStreamTasks(pStream);
|
||||||
int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList);
|
int32_t ret = allTasksSendChkptReport(px, total, pStream->name);
|
||||||
|
if (ret == 0) {
|
||||||
if (total == existed) {
|
|
||||||
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
|
|
||||||
pStream->uid, pStream->name, total);
|
|
||||||
|
|
||||||
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
|
code = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
|
code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
|
||||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
|
||||||
taosArrayClear(px->pTaskList);
|
taosArrayClear(px->pTaskList);
|
||||||
|
mInfo("stream:0x%" PRIx64 " clear checkpoint-report list and update the report checkpointId from:%" PRId64
|
||||||
|
" to %" PRId64,
|
||||||
|
pInfo->streamId, px->reportChkpt, pInfo->checkpointId);
|
||||||
px->reportChkpt = pInfo->checkpointId;
|
px->reportChkpt = pInfo->checkpointId;
|
||||||
mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId);
|
|
||||||
} else {
|
} else {
|
||||||
mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet",
|
mDebug("stream:0x%" PRIx64 " not launch chkpt-info update trans, due to checkpoint not finished yet",
|
||||||
pInfo->streamId);
|
pInfo->streamId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
|
mDebug("stream:0x%" PRIx64 " active checkpoint trans not finished yet, wait", pInfo->streamId);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
mDebug("stream:0x%" PRIx64 " %s %d/%d tasks send checkpoint-report, %d not send", pInfo->streamId, pStream->name,
|
|
||||||
existed, total, total - existed);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
@ -743,6 +800,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
|
||||||
streamMutexUnlock(&execInfo.lock);
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
|
||||||
taosArrayDestroy(pDropped);
|
taosArrayDestroy(pDropped);
|
||||||
|
|
||||||
|
mDebug("end to scan checkpoint report info")
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue