fix(stream): fix race condition.
This commit is contained in:
parent
e49409a11e
commit
46b17aa9f1
|
@ -37,10 +37,7 @@ void sndEnqueueStreamDispatch(SSnode *pSnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
SStreamTask *pTask = streamMetaAcquireTask(pSnode->pMeta, req.streamId, req.taskId);
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
SRpcMsg rsp = {
|
SRpcMsg rsp = { .info = pMsg->info, .code = 0 };
|
||||||
.info = pMsg->info,
|
|
||||||
.code = 0,
|
|
||||||
};
|
|
||||||
streamProcessDispatchMsg(pTask, &req, &rsp);
|
streamProcessDispatchMsg(pTask, &req, &rsp);
|
||||||
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
streamMetaReleaseTask(pSnode->pMeta, pTask);
|
||||||
rpcFreeCont(pMsg->pCont);
|
rpcFreeCont(pMsg->pCont);
|
||||||
|
|
|
@ -213,38 +213,41 @@ int32_t streamTaskEnqueueRetrieve(SStreamTask* pTask, SStreamRetrieveReq* pReq,
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, SRpcMsg* pRsp) {
|
||||||
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64, pTask->id.idStr,
|
|
||||||
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen);
|
|
||||||
int32_t status = 0;
|
int32_t status = 0;
|
||||||
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
stDebug("s-task:%s receive dispatch msg from taskId:0x%x(vgId:%d), msgLen:%" PRId64 ", msgId:%d", id,
|
||||||
|
pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->totalLen, pReq->msgId);
|
||||||
|
|
||||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
|
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, pReq->upstreamTaskId);
|
||||||
ASSERT(pInfo != NULL);
|
ASSERT(pInfo != NULL);
|
||||||
|
|
||||||
if (!pTask->pMeta->leader) {
|
if (!pTask->pMeta->leader) {
|
||||||
stError("s-task:%s task on follower received dispatch msgs, should discard it, not now", pTask->id.idStr);
|
stError("s-task:%s task on follower received dispatch msgs, should discard it, not now", id);
|
||||||
}
|
status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
|
} else {
|
||||||
// upstream task has restarted/leader-follower switch/transferred to other dnodes
|
|
||||||
if (pReq->stage > pInfo->stage) {
|
if (pReq->stage > pInfo->stage) {
|
||||||
|
// upstream task has restarted/leader-follower switch/transferred to other dnodes
|
||||||
stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64
|
stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64
|
||||||
", current:%" PRId64 " dispatch msg rejected",
|
", current:%" PRId64 " dispatch msg rejected",
|
||||||
pTask->id.idStr, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage);
|
id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage);
|
||||||
status = TASK_INPUT_STATUS__BLOCKED;
|
status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
} else {
|
} else {
|
||||||
if (!pInfo->dataAllowed) {
|
if (!pInfo->dataAllowed) {
|
||||||
stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", pTask->id.idStr,
|
stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId);
|
||||||
pReq->upstreamTaskId);
|
|
||||||
status = TASK_INPUT_STATUS__BLOCKED;
|
status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
} else {
|
} else {
|
||||||
// This task has received the checkpoint req from the upstream task, from which all the messages should be blocked
|
// This task has received the checkpoint req from the upstream task, from which all the messages should be
|
||||||
|
// blocked
|
||||||
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
if (pReq->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
streamTaskCloseUpstreamInput(pTask, pReq->upstreamTaskId);
|
||||||
stDebug("s-task:%s close inputQ for upstream:0x%x", pTask->id.idStr, pReq->upstreamTaskId);
|
stDebug("s-task:%s close inputQ for upstream:0x%x, msgId:%d", id, pReq->upstreamTaskId, pReq->msgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
status = streamTaskAppendInputBlocks(pTask, pReq);
|
status = streamTaskAppendInputBlocks(pTask, pReq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// disable the data from upstream tasks
|
// disable the data from upstream tasks
|
||||||
int8_t st = pTask->status.taskStatus;
|
int8_t st = pTask->status.taskStatus;
|
||||||
|
@ -256,7 +259,7 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
||||||
// do send response with the input status
|
// do send response with the input status
|
||||||
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
|
int32_t code = buildDispatchRsp(pTask, pReq, status, &pRsp->pCont);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", pTask->id.idStr, pReq->msgId, tstrerror(code));
|
stError("s-task:%s failed to build dispatch rsp, msgId:%d, code:%s", id, pReq->msgId, tstrerror(code));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1074,27 +1074,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t leftRsp = 0;
|
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt);
|
|
||||||
|
|
||||||
leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
|
||||||
ASSERT(leftRsp >= 0);
|
|
||||||
|
|
||||||
if (leftRsp > 0) {
|
|
||||||
stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp",
|
|
||||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
|
|
||||||
} else {
|
|
||||||
stDebug(
|
|
||||||
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp",
|
|
||||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d",
|
|
||||||
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
// dispatch message failed: network error, or node not available.
|
// dispatch message failed: network error, or node not available.
|
||||||
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set
|
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set
|
||||||
|
@ -1112,14 +1091,19 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else { // code == 0
|
} else { // code == 0
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED;
|
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
// block the input of current task, to push pressure to upstream
|
// block the input of current task, to push pressure to upstream
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id,
|
stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
|
||||||
}
|
}
|
||||||
|
@ -1139,6 +1123,25 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t leftRsp = 0;
|
||||||
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt);
|
||||||
|
leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
|
||||||
|
ASSERT(leftRsp >= 0);
|
||||||
|
|
||||||
|
if (leftRsp > 0) {
|
||||||
|
stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp",
|
||||||
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
|
||||||
|
} else {
|
||||||
|
stDebug(
|
||||||
|
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp",
|
||||||
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d",
|
||||||
|
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
|
||||||
|
}
|
||||||
|
|
||||||
ASSERT(leftRsp >= 0);
|
ASSERT(leftRsp >= 0);
|
||||||
|
|
||||||
// all msg rsp already, continue
|
// all msg rsp already, continue
|
||||||
|
|
Loading…
Reference in New Issue