Merge pull request #27539 from taosdata/fix/3_liaohj

refactor: do some internal refactor.
This commit is contained in:
Haojun Liao 2024-08-29 18:38:05 +08:00 committed by GitHub
commit 705d778d95
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 82 additions and 63 deletions

View File

@ -785,7 +785,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta); bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts); int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts);
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts);
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts);
// timer // timer
int32_t streamTimerGetInstance(tmr_h* pTmr); int32_t streamTimerGetInstance(tmr_h* pTmr);

View File

@ -138,6 +138,12 @@ int32_t tqScanWalAsync(STQ* pTq, bool ckPause) {
return 0; return 0;
} }
if (pMeta->startInfo.startAllTasks) {
tqTrace("vgId:%d in restart procedure, not scan wal", vgId);
streamMetaWUnLock(pMeta);
return 0;
}
pMeta->scanInfo.scanCounter += 1; pMeta->scanInfo.scanCounter += 1;
if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) { if (pMeta->scanInfo.scanCounter > MAX_REPEAT_SCAN_THRESHOLD) {
pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD; pMeta->scanInfo.scanCounter = MAX_REPEAT_SCAN_THRESHOLD;

View File

@ -1193,13 +1193,12 @@ int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t code = 0; int32_t code = 0;
SStreamTask* pTask = NULL;
SRestoreCheckpointInfo req = {0};
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead); int32_t len = pMsg->contLen - sizeof(SMsgHead);
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
SRestoreCheckpointInfo req = {0};
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len); tDecoderInit(&decoder, (uint8_t*)msg, len);
@ -1211,7 +1210,6 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
tDecoderClear(&decoder); tDecoderClear(&decoder);
SStreamTask* pTask = NULL;
code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask); code = streamMetaAcquireTask(pMeta, req.streamId, req.taskId, &pTask);
if (pTask == NULL || (code != 0)) { if (pTask == NULL || (code != 0)) {
tqError( tqError(
@ -1238,9 +1236,10 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId); ASSERT(pTask->chkInfo.checkpointId >= req.checkpointId);
if (pTask->status.consenChkptInfo.consenChkptTransId >= req.transId) { SConsenChkptInfo* pConsenInfo = &pTask->status.consenChkptInfo;
if (pConsenInfo->consenChkptTransId >= req.transId) {
tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId, tqDebug("s-task:%s vgId:%d latest consensus transId:%d, expired consensus trans:%d, discard", pTask->id.idStr, vgId,
pTask->status.consenChkptInfo.consenChkptTransId, req.transId); pConsenInfo->consenChkptTransId, req.transId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1256,9 +1255,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
pTask->id.idStr, vgId, req.checkpointId, req.transId); pTask->id.idStr, vgId, req.checkpointId, req.transId);
} }
pTask->status.consenChkptInfo.consenChkptTransId = req.transId; streamTaskSetConsenChkptIdRecv(pTask, req.transId, now);
pTask->status.consenChkptInfo.status = TASK_CONSEN_CHKPT_RECV;
pTask->status.consenChkptInfo.statusTs = taosGetTimestampMs();
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (pMeta->role == NODE_ROLE_LEADER) { if (pMeta->role == NODE_ROLE_LEADER) {

View File

@ -615,7 +615,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs; pInfo->checkpointTime = pReq->checkpointTs;
if (restored) { if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} }
} }
@ -1371,9 +1371,6 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
} }
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) { int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
const char* id = pTask->id.idStr;
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
ETaskStatus p = streamTaskGetStatus(pTask).state; ETaskStatus p = streamTaskGetStatus(pTask).state;
// if (pInfo->alreadySendChkptId == true) { // if (pInfo->alreadySendChkptId == true) {
@ -1384,16 +1381,13 @@ int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
// pInfo->alreadySendChkptId = true; // pInfo->alreadySendChkptId = true;
// } // }
// //
streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (pTask->pBackend != NULL) { if (pTask->pBackend != NULL) {
streamFreeTaskState(pTask, p); streamFreeTaskState(pTask, p);
pTask->pBackend = NULL; pTask->pBackend = NULL;
} }
pInfo->status = TASK_CONSEN_CHKPT_REQ;
pInfo->statusTs = taosGetTimestampMs();
stDebug("s-task:%s set the require consensus-checkpointId flag, ts:%" PRId64, id, pInfo->statusTs);
return 0; return 0;
} }

