fix(stream): refactor re-try dispatch msg for stream tasks.

This commit is contained in:
Haojun Liao 2023-09-22 15:57:55 +08:00
parent 898aea5cfb
commit 619a9f6d08
3 changed files with 122 additions and 116 deletions

View File

@ -297,7 +297,8 @@ typedef struct SDispatchMsgInfo {
int8_t dispatchMsgType;
int16_t msgType; // dispatch msg type
int32_t retryCount; // retry send data count
int64_t blockingTs; // output blocking timestamp
int64_t startTs; // output blocking timestamp
SArray* pRetryList; // current dispatch successfully completed node of downstream
} SDispatchMsgInfo;
typedef struct STaskOutputInfo {

View File

@ -370,29 +370,6 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
}
pTask->msgInfo.pData = pReqs;
// *pDispatchReq = pReqs;
// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to %d vgroup(s), msgId:%d", pTask->id.idStr,
// pTask->info.selfChildId, numOfBlocks, numOfVgroups, msgId);
//
// for (int32_t i = 0; i < numOfVgroups; i++) {
// if (pReqs[i].blockNum > 0) {
// SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
// stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
// pTask->info.selfChildId, pReqs[i].blockNum, pVgInfo->vgId);
//
// code = doSendDispatchMsg(pTask, &pReqs[i], pVgInfo->vgId, &pVgInfo->epSet);
// if (code < 0) {
// destroyDispatchMsg(pReqs, numOfVgroups);
// return code;
// }
// }
// }
//
// stDebug("s-task:%s complete shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfVgroups, msgId);
// code = 0;
//
// *pDispatchReq = pReqs;
}
stDebug("s-task:%s build dispatch msg success, msgId:%d", pTask->id.idStr, pTask->taskExecInfo.dispatchCount);
@ -441,6 +418,8 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
static void doRetryDispatchData(void* param, void* tmrId) {
SStreamTask* pTask = param;
const char* id = pTask->id.idStr;
int32_t msgId = pTask->taskExecInfo.dispatchCount;
if (streamTaskShouldStop(&pTask->status)) {
int8_t ref = atomic_sub_fetch_8(&pTask->status.timerActive, 1);
@ -450,11 +429,53 @@ static void doRetryDispatchData(void* param, void* tmrId) {
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
int32_t code = sendDispatchMsg(pTask, pTask->msgInfo.pData);
int32_t code = 0;
{
SStreamDispatchReq *pReq = pTask->msgInfo.pData;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList);
stDebug("s-task:%s (child taskId:%d) re-try shuffle-dispatch blocks to %d vgroup(s), msgId:%d",
id, pTask->info.selfChildId, numOfFailed, msgId);
for (int32_t i = 0; i < numOfFailed; i++) {
int32_t vgId = *(int32_t*) taosArrayGet(pTask->msgInfo.pRetryList, i);
for(int32_t j = 0; j < numOfVgroups; ++j) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, j);
if (pVgInfo->vgId == vgId) {
stDebug("s-task:%s (child taskId:%d) shuffle-dispatch blocks:%d to vgId:%d", pTask->id.idStr,
pTask->info.selfChildId, pReq[j].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pReq[j], pVgInfo->vgId, &pVgInfo->epSet);
if (code < 0) {
break;
}
}
}
}
stDebug("s-task:%s complete re-try shuffle-dispatch blocks to all %d vnodes, msgId:%d", pTask->id.idStr, numOfFailed, msgId);
} else {
int32_t vgId = pTask->fixedEpDispatcher.nodeId;
SEpSet* pEpSet = &pTask->fixedEpDispatcher.epSet;
int32_t downstreamTaskId = pTask->fixedEpDispatcher.taskId;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet);
}
}
if (code != TSDB_CODE_SUCCESS) {
if (!streamTaskShouldStop(&pTask->status)) {
stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
// stDebug("s-task:%s reset the waitRspCnt to be 0 before launch retry dispatch", pTask->id.idStr);
// atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, 0);
if (streamTaskShouldPause(&pTask->status)) {
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS * 10);
} else {
@ -471,7 +492,9 @@ static void doRetryDispatchData(void* param, void* tmrId) {
}
void streamRetryDispatchStreamBlock(SStreamTask* pTask, int64_t waitDuration) {
stWarn("s-task:%s dispatch data in %" PRId64 "ms, in timer", pTask->id.idStr, waitDuration);
stWarn("s-task:%s retry send dispatch data in %" PRId64 "ms, in timer msgId:%d", pTask->id.idStr, waitDuration,
pTask->taskExecInfo.dispatchCount);
if (pTask->launchTaskTimer != NULL) {
taosTmrReset(doRetryDispatchData, waitDuration, pTask, streamEnv.timer, &pTask->launchTaskTimer);
} else {
@ -1011,12 +1034,14 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
if (pTask->msgInfo.blockingTs != 0) {
int64_t el = taosGetTimestampMs() - pTask->msgInfo.blockingTs;
if (pTask->msgInfo.startTs != 0) {
int64_t now = taosGetTimestampMs();
int64_t el = now - pTask->msgInfo.startTs;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
pTask->msgInfo.blockingTs = 0;
pTask->msgInfo.startTs = now;
// put data into inputQ of current task is also allowed
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
}
@ -1033,12 +1058,28 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
const char* id = pTask->id.idStr;
int32_t msgId = pTask->taskExecInfo.dispatchCount;
int32_t leftRsp = 0;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
if (leftRsp > 0) {
stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
} else {
stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
}
} else {
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
}
if (code != TSDB_CODE_SUCCESS) {
// dispatch message failed: network error, or node not available.
// in case of the input queue is full, the code will be TSDB_CODE_SUCCESS, the and pRsp->inputStatus will be set
// flag. Here we need to retry dispatch this message to downstream task immediately. handle the case the failure
// happened too fast.
// todo handle the shuffle dispatch failure
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
@ -1048,98 +1089,60 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// stError("s-task:%s checkpoint trigger send failed, continue do checkpoint ready process", id);
// streamProcessCheckpointReadyMsg(pTask);
// }
// we should set the correct finish flag to make sure the shuffle dispatch will be executed completed.
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t left = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
if (left > 0) { // do nothing
stError("s-task:%s add the shuffle dispatch counter to complete the dispatch process", id);
} else {
stError("s-task:%s the last rsp is failed, ignore it and continue, roll-back will discard this msg", id);
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}
}
} else {
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, retry", id, msgId,
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
SStreamDispatchReq* pDispatchMsg = pTask->msgInfo.pData;
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
}
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(vgInfo);
for(int32_t i = 0; i < numOfVgroups; ++i) {
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
if (pVgInfo->vgId == pRsp->downstreamNodeId) {
stDebug("s-task:%s (child taskId:%d) re-send blocks:%d to vgId:%d", pTask->id.idStr,
pTask->info.selfChildId, pDispatchMsg[i].blockNum, pVgInfo->vgId);
code = doSendDispatchMsg(pTask, &pDispatchMsg[i], pVgInfo->vgId, &pVgInfo->epSet);
}
}
} else {
sendDispatchMsg(pTask, pTask->msgInfo.pData);
} else { // code == 0
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED;
// block the input of current task, to push pressure to upstream
taosArrayPush(pTask->msgInfo.pRetryList, &pRsp->downstreamNodeId);
stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
}
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}
}
return TSDB_CODE_SUCCESS;
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
return TSDB_CODE_SUCCESS;
}
}
// there are other dispatch message not response yet
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
if (leftRsp > 0) {
stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d "
"rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
return 0;
} else {
stDebug("s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp", id,
msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
ASSERT(leftRsp >= 0);
// all msg rsp already, continue
if (leftRsp == 0) {
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
stDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status);
// we need to re-try send dispatch msg to downstream tasks
int32_t numOfFailed = taosArrayGetSize(pTask->msgInfo.pRetryList);
if (numOfFailed > 0) {
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, numOfFailed);
}
pTask->msgInfo.retryCount++;
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
stDebug("s-task:%s failed to dispatch msg to downstream for %d times, code:%s, add timer to retry in %dms, ref:%d",
pTask->id.idStr, pTask->msgInfo.retryCount, tstrerror(terrno), DISPATCH_RETRY_INTERVAL_MS, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else { // this message has been sent successfully, let's try next one.
pTask->msgInfo.retryCount = 0;
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}
} else {
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
}
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch transtate msgId:%d to downstream successfully, start to transfer state", id, msgId);
ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStateToStreamTask(pTask);
if (code != TSDB_CODE_SUCCESS) { // todo: do nothing if error happens
}
// now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
return TSDB_CODE_SUCCESS;
}
pTask->msgInfo.retryCount = 0;
ASSERT(pTask->outputInfo.status == TASK_OUTPUT_STATUS__WAIT);
stDebug("s-task:%s output status is set to:%d", id, pTask->outputInfo.status);
// the input queue of the (down stream) task that receive the output data is full,
// so the TASK_INPUT_STATUS_BLOCKED is rsp
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
double el = 0;
if (pTask->msgInfo.blockingTs == 0) {
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
} else {
el = (taosGetTimestampMs() - pTask->msgInfo.blockingTs) / 1000.0;
}
int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);
stError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64
" wait for %dms and retry dispatch data, total wait:%.2fSec ref:%d",
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS, el, ref);
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
} else {
// this message has been sent successfully, let's try next one.
handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId);
}
return 0;

View File

@ -381,6 +381,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
pTask->pUpstreamInfoList = NULL;
}
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
taosMemoryFree(pTask->pTokenBucket);
taosThreadMutexDestroy(&pTask->lock);
taosMemoryFree(pTask);
@ -410,6 +411,7 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb;
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
pTask->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pTask->pTokenBucket == NULL) {