refactor: do some internal refactor and set the pActiveInfo for rsma.

This commit is contained in:
Haojun Liao 2024-05-29 00:08:29 +08:00
parent 8d54d45054
commit d1d868f239
14 changed files with 104 additions and 39 deletions

View File

@ -37,6 +37,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);

View File

@ -181,6 +181,14 @@ typedef struct SRetrieveChkptTriggerReq {
int64_t downstreamTaskId;
} SRetrieveChkptTriggerReq;
typedef struct SCheckpointTriggerRsp {
int64_t streamId;
int64_t checkpointId;
int32_t upstreamTaskId;
int32_t taskId;
int32_t transId;
} SCheckpointTriggerRsp;
typedef struct {
SMsgHead head;
int64_t streamId;

View File

@ -678,8 +678,7 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeI
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal);
void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask);
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t checkpointType, int32_t dstTaskId, int32_t vgId,
SEpSet* pEpset);
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo);
int32_t streamQueueGetNumOfItems(const SStreamQueue* pQueue);
@ -753,6 +752,7 @@ tmr_h streamTimerGetInstance();
// checkpoint
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp);
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask);
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask);
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg);
@ -764,6 +764,7 @@ int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRp
int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId,
SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq);
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
// stream task state machine, and event handling
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);

View File

