fix(stream): fix race condition in handling dispatch rsp.
This commit is contained in:
parent
0cca12ab52
commit
aaf67a42eb
|
@ -799,10 +799,15 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||||
// send msg to retrieve checkpoint trigger msg
|
// send msg to retrieve checkpoint trigger msg
|
||||||
SArray* pList = pTask->upstreamInfo.pList;
|
SArray* pList = pTask->upstreamInfo.pList;
|
||||||
ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE);
|
ASSERT(pTask->info.taskLevel > TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
|
||||||
if (pNotSendList == NULL) {
|
if (pNotSendList == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
|
stError("s-task:%s quit tmr function due to out of memory", id);
|
||||||
|
taosThreadMutexUnlock(&pActiveInfo->lock);
|
||||||
|
|
||||||
|
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
|
||||||
|
taosTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -967,18 +972,14 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
|
||||||
taosThreadMutexUnlock(&pInfo->lock);
|
taosThreadMutexUnlock(&pInfo->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
|
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
|
||||||
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
|
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
taosThreadMutexLock(&pInfo->lock);
|
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
|
||||||
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
|
||||||
if (p->recved) {
|
if (p->recved) {
|
||||||
num++;
|
num++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pInfo->lock);
|
|
||||||
return num;
|
return num;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1000,9 +1001,9 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
|
||||||
taosThreadMutexUnlock(&pInfo->lock);
|
taosThreadMutexUnlock(&pInfo->lock);
|
||||||
|
|
||||||
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask);
|
|
||||||
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
||||||
stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
|
stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
|
||||||
pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
|
pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
|
||||||
|
|
|
@ -501,7 +501,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref);
|
stDebug("s-task:%s not in dispatch procedure, abort from timer, ref:%d", pTask->id.idStr, ref);
|
||||||
|
|
||||||
pTask->msgInfo.inMonitor = 0;
|
pTask->msgInfo.inMonitor = 0; // set not in dispatch monitor
|
||||||
taosThreadMutexUnlock(&pMsgInfo->lock);
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1211,44 +1211,51 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, const char* id) {
|
static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp,
|
||||||
|
int32_t* pFailed, const char* id) {
|
||||||
int32_t numOfRsp = 0;
|
int32_t numOfRsp = 0;
|
||||||
bool alreadySet = false;
|
int32_t numOfFailed = 0;
|
||||||
bool updated = false;
|
|
||||||
bool allRsp = false;
|
|
||||||
*pNotRsp = 0;
|
|
||||||
|
|
||||||
taosThreadMutexLock(&pMsgInfo->lock);
|
bool allRsp = false;
|
||||||
int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo);
|
int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo);
|
||||||
|
|
||||||
|
*pNotRsp = 0;
|
||||||
|
*pFailed = 0;
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfDispatchBranch; ++i) {
|
for (int32_t i = 0; i < numOfDispatchBranch; ++i) {
|
||||||
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i);
|
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i);
|
||||||
if (pEntry->rspTs != -1) {
|
if (pEntry->rspTs != -1) {
|
||||||
numOfRsp += 1;
|
numOfRsp += 1;
|
||||||
|
} else {
|
||||||
|
if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) {
|
||||||
|
numOfFailed += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < numOfDispatchBranch; ++j) {
|
for (int32_t j = 0; j < numOfDispatchBranch; ++j) {
|
||||||
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, j);
|
||||||
if (pEntry->nodeId == vgId) {
|
if (pEntry->nodeId == vgId) {
|
||||||
ASSERT(!alreadySet);
|
if (pEntry->rspTs != -1) {
|
||||||
|
stDebug("s-task:%s dispatch rsp has already recved at:%" PRId64 ", ignore this rsp, msgId:%d", id,
|
||||||
|
pEntry->rspTs, pMsgInfo->msgId);
|
||||||
|
allRsp = false;
|
||||||
|
} else {
|
||||||
pEntry->rspTs = now;
|
pEntry->rspTs = now;
|
||||||
pEntry->status = code;
|
pEntry->status = code;
|
||||||
alreadySet = true;
|
|
||||||
updated = true;
|
|
||||||
numOfRsp += 1;
|
numOfRsp += 1;
|
||||||
|
allRsp = (numOfRsp == numOfDispatchBranch);
|
||||||
|
|
||||||
stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j,
|
stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j,
|
||||||
numOfRsp, numOfDispatchBranch);
|
numOfRsp, numOfDispatchBranch);
|
||||||
}
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
*pFailed = numOfFailed;
|
||||||
*pNotRsp = numOfDispatchBranch - numOfRsp;
|
*pNotRsp = numOfDispatchBranch - numOfRsp;
|
||||||
allRsp = (numOfRsp == numOfDispatchBranch);
|
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pMsgInfo->lock);
|
|
||||||
|
|
||||||
ASSERT(updated);
|
|
||||||
return allRsp;
|
return allRsp;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1277,15 +1284,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
bool allRsp = false;
|
bool allRsp = false;
|
||||||
int32_t notRsp = 0;
|
int32_t notRsp = 0;
|
||||||
|
int32_t numOfFailed = 0;
|
||||||
|
bool triggerDispatchRsp = false;
|
||||||
|
|
||||||
|
// we only set the dispatch msg info for current checkpoint trans
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
triggerDispatchRsp = (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) &&
|
||||||
|
(pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId);
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
taosThreadMutexLock(&pMsgInfo->lock);
|
taosThreadMutexLock(&pMsgInfo->lock);
|
||||||
int32_t msgId = pMsgInfo->msgId;
|
int32_t msgId = pMsgInfo->msgId;
|
||||||
taosThreadMutexUnlock(&pMsgInfo->lock);
|
|
||||||
|
|
||||||
// follower not handle the dispatch rsp
|
// follower not handle the dispatch rsp
|
||||||
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
|
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
|
||||||
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,
|
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,
|
||||||
vgId);
|
vgId);
|
||||||
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1294,6 +1309,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64
|
stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64
|
||||||
" discard it",
|
" discard it",
|
||||||
id, vgId, msgId, pTask->pMeta->stage, pRsp->msgId, pRsp->stage);
|
id, vgId, msgId, pTask->pMeta->stage, pRsp->msgId, pRsp->stage);
|
||||||
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||||
return TSDB_CODE_INVALID_MSG;
|
return TSDB_CODE_INVALID_MSG;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1305,18 +1321,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
|
||||||
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
|
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
|
||||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
|
||||||
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id);
|
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, &numOfFailed, id);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
|
||||||
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, id);
|
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, ¬Rsp, &numOfFailed, id);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else { // code == 0
|
} else { // code == 0
|
||||||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||||
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
|
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
|
||||||
// block the input of current task, to push pressure to upstream
|
// block the input of current task, to push pressure to upstream
|
||||||
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, id);
|
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, ¬Rsp, &numOfFailed, id);
|
||||||
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
|
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||||
} else {
|
} else {
|
||||||
|
@ -1328,15 +1344,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, id);
|
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, ¬Rsp, &numOfFailed, id);
|
||||||
|
|
||||||
{
|
{
|
||||||
bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
if (delayDispatch) {
|
if (delayDispatch) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
|
||||||
// we only set the dispatch msg info for current checkpoint trans
|
// we only set the dispatch msg info for current checkpoint trans
|
||||||
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK &&
|
if (triggerDispatchRsp) {
|
||||||
pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) {
|
|
||||||
ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
|
ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
|
||||||
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
|
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
|
||||||
pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);
|
pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);
|
||||||
|
@ -1347,12 +1361,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
" transId:%d discard, since expired",
|
" transId:%d discard, since expired",
|
||||||
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId);
|
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId);
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pMsgInfo->lock);
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
if (!allRsp) {
|
if (!allRsp) {
|
||||||
stDebug(
|
stDebug(
|
||||||
|
@ -1371,16 +1386,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
}
|
}
|
||||||
|
|
||||||
// all msg rsp already, continue
|
// all msg rsp already, continue
|
||||||
if (allRsp) {
|
|
||||||
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
|
|
||||||
|
|
||||||
// we need to re-try send dispatch msg to downstream tasks
|
// we need to re-try send dispatch msg to downstream tasks
|
||||||
int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now);
|
if (allRsp && (numOfFailed == 0)) {
|
||||||
if (numOfFailed == 0) { // this message has been sent successfully, let's try next one.
|
|
||||||
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
|
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
|
||||||
if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
|
if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
|
||||||
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state",
|
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id,
|
||||||
id, msgId);
|
msgId);
|
||||||
ASSERT(pTask->info.fillHistory == 1);
|
ASSERT(pTask->info.fillHistory == 1);
|
||||||
|
|
||||||
code = streamTransferStatePrepare(pTask);
|
code = streamTransferStatePrepare(pTask);
|
||||||
|
@ -1392,10 +1403,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
||||||
// now ready for next data output
|
// now ready for next data output
|
||||||
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
|
||||||
} else {
|
} else {
|
||||||
|
// this message has been sent successfully, let's try next one.
|
||||||
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue