fix(stream): clear the msgId if send success, and handle the race condition problem.

This commit is contained in:
Haojun Liao 2024-06-28 17:03:14 +08:00
parent 2345f6cf7f
commit 1ffec769b8
1 changed files with 25 additions and 16 deletions

View File

@ -40,6 +40,20 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->contLen = contLen;
}
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
pInfo->startTs = taosGetTimestampMs();
pInfo->rspTs = -1;
pInfo->msgId = msgId;
}
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->startTs = -1;
pInfo->msgId = -1;
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; }
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
pReq->streamId = pTask->id.streamId;
@ -225,12 +239,15 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) {
destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
}
taosThreadMutexLock(&pMsgInfo->lock);
pMsgInfo->checkpointId = -1;
pMsgInfo->transId = -1;
pMsgInfo->pData = NULL;
pMsgInfo->dispatchMsgType = 0;
taosThreadMutexLock(&pMsgInfo->lock);
clearDispatchInfo(pMsgInfo);
taosArrayClear(pTask->msgInfo.pSendInfo);
taosThreadMutexUnlock(&pMsgInfo->lock);
}
@ -643,20 +660,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
return 0;
}
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
pInfo->startTs = taosGetTimestampMs();
pInfo->rspTs = -1;
pInfo->msgId = msgId;
}
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->startTs = -1;
pInfo->msgId = -1;
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; }
int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
@ -699,7 +702,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
type == STREAM_INPUT__TRANS_STATE);
pTask->execInfo.dispatch += 1;
taosThreadMutexLock(&pTask->msgInfo.lock);
initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
taosThreadMutexUnlock(&pTask->msgInfo.lock);
int32_t code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) {
@ -1222,10 +1228,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int32_t msgId = pMsgInfo->msgId;
int64_t now = taosGetTimestampMs();
int32_t totalRsp = 0;
taosThreadMutexLock(&pMsgInfo->lock);
int32_t msgId = pMsgInfo->msgId;
taosThreadMutexUnlock(&pMsgInfo->lock);
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,