fix(stream): update the checkpoint info only when the status is normal.

This commit is contained in:
Haojun Liao 2024-06-21 09:23:29 +08:00
parent ece139d921
commit 2cb9644471
10 changed files with 50 additions and 51 deletions

View File

@ -41,7 +41,7 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessRetrieveTriggerRsp(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen);
void tqSetRestoreVersionInfo(SStreamTask* pTask); void tqSetRestoreVersionInfo(SStreamTask* pTask);
int32_t tqExpandStreamTask(SStreamTask* pTask); int32_t tqExpandStreamTask(SStreamTask* pTask);

View File

@ -770,7 +770,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg, int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int32_t setCode); int32_t setCode);
int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask); int32_t streamSendChkptReportMsg(SStreamTask* pTask, SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq); int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq);
SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo(); SActiveCheckpointInfo* streamTaskCreateActiveChkptInfo();
// stream task state machine, and event handling // stream task state machine, and event handling

View File

@ -154,7 +154,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
case TDMT_STREAM_TASK_RESUME: case TDMT_STREAM_TASK_RESUME:
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false); return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
case TDMT_STREAM_TASK_UPDATE_CHKPT: case TDMT_STREAM_TASK_UPDATE_CHKPT:
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, true, pMsg->pCont, pMsg->contLen);
default: default:
ASSERT(0); ASSERT(0);
} }

View File

@ -1013,7 +1013,7 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
} }
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen); return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, pTq->pVnode->restored, msg, msgLen);
} }
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {

View File

@ -640,7 +640,7 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
return 0; return 0;
} }
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, bool restored, char* msg, int32_t msgLen) {
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg; SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
@ -652,13 +652,14 @@ int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, in
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL && (*ppTask) != NULL) { if (ppTask != NULL && (*ppTask) != NULL) {
streamTaskUpdateTaskCheckpointInfo(*ppTask, pReq); streamTaskUpdateTaskCheckpointInfo(*ppTask, restored, pReq);
} else { // failed to get the task. } else { // failed to get the task.
tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already", tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already",
vgId, pReq->taskId); vgId, pReq->taskId);
} }
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
// always return success when handling the requirement issued by mnode during transaction.
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -257,7 +257,8 @@ int32_t bkdMgtGetDelta(SBkdMgt* bm, char* taskId, int64_t chkpId, SArray* list,
int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname); int32_t bkdMgtDumpTo(SBkdMgt* bm, char* taskId, char* dname);
void bkdMgtDestroy(SBkdMgt* bm); void bkdMgtDestroy(SBkdMgt* bm);
int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list); int32_t taskDbGenChkpUploadData(void* arg, void* bkdMgt, int64_t chkpId, int8_t type, char** path, SArray* list,
const char* id);
void* taskAcquireDb(int64_t refId); void* taskAcquireDb(int64_t refId);
void taskReleaseDb(int64_t refId); void taskReleaseDb(int64_t refId);

View File

@ -2181,7 +2181,6 @@ void taskDbDestroy(void* pDb, bool flush) {
void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); } void taskDbDestroy2(void* pDb) { taskDbDestroy(pDb, true); }
int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) { int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char** path) {
int64_t st = taosGetTimestampMs();
int32_t code = -1; int32_t code = -1;
int64_t refId = pDb->refId; int64_t refId = pDb->refId;
@ -2202,7 +2201,7 @@ int32_t taskDbGenChkpUploadData__rsync(STaskDbWrapper* pDb, int64_t chkpId, char
return code; return code;
} }
int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list) { int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64_t chkpId, char** path, SArray* list, const char* idStr) {
int32_t code = 0; int32_t code = 0;
SBkdMgt* p = (SBkdMgt*)bkdChkpMgt; SBkdMgt* p = (SBkdMgt*)bkdChkpMgt;
@ -2210,7 +2209,7 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId); sprintf(temp, "%s%s%s%" PRId64, pDb->path, TD_DIRSEP, "tmp", chkpId);
if (taosDirExist(temp)) { if (taosDirExist(temp)) {
cleanDir(temp, ""); cleanDir(temp, idStr);
} else { } else {
taosMkDir(temp); taosMkDir(temp);
} }
@ -2220,7 +2219,8 @@ int32_t taskDbGenChkpUploadData__s3(STaskDbWrapper* pDb, void* bkdChkpMgt, int64
return code; return code;
} }
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list) {
int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t type, char** path, SArray* list, const char* idStr) {
int32_t code = -1; int32_t code = -1;
STaskDbWrapper* pDb = arg; STaskDbWrapper* pDb = arg;
ECHECKPOINT_BACKUP_TYPE utype = type; ECHECKPOINT_BACKUP_TYPE utype = type;
@ -2229,7 +2229,7 @@ int32_t taskDbGenChkpUploadData(void* arg, void* mgt, int64_t chkpId, int8_t typ
if (utype == DATA_UPLOAD_RSYNC) { if (utype == DATA_UPLOAD_RSYNC) {
code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path); code = taskDbGenChkpUploadData__rsync(pDb, chkpId, path);
} else if (utype == DATA_UPLOAD_S3) { } else if (utype == DATA_UPLOAD_S3) {
code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list); code = taskDbGenChkpUploadData__s3(pDb, mgt, chkpId, path, list, idStr);
} }
taskDbUnRefChkp(pDb, chkpId); taskDbUnRefChkp(pDb, chkpId);
return code; return code;

