Merge pull request #27838 from taosdata/fix/3_liaohj

fix(stream): fix the error in the checkpoint-trigger confirm condition.
This commit is contained in:
Haojun Liao 2024-09-12 17:21:12 +08:00 committed by GitHub
commit 63e6a2d433
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 91 additions and 50 deletions

View File

@ -586,6 +586,10 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
bool isLeader, bool restored) {
int32_t code = 0;
int32_t vgId = pMeta->vgId;
int32_t numOfTasks = 0;
int32_t taskId = -1;
int64_t streamId = -1;
bool added = false;
if (tsDisableStream) {
tqInfo("vgId:%d stream disabled, not deploy stream tasks", vgId);
@ -613,13 +617,12 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
}
// 2.save task, use the latest commit version as the initial start version of stream task.
int32_t taskId = pTask->id.taskId;
int64_t streamId = pTask->id.streamId;
bool added = false;
taskId = pTask->id.taskId;
streamId = pTask->id.streamId;
streamMetaWLock(pMeta);
code = streamMetaRegisterTask(pMeta, sversion, pTask, &added);
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
numOfTasks = streamMetaGetNumOfTasks(pMeta);
streamMetaWUnLock(pMeta);
if (code < 0) {
@ -654,7 +657,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
tqDebug("vgId:%d not leader, not launch stream task s-task:0x%x", vgId, taskId);
}
} else {
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store, total:%d", vgId, taskId, numOfTasks);
tFreeStreamTask(pTask);
}

View File

