Merge pull request #26340 from taosdata/fix/check_end_pos

fix(stream): reset the status before re-send data.
This commit is contained in:
Haojun Liao 2024-07-01 11:13:36 +08:00 committed by GitHub
commit b07d2b4adc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 63 additions and 45 deletions

View File

@ -171,7 +171,11 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
SStreamTask* pTask = *ppTask; SStreamTask* pTask = *ppTask;
const char* idstr = pTask->id.idStr; const char* idstr = pTask->id.idStr;
if ((pMeta->updateInfo.transId != req.transId) && (pMeta->updateInfo.transId != -1)) { if (pMeta->updateInfo.transId == -1) { // info needs to be kept till the new trans to update the nodeEp arrived.
streamMetaInitUpdateTaskList(pMeta, req.transId);
}
if (pMeta->updateInfo.transId != req.transId) {
if (req.transId < pMeta->updateInfo.transId) { if (req.transId < pMeta->updateInfo.transId) {
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId, tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
pMeta->updateInfo.transId, req.transId); pMeta->updateInfo.transId, req.transId);
@ -937,7 +941,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
} }
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SCheckpointTriggerRsp* pRsp = pMsg->pCont; SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pRsp->streamId, pRsp->taskId);
if (pTask == NULL) { if (pTask == NULL) {

View File

@ -25,15 +25,12 @@ static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId); int32_t transId, int32_t srcTaskId);
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList); static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
static void checkpointTriggerMonitorFn(void* param, void* tmrId); static void checkpointTriggerMonitorFn(void* param, void* tmrId);
static SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId);
SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
int32_t transId) { int32_t transId, int32_t srcTaskId) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock)); SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) { if (pChkpoint == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
@ -41,6 +38,10 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
} }
pChkpoint->type = checkpointType; pChkpoint->type = checkpointType;
if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
pChkpoint->srcTaskId = srcTaskId;
ASSERT(srcTaskId != 0);
}
SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock)); SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
if (pBlock == NULL) { if (pBlock == NULL) {
@ -64,8 +65,9 @@ SStreamDataBlock* createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpoint
return pChkpoint; return pChkpoint;
} }
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId) { int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId); int32_t srcTaskId) {
SStreamDataBlock* pCheckpoint = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId);
if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) { if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
@ -90,7 +92,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// and this is the last item in the inputQ. // and this is the last item in the inputQ.
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId); return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
} }
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) { int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
@ -102,15 +104,16 @@ int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTri
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
pRsp->upstreamTaskId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId, int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
SRpcHandleInfo* pRpcInfo, int32_t code) { SRpcHandleInfo* pRpcInfo, int32_t code) {
int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp); int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
void* pBuf = rpcMallocCont(size);
void* pBuf = rpcMallocCont(size);
SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead)); SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId); ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
@ -118,6 +121,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
pRsp->streamId = pTask->id.streamId; pRsp->streamId = pTask->id.streamId;
pRsp->upstreamTaskId = pTask->id.taskId; pRsp->upstreamTaskId = pTask->id.taskId;
pRsp->taskId = dstTaskId; pRsp->taskId = dstTaskId;
pRsp->rspCode = code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId; pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
@ -127,9 +131,7 @@ int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId
pRsp->transId = -1; pRsp->transId = -1;
} }
pRsp->rspCode = code; SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
SRpcMsg rspMsg = {.code = 0, .pCont = pRsp, .contLen = size, .info = *pRpcInfo};
tmsgSendRsp(&rspMsg); tmsgSendRsp(&rspMsg);
return 0; return 0;
} }
@ -267,7 +269,7 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId); stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointTriggerBlock(pBlock, pTask); continueDispatchCheckpointTriggerBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info } else { // only one task exists, no need to dispatch downstream info
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
} }
} else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -364,9 +366,8 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
taosThreadMutexUnlock(&pInfo->lock); taosThreadMutexUnlock(&pInfo->lock);
if (notReady == 0) { if (notReady == 0) {
stDebug("s-task:%s all downstream task(s) have completed build checkpoint, start to do checkpoint for current task", stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
id); appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId);
} }
return 0; return 0;
@ -707,11 +708,10 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
} }
} else { // clear the checkpoint info if failed } else { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock); taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, false); streamTaskSetFailedCheckpointId(pTask); // set failed checkpoint id before clear the checkpoint info
taosThreadMutexUnlock(&pTask->lock); taosThreadMutexUnlock(&pTask->lock);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
streamTaskSetFailedCheckpointId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId); stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
} }

View File

