Merge pull request #27163 from taosdata/fix/3_liaohj

fix(stream): remove invalid assert.
This commit is contained in:
Haojun Liao 2024-08-13 10:02:09 +08:00 committed by GitHub
commit 35f179c228
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 48 additions and 17 deletions

View File

@ -1008,7 +1008,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
tqDebug("s-task:%s re-send checkpoint-trigger to:0x%x, checkpointId:%" PRId64 ", transId:%d", pTask->id.idStr,
(int32_t)pReq->downstreamTaskId, checkpointId, transId);
code = streamTaskSendCheckpointTriggerMsg(pTask, pReq->downstreamTaskId, pReq->downstreamNodeId, &pMsg->info,
TSDB_CODE_SUCCESS);
TSDB_CODE_SUCCESS);
} else { // not send checkpoint-trigger yet, wait
int32_t recv = 0, total = 0;
streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
@ -1050,8 +1050,10 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg)
return code;
}
tqDebug("s-task:%s recv re-send checkpoint-trigger msg from upstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
tqDebug(
"s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, "
"checkpointId:%" PRId64 ", transId:%d",
pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);
streamMetaReleaseTask(pMeta, pTask);

View File

@ -128,6 +128,28 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
return TSDB_CODE_SUCCESS;
}
streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask);
if (status.state != TASK_STATUS__CK) {
stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name);
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
streamMutexUnlock(&pTask->lock);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pInfo->lock);
if (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId) {
stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name);
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
streamMutexUnlock(&pInfo->lock);
// NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
pRsp->upstreamTaskId);
return TSDB_CODE_SUCCESS;
@ -136,10 +158,8 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size);
void* pBuf = rpcMallocCont(size);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
@ -1016,7 +1036,6 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
bool alreadySend = false;
if (pStatus.state != TASK_STATUS__CK) {
return false;
@ -1126,7 +1145,11 @@ int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t now = taosGetTimestampMs();
int32_t taskId = 0;
int32_t total = streamTaskGetNumOfDownstream(pTask);
bool alreadyRecv = false;
streamMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
@ -1136,11 +1159,16 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
}
if (p->nodeId == vgId) {
ASSERT(p->recved == false);
p->recved = true;
p->recvTs = taosGetTimestampMs();
taskId = p->taskId;
if (p->recved) {
stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64
" discard",
pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs);
alreadyRecv = true;
} else {
p->recved = true;
p->recvTs = taosGetTimestampMs();
taskId = p->taskId;
}
break;
}
}
@ -1148,12 +1176,13 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
streamMutexUnlock(&pInfo->lock);
int32_t total = streamTaskGetNumOfDownstream(pTask);
if (taskId == 0) {
stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
} else {
stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
if (!alreadyRecv) {
stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
}
}
}

View File

@ -1388,7 +1388,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// we only set the dispatch msg info for current checkpoint trans
streamMutexLock(&pTask->lock);
triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) &&
(pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId);
(pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) &&
(pTask->chkInfo.pActiveInfo->transId != pMsgInfo->transId);
streamMutexUnlock(&pTask->lock);
streamMutexLock(&pMsgInfo->lock);
@ -1449,7 +1450,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (delayDispatch) {
// we only set the dispatch msg info for current checkpoint trans
if (triggerDispatchRsp) {
ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);