fix(stream): fix race condition for dispatch msg.
This commit is contained in:
parent
38ee5dc472
commit
5057749790
|
@ -1162,10 +1162,10 @@ void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
|
|||
static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId, int32_t downstreamNodeId) {
|
||||
stDebug("s-task:%s destroy dispatch msg:%p", pTask->id.idStr, pTask->msgInfo.pData);
|
||||
|
||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
|
||||
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
|
||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||
|
||||
clearBufferedDispatchMsg(pTask);
|
||||
|
||||
// put data into inputQ of current task is also allowed
|
||||
if (pTask->inputq.status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
|
@ -1189,13 +1189,24 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, const char* id) {
|
||||
static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, const char* id) {
|
||||
int32_t numOfRsp = 0;
|
||||
bool alreadySet = false;
|
||||
bool updated = false;
|
||||
bool allRsp = false;
|
||||
*pNotRsp = 0;
|
||||
|
||||
taosThreadMutexLock(&pMsgInfo->lock);
|
||||
for (int32_t j = 0; j < taosArrayGetSize(pMsgInfo->pSendInfo); ++j) {
|
||||
int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo);
|
||||
|
||||
for(int32_t i = 0; i < numOfDispatchBranch; ++i) {
|
||||
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i);
|
||||
if (pEntry->rspTs != -1) {
|
||||
numOfRsp += 1;
|
||||
}
|
||||
}
|
||||
|
||||
for (int32_t j = 0; j < numOfDispatchBranch; ++j) {
|
||||
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
||||
if (pEntry->nodeId == vgId) {
|
||||
ASSERT(!alreadySet);
|
||||
|
@ -1203,18 +1214,20 @@ static int32_t setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int3
|
|||
pEntry->status = code;
|
||||
alreadySet = true;
|
||||
updated = true;
|
||||
stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d", id, now, code, j);
|
||||
}
|
||||
|
||||
if (pEntry->rspTs != -1) {
|
||||
numOfRsp += 1;
|
||||
|
||||
stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j,
|
||||
numOfRsp, numOfDispatchBranch);
|
||||
}
|
||||
}
|
||||
|
||||
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||
ASSERT(updated);
|
||||
*pNotRsp = numOfDispatchBranch - numOfRsp;
|
||||
allRsp = (numOfRsp == numOfDispatchBranch);
|
||||
|
||||
return numOfRsp;
|
||||
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||
|
||||
ASSERT(updated);
|
||||
return allRsp;
|
||||
}
|
||||
|
||||
bool isDispatchRspTimeout(SDispatchEntry* pEntry, int64_t now) {
|
||||
|
@ -1240,7 +1253,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
int32_t vgId = pTask->pMeta->vgId;
|
||||
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
int32_t totalRsp = 0;
|
||||
bool allRsp = false;
|
||||
int32_t notRsp = 0;
|
||||
|
||||
taosThreadMutexLock(&pMsgInfo->lock);
|
||||
int32_t msgId = pMsgInfo->msgId;
|
||||
|
@ -1269,18 +1283,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
||||
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
|
||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
||||
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
|
||||
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id);
|
||||
} else {
|
||||
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
||||
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, id);
|
||||
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, id);
|
||||
}
|
||||
|
||||
} else { // code == 0
|
||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
|
||||
// block the input of current task, to push pressure to upstream
|
||||
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, id);
|
||||
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, id);
|
||||
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
|
||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
} else {
|
||||
|
@ -1292,7 +1306,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||
}
|
||||
|
||||
totalRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, id);
|
||||
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id);
|
||||
|
||||
{
|
||||
bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||
|
@ -1317,13 +1331,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
}
|
||||
|
||||
int32_t notRsp = taosArrayGetSize(pMsgInfo->pSendInfo) - totalRsp;
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (notRsp > 0) {
|
||||
if (!allRsp) {
|
||||
stDebug(
|
||||
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, "
|
||||
"waiting "
|
||||
"for %d rsp",
|
||||
"waiting for %d rsp",
|
||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), notRsp);
|
||||
} else {
|
||||
stDebug(
|
||||
|
@ -1337,7 +1349,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
}
|
||||
|
||||
// all msg rsp already, continue
|
||||
if (notRsp == 0) {
|
||||
if (allRsp) {
|
||||
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
||||
|
||||
// we need to re-try send dispatch msg to downstream tasks
|
||||
|
|
Loading…
Reference in New Issue