refactor: do some internal refactor.
This commit is contained in:
parent
40669f4e9c
commit
5bffb0c675
|
@ -785,7 +785,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
||||||
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
|
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts);
|
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts);
|
||||||
|
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts);
|
||||||
|
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts);
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
||||||
|
|
|
@ -138,6 +138,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (pMeta->startInfo.startAllTasks) {
|
||||||
|
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
pMeta->scanInfo.scanCounter += 1;
|
pMeta->scanInfo.scanCounter += 1;
|
||||||
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
|
||||||
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;
|
||||||
|
|
|
@ -1193,13 +1193,12 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
SStreamTask* pTask = NULL;
|
||||||
|
SRestoreCheckpointInfo req = {0};
|
||||||
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
|
||||||
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
int32_t len = pMsg->contLen - sizeof(SMsgHead);
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
SRestoreCheckpointInfo req = {0};
|
|
||||||
|
|
||||||
SDecoder decoder;
|
SDecoder decoder;
|
||||||
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
tDecoderInit(&decoder, (uint8_t*)msg, len);
|
||||||
|
|
||||||
|
@ -1211,7 +1210,6 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
tDecoderClear(&decoder);
|
tDecoderClear(&decoder);
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
|
||||||
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
|
||||||
if (pTask == NULL || (code != 0)) {
|
if (pTask == NULL || (code != 0)) {
|
||||||
tqError(
|
tqError(
|
||||||
|
@ -1238,9 +1236,10 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
|
||||||
|
|
||||||
if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) {
|
SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
|
||||||
|
if (pConsenInfo->consenChkptTransId >= req.transId) {
|
||||||
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
|
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
|
||||||
pTask->status.consenChkptInfo.consenChkptTransId, req.transId);
|
pConsenInfo->consenChkptTransId, req.transId);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1256,9 +1255,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
pTask->id.idStr, vgId, req.checkpointId, req.transId);
|
pTask->id.idStr, vgId, req.checkpointId, req.transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->status.consenChkptInfo.consenChkptTransId = req.transId;
|
streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
|
||||||
pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV;
|
|
||||||
pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs();
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pMeta->role == NODE_ROLE_LEADER) {
|
if (pMeta->role == NODE_ROLE_LEADER) {
|
||||||
|
|
|
@ -615,7 +615,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
pInfo->checkpointVer = pReq->checkpointVer;
|
pInfo->checkpointVer = pReq->checkpointVer;
|
||||||
pInfo->checkpointTime = pReq->checkpointTs;
|
pInfo->checkpointTime = pReq->checkpointTs;
|
||||||
|
|
||||||
if (restored) {
|
if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1371,9 +1371,6 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
|
||||||
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
|
|
||||||
|
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
ETaskStatus p = streamTaskGetStatus(pTask).state;
|
ETaskStatus p = streamTaskGetStatus(pTask).state;
|
||||||
// if (pInfo->alreadySendChkptId == true) {
|
// if (pInfo->alreadySendChkptId == true) {
|
||||||
|
@ -1384,16 +1381,13 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
||||||
// pInfo->alreadySendChkptId = true;
|
// pInfo->alreadySendChkptId = true;
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
|
streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
if (pTask->pBackend != NULL) {
|
if (pTask->pBackend != NULL) {
|
||||||
streamFreeTaskState(pTask, p);
|
streamFreeTaskState(pTask, p);
|
||||||
pTask->pBackend = NULL;
|
pTask->pBackend = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->status = TASK_CONSEN_CHKPT_REQ;
|
|
||||||
pInfo->statusTs = taosGetTimestampMs();
|
|
||||||
stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -197,10 +197,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts);
|
streamMutexLock(&(*pTask)->lock);
|
||||||
|
entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(*pTask, pMsg->ts);
|
||||||
if (entry.checkpointInfo.consensusChkptId) {
|
if (entry.checkpointInfo.consensusChkptId) {
|
||||||
entry.checkpointInfo.consensusTs = pMsg->ts;
|
entry.checkpointInfo.consensusTs = pMsg->ts;
|
||||||
}
|
}
|
||||||
|
streamMutexUnlock(&(*pTask)->lock);
|
||||||
|
|
||||||
if ((*pTask)->exec.pWalReader != NULL) {
|
if ((*pTask)->exec.pWalReader != NULL) {
|
||||||
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
|
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
|
||||||
|
|
|
@ -444,7 +444,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
|
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
|
SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
|
||||||
|
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
@ -455,11 +455,13 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
vgId, pConChkptInfo->statusTs);
|
vgId, pConChkptInfo->statusTs);
|
||||||
return 1;
|
return 1;
|
||||||
} else {
|
} else {
|
||||||
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) {
|
int64_t el = (ts - pConChkptInfo->statusTs) / 1000;
|
||||||
|
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) {
|
||||||
pConChkptInfo->statusTs = ts;
|
pConChkptInfo->statusTs = ts;
|
||||||
|
|
||||||
stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64,
|
stWarn(
|
||||||
pTask->id.idStr, vgId, pConChkptInfo->statusTs);
|
"s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64,
|
||||||
|
pTask->id.idStr, vgId, el, pConChkptInfo->statusTs);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -467,4 +469,22 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) {
|
||||||
|
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
|
||||||
|
pInfo->consenChkptTransId = transId;
|
||||||
|
pInfo->status = TASK_CONSEN_CHKPT_RECV;
|
||||||
|
pInfo->statusTs = ts;
|
||||||
|
|
||||||
|
stDebug("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
|
||||||
|
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
|
||||||
|
int32_t prevTrans = pInfo->consenChkptTransId;
|
||||||
|
|
||||||
|
pInfo->status = TASK_CONSEN_CHKPT_REQ;
|
||||||
|
pInfo->statusTs = ts;
|
||||||
|
pInfo->consenChkptTransId = 0;
|
||||||
|
|
||||||
|
stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue