fix(stream): track the checkpoint-ready msg on the upstream tasks.

This commit is contained in:
Haojun Liao 2024-06-01 18:26:45 +08:00
parent 5cbd733cab
commit 2a8270f9c8
6 changed files with 115 additions and 52 deletions

View File

@ -753,7 +753,7 @@ tmr_h streamTimerGetInstance();
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNodeId, int32_t downstreamTaskId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
int32_t streamAlignTransferState(SStreamTask* pTask);

View File

@ -492,7 +492,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId,
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
streamProcessCheckpointReadyMsg(pTask);
streamProcessCheckpointReadyMsg(pTask, req.downstreamTaskId, req.downstreamNodeId);
streamMetaReleaseTask(pMeta, pTask);
{ // send checkpoint ready rsp

View File

@ -60,9 +60,9 @@ struct SActiveCheckpointInfo {
int64_t failedId;
bool dispatchTrigger;
SArray* pDispatchTriggerList; // SArray<STaskTriggerSendInfo>
SArray* pReadyMsgList; // SArray<SStreamChkptReadyInfo*>
SArray* pReadyMsgList; // SArray<STaskCheckpointReadyInfo*>
int8_t allUpstreamTriggerRecv;
SArray* pCheckpointReadyRecvList; // SArray<STaskCheckpointReadyRecvInfo>
int32_t checkCounter;
tmr_h pCheckTmr;
};
@ -97,14 +97,14 @@ struct STokenBucket {
};
typedef struct {
int32_t upStreamTaskId;
int32_t upstreamTaskId;
SEpSet upstreamNodeEpset;
int32_t nodeId;
SRpcMsg msg;
int64_t recvTs;
int32_t transId;
int64_t checkpointId;
} SStreamChkptReadyInfo;
} STaskCheckpointReadyInfo;
typedef struct {
int64_t sendTs;
@ -114,6 +114,15 @@ typedef struct {
int32_t taskId;
} STaskTriggerSendInfo;
typedef struct {
int64_t streamId;
int64_t recvTs;
int32_t downstreamNodeId;
int32_t downstreamTaskId;
int64_t checkpointId;
int32_t transId;
} STaskCheckpointReadyRecvInfo;
struct SStreamQueue {
STaosQueue* pQueue;
STaosQall* qall;
@ -203,6 +212,9 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask);
int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, SStreamTask* pTask, int32_t upstreamNodeId,
int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId);
typedef int32_t (*__stream_async_exec_fn_t)(void* param);
int32_t streamMetaAsyncExec(SStreamMeta* pMeta, __stream_async_exec_fn_t fn, void* param, int32_t* code);

View File

