fix(stream): add monitor for checkpoint-ready msg recv status.

This commit is contained in:
Haojun Liao 2024-06-03 16:45:37 +08:00
parent 927e2237de
commit 3b3ed1c30c
8 changed files with 273 additions and 100 deletions

View File

@ -271,7 +271,6 @@ typedef struct SCheckpointInfo {
int64_t checkpointTime; // latest checkpoint time
int64_t processedVer;
int64_t nextProcessVer; // current offset in WAL, not serialize it
int32_t numOfNotReady;
SActiveCheckpointInfo* pActiveInfo;
int64_t msgVer;
} SCheckpointInfo;
@ -753,7 +752,8 @@ tmr_h streamTimerGetInstance();
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNodeId, int32_t downstreamTaskId);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId, int32_t downstreamTaskId);
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
int32_t streamAlignTransferState(SStreamTask* pTask);

View File

@ -1229,6 +1229,15 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
// downstream task has complete the stream task checkpoint procedure, let's start the handle the rsp by execute task
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SRetrieveChkptTriggerReq* pReq = (SRetrieveChkptTriggerReq*) pMsg->pCont;
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqError("vgId:%d not leader, ignore the retrieve checkpoint-trigger msg from 0x%x", vgId,
(int32_t)pReq->downstreamTaskId);
return TSDB_CODE_STREAM_NOT_LEADER;
}
return tqStreamTaskProcessCheckpointReadyMsg(pTq->pStreamMeta, pMsg);
}

View File

