Merge pull request #24098 from taosdata/fix/3_liaohj
fix(stream): send checkpoint complete msg to mnode when dropping it.
This commit is contained in:
commit
ef0cab18cb
|
@ -622,9 +622,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: send the checkpoint complete msg if it is in checkpoint procedure.
|
|
||||||
|
|
||||||
|
|
||||||
// drop the stream task now
|
// drop the stream task now
|
||||||
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId);
|
||||||
|
|
||||||
|
|
|
@ -720,14 +720,21 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
// this function is only invoked by source task, and send rsp to mnode
|
// this function is only invoked by source task, and send rsp to mnode
|
||||||
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE && taosArrayGetSize(pTask->pReadyMsgList) == 1);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, 0);
|
|
||||||
|
|
||||||
tmsgSendRsp(&pInfo->msg);
|
ASSERT(pTask->info.taskLevel == TASK_LEVEL__SOURCE);
|
||||||
|
|
||||||
taosArrayClear(pTask->pReadyMsgList);
|
if (taosArrayGetSize(pTask->pReadyMsgList) == 1) {
|
||||||
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, 0);
|
||||||
|
tmsgSendRsp(&pInfo->msg);
|
||||||
|
|
||||||
|
taosArrayClear(pTask->pReadyMsgList);
|
||||||
|
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s level:%d already send rsp to mnode", pTask->id.idStr, pTask->info.taskLevel);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -302,17 +302,25 @@ static void freeUpstreamItem(void* p) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tFreeStreamTask(SStreamTask* pTask) {
|
void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
int32_t taskId = pTask->id.taskId;
|
char* p = NULL;
|
||||||
|
int32_t taskId = pTask->id.taskId;
|
||||||
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
STaskExecStatisInfo* pStatis = &pTask->execInfo;
|
||||||
stDebug("start to free s-task:0x%x, %p, state:%p", taskId, pTask, pTask->pState);
|
|
||||||
|
|
||||||
|
ETaskStatus status1 = TASK_STATUS__UNINIT;
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
if (pTask->status.pSM != NULL) {
|
||||||
|
status1 = streamTaskGetStatus(pTask, &p);
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
stDebug("start to free s-task:0x%x, %p, state:%s", taskId, pTask, p);
|
||||||
|
|
||||||
|
SCheckpointInfo* pCkInfo = &pTask->chkInfo;
|
||||||
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
||||||
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
||||||
" nextProcessVer:%" PRId64 ", checkpointCount:%d",
|
" nextProcessVer:%" PRId64 ", checkpointCount:%d",
|
||||||
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
|
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
|
||||||
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer,
|
pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
|
||||||
pStatis->checkpoint);
|
|
||||||
|
|
||||||
// remove the ref by timer
|
// remove the ref by timer
|
||||||
while (pTask->status.timerActive > 0) {
|
while (pTask->status.timerActive > 0) {
|
||||||
|
@ -335,7 +343,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
pTask->msgInfo.pTimer = NULL;
|
pTask->msgInfo.pTimer = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
|
|
||||||
if (pTask->inputq.queue) {
|
if (pTask->inputq.queue) {
|
||||||
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
|
streamQueueClose(pTask->inputq.queue, pTask->id.taskId);
|
||||||
}
|
}
|
||||||
|
@ -377,9 +384,8 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
if (pTask->pState) {
|
if (pTask->pState) {
|
||||||
stDebug("s-task:0x%x start to free task state", taskId);
|
stDebug("s-task:0x%x start to free task state", taskId);
|
||||||
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
|
streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING);
|
||||||
taskDbRemoveRef(pTask->pBackend);
|
taskDbRemoveRef(pTask->pBackend);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pTask->id.idStr != NULL) {
|
if (pTask->id.idStr != NULL) {
|
||||||
|
@ -396,7 +402,6 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM);
|
pTask->status.pSM = streamDestroyStateMachine(pTask->status.pSM);
|
||||||
|
|
||||||
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
|
streamTaskDestroyUpstreamInfo(&pTask->upstreamInfo);
|
||||||
|
|
||||||
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
|
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
|
||||||
|
|
|
@ -56,6 +56,7 @@ static int32_t streamTaskInitStatus(SStreamTask* pTask);
|
||||||
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
|
static int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask);
|
||||||
static int32_t initStateTransferTable();
|
static int32_t initStateTransferTable();
|
||||||
static void doInitStateTransferTable(void);
|
static void doInitStateTransferTable(void);
|
||||||
|
static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
static STaskStateTrans createStateTransform(ETaskStatus current, ETaskStatus next, EStreamTaskEvent event,
|
||||||
__state_trans_fn fn, __state_trans_succ_fn succFn,
|
__state_trans_fn fn, __state_trans_succ_fn succFn,
|
||||||
|
@ -87,6 +88,13 @@ static int32_t streamTaskDoCheckpoint(SStreamTask* pTask) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask) {
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
|
streamTaskSendCheckpointSourceRsp(pTask);
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
int32_t streamTaskKeepCurrentVerInWal(SStreamTask* pTask) {
|
||||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
|
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
|
||||||
|
|
||||||
|
@ -551,7 +559,7 @@ void doInitStateTransferTable(void) {
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, streamTaskSendTransSuccessMsg, NULL, NULL, true);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__STREAM_SCAN_HISTORY, TASK_STATUS__DROPPING, TASK_EVENT_DROPPING, NULL, NULL, NULL, true);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
Loading…
Reference in New Issue