@ -182,7 +182,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
taosThreadMutexLock(&pTask->lock);
if (pTask->chkInfo.checkpointId >= checkpointId) {
if (pTask->chkInfo.checkpointId > checkpointId) {
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
@ -190,6 +190,26 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return TSDB_CODE_SUCCESS;
}
if (pTask->chkInfo.checkpointId == checkpointId) {
{ // send checkpoint-ready msg to upstream
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId);
STaskCheckpointReadyInfo info = {0};
initCheckpointReadyInfo(&info, pTask, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
tmsgSendReq(&info.upstreamNodeEpset, &info.msg);
}
stWarn(
"s-task:%s vgId:%d recv already finished checkpoint msg, send checkpoint-ready to upstream:0x%x to resume the "
"interrupted checkpoint",
id, vgId, pBlock->srcTaskId);
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
if (pActiveInfo->activeId != checkpointId) {
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
@ -210,12 +230,12 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
// check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
SStreamChkptReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (p->upStreamTaskId == pBlock->srcTaskId) {
STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
if (p->upstreamTaskId == pBlock->srcTaskId) {
ASSERT(p->checkpointId == checkpointId);
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upStreamTaskId, p->nodeId, p->checkpointId, p->recvTs);
pTask->id.idStr, p->upstreamTaskId, p->nodeId, p->checkpointId, p->recvTs);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
@ -262,8 +282,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointTriggerBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
atomic_add_fetch_32(&pTask->chkInfo.numOfNotReady, 1);
streamProcessCheckpointReadyMsg(pTask);
streamProcessCheckpointReadyMsg(pTask, 0, 0);
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -307,23 +326,47 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
* All down stream tasks have successfully completed the check point task.
* Current stream task is allowed to start to do checkpoint things in ASYNC model.
*/
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask) {
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNodeId, int32_t downstreamTaskId) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
const char* id = pTask->id.idStr;
bool received = false;
int32_t total = streamTaskGetNumOfDownstream(pTask);
taosThreadMutexLock(&pInfo->lock);
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
int32_t notReady = atomic_sub_fetch_32(&pTask->chkInfo.numOfNotReady, 1);
ASSERT(notReady >= 0);
if (notReady == 0) {
stDebug("s-task:%s all downstream tasks have completed the checkpoint, start to do checkpoint for current task",
pTask->id.idStr);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId);
} else {
int32_t total = streamTaskGetNumOfDownstream(pTask);
stDebug("s-task:%s %d/%d downstream tasks are not ready, wait", pTask->id.idStr, notReady, total);
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
for (int32_t i = 0; i < size; ++i) {
STaskCheckpointReadyRecvInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
if (p->downstreamTaskId == downstreamTaskId) {
received = true;
break;
}
}
if (received) {
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, %d/%d downstream not ready", id,
downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total);
} else {
STaskCheckpointReadyRecvInfo info = {.recvTs = taosGetTimestampMs(),
.downstreamTaskId = downstreamTaskId,
.checkpointId = pInfo->activeId,
.transId = pInfo->transId,
.streamId = pTask->id.streamId,
.downstreamNodeId = downstreamNodeId};
taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
}
int32_t notReady = total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
if (notReady == 0) {
stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task",
id);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId);
}
taosThreadMutexUnlock(&pInfo->lock);
return 0;
}
@ -685,7 +728,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
bool recved = false;
for(int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
SStreamChkptReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
if (pInfo->nodeId == pReady->nodeId) {
recved = true;
break;

View File

@ -633,11 +633,11 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
for (int32_t i = 0; i < num; ++i) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pList, i);
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg);
stDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel,
pInfo->upStreamTaskId);
pInfo->upstreamTaskId);
}
taosArrayClear(pList);
@ -657,7 +657,7 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
if (taosArrayGetSize(pList) == 1) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pList, 0);
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, 0);
tmsgSendRsp(&pInfo->msg);
taosArrayClear(pList);
@ -785,7 +785,7 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp
}
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask) {
SStreamChkptReadyInfo info = {
STaskCheckpointReadyInfo info = {
.recvTs = taosGetTimestampMs(), .transId = pReq->transId, .checkpointId = pReq->checkpointId};
streamTaskBuildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, TSDB_CODE_SUCCESS);
@ -797,7 +797,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
if (size > 0) {
ASSERT(size == 1);
SStreamChkptReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0);
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0);
if (pReady->transId == pReq->transId) {
stWarn("s-task:%s repeatly recv checkpoint source msg from mnode, checkpointId:%" PRId64 ", ignore",
pTask->id.idStr, pReq->checkpointId);
@ -816,24 +816,20 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
return TSDB_CODE_SUCCESS;
}
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) {
int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, SStreamTask* pTask, int32_t upstreamNodeId,
int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId) {
int32_t code = 0;
int32_t tlen = 0;
void* buf = NULL;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return TSDB_CODE_SUCCESS;
}
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
SStreamCheckpointReadyMsg req = {0};
req.downstreamNodeId = pTask->pMeta->vgId;
req.downstreamTaskId = pTask->id.taskId;
req.streamId = pTask->id.streamId;
req.checkpointId = checkpointId;
req.childId = pInfo->childId;
req.upstreamNodeId = pInfo->nodeId;
req.upstreamTaskId = pInfo->taskId;
req.childId = childId;
req.upstreamNodeId = upstreamNodeId;
req.upstreamTaskId = upstreamTaskId;
tEncodeSize(tEncodeStreamCheckpointReadyMsg, &req, tlen, code);
if (code < 0) {
@ -858,20 +854,29 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
ASSERT(req.upstreamTaskId != 0);
SStreamChkptReadyInfo info = {
.upStreamTaskId = pInfo->taskId,
.upstreamNodeEpset = pInfo->epSet,
.nodeId = req.upstreamNodeId,
.recvTs = taosGetTimestampMs(),
.checkpointId = req.checkpointId,
};
pReadyInfo->upstreamTaskId = upstreamTaskId;
pReadyInfo->upstreamNodeEpset = *pEpset;
pReadyInfo->nodeId = req.upstreamNodeId;
pReadyInfo->recvTs = taosGetTimestampMs();
pReadyInfo->checkpointId = req.checkpointId;
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
initRpcMsg(&pReadyInfo->msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
return TSDB_CODE_SUCCESS;
}
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId, int32_t index, int64_t checkpointId) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return TSDB_CODE_SUCCESS;
}
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
STaskCheckpointReadyInfo info = {0};
initCheckpointReadyInfo(&info, pTask, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64
":0x%x (vgId:%d) idx:%d, vgId:%d",
pTask->id.idStr, pTask->info.taskLevel, req.streamId, req.upstreamTaskId, req.upstreamNodeId, index,
req.upstreamNodeId);
"-0x%x (vgId:%d) idx:%d",
pTask->id.idStr, pTask->info.taskLevel, pTask->id.streamId, pInfo->taskId, pInfo->nodeId, index);
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
@ -899,7 +904,7 @@ void streamClearChkptReadyMsg(SStreamTask* pTask) {
}
for (int i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); i++) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i);
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pActiveInfo->pReadyMsgList, i);
rpcFreeCont(pInfo->msg.pCont);
}

View File

@ -984,7 +984,8 @@ SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo() {
taosThreadMutexInit(&pInfo->lock, NULL);
pInfo->pDispatchTriggerList = taosArrayInit(4, sizeof(STaskTriggerSendInfo));
pInfo->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
pInfo->pReadyMsgList = taosArrayInit(4, sizeof(STaskCheckpointReadyInfo));
pInfo->pCheckpointReadyRecvList = taosArrayInit(4, sizeof(STaskCheckpointReadyRecvInfo));
return pInfo;
}
@ -996,6 +997,7 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
taosThreadMutexDestroy(&pInfo->lock);
pInfo->pDispatchTriggerList = taosArrayDestroy(pInfo->pDispatchTriggerList);
pInfo->pReadyMsgList = taosArrayDestroy(pInfo->pReadyMsgList);
pInfo->pCheckpointReadyRecvList = taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
if (pInfo->pCheckTmr != NULL) {
taosTmrStop(pInfo->pCheckTmr);
@ -1014,4 +1016,5 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
taosArrayClear(pInfo->pReadyMsgList);
taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList);
}