refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-01-16 15:40:40 +08:00
parent 9a4b7a6d82
commit 8ede6d87ce
5 changed files with 23 additions and 18 deletions

View File

@ -689,9 +689,9 @@ typedef struct STaskStatusEntry {
int64_t verStart; // start version in WAL, only valid for source task int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task int64_t processedVer; // only valid for source task
int64_t activeCheckpointId; // current active checkpoint id int64_t checkpointId; // current active checkpoint id
int32_t chkpointTransId; // checkpoint trans id int32_t chkpointTransId; // checkpoint trans id
bool checkpointFailed; // denote if the checkpoint is failed or not int8_t checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter; int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB double inputQUsed; // in MiB

View File

@ -3026,11 +3026,11 @@ static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
} }
int32_t mndProcessStreamHb(SRpcMsg *pReq) { int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SMnode * pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
bool checkpointFailed = false; bool checkpointFailed = false;
int64_t activeCheckpointId = 0; int64_t checkpointId = 0;
int64_t streamId = 0; int64_t streamId = 0;
int32_t transId = 0; int32_t transId = 0;
@ -3092,14 +3092,17 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
streamTaskStatusCopy(pTaskEntry, p); streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) { if (p->checkpointId != 0) {
if (activeCheckpointId != 0) { if (checkpointId != 0) {
ASSERT(activeCheckpointId == p->activeCheckpointId); ASSERT(checkpointId == p->checkpointId);
} else { } else {
activeCheckpointId = p->activeCheckpointId; checkpointId = p->checkpointId;
} }
if (p->checkpointFailed) { if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " failed, transId:%d, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
checkpointFailed = p->checkpointFailed; checkpointFailed = p->checkpointFailed;
streamId = p->id.streamId; streamId = p->id.streamId;
transId = p->chkpointTransId; transId = p->chkpointTransId;
@ -3121,17 +3124,17 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans // current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal // kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) { if (checkpointFailed && checkpointId != 0) {
bool allReady = true; bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p); taosArrayDestroy(p);
if (allReady || snodeChanged) { if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId); mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", checkpointId);
mndResetStatusFromCheckpoint(pMnode, streamId, transId); mndResetStatusFromCheckpoint(pMnode, streamId, transId);
} else { } else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks"); mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
} }
} }
@ -3145,6 +3148,7 @@ void freeCheckpointCandEntry(void *param) {
SCheckpointCandEntry *pEntry = param; SCheckpointCandEntry *pEntry = param;
taosMemoryFreeClear(pEntry->pName); taosMemoryFreeClear(pEntry->pName);
} }
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void * pIter = NULL; void * pIter = NULL;
SSdb * pSdb = pMnode->pSdb; SSdb * pSdb = pMnode->pSdb;

View File

@ -360,6 +360,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
void streamTaskSetCheckpointFailedId(SStreamTask* pTask) { void streamTaskSetCheckpointFailedId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
} }
int32_t getChkpMeta(char* id, char* path, SArray* list) { int32_t getChkpMeta(char* id, char* path, SArray* list) {

View File

@ -962,7 +962,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1; if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1;
} }
@ -1001,8 +1001,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1; if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1;
entry.id.taskId = taskId; entry.id.taskId = taskId;
@ -1115,8 +1115,8 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
} }
if ((*pTask)->chkInfo.checkpointingId != 0) { if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId); entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId)? 1:0;
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId; entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId; entry.chkpointTransId = (*pTask)->chkInfo.transId;
if (entry.checkpointFailed) { if (entry.checkpointFailed) {
@ -1375,7 +1375,6 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { if (pState->state == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask); streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId);
} else { } else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name); stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name);
} }

View File

@ -810,7 +810,7 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->verEnd = pSrc->verEnd; pDst->verEnd = pSrc->verEnd;
pDst->sinkQuota = pSrc->sinkQuota; pDst->sinkQuota = pSrc->sinkQuota;
pDst->sinkDataSize = pSrc->sinkDataSize; pDst->sinkDataSize = pSrc->sinkDataSize;
pDst->activeCheckpointId = pSrc->activeCheckpointId; pDst->checkpointId = pSrc->checkpointId;
pDst->checkpointFailed = pSrc->checkpointFailed; pDst->checkpointFailed = pSrc->checkpointFailed;
pDst->chkpointTransId = pSrc->chkpointTransId; pDst->chkpointTransId = pSrc->chkpointTransId;
} }