fix(stream): fix race condition.
This commit is contained in:
parent
737138856c
commit
a3eea6fe1c
|
@ -527,6 +527,76 @@ static void cleanupInMonitor(int32_t taskId, int64_t taskRefId, void* param) {
|
|||
streamTaskFreeRefId(param);
|
||||
}
|
||||
|
||||
static int32_t sendFailedDispatchData(SStreamTask* pTask, int64_t now) {
|
||||
int32_t code = 0;
|
||||
const char* id = pTask->id.idStr;
|
||||
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
|
||||
|
||||
streamMutexLock(&pMsgInfo->lock);
|
||||
|
||||
int32_t msgId = pMsgInfo->msgId;
|
||||
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id, pTask->info.selfChildId,
|
||||
msgId);
|
||||
|
||||
int32_t numOfRetry = 0;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
|
||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// downstream not rsp yet beyond threshold that is 10s
|
||||
if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
|
||||
doSendFailedDispatch(pTask, pEntry, now, "timeout");
|
||||
numOfRetry += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// downstream inputQ is closed
|
||||
if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
|
||||
numOfRetry += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// handle other errors
|
||||
if (pEntry->status != TSDB_CODE_SUCCESS) {
|
||||
doSendFailedDispatch(pTask, pEntry, now, "downstream error");
|
||||
numOfRetry += 1;
|
||||
}
|
||||
}
|
||||
|
||||
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfRetry,
|
||||
msgId);
|
||||
} else {
|
||||
int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
|
||||
int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
|
||||
if (pEntry != NULL) {
|
||||
setResendInfo(pEntry, now);
|
||||
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
||||
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
||||
} else {
|
||||
stError("s-task:%s invalid index 0, size:%d", id, s);
|
||||
}
|
||||
}
|
||||
|
||||
streamMutexUnlock(&pMsgInfo->lock);
|
||||
return code;
|
||||
}
|
||||
|
||||
static void doMonitorDispatchData(void* param, void* tmrId) {
|
||||
int32_t code = 0;
|
||||
int64_t now = taosGetTimestampMs();
|
||||
|
@ -590,65 +660,7 @@ static void doMonitorDispatchData(void* param, void* tmrId) {
|
|||
return;
|
||||
}
|
||||
|
||||
{
|
||||
SStreamDispatchReq* pReq = pTask->msgInfo.pData;
|
||||
|
||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
stDebug("s-task:%s (child taskId:%d) retry shuffle-dispatch to down streams, msgId:%d", id,
|
||||
pTask->info.selfChildId, msgId);
|
||||
|
||||
int32_t numOfRetry = 0;
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pTask->msgInfo.pSendInfo); ++i) {
|
||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, i);
|
||||
if (pEntry == NULL) {
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pEntry->status == TSDB_CODE_SUCCESS && pEntry->rspTs > 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// downstream not rsp yet beyond threshold that is 10s
|
||||
if (isDispatchRspTimeout(pEntry, now)) { // not respond yet beyonds 30s, re-send data
|
||||
doSendFailedDispatch(pTask, pEntry, now, "timeout");
|
||||
numOfRetry += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// downstream inputQ is closed
|
||||
if (pEntry->status == TASK_INPUT_STATUS__BLOCKED) {
|
||||
doSendFailedDispatch(pTask, pEntry, now, "downstream inputQ blocked");
|
||||
numOfRetry += 1;
|
||||
continue;
|
||||
}
|
||||
|
||||
// handle other errors
|
||||
if (pEntry->status != TSDB_CODE_SUCCESS) {
|
||||
doSendFailedDispatch(pTask, pEntry, now, "downstream error");
|
||||
numOfRetry += 1;
|
||||
}
|
||||
}
|
||||
|
||||
stDebug("s-task:%s complete retry shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr,
|
||||
numOfRetry, msgId);
|
||||
} else {
|
||||
int32_t dstVgId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
|
||||
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||
|
||||
int32_t s = taosArrayGetSize(pTask->msgInfo.pSendInfo);
|
||||
SDispatchEntry* pEntry = taosArrayGet(pTask->msgInfo.pSendInfo, 0);
|
||||
if (pEntry != NULL) {
|
||||
setResendInfo(pEntry, now);
|
||||
code = doSendDispatchMsg(pTask, pReq, dstVgId, pEpSet);
|
||||
|
||||
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d, code:%s", id,
|
||||
pTask->info.selfChildId, 1, downstreamTaskId, dstVgId, msgId, tstrerror(code));
|
||||
} else {
|
||||
stError("s-task:%s invalid index 0, size:%d", id, s);
|
||||
}
|
||||
}
|
||||
}
|
||||
code = sendFailedDispatchData(pTask, now);
|
||||
|
||||
if (streamTaskShouldStop(pTask)) {
|
||||
stDebug("s-task:%s should stop, abort from timer", pTask->id.idStr);
|
||||
|
@ -880,7 +892,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
|
||||
code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
|
||||
|
||||
// todo: secure the timerActive and start timer in after lock pTask->lock
|
||||
// todo: start timer in after lock pTask->lock
|
||||
streamMutexLock(&pTask->lock);
|
||||
bool shouldStop = streamTaskShouldStop(pTask);
|
||||
streamMutexUnlock(&pTask->lock);
|
||||
|
@ -890,7 +902,6 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
|
|||
} else {
|
||||
streamMutexLock(&pTask->msgInfo.lock);
|
||||
if (pTask->msgInfo.inMonitor == 0) {
|
||||
// int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:%s start dispatch monitor tmr in %dms, dispatch code:%s", id, DISPATCH_RETRY_INTERVAL_MS,
|
||||
tstrerror(code));
|
||||
streamStartMonitorDispatchData(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
|
@ -1873,8 +1884,10 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
|
|||
status = streamTaskAppendInputBlocks(pTask, pReq);
|
||||
pInfo->lastMsgId = pReq->msgId;
|
||||
} else {
|
||||
stWarn("s-task:%s duplicate msgId:%d from upstream:0x%x, from vgId:%d already recv msgId:%" PRId64, id,
|
||||
pReq->msgId, pReq->upstreamTaskId, pReq->srcVgId, pInfo->lastMsgId);
|
||||
stWarn(
|
||||
"s-task:%s duplicate msgId:%d from upstream:0x%x discard and return succ, from vgId:%d already recv "
|
||||
"msgId:%" PRId64,
|
||||
id, pReq->msgId, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->lastMsgId);
|
||||
status = TASK_INPUT_STATUS__NORMAL; // still return success
|
||||
}
|
||||
}
|
||||
|
|
|
@ -281,7 +281,6 @@ void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo,
|
|||
SStreamMeta* pMeta = pTask->pMeta;
|
||||
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||
|
||||
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
|
||||
|
||||
if (code) {
|
||||
|
@ -300,7 +299,6 @@ void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, i
|
|||
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||
|
||||
if (streamTaskShouldStop(pTask)) { // record the failure
|
||||
// int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||
stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId,
|
||||
pInfo->hTaskId.taskId);
|
||||
|
||||
|
|
Loading…
Reference in New Issue