@ -136,6 +136,10 @@ int32_t sndProcessStreamMsg(SSnode *pSnode, SRpcMsg *pMsg) {
return tqStreamProcessReqCheckpointRsp(pSnode->pMeta, pMsg);
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqStreamProcessCheckpointReadyRsp(pSnode->pMeta, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER:
return tqStreamTaskProcessRetrieveTriggerReq(pSnode->pMeta, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
return tqStreamTaskProcessRetrieveTriggerRsp(pSnode->pMeta, pMsg);
default:
sndError("invalid snode msg:%d", pMsg->msgType);
ASSERT(0);

View File

@ -254,7 +254,8 @@ int tqScanWalAsync(STQ* pTq, bool ckPause);
int32_t tqStopStreamTasksAsync(STQ* pTq);
int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp);
int32_t tqProcessTaskCheckpointReadyMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveTriggerMsg(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg);

View File

@ -298,7 +298,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pStreamTask->chkInfo.checkpointId = streamMetaGetLatestCheckpointId(pStreamTask->pMeta);
tdRSmaTaskInit(pStreamTask->pMeta, pItem, &pStreamTask->id);
pStreamTask->status.pSM = streamCreateStateMachine(pStreamTask);
pStreamTask->chkInfo.pActiveInfo = streamTaskCreateActiveChkptInfo();
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;

View File

@ -1240,10 +1240,14 @@ int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskRetrieveTriggerMsg(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRetrieveTriggerReq(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessRetrieveTriggerReq(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskRetrieveTriggerRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamTaskProcessRetrieveTriggerRsp(pTq->pStreamMeta, pMsg);
}
// this function is needed, do not try to remove it.
int32_t tqProcessStreamHbRsp(STQ* pTq, SRpcMsg* pMsg) {
return tqStreamProcessStreamHbRsp(pTq->pStreamMeta, pMsg);

View File

@ -291,8 +291,7 @@ bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems
}
} else {
walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
tqError("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id,
pTask->chkInfo.nextProcessVer);
tqTrace("s-task:%s append input queue failed, code:too many items, ver:%" PRId64, id, pTask->chkInfo.nextProcessVer);
break;
}
}

View File

@ -887,9 +887,9 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) {
// re-send the lost checkpoint-trigger msg to downstream task
SEpSet* pEpset = streamTaskGetDownstreamEpInfo(pTask, pReq->downstreamTaskId);
streamTaskSendCheckpointTriggerMsg(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->downstreamTaskId,
pReq->downstreamNodeId, pEpset);
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, &pMsg->info);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
@ -914,6 +914,25 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SCheckpointTriggerRsp* pRsp = pMsg->pCont;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId);
if (pTask == NULL) {
tqError(
"vgId:%d process retrieve checkpoint-trigger, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pRsp->taskId);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
tqDebug("s-task:%s recv re-send checkpoint-trigger msg from upstream:0x%x, checkpointId:%"PRId64", transId:%d",
pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg;

View File

@ -843,7 +843,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
case TDMT_STREAM_TASK_CHECKPOINT_READY:
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER:
return tqProcessTaskRetrieveTriggerMsg(pVnode->pTq, pMsg);
return tqProcessTaskRetrieveTriggerReq(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_HEARTBEAT_RSP:
return tqProcessStreamHbRsp(pVnode->pTq, pMsg);
case TDMT_MND_STREAM_REQ_CHKPT_RSP:
@ -851,7 +851,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
case TDMT_STREAM_TASK_CHECKPOINT_READY_RSP:
return tqProcessTaskCheckpointReadyRsp(pVnode->pTq, pMsg);
case TDMT_STREAM_RETRIEVE_TRIGGER_RSP:
return tqProcessTaskCheckpointReadyMsg(pVnode->pTq, pMsg);
return tqProcessTaskRetrieveTriggerRsp(pVnode->pTq, pMsg);
case TDMT_VND_GET_STREAM_PROGRESS:
return tqStreamProgressRetrieveReq(pVnode->pTq, pMsg);
default:

View File

@ -246,7 +246,7 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
}
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t len;

View File

@ -118,23 +118,40 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
}
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t checkpointType, int32_t dstTaskId, int32_t vgId,
SEpSet* pEpset) {
SStreamDataBlock* pChkpoint = createChkptTriggerBlock(pTask, checkpointType);
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
ASSERT(pTask->info.taskLevel != TASK_LEVEL__SOURCE);
pChkpoint->srcTaskId = pTask->id.taskId;
pChkpoint->srcVgId = pTask->pMeta->vgId;
int32_t code = streamTaskBuildAndSendTriggerMsg(pTask, pChkpoint, dstTaskId, vgId, pEpset);
if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s build and send checkpoint-trigger dispatch msg succ, stage:%" PRId64, pTask->id.idStr,
pTask->pMeta->stage);
} else {
// todo handle send data failure
stError("s-task:%s failed to build and send trigger msg", pTask->id.idStr);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
if (pInfo->transId != pRsp->transId || pInfo->activeId != pRsp->checkpointId) {
// todo handle error
return -1;
}
return code;
taosThreadMutexLock(&pTask->lock);
SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state != TASK_STATUS__CK) {
// todo handle error
taosThreadMutexUnlock(&pTask->lock);
return -1;
}
taosThreadMutexUnlock(&pTask->lock);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, SRpcHandleInfo* pRpcInfo) {
SCheckpointTriggerRsp* pRsp = rpcMallocCont(sizeof(SCheckpointTriggerRsp));
pRsp->streamId = pTask->id.streamId;
pRsp->upstreamTaskId = pTask->id.taskId;
pRsp->taskId = dstTaskId;
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
pRsp->transId = pTask->chkInfo.pActiveInfo->transId;
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = sizeof(SCheckpointTriggerRsp), .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
}
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
@ -617,8 +634,15 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
int32_t vgId = pTask->pMeta->vgId;
const char* pId = pTask->id.idStr;
int32_t size = taosArrayGetSize(pNotSendList);
int32_t numOfUpstream = streamTaskGetNumOfUpstream(pTask);
stDebug("s-task:%s start to send trigger-retrieve msg to %d upstream(s)", pId, size);
if (size <= 0) {
stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
return code;
}
stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size,
numOfUpstream);
for (int32_t i = 0; i < size; i++) {
SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
@ -643,21 +667,23 @@ int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
stDebug("s-task:%s vgId:%d send retrieve msg to 0x%x checkpointId:%" PRId64, pId, vgId, pUpstreamTask->taskId,
pReq->checkpointId);
stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId, vgId,
pUpstreamTask->taskId, pUpstreamTask->nodeId, pReq->checkpointId);
}
return TSDB_CODE_SUCCESS;
}
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
int64_t now = taosGetTimestampMs();
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
if (pStatus->state != TASK_STATUS__CK) {
return false;
}
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
taosThreadMutexLock(&pInfo->lock);
if (!pInfo->dispatchTrigger) {
taosThreadMutexUnlock(&pInfo->lock);
@ -671,12 +697,15 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
}
// has send trigger msg to downstream node,
double before = (now - pSendInfo->sendTs) / 1000.0;
if (pSendInfo->recved) {
stWarn("s-task:%s checkpoint-trigger msg send at:%"PRId64" and recv confirmed, checkpointId:%"PRId64 ", transId:%d",
pTask->id.idStr, pSendInfo->sendTs, pInfo->activeId, pInfo->transId);
stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64
"(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId);
} else {
stWarn("s-task:%s checkpoint-trigger send at:%"PRId64", checkpointId:%"PRId64", transId:%d", pTask->id.idStr,
pSendInfo->sendTs, pInfo->activeId, pInfo->transId);
stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64
", transId:%d",
id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
}
taosThreadMutexUnlock(&pInfo->lock);

View File

@ -480,7 +480,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
void streamRetryDispatchData(SStreamTask* pTask, int64_t waitDuration) {
pTask->msgInfo.retryCount++;
stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
stTrace("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d, retryTimes:%d", pTask->id.idStr,
waitDuration, pTask->execInfo.dispatch, pTask->msgInfo.retryCount);
if (pTask->msgInfo.pTimer != NULL) {

View File

@ -25,7 +25,6 @@ static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdated);
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet, bool* pUpdate);
static void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo);
static SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
int32_t childId = taosArrayGetSize(pArray);