View File

@ -18,16 +18,6 @@
#include "streamBackendRocksdb.h" #include "streamBackendRocksdb.h"
#include "streamInt.h" #include "streamInt.h"
typedef struct {
ECHECKPOINT_BACKUP_TYPE type;
char* taskId;
int64_t chkpId;
SStreamTask* pTask;
int64_t dbRefId;
void* pMeta;
} SAsyncUploadArg;
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t deleteCheckpointFile(const char* id, const char* name);
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
@ -412,7 +402,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
} }
} }
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq) { int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
SStreamMeta* pMeta = pTask->pMeta; SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t code = 0; int32_t code = 0;
@ -446,13 +436,28 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
SStreamTaskState* pStatus = streamTaskGetStatus(pTask); SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64 if ((!restored) && (pStatus->state != TASK_STATUS__CK)) {
stDebug("s-task:0x%x vgId:%d restored:%d status:%s not update checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" failed",
pReq->taskId, vgId, restored, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (!restored) { // during restore procedure, do update checkpoint-info
stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64, " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer, id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs); pInfo->checkpointTime, pReq->checkpointTs);
} else { // not in restore status, must be in checkpoint status
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs);
}
if (pStatus->state != TASK_STATUS__DROPPING) { ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer); pInfo->processedVer >= pReq->checkpointVer);
pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointVer = pReq->checkpointVer;
@ -460,19 +465,12 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
streamTaskClearCheckInfo(pTask, false); streamTaskClearCheckInfo(pTask, false);
// todo handle error
if (pStatus->state == TASK_STATUS__CK) { if (pStatus->state == TASK_STATUS__CK) {
// todo handle error
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else { } else {
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name); stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name);
} }
} else {
stDebug("s-task:0x%x vgId:%d status:%s not update checkpoint info, checkpointId:%" PRId64 "->%" PRId64 " failed",
pReq->taskId, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (pReq->dropRelHTask) { if (pReq->dropRelHTask) {
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",

View File

@ -906,9 +906,9 @@ int32_t streamTaskSendCheckpointSourceRsp(SStreamTask* pTask) {
tmsgSendRsp(&pInfo->msg); tmsgSendRsp(&pInfo->msg);
taosArrayClear(pList); taosArrayClear(pList);
stDebug("s-task:%s level:%d source checkpoint completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel); stDebug("s-task:%s level:%d checkpoint-source rsp completed msg sent to mnode", pTask->id.idStr, pTask->info.taskLevel);
} else { } else {
stDebug("s-task:%s level:%d already send rsp checkpoint success to mnode", pTask->id.idStr, pTask->info.taskLevel); stDebug("s-task:%s level:%d already send checkpoint-source rsp success to mnode", pTask->id.idStr, pTask->info.taskLevel);
} }
taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock); taosThreadMutexUnlock(&pTask->chkInfo.pActiveInfo->lock);

View File

@ -231,7 +231,6 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
return 0; return 0;
} else if (compatible == STREAM_STATA_NEED_CONVERT) { } else if (compatible == STREAM_STATA_NEED_CONVERT) {
stInfo("vgId:%d stream state need covert backend format", pMeta->vgId); stInfo("vgId:%d stream state need covert backend format", pMeta->vgId);
return streamMetaCvtDbFormat(pMeta); return streamMetaCvtDbFormat(pMeta);
} else if (compatible == STREAM_STATA_NO_COMPATIBLE) { } else if (compatible == STREAM_STATA_NO_COMPATIBLE) {
stError( stError(