fix(stream): init before send msg, to avoid race condition.

This commit is contained in:
Haojun Liao 2024-04-22 17:18:01 +08:00
parent 97c1f88b22
commit cf7ffb82e0
2 changed files with 31 additions and 22 deletions

View File

@ -186,6 +186,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
// serialize streamProcessScanHistoryFinishRsp // serialize streamProcessScanHistoryFinishRsp
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
req.reqId = tGenIdPI64(); req.reqId = tGenIdPI64();
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId; req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId; req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
@ -199,8 +201,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
streamTaskStartMonitorCheckRsp(pTask);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
streamTaskStartMonitorCheckRsp(pTask);
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo); int32_t numOfVgs = taosArrayGetSize(vgInfo);
@ -219,8 +222,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }
streamTaskStartMonitorCheckRsp(pTask);
} else { // for sink task, set it ready directly. } else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);

View File

@ -534,7 +534,8 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t)); pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
TdThreadMutexAttr attr = {0}; TdThreadMutexAttr attr = {0};
int code = taosThreadMutexAttrInit(&attr);
int code = taosThreadMutexAttrInit(&attr);
if (code != 0) { if (code != 0) {
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code)); stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
return code; return code;
@ -563,6 +564,14 @@ int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, i
streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr); streamTaskInitTokenBucket(pOutputInfo->pTokenBucket, 35, 35, tsSinkDataRate, pTask->id.idStr);
pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset)); pOutputInfo->pDownstreamUpdateList = taosArrayInit(4, sizeof(SDownstreamTaskEpset));
if (pOutputInfo->pDownstreamUpdateList == NULL) { if (pOutputInfo->pDownstreamUpdateList == NULL) {
stError("s-task:%s failed to prepare downstreamUpdateList, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
if (pTask->taskCheckInfo.pList == NULL) {
stError("s-task:%s failed to prepare taskCheckInfo list, code:%s", pTask->id.idStr,
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
@ -943,11 +952,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
} }
static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
if (pInfo->pList == NULL) { taosArrayClear(pInfo->pList);
pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
} else {
taosArrayClear(pInfo->pList);
}
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
pInfo->notReadyTasks = 1; pInfo->notReadyTasks = 1;
@ -1089,9 +1094,12 @@ static void rspMonitorFn(void* param, void* tmrId) {
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs; int64_t el = now - pInfo->startTs;
ETaskStatus state = pStat->state; ETaskStatus state = pStat->state;
const char* id = pTask->id.idStr;
int32_t numOfReady = 0; int32_t numOfReady = 0;
int32_t numOfFault = 0; int32_t numOfFault = 0;
const char* id = pTask->id.idStr; int32_t numOfNotRsp = 0;
int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0;
stDebug("s-task:%s start to do check downstream rsp check", id); stDebug("s-task:%s start to do check downstream rsp check", id);
@ -1147,7 +1155,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId); taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION } else { // el < CHECK_NOT_RSP_DURATION
// do nothing and continue waiting for their rsps numOfNotRsp += 1; // do nothing and continue waiting for their rsp
} }
} else { } else {
taosArrayPush(pNotReadyList, &p->taskId); taosArrayPush(pNotReadyList, &p->taskId);
@ -1158,17 +1166,17 @@ static void rspMonitorFn(void* param, void* tmrId) {
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
} }
int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore // fault tasks detected, not try anymore
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
(numOfFault > 0)) { if ((numOfNotRsp == 0) && (numOfFault > 0)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug( stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", "detected, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
@ -1182,9 +1190,9 @@ static void rspMonitorFn(void* param, void* tmrId) {
if (pInfo->stopCheckProcess == 1) { if (pInfo->stopCheckProcess == 1) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug( stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notRsp:%d, notReady:%d, "
"timeout:%d, ready:%d ref:%d", "fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); id, pStat->name, vgId, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
streamTaskCompleteCheckRsp(pInfo, id); streamTaskCompleteCheckRsp(pInfo, id);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
@ -1241,8 +1249,8 @@ static void rspMonitorFn(void* param, void* tmrId) {
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s continue checking rsp in 300ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, stDebug("s-task:%s continue checking rsp in 300ms, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", id,
numOfFault, numOfTimeout, numOfReady); numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList); taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList); taosArrayDestroy(pTimeoutList);