fix(stream): do scan checkpoint-report in write queue.

This commit is contained in:
Haojun Liao 2024-06-11 14:09:50 +08:00
parent 68aac5dee1
commit 00eb621825
9 changed files with 52 additions and 46 deletions

View File

@ -207,7 +207,7 @@
TD_DEF_MSG_TYPE(TDMT_MND_RESTORE_DNODE, "restore-dnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_PAUSE_STREAM, "pause-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_RESUME_STREAM, "resume-stream", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHECKPOINT_TIMER, "stream-checkpoint-tmr", NULL, NULL) // unused,reserved
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_UPDATE_CHKPT_EVT, "stream-update-chkpt-evt", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_BEGIN_CHECKPOINT, "stream-begin-checkpoint", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_CHKPT_REPORT, "stream-chkpt-report", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_STREAM_NODECHANGE_CHECK, "stream-nodechange-check", NULL, NULL)

View File

@ -95,7 +95,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -967,7 +967,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToWriteQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_CHKPT_REPORT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -130,7 +130,7 @@ int32_t mndStreamSetResetTaskAction(SMnode *pMnode, STrans *pTrans, SStreamO
int32_t mndCreateStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream);
int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndCreateStreamChkptInfoUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SArray *pChkptInfoList);
void scanCheckpointReportInfo(SMnode *pMnode);
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq);
void removeTasksInBuf(SArray *pTaskIds, SStreamExecInfo *pExecInfo);
SStreamTaskIter *createStreamTaskIter(SStreamObj *pStream);

View File

@ -115,10 +115,11 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_VND_STREAM_CHECK_POINT_SOURCE_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_BEGIN_CHECKPOINT, mndProcessStreamCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_REQ_CHKPT, mndProcessStreamReqCheckpoint);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_CHKPT_REPORT, mndProcessCheckpointReport);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_UPDATE_CHKPT_EVT, mndScanCheckpointReportInfo);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_REPORT_CHECKPOINT, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_HEARTBEAT, mndProcessStreamHb);
mndSetMsgHandle(pMnode, TDMT_MND_STREAM_NODECHANGE_CHECK, mndProcessNodeCheckReq);
mndSetMsgHandle(pMnode, TDMT_MND_PAUSE_STREAM, mndProcessPauseStreamReq);
@ -1181,7 +1182,8 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
int32_t numOfCheckpointTrans = 0;
if ((code = mndCheckTaskAndNodeStatus(pMnode)) != 0) {
return code;
terrno = TSDB_CODE_STREAM_TASK_IVLD_STATUS;
return -1;
}
SArray* pList = taosArrayInit(4, sizeof(SCheckpointInterval));

View File

@ -22,6 +22,8 @@ typedef struct SFailedCheckpointInfo {
int32_t transId;
} SFailedCheckpointInfo;
static void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode);
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
for (int32_t j = 0; j < numOfNodes; ++j) {
@ -326,7 +328,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
mndDropOrphanTasks(pMnode, pOrphanTasks);
}
scanCheckpointReportInfo(pMnode);
mndStreamStartUpdateCheckpointInfo(pMnode);
taosThreadMutexUnlock(&execInfo.lock);
tCleanupStreamHbMsg(&req);
@ -346,3 +348,12 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
return TSDB_CODE_SUCCESS;
}
void mndStreamStartUpdateCheckpointInfo(SMnode *pMnode) { // here reuse the doCheckpointmsg
SMStreamDoCheckpointMsg *pMsg = rpcMallocCont(sizeof(SMStreamDoCheckpointMsg));
if (pMsg != NULL) {
int32_t size = sizeof(SMStreamDoCheckpointMsg);
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_UPDATE_CHKPT_EVT, .pCont = pMsg, .contLen = size};
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
}
}

View File