@ -19,6 +19,13 @@
typedef struct SMStreamCheckpointReadyRspMsg {
SMsgHead head;
int64_t streamId;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
int32_t downstreamTaskId;
int32_t downstreamNodeId;
int64_t checkpointId;
int32_t transId;
} SMStreamCheckpointReadyRspMsg;
static int32_t doProcessDummyRspMsg(SStreamMeta* pMeta, SRpcMsg* pMsg);
@ -486,21 +493,27 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.upstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, it may have been destroyed already", vgId, req.downstreamTaskId);
return code;
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
tqDebug("vgId:%d s-task:%s received the checkpoint ready msg from task:0x%x (vgId:%d), handle it", vgId,
tqDebug("vgId:%d s-task:%s received the checkpoint-ready msg from task:0x%x (vgId:%d), handle it", vgId,
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
streamProcessCheckpointReadyMsg(pTask, req.downstreamTaskId, req.downstreamNodeId);
streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId);
streamMetaReleaseTask(pMeta, pTask);
{ // send checkpoint ready rsp
SRpcMsg rsp = {.code = 0, .info = pMsg->info, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
rsp.pCont = rpcMallocCont(rsp.contLen);
SMsgHead* pHead = rsp.pCont;
pHead->vgId = htonl(req.downstreamNodeId);
SMStreamCheckpointReadyRspMsg* pReadyRsp = rpcMallocCont(sizeof(SMStreamCheckpointReadyRspMsg));
pReadyRsp->upstreamTaskId = req.upstreamTaskId;
pReadyRsp->upstreamNodeId = req.upstreamNodeId;
pReadyRsp->downstreamTaskId = req.downstreamTaskId;
pReadyRsp->downstreamNodeId = req.downstreamNodeId;
pReadyRsp->checkpointId = req.checkpointId;
pReadyRsp->streamId = req.streamId;
pReadyRsp->head.vgId = htonl(req.downstreamNodeId);
SRpcMsg rsp = {.code = 0, .info = pMsg->info, .pCont = pReadyRsp, .contLen = sizeof(SMStreamCheckpointReadyRspMsg)};
tmsgSendRsp(&rsp);
pMsg->info.handle = NULL; // disable auto rsp
@ -1066,5 +1079,16 @@ int32_t tqStreamProcessStreamHbRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return d
int32_t tqStreamProcessReqCheckpointRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { return doProcessDummyRspMsg(pMeta, pMsg); }
int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
return doProcessDummyRspMsg(pMeta, pMsg);
SMStreamCheckpointReadyRspMsg* pRsp = pMsg->pCont;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->downstreamTaskId);
if (pTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling checkpoint-ready msg, it may have been dropped",
pRsp->downstreamNodeId, pRsp->downstreamTaskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
streamTaskProcessCheckpointReadyRsp(pTask, pRsp->upstreamTaskId, pRsp->checkpointId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}

View File

@ -64,7 +64,9 @@ struct SActiveCheckpointInfo {
int8_t allUpstreamTriggerRecv;
SArray* pCheckpointReadyRecvList; // SArray<STaskDownstreamReadyInfo>
int32_t checkCounter;
tmr_h pCheckTmr;
tmr_h pChkptTriggerTmr;
int32_t sendReadyCheckCounter;
tmr_h pSendReadyMsgTmr;
};
typedef struct {
@ -99,12 +101,13 @@ struct STokenBucket {
typedef struct {
int32_t upstreamTaskId;
SEpSet upstreamNodeEpset;
int32_t nodeId;
int32_t upstreamNodeId;
int32_t transId;
SRpcMsg msg;
int32_t childId;
SRpcMsg msg; // for mnode checkpoint-source rsp
int64_t checkpointId;
int64_t recvTs;
int32_t sendToUpstream;
int32_t sendCompleted;
} STaskCheckpointReadyInfo;
typedef struct {
@ -213,8 +216,10 @@ int32_t streamTaskDownloadCheckpointData(const char* id, char* path);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask);
int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, SStreamTask* pTask, int32_t upstreamNodeId,
int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId);
int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstreamNodeId, int32_t upstreamTaskId,
int32_t childId, SEpSet* pEpset, int64_t checkpointId);
int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId,
int64_t checkpointId, SRpcMsg* pMsg);
typedef int32_t (*__stream_async_exec_fn_t)(void* param);

View File

@ -111,7 +111,6 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
pTask->chkInfo.pActiveInfo->transId = pReq->transId;
pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask);
pTask->chkInfo.startTs = taosGetTimestampMs();
pTask->execInfo.checkpoint += 1;
@ -192,12 +191,11 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (pTask->chkInfo.checkpointId == checkpointId) {
{ // send checkpoint-ready msg to upstream
SRpcMsg msg ={0};
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId);
STaskCheckpointReadyInfo info = {0};
initCheckpointReadyInfo(&info, pTask, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
tmsgSendReq(&info.upstreamNodeEpset, &info.msg);
initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
tmsgSendReq(&pInfo->epSet, &msg);
}
stWarn(
@ -235,7 +233,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
ASSERT(p->checkpointId == checkpointId);
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upstreamTaskId, p->nodeId, p->checkpointId, p->recvTs);
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
@ -267,10 +265,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pActiveInfo->pCheckTmr == NULL) {
pActiveInfo->pCheckTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer);
if (pActiveInfo->pChkptTriggerTmr == NULL) {
pActiveInfo->pChkptTriggerTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer);
} else {
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr);
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr);
}
}
@ -282,7 +280,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointTriggerBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
streamProcessCheckpointReadyMsg(pTask, 0, 0);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId);
streamFreeQitem((SStreamQueueItem*)pBlock);
}
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -308,11 +306,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
streamTaskBuildCheckpoint(pTask);
} else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
// set the needed checked downstream tasks, only when all downstream tasks do checkpoint complete, this task
// can start local checkpoint procedure
pTask->chkInfo.numOfNotReady = streamTaskGetNumOfDownstream(pTask);
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
@ -326,19 +319,31 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
* All down stream tasks have successfully completed the check point task.
* Current stream task is allowed to start to do checkpoint things in ASYNC model.
*/
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNodeId, int32_t downstreamTaskId) {
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
int32_t downstreamTaskId) {
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE || pTask->info.taskLevel == TASK_LEVEL__AGG);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
const char* id = pTask->id.idStr;
bool received = false;
int32_t total = streamTaskGetNumOfDownstream(pTask);
ASSERT(total > 0);
// only one task in this stream
if (total == 0 && pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pInfo->activeId, pInfo->transId);
taosThreadMutexUnlock(&pInfo->lock);
return 0;
// 1. not in checkpoint status now
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
if (pStat->state != TASK_STATUS__CK) {
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
// 2. expired checkpoint-ready msg
if (pTask->chkInfo.checkpointId > checkpointId) {
// discard it directly
return -1;
}
// invalid checkpoint-ready msg
if (pInfo->activeId != checkpointId) {
return -1;
}
taosThreadMutexLock(&pInfo->lock);
@ -354,7 +359,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNo
}
if (received) {
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, %d/%d downstream not ready", id,
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
downstreamTaskId, (int32_t)(total - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), total);
} else {
STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
@ -377,9 +382,38 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int32_t downstreamNo
return 0;
}
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t now = taosGetTimestampMs();
int32_t numOfConfirmed = 0;
taosThreadMutexLock(&pInfo->lock);
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
pReadyInfo->sendCompleted = 1;
stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
pTask->id.idStr, upstreamTaskId, checkpointId, now);
break;
}
}
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
if (pReadyInfo->sendCompleted == 1) {
numOfConfirmed += 1;
}
}
stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
numOfConfirmed, checkpointId);
taosThreadMutexUnlock(&pInfo->lock);
return TSDB_CODE_SUCCESS;
}
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
pTask->chkInfo.startTs = 0; // clear the recorded start time
pTask->chkInfo.numOfNotReady = 0;
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
@ -703,7 +737,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
}
if (++pActiveInfo->checkCounter < 100) {
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr);
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr);
return;
}
@ -736,7 +770,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
bool recved = false;
for(int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
if (pInfo->nodeId == pReady->nodeId) {
if (pInfo->nodeId == pReady->upstreamNodeId) {
recved = true;
break;
}
@ -756,7 +790,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// check every 100ms
if (size > 0) {
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pCheckTmr);
taosTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr);
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);