View File

@ -25,7 +25,6 @@ int32_t streamMetaId = 0;
struct SMetaHbInfo { struct SMetaHbInfo {
tmr_h hbTmr; tmr_h hbTmr;
int32_t stopFlag;
int32_t tickCounter; int32_t tickCounter;
int32_t hbCount; int32_t hbCount;
int64_t hbStart; int64_t hbStart;
@ -197,10 +196,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
} }
} }
entry.checkpointInfo.consensusChkptId = streamTaskSetReqConsensusChkptId(*pTask, pMsg->ts); streamMutexLock(&(*pTask)->lock);
entry.checkpointInfo.consensusChkptId = streamTaskCheckIfReqConsenChkptId(*pTask, pMsg->ts);
if (entry.checkpointInfo.consensusChkptId) { if (entry.checkpointInfo.consensusChkptId) {
entry.checkpointInfo.consensusTs = pMsg->ts; entry.checkpointInfo.consensusTs = pMsg->ts;
} }
streamMutexUnlock(&(*pTask)->lock);
if ((*pTask)->exec.pWalReader != NULL) { if ((*pTask)->exec.pWalReader != NULL) {
entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1; entry.processedVer = walReaderGetCurrentVer((*pTask)->exec.pWalReader) - 1;
@ -240,6 +241,8 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
void streamMetaHbToMnode(void* param, void* tmrId) { void streamMetaHbToMnode(void* param, void* tmrId) {
int64_t rid = *(int64_t*)param; int64_t rid = *(int64_t*)param;
int32_t code = 0; int32_t code = 0;
int32_t vgId = 0;
int32_t role = 0;
SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid); SStreamMeta* pMeta = taosAcquireRef(streamMetaId, rid);
if (pMeta == NULL) { if (pMeta == NULL) {
@ -247,29 +250,41 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
return; return;
} }
vgId = pMeta->vgId;
role = pMeta->role;
// need to stop, stop now // need to stop, stop now
if (pMeta->pHbInfo->stopFlag == STREAM_META_WILL_STOP) { // todo refactor: not need this now, use closeFlag in Meta if (pMeta->closeFlag) {
pMeta->pHbInfo->stopFlag = STREAM_META_OK_TO_STOP; pMeta->pHbInfo->hbStart = 0;
code = taosReleaseRef(streamMetaId, rid); code = taosReleaseRef(streamMetaId, rid);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
stDebug("vgId:%d jump out of meta timer", pMeta->vgId); stDebug("vgId:%d jump out of meta timer", vgId);
} else { } else {
stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); stError("vgId:%d jump out of meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
} }
return; return;
} }
// not leader not send msg // not leader not send msg
if (pMeta->role != NODE_ROLE_LEADER) { if (pMeta->role != NODE_ROLE_LEADER) {
pMeta->pHbInfo->hbStart = 0;
code = taosReleaseRef(streamMetaId, rid); code = taosReleaseRef(streamMetaId, rid);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
stInfo("vgId:%d role:%d not leader not send hb to mnode", pMeta->vgId, pMeta->role); stInfo("vgId:%d role:%d not leader not send hb to mnode", vgId, role);
} else { } else {
stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, pMeta->vgId, stError("vgId:%d role:%d not leader not send hb to mnodefailed to release the meta rid:%" PRId64, vgId, role, rid);
pMeta->role, rid); }
return;
} }
pMeta->pHbInfo->hbStart = 0; if (!waitForEnoughDuration(pMeta->pHbInfo)) {
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
}
return; return;
} }
@ -278,17 +293,6 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
pMeta->pHbInfo->hbStart = taosGetTimestampMs(); pMeta->pHbInfo->hbStart = taosGetTimestampMs();
} }
if (!waitForEnoughDuration(pMeta->pHbInfo)) {
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid);
}
return;
}
streamMetaRLock(pMeta); streamMetaRLock(pMeta);
code = streamMetaSendHbHelper(pMeta); code = streamMetaSendHbHelper(pMeta);
if (code) { if (code) {
@ -298,10 +302,10 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId, streamTmrReset(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, param, streamTimer, &pMeta->pHbInfo->hbTmr, pMeta->vgId,
"meta-hb-tmr"); "meta-hb-tmr");
code = taosReleaseRef(streamMetaId, rid);
code = taosReleaseRef(streamMetaId, rid);
if (code) { if (code) {
stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, pMeta->vgId, rid); stError("vgId:%d in meta timer, failed to release the meta rid:%" PRId64, vgId, rid);
} }
} }
@ -314,7 +318,6 @@ int32_t createMetaHbInfo(int64_t* pRid, SMetaHbInfo** pRes) {
pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer); pInfo->hbTmr = taosTmrStart(streamMetaHbToMnode, META_HB_CHECK_INTERVAL, pRid, streamTimer);
pInfo->tickCounter = 0; pInfo->tickCounter = 0;
pInfo->stopFlag = 0;
pInfo->msgSendTs = -1; pInfo->msgSendTs = -1;
pInfo->hbCount = 0; pInfo->hbCount = 0;
@ -338,13 +341,10 @@ void destroyMetaHbInfo(SMetaHbInfo* pInfo) {
void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) { void streamMetaWaitForHbTmrQuit(SStreamMeta* pMeta) {
// wait for the stream meta hb function stopping // wait for the stream meta hb function stopping
if (pMeta->role == NODE_ROLE_LEADER) { if (pMeta->role == NODE_ROLE_LEADER) {
pMeta->pHbInfo->stopFlag = STREAM_META_WILL_STOP; taosMsleep(2 * META_HB_CHECK_INTERVAL);
while (pMeta->pHbInfo->stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId); stDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
} }
} }
}
void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount) { void streamMetaGetHbSendInfo(SMetaHbInfo* pInfo, int64_t* pStartTs, int32_t* pSendCount) {
*pStartTs = 0; *pStartTs = 0;

View File

@ -444,7 +444,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
return 0; return 0;
} }
int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) { int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo; SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
@ -455,11 +455,13 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
vgId, pConChkptInfo->statusTs); vgId, pConChkptInfo->statusTs);
return 1; return 1;
} else { } else {
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && (ts - pConChkptInfo->statusTs) > 60 * 1000) { int32_t el = (ts - pConChkptInfo->statusTs) / 1000;
if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) {
pConChkptInfo->statusTs = ts; pConChkptInfo->statusTs = ts;
stWarn("s-task:%s vgId:%d not recv consensus-chkptId for 60s, set requiring in Hb again, ts:%" PRId64, stWarn(
pTask->id.idStr, vgId, pConChkptInfo->statusTs); "s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64,
pTask->id.idStr, vgId, el, pConChkptInfo->statusTs);
return 1; return 1;
} }
} }
@ -467,4 +469,22 @@ int32_t streamTaskSetReqConsensusChkptId(SStreamTask* pTask, int64_t ts) {
return 0; return 0;
} }
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) {
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
pInfo->consenChkptTransId = transId;
pInfo->status = TASK_CONSEN_CHKPT_RECV;
pInfo->statusTs = ts;
stDebug("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
}
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
int32_t prevTrans = pInfo->consenChkptTransId;
pInfo->status = TASK_CONSEN_CHKPT_REQ;
pInfo->statusTs = ts;
pInfo->consenChkptTransId = 0;
stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
}