@ -27,7 +27,7 @@ static void streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInf
static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
static void streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id);
static void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id);
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
static int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p);
static void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList);
static void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList);
static int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs,
@ -83,6 +83,7 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
SDataRange* pRange = &pTask->dataRange;
STimeWindow* pWindow = &pRange->window;
const char* idstr = pTask->id.idStr;
int32_t code = 0;
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
@ -102,11 +103,11 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pDispatch->taskId, pDispatch->nodeId, idstr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
" window:%" PRId64 "-%" PRId64 "QID:0x%" PRIx64,
" window:%" PRId64 "-%" PRId64 " QID:0x%" PRIx64,
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, pRange->range.minVer,
pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
(void)streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
code = streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId,
&pTask->outputInfo.fixedDispatcher.epSet);
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
@ -128,15 +129,19 @@ void streamTaskSendCheckMsg(SStreamTask* pTask) {
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, pVgInfo->taskId, pVgInfo->vgId, idstr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d,QID:0x%" PRIx64,
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, QID:0x%" PRIx64,
idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
(void)streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
}
} else { // for sink task, set it ready directly.
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId);
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr);
processDownstreamReadyRsp(pTask);
}
if (code) {
stError("s-task:%s failed to send check msg to downstream, code:%s", idstr, tstrerror(code));
}
}
void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) {
@ -243,13 +248,13 @@ int32_t streamTaskProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp*
int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp,
SRpcHandleInfo* pRpcInfo, int32_t taskId) {
SEncoder encoder;
int32_t code;
int32_t code = 0;
int32_t len;
tEncodeSize(tEncodeStreamTaskCheckRsp, pRsp, len, code);
if (code < 0) {
stError("vgId:%d failed to encode task check rsp, s-task:0x%x", pMeta->vgId, taskId);
return -1;
return TSDB_CODE_INVALID_MSG;
}
void* buf = rpcMallocCont(sizeof(SMsgHead) + len);
@ -257,13 +262,14 @@ int32_t streamTaskSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTa
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
tEncoderInit(&encoder, (uint8_t*)abuf, len);
(void)tEncodeStreamTaskCheckRsp(&encoder, pRsp);
code = tEncodeStreamTaskCheckRsp(&encoder, pRsp);
tEncoderClear(&encoder);
SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = sizeof(SMsgHead) + len, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg);
return 0;
code = (code >= 0)? 0:code;
return code;
}
void streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
@ -316,8 +322,11 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
pInfo->pList = NULL;
if (pInfo->checkRspTmr != NULL) {
(void)taosTmrStop(pInfo->checkRspTmr);
bool succ = taosTmrStop(pInfo->checkRspTmr);
pInfo->checkRspTmr = NULL;
if (!succ) {
stError("failed to stop checkrsp tmr"); // todo: add id
}
}
streamMutexDestroy(&pInfo->checkInfoLock);
@ -326,11 +335,17 @@ void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo) {
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
void processDownstreamReadyRsp(SStreamTask* pTask) {
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
(void)streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
int32_t code = streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
if (code) {
stError("s-task:%s failed to set event succ, code:%s", pTask->id.idStr, tstrerror(code));
}
int64_t checkTs = pTask->execInfo.checkTs;
int64_t readyTs = pTask->execInfo.readyTs;
(void)streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
if (code) {
stError("s-task:%s failed to record the downstream task status, code:%s", pTask->id.idStr, tstrerror(code));
}
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
if (!HAS_RELATED_FILLHISTORY_TASK(pTask) || (pTask->info.fillHistory != 0)) {
@ -341,9 +356,9 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
// halt it self for count window stream task until the related fill history task completed.
stDebug("s-task:%s level:%d initial status is %s from mnode, set it to be halt", pTask->id.idStr,
pTask->info.taskLevel, streamTaskGetStatusStr(pTask->status.taskStatus));
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
if (code != 0) {
// todo: handle error
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_HALT);
if (code != 0) { // todo: handle error
stError("s-task:%s failed to handle halt event, code:%s", pTask->id.idStr, tstrerror(code));
}
}
@ -352,7 +367,10 @@ void processDownstreamReadyRsp(SStreamTask* pTask) {
// todo: let's retry
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
stDebug("s-task:%s try to launch related fill-history task", pTask->id.idStr);
(void)streamLaunchFillHistoryTask(pTask);
code = streamLaunchFillHistoryTask(pTask);
if (code) {
stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
}
}
}
@ -517,8 +535,9 @@ void streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId,
streamMutexUnlock(&pInfo->checkInfoLock);
}
void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
int32_t doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
const char* id = pTask->id.idStr;
int32_t code = 0;
SStreamTaskCheckReq req = {
.streamId = pTask->id.streamId,
@ -536,10 +555,10 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher;
setCheckDownstreamReqInfo(&req, p->reqId, pDispatch->taskId, pDispatch->nodeId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d)QID:0x%" PRIx64, id,
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) QID:0x%" PRIx64, id,
pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
(void)streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
code = streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgs = taosArrayGetSize(vgInfo);
@ -554,13 +573,18 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
setCheckDownstreamReqInfo(&req, p->reqId, pVgInfo->taskId, pVgInfo->vgId);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%dQID:0x%" PRIx64,
" re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d QID:0x%" PRIx64,
id, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, p->reqId);
(void)streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
code = streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break;
}
}
}
if (code) {
stError("s-task:%s failed to send check msg to downstream, code:%s", pTask->id.idStr, tstrerror(code));
}
return code;
}
void getCheckRspStatus(STaskCheckInfo* pInfo, int64_t el, int32_t* numOfReady, int32_t* numOfFault,
@ -626,7 +650,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
continue;
}
doSendCheckMsg(pTask, p);
int32_t code = doSendCheckMsg(pTask, p);
}
}
@ -676,7 +700,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) {
if (p != NULL) {
p->rspTs = 0;
p->status = -1;
doSendCheckMsg(pTask, p);
int32_t code = doSendCheckMsg(pTask, p);
}
}
@ -723,7 +747,10 @@ void rspMonitorFn(void* param, void* tmrId) {
// not record the failed of the current task if try to close current vnode
// otherwise, the put of message operation may incur invalid read of message queue.
if (!pMeta->closeFlag) {
(void)addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
if (code) {
stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
}
}
streamMetaReleaseTask(pMeta, pTask);
@ -805,7 +832,11 @@ void rspMonitorFn(void* param, void* tmrId) {
streamTaskCompleteCheckRsp(pInfo, false, id);
streamMutexUnlock(&pInfo->checkInfoLock);
(void)addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
int32_t code = addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId);
if (code) {
stError("s-task:%s failed to create async record start failed task, code:%s", id, tstrerror(code));
}
streamMetaReleaseTask(pMeta, pTask);
taosArrayDestroy(pNotReadyList);

View File

@ -237,7 +237,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
if (pActiveInfo->failedId >= checkpointId) {
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
"discard the checkpoint-trigger block",
" discard the checkpoint-trigger block",
id, vgId, checkpointId, transId, pActiveInfo->failedId);
streamMutexUnlock(&pTask->lock);

View File

@ -193,13 +193,13 @@ int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock)
// no need to do anything if failed
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet) {
void* buf = NULL;
int32_t code = -1;
int32_t code = 0;
SRpcMsg msg = {0};
int32_t tlen;
tEncodeSize(tEncodeStreamTaskCheckReq, pReq, tlen, code);
if (code < 0) {
return -1;
return code;
}
buf = rpcMallocCont(sizeof(SMsgHead) + tlen);
@ -217,8 +217,8 @@ int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq,
tEncoderClear(&encoder);
return code;
}
tEncoderClear(&encoder);
tEncoderClear(&encoder);
initRpcMsg(&msg, TDMT_VND_STREAM_TASK_CHECK, buf, tlen + sizeof(SMsgHead));
stDebug("s-task:%s (level:%d) send check msg to s-task:0x%" PRIx64 ":0x%x (vgId:%d)", pTask->id.idStr,
pTask->info.taskLevel, pReq->streamId, pReq->downstreamTaskId, nodeId);
@ -1477,20 +1477,27 @@ int32_t getFailedDispatchInfo(SDispatchMsgInfo* pMsgInfo, int64_t now) {
}
int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, int32_t code) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int64_t now = taosGetTimestampMs();
bool allRsp = false;
int32_t notRsp = 0;
int32_t numOfFailed = 0;
bool triggerDispatchRsp = false;
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int64_t now = taosGetTimestampMs();
bool allRsp = false;
int32_t notRsp = 0;
int32_t numOfFailed = 0;
bool triggerDispatchRsp = false;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t tmpCheckpointId = -1;
int32_t tmpTranId = -1;
const char* pStatus = NULL;
// we only set the dispatch msg info for current checkpoint trans
streamMutexLock(&pTask->lock);
triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) &&
(pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) &&
(pTask->chkInfo.pActiveInfo->transId != pMsgInfo->transId);
SStreamTaskState s = streamTaskGetStatus(pTask);
triggerDispatchRsp = (s.state == TASK_STATUS__CK) && (pInfo->activeId == pMsgInfo->checkpointId) &&
(pInfo->transId == pMsgInfo->transId);
tmpCheckpointId = pInfo->activeId;
tmpTranId = pInfo->transId;
pStatus = s.name;
streamMutexUnlock(&pTask->lock);
streamMutexLock(&pMsgInfo->lock);
@ -1498,8 +1505,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,
vgId);
stError("s-task:%s vgId:%d is follower or just re-launched, not handle the dispatch rsp, discard it", id, vgId);
streamMutexUnlock(&pMsgInfo->lock);
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
@ -1557,8 +1563,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
streamTaskSetTriggerDispatchConfirmed(pTask, pRsp->downstreamNodeId);
} else {
stWarn("s-task:%s checkpoint-trigger msg rsp for checkpointId:%" PRId64
" transId:%d discard, since expired",
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId);
" transId:%d discard, current status:%s, active checkpointId:%" PRId64
" active transId:%d, since expired",
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId, pStatus, tmpCheckpointId, tmpTranId);
}
}
}