View File

@ -623,28 +623,163 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
return TSDB_CODE_SUCCESS;
}
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
SArray* pList = pTask->chkInfo.pActiveInfo->pReadyMsgList;
int32_t initCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamNodeId, int32_t upstreamTaskId, int32_t childId,
int64_t checkpointId, SRpcMsg* pMsg) {
int32_t code = 0;
int32_t tlen = 0;
void* buf = NULL;
taosThreadMutexLock(&pTask->chkInfo.pActiveInfo->lock);
SStreamCheckpointReadyMsg req = {0};
req.downstreamNodeId = pTask->pMeta->vgId;
req.downstreamTaskId = pTask->id.taskId;
req.streamId = pTask->id.streamId;
req.checkpointId = checkpointId;
req.childId = childId;
req.upstreamNodeId = upstreamNodeId;
req.upstreamTaskId = upstreamTaskId;
tEncodeSize(tEncodeStreamCheckpointReadyMsg, &req, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
return -1;
}
((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) {
rpcFreeCont(buf);
return code;
}
tEncoderClear(&encoder);
initRpcMsg(pMsg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
return TSDB_CODE_SUCCESS;
}
static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr;
// check the status every 100ms
if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
if (++pActiveInfo->sendReadyCheckCounter < 100) {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
return;
}
pActiveInfo->sendReadyCheckCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id);
taosThreadMutexLock(&pActiveInfo->lock);
SArray* pList = pActiveInfo->pReadyMsgList;
SArray* pNotRspList = taosArrayInit(4, sizeof(int32_t));
int32_t num = taosArrayGetSize(pList);
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
for (int32_t i = 0; i < num; ++i) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg);
if (pInfo->sendCompleted == 1) {
continue;
}
stDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel,
taosArrayPush(pNotRspList, &pInfo->upstreamTaskId);
stDebug("s-task:%s vgId:%d level:%d checkpoint-ready rsp from upstream:0x%x not confirmed yet", id, vgId,
pTask->info.taskLevel, pInfo->upstreamTaskId);
}
int32_t checkpointId = pActiveInfo->activeId;
int32_t notRsp = taosArrayGetSize(pNotRspList);
if (notRsp > 0) { // send checkpoint-ready msg again
for (int32_t i = 0; i < taosArrayGetSize(pNotRspList); ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pNotRspList, i);
for (int32_t j = 0; j < num; ++j) {
STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pList, j);
if (taskId == pReadyInfo->upstreamTaskId) { // send msg again
SRpcMsg msg = {0};
initCheckpointReadyMsg(pTask, pReadyInfo->upstreamNodeId, pReadyInfo->upstreamTaskId, pReadyInfo->childId,
checkpointId, &msg);
tmsgSendReq(&pReadyInfo->upstreamNodeEpset, &msg);
stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x again", id, pTask->info.taskLevel,
pReadyInfo->upstreamTaskId);
}
}
}
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
taosThreadMutexUnlock(&pActiveInfo->lock);
} else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), quit from timer and clear "
"checkpoint-ready msg, ref:%d",
id, vgId, ref);
streamClearChkptReadyMsg(pTask);
taosThreadMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
}
taosArrayDestroy(pNotRspList);
}
// this function is usually invoked by sink/agg task
int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
const char* id = pTask->id.idStr;
SArray* pList = pActiveInfo->pReadyMsgList;
taosThreadMutexLock(&pActiveInfo->lock);
int32_t num = taosArrayGetSize(pList);
ASSERT(taosArrayGetSize(pTask->upstreamInfo.pList) == num);
for (int32_t i = 0; i < num; ++i) {
STaskCheckpointReadyInfo* pInfo = taosArrayGet(pList, i);
SRpcMsg msg = {0};
initCheckpointReadyMsg(pTask, pInfo->upstreamNodeId, pInfo->upstreamTaskId, pInfo->childId, pInfo->checkpointId, &msg);
tmsgSendReq(&pInfo->upstreamNodeEpset, &msg);
stDebug("s-task:%s level:%d checkpoint-ready msg sent to upstream:0x%x", id, pTask->info.taskLevel,
pInfo->upstreamTaskId);
}
taosArrayClear(pList);
taosThreadMutexUnlock(&pActiveInfo->lock);
stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num);
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);
stDebug("s-task:%s level:%d checkpoint ready msg sent to all %d upstreams", pTask->id.idStr, pTask->info.taskLevel,
num);
// start to check if checkpoint ready msg has successfully received by upstream tasks.
pActiveInfo->pSendReadyMsgTmr = NULL;
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask);
if (pActiveInfo->pSendReadyMsgTmr == NULL) {
pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer);
} else {
taosTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr);
}
return TSDB_CODE_SUCCESS;
}
@ -816,51 +951,17 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
return TSDB_CODE_SUCCESS;
}
int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, SStreamTask* pTask, int32_t upstreamNodeId,
int32_t upstreamTaskId, int32_t childId, SEpSet* pEpset, int64_t checkpointId) {
int32_t code = 0;
int32_t tlen = 0;
void* buf = NULL;
SStreamCheckpointReadyMsg req = {0};
req.downstreamNodeId = pTask->pMeta->vgId;
req.downstreamTaskId = pTask->id.taskId;
req.streamId = pTask->id.streamId;
req.checkpointId = checkpointId;
req.childId = childId;
req.upstreamNodeId = upstreamNodeId;
req.upstreamTaskId = upstreamTaskId;
tEncodeSize(tEncodeStreamCheckpointReadyMsg, &req, tlen, code);
if (code < 0) {
return -1;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
if (buf == NULL) {
return -1;
}
((SMsgHead*)buf)->vgId = htonl(req.upstreamNodeId);
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
SEncoder encoder;
tEncoderInit(&encoder, abuf, tlen);
if ((code = tEncodeStreamCheckpointReadyMsg(&encoder, &req)) < 0) {
rpcFreeCont(buf);
return code;
}
tEncoderClear(&encoder);
ASSERT(req.upstreamTaskId != 0);
int32_t initCheckpointReadyInfo(STaskCheckpointReadyInfo* pReadyInfo, int32_t upstreamNodeId, int32_t upstreamTaskId,
int32_t childId, SEpSet* pEpset, int64_t checkpointId) {
ASSERT(upstreamTaskId != 0);
pReadyInfo->upstreamTaskId = upstreamTaskId;
pReadyInfo->upstreamNodeEpset = *pEpset;
pReadyInfo->nodeId = req.upstreamNodeId;
pReadyInfo->upstreamNodeId = upstreamNodeId;
pReadyInfo->recvTs = taosGetTimestampMs();
pReadyInfo->checkpointId = req.checkpointId;
pReadyInfo->checkpointId = checkpointId;
pReadyInfo->childId = childId;
initRpcMsg(&pReadyInfo->msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
return TSDB_CODE_SUCCESS;
}
@ -872,7 +973,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
SStreamUpstreamEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
STaskCheckpointReadyInfo info = {0};
initCheckpointReadyInfo(&info, pTask, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
initCheckpointReadyInfo(&info, pInfo->nodeId, pInfo->taskId, pInfo->childId, &pInfo->epSet, checkpointId);
stDebug("s-task:%s (level:%d) prepare checkpoint-ready msg to upstream s-task:0x%" PRIx64
"-0x%x (vgId:%d) idx:%d",

View File

@ -1121,6 +1121,7 @@ void metaHbToMnode(void* param, void* tmrId) {
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) {
stError("invalid rid:%" PRId64 " failed to acquired stream-meta", rid);
return;
}
@ -1341,8 +1342,8 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
streamMetaWUnLock(pMeta);
if (isLeader) {
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb", pMeta->vgId,
prevStage, stage, isLeader);
stInfo("vgId:%d update meta stage:%" PRId64 ", prev:%" PRId64 " leader:%d, start to send Hb, rid:%" PRId64,
pMeta->vgId, prevStage, stage, isLeader, pMeta->rid);
streamMetaStartHb(pMeta);
} else {
stInfo("vgId:%d update meta stage:%" PRId64 " prev:%" PRId64 " leader:%d sendMsg beforeClosing:%d", pMeta->vgId,

View File

@ -999,9 +999,9 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
pInfo->pReadyMsgList = taosArrayDestroy(pInfo->pReadyMsgList);
pInfo->pCheckpointReadyRecvList = taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
if (pInfo->pCheckTmr != NULL) {
taosTmrStop(pInfo->pCheckTmr);
pInfo->pCheckTmr = NULL;
if (pInfo->pChkptTriggerTmr != NULL) {
taosTmrStop(pInfo->pChkptTriggerTmr);
pInfo->pChkptTriggerTmr = NULL;
}
taosMemoryFree(pInfo);
@ -1014,7 +1014,6 @@ void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false;
taosArrayClear(pInfo->pReadyMsgList);
taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList);
}