fix(stream): add strict check for checkpoint-trigger msg dispatach rsp confirmation flag.
This commit is contained in:
parent
5184e444df
commit
dcc97472d3
|
@ -355,6 +355,8 @@ typedef struct SMetaHbInfo SMetaHbInfo;
|
||||||
typedef struct SDispatchMsgInfo {
|
typedef struct SDispatchMsgInfo {
|
||||||
SStreamDispatchReq* pData; // current dispatch data
|
SStreamDispatchReq* pData; // current dispatch data
|
||||||
int8_t dispatchMsgType;
|
int8_t dispatchMsgType;
|
||||||
|
int64_t checkpointId;// checkpoint id msg
|
||||||
|
int32_t transId; // transId for current checkpoint
|
||||||
int16_t msgType; // dispatch msg type
|
int16_t msgType; // dispatch msg type
|
||||||
int32_t retryCount; // retry send data count
|
int32_t retryCount; // retry send data count
|
||||||
int64_t startTs; // dispatch start time, record total elapsed time for dispatch
|
int64_t startTs; // dispatch start time, record total elapsed time for dispatch
|
||||||
|
|
|
@ -321,6 +321,8 @@ void clearBufferedDispatchMsg(SStreamTask* pTask) {
|
||||||
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask));
|
destroyDispatchMsg(pMsgInfo->pData, getNumOfDispatchBranch(pTask));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMsgInfo->checkpointId = -1;
|
||||||
|
pMsgInfo->transId = -1;
|
||||||
pMsgInfo->pData = NULL;
|
pMsgInfo->pData = NULL;
|
||||||
pMsgInfo->dispatchMsgType = 0;
|
pMsgInfo->dispatchMsgType = 0;
|
||||||
}
|
}
|
||||||
|
@ -332,6 +334,12 @@ static int32_t doBuildDispatchMsg(SStreamTask* pTask, const SStreamDataBlock* pD
|
||||||
|
|
||||||
pTask->msgInfo.dispatchMsgType = pData->type;
|
pTask->msgInfo.dispatchMsgType = pData->type;
|
||||||
|
|
||||||
|
if (pData->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
||||||
|
SSDataBlock* p = taosArrayGet(pData->blocks, 0);
|
||||||
|
pTask->msgInfo.checkpointId = p->info.version;
|
||||||
|
pTask->msgInfo.transId = p->info.window.ekey;
|
||||||
|
}
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
|
SStreamDispatchReq* pReq = taosMemoryCalloc(1, sizeof(SStreamDispatchReq));
|
||||||
|
|
||||||
|
@ -954,7 +962,9 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
|
||||||
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
bool delayDispatch = (pTask->msgInfo.dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
if (delayDispatch) {
|
if (delayDispatch) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
|
// we only set the dispatch msg info for current checkpoint trans
|
||||||
|
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK && pTask->chkInfo.checkpointingId == pTask->msgInfo.checkpointId) {
|
||||||
|
ASSERT(pTask->chkInfo.transId == pTask->msgInfo.transId);
|
||||||
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
pTask->chkInfo.dispatchCheckpointTrigger = true;
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
Loading…
Reference in New Issue