fix(stream): remove invalid assert.

This commit is contained in:
Haojun Liao 2024-08-12 16:56:43 +08:00
parent 600d1b2c64
commit 3701ded767
3 changed files with 48 additions and 17 deletions

View File

@ -1050,7 +1050,9 @@ int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg)
return code;
}
tqDebug("s-task:%s recv re-send checkpoint-trigger msg from upstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
tqDebug(
"s-task:%s recv re-send checkpoint-trigger msg from through retrieve/rsp channel, upstream:0x%x, "
"checkpointId:%" PRId64 ", transId:%d",
pTask->id.idStr, pRsp->upstreamTaskId, pRsp->checkpointId, pRsp->transId);
code = streamTaskProcessCheckpointTriggerRsp(pTask, pRsp);

View File

@ -128,6 +128,28 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
return TSDB_CODE_SUCCESS;
}
streamMutexLock(&pTask->lock);
SStreamTaskState status = streamTaskGetStatus(pTask);
if (status.state != TASK_STATUS__CK) {
stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name);
streamMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
streamMutexUnlock(&pTask->lock);
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
streamMutexLock(&pInfo->lock);
if (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId) {
stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", pTask->id.idStr, status.name);
streamMutexUnlock(&pInfo->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
streamMutexUnlock(&pInfo->lock);
// NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
pRsp->upstreamTaskId);
return TSDB_CODE_SUCCESS;
@ -136,10 +158,8 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size);
if (pBuf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return terrno;
}
@ -1016,7 +1036,6 @@ bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId)
const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
SStreamTaskState pStatus = streamTaskGetStatus(pTask);
bool alreadySend = false;
if (pStatus.state != TASK_STATUS__CK) {
return false;
@ -1126,7 +1145,11 @@ int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int64_t now = taosGetTimestampMs();
int32_t taskId = 0;
int32_t total = streamTaskGetNumOfDownstream(pTask);
bool alreadyRecv = false;
streamMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
@ -1136,11 +1159,16 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
}
if (p->nodeId == vgId) {
ASSERT(p->recved == false);
if (p->recved) {
stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64
" discard",
pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs);
alreadyRecv = true;
} else {
p->recved = true;
p->recvTs = taosGetTimestampMs();
taskId = p->taskId;
}
break;
}
}
@ -1148,14 +1176,15 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
streamMutexUnlock(&pInfo->lock);
int32_t total = streamTaskGetNumOfDownstream(pTask);
if (taskId == 0) {
stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
} else {
if (!alreadyRecv) {
stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
}
}
}
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
int32_t code = 0;

View File

@ -1388,7 +1388,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// we only set the dispatch msg info for current checkpoint trans
streamMutexLock(&pTask->lock);
triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) &&
(pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId);
(pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) &&
(pTask->chkInfo.pActiveInfo->transId != pMsgInfo->transId);
streamMutexUnlock(&pTask->lock);
streamMutexLock(&pMsgInfo->lock);
@ -1449,7 +1450,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (delayDispatch) {
// we only set the dispatch msg info for current checkpoint trans
if (triggerDispatchRsp) {
ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);