@ -754,8 +754,11 @@ int32_t mndStreamSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamObj
return 0;
}
void scanCheckpointReportInfo(SMnode* pMnode) {
void* pIter = NULL;
int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
void *pIter = NULL;
SArray *pDropped = taosArrayInit(4, sizeof(int64_t));
mDebug("start to scan checkpoint report info");
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
@ -764,8 +767,8 @@ void scanCheckpointReportInfo(SMnode* pMnode) {
STaskChkptInfo* pInfo = taosArrayGet(pList, 0);
SStreamObj* pStream = mndGetStreamObj(pMnode, pInfo->streamId);
if (pStream == NULL) {
mError("failed to acquire stream:0x%"PRIx64" remove it from checkpoint-report list", pInfo->streamId);
taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId));
mDebug("failed to acquire stream:0x%" PRIx64 " remove it from checkpoint-report list", pInfo->streamId);
taosArrayPush(pDropped, &pInfo->streamId);
continue;
}
@ -780,14 +783,13 @@ void scanCheckpointReportInfo(SMnode* pMnode) {
if (!conflict) {
int32_t code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
taosHashRemove(execInfo.pChkptStreams, &pInfo->streamId, sizeof(pInfo->streamId));
int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId, numOfStreams);
taosArrayPush(pDropped, &pInfo->streamId);
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d in checkpoint procedure", pInfo->streamId);
} else {
mDebug("stream:0x%" PRIx64 " not launch chkpt update trans, due to checkpoint not finished yet",
mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet",
pInfo->streamId);
}
break;
} else {
mDebug("stream:0x%"PRIx64" active checkpoint trans not finished yet, wait", pInfo->streamId);
}
@ -799,4 +801,16 @@ void scanCheckpointReportInfo(SMnode* pMnode) {
sdbRelease(pMnode->pSdb, pStream);
}
if (taosArrayGetSize(pDropped) > 0) {
for (int32_t i = 0; i < taosArrayGetSize(pDropped); ++i) {
int64_t streamId = *(int64_t *)taosArrayGet(pDropped, i);
taosHashRemove(execInfo.pChkptStreams, &streamId, sizeof(streamId));
}
int32_t numOfStreams = taosHashGetSize(execInfo.pChkptStreams);
mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", numOfStreams);
}
taosArrayDestroy(pDropped);
return TSDB_CODE_SUCCESS;
}

View File

@ -40,26 +40,8 @@ static void checkpointTriggerMonitorFn(void* param, void* tmrId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId);
bool streamTaskIsAllUpstreamSendTrigger(SStreamTask* pTask) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
int32_t numOfUpstreams = taosArrayGetSize(pTask->upstreamInfo.pList);
bool allSend = true;
taosThreadMutexLock(&pActiveInfo->lock);
int32_t numOfRecv = taosArrayGetSize(pActiveInfo->pReadyMsgList);
if (numOfRecv < numOfUpstreams) {
stDebug("s-task:%s received checkpoint-trigger block, idx:%d, %d upstream tasks not send yet, total:%d",
pTask->id.idStr, pTask->info.selfChildId, (numOfUpstreams - numOfRecv), numOfUpstreams);
allSend = false;
}
taosThreadMutexUnlock(&pActiveInfo->lock);
return allSend;
}
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId) {
int32_t transId) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;

View File

@ -615,7 +615,7 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
* appropriate batch of blocks should be handled in 5 to 10 sec.
*/
static int32_t doStreamExecTask(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
const char* id = pTask->id.idStr;
// merge multiple input data if possible in the input queue.
stDebug("s-task:%s start to extract data block from inputQ", id);
@ -699,20 +699,15 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
if (type != STREAM_INPUT__CHECKPOINT) {
doStreamTaskExecImpl(pTask, pInput);
}
streamFreeQitem(pInput);
// todo other thread may change the status
streamFreeQitem(pInput);
} else { // todo other thread may change the status
// do nothing after sync executor state to storage backend, untill the vnode-level checkpoint is completed.
if (type == STREAM_INPUT__CHECKPOINT) {
// todo add lock
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) {
stDebug("s-task:%s checkpoint block received, set status:%s", id, pState->name);
streamTaskBuildCheckpoint(pTask);
} else {
// todo refactor
} else { // todo refactor
int32_t code = 0;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
code = streamTaskSendCheckpointSourceRsp(pTask);
@ -727,6 +722,8 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
}
}
taosThreadMutexUnlock(&pTask->lock);
streamFreeQitem(pInput);
return 0;
}
}