@ -40,6 +40,20 @@ void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen) {
pMsg->contLen = contLen; pMsg->contLen = contLen;
} }
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
pInfo->startTs = taosGetTimestampMs();
pInfo->rspTs = -1;
pInfo->msgId = msgId;
}
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->startTs = -1;
pInfo->msgId = -1;
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; }
static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId, static int32_t tInitStreamDispatchReq(SStreamDispatchReq* pReq, const SStreamTask* pTask, int32_t vgId,
int32_t numOfBlocks, int64_t dstTaskId, int32_t type) { int32_t numOfBlocks, int64_t dstTaskId, int32_t type) {
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
@ -225,12 +239,15 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) {
destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask)); destroyDispatchMsg(pMsgInfo->pData, streamTaskGetNumOfDownstream(pTask));
} }
taosThreadMutexLock(&pMsgInfo->lock);
pMsgInfo->checkpointId = -1; pMsgInfo->checkpointId = -1;
pMsgInfo->transId = -1; pMsgInfo->transId = -1;
pMsgInfo->pData = NULL; pMsgInfo->pData = NULL;
pMsgInfo->dispatchMsgType = 0; pMsgInfo->dispatchMsgType = 0;
taosThreadMutexLock(&pMsgInfo->lock); clearDispatchInfo(pMsgInfo);
taosArrayClear(pTask->msgInfo.pSendInfo); taosArrayClear(pTask->msgInfo.pSendInfo);
taosThreadMutexUnlock(&pMsgInfo->lock); taosThreadMutexUnlock(&pMsgInfo->lock);
} }
@ -416,6 +433,7 @@ static void setResendInfo(SDispatchEntry* pEntry, int64_t now) {
pEntry->sendTs = now; pEntry->sendTs = now;
pEntry->rspTs = -1; pEntry->rspTs = -1;
pEntry->retryCount += 1; pEntry->retryCount += 1;
pEntry->status = TSDB_CODE_SUCCESS;
} }
static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) { static void addDispatchEntry(SDispatchMsgInfo* pMsgInfo, int32_t nodeId, int64_t now, bool lock) {
@ -642,20 +660,6 @@ int32_t streamSearchAndAddBlock(SStreamTask* pTask, SStreamDispatchReq* pReqs, S
return 0; return 0;
} }
static void initDispatchInfo(SDispatchMsgInfo* pInfo, int32_t msgId) {
pInfo->startTs = taosGetTimestampMs();
pInfo->rspTs = -1;
pInfo->msgId = msgId;
}
static void clearDispatchInfo(SDispatchMsgInfo* pInfo) {
pInfo->startTs = -1;
pInfo->msgId = -1;
pInfo->rspTs = -1;
}
static void updateDispatchInfo(SDispatchMsgInfo* pInfo, int64_t recvTs) { pInfo->rspTs = recvTs; }
int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH || ASSERT((pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH ||
pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH)); pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH));
@ -698,7 +702,10 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) {
type == STREAM_INPUT__TRANS_STATE); type == STREAM_INPUT__TRANS_STATE);
pTask->execInfo.dispatch += 1; pTask->execInfo.dispatch += 1;
taosThreadMutexLock(&pTask->msgInfo.lock);
initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch); initDispatchInfo(&pTask->msgInfo, pTask->execInfo.dispatch);
taosThreadMutexUnlock(&pTask->msgInfo.lock);
int32_t code = doBuildDispatchMsg(pTask, pBlock); int32_t code = doBuildDispatchMsg(pTask, pBlock);
if (code == 0) { if (code == 0) {
@ -1221,10 +1228,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo; SDispatchMsgInfo* pMsgInfo = &pTask->msgInfo;
int32_t msgId = pMsgInfo->msgId;
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
int32_t totalRsp = 0; int32_t totalRsp = 0;
taosThreadMutexLock(&pMsgInfo->lock);
int32_t msgId = pMsgInfo->msgId;
taosThreadMutexUnlock(&pMsgInfo->lock);
// follower not handle the dispatch rsp // follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,

View File

@ -1053,15 +1053,19 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize); entry.sinkDataSize = SIZE_IN_MiB((*pTask)->execInfo.sink.dataSize);
} }
if ((*pTask)->chkInfo.pActiveInfo->activeId != 0) { SActiveCheckpointInfo* p = (*pTask)->chkInfo.pActiveInfo;
entry.checkpointInfo.failed = if (p->activeId != 0) {
((*pTask)->chkInfo.pActiveInfo->failedId >= (*pTask)->chkInfo.pActiveInfo->activeId) ? 1 : 0; entry.checkpointInfo.failed = (p->failedId >= p->activeId) ? 1 : 0;
entry.checkpointInfo.activeId = (*pTask)->chkInfo.pActiveInfo->activeId; entry.checkpointInfo.activeId = p->activeId;
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.pActiveInfo->transId; entry.checkpointInfo.activeTransId = p->transId;
if (entry.checkpointInfo.failed) { if (entry.checkpointInfo.failed) {
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d, clear the active checkpointInfo",
(*pTask)->chkInfo.pActiveInfo->transId); (*pTask)->id.idStr, p->transId);
taosThreadMutexLock(&(*pTask)->lock);
streamTaskClearCheckInfo((*pTask), true);
taosThreadMutexUnlock(&(*pTask)->lock);
} }
} }

View File

@ -1039,10 +1039,10 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) { void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo) {
pInfo->activeId = 0; // clear the checkpoint id pInfo->activeId = 0; // clear the checkpoint id
pInfo->failedId = 0;
pInfo->transId = 0; pInfo->transId = 0;
pInfo->allUpstreamTriggerRecv = 0; pInfo->allUpstreamTriggerRecv = 0;
pInfo->dispatchTrigger = false; pInfo->dispatchTrigger = false;
pInfo->failedId = 0;
taosArrayClear(pInfo->pDispatchTriggerList); taosArrayClear(pInfo->pDispatchTriggerList);
taosArrayClear(pInfo->pCheckpointReadyRecvList); taosArrayClear(pInfo->pCheckpointReadyRecvList);