fix(stream): update the checkpoint info for followers, and some internal refactor.

This commit is contained in:
Haojun Liao 2024-05-11 09:37:02 +08:00
parent 6721ec3cd8
commit 7f93ec2c53
22 changed files with 272 additions and 206 deletions

View File

@ -3440,23 +3440,24 @@ int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamR
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
void tFreeMDropStreamReq(SMDropStreamReq* pReq);
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
int8_t igNotExists;
} SMRecoverStreamReq;
typedef struct {
int8_t reserved;
} SMRecoverStreamRsp;
typedef struct {
int64_t recoverObjUid;
int32_t taskId;
int32_t hasCheckPoint;
} SMVStreamGatherInfoReq;
// int32_t tSerializeSMRecoverStreamReq(void* buf, int32_t bufLen, const SMRecoverStreamReq* pReq);
// int32_t tDeserializeSMRecoverStreamReq(void* buf, int32_t bufLen, SMRecoverStreamReq* pReq);
typedef struct SVUpdateCheckpointInfoReq {
SMsgHead head;
int64_t streamId;
int32_t taskId;
int64_t checkpointId;
int64_t checkpointVer;
int64_t checkpointTs;
int32_t transId;
int8_t dropRelHTask;
int64_t hStreamId;
int64_t hTaskId;
} SVUpdateCheckpointInfoReq;
typedef struct {
int64_t leftForVer;

View File

@ -310,7 +310,7 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DROP, "stream-task-drop", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_RUN, "stream-task-run", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_DISPATCH, "stream-task-dispatch", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_UNUSED1, "stream-unused1", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_UPDATE_CHKPT, "stream-update-chkptinfo", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_RETRIEVE, "stream-retrieve", NULL, NULL) //1035 1036
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_CHECKPOINT_READY, "stream-checkpoint-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_TASK_REPORT_CHECKPOINT, "stream-report-checkpoint", NULL, NULL)
@ -321,6 +321,7 @@
TD_DEF_MSG_TYPE(TDMT_STREAM_MAX_MSG, "stream-max", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_CREATE, "stream-create", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_STREAM_DROP, "stream-drop", NULL, NULL)
TD_CLOSE_MSG_SEG(TDMT_END_STREAM_MSG)
TD_NEW_MSG_SEG(TDMT_MON_MSG) //5 << 8

View File

@ -38,6 +38,7 @@ int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode);
void tqSetRestoreVersionInfo(SStreamTask* pTask);

View File

@ -736,6 +736,9 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t streamTaskBuildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int32_t setCode);
int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId,
SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask);
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq);
// stream task state machine, and event handling
SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);

View File

@ -208,11 +208,11 @@ int32_t downloadRsync(const char* id, const char* path) {
int32_t code = execCommand(command);
if (code != 0) {
uError("[rsync] get failed code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
uError("[rsync] download checkpoint data failed, code:%d," ERRNO_ERR_FORMAT, code, ERRNO_ERR_DATA);
return -1;
}
uDebug("[rsync] down data:%s successful", id);
uDebug("[rsync] download checkpoint data:%s successfully", id);
return 0;
}

View File

@ -78,6 +78,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DROP, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_RUN, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_DISPATCH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_RETRIEVE_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;

View File

@ -967,6 +967,8 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT_RSP, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_GET_STREAM_PROGRESS, vmPutMsgToStreamQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_UPDATE_CHKPT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIRM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;

View File

@ -35,23 +35,6 @@ struct SSnode {
SMsgCb msgCb;
};
#if 0
typedef struct {
SHashObj* pHash; // taskId -> SStreamTask
} SStreamMeta;
SStreamMeta* sndMetaNew();
void sndMetaDelete(SStreamMeta* pMeta);
int32_t sndMetaDeployTask(SStreamMeta* pMeta, SStreamTask* pTask);
SStreamTask* sndMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t sndMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
int32_t sndDropTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
int32_t sndStopTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
int32_t sndResumeTaskOfStream(SStreamMeta* pMeta, int64_t streamId);
#endif
void initStreamStateAPI(SStorageAPI* pAPI);
#ifdef __cplusplus

View File

@ -168,6 +168,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return tqStreamTaskProcessTaskPauseReq(pSnode->pMeta, pMsg->pCont);
case TDMT_STREAM_TASK_RESUME:
return tqStreamTaskProcessTaskResumeReq(pSnode->pMeta, pMsg->info.conn.applyIndex, pMsg->pCont, false);
case TDMT_STREAM_TASK_UPDATE_CHKPT:
return tqStreamTaskProcessUpdateCheckpointReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
default:
ASSERT(0);
}

View File

@ -75,8 +75,10 @@ void initStateStoreAPI(SStateStore* pStore) {
pStore->streamStateCountGetKeyByRange = streamStateCountGetKeyByRange;
pStore->streamStateSessionAllocWinBuffByNextPosition = streamStateSessionAllocWinBuffByNextPosition;
pStore->streamStateCountWinAddIfNotExist = streamStateCountWinAddIfNotExist;
pStore->streamStateCountWinAdd = streamStateCountWinAdd;
//void initStreamStateAPI(SStorageAPI* pAPI) {
// initStateStoreAPI(&pAPI->stateStore);
// initFunctionStateStore(&pAPI->functionStore);
//}
pStore->updateInfoInit = updateInfoInit;
pStore->updateInfoFillBlockData = updateInfoFillBlockData;

View File

@ -295,6 +295,7 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg);
int32_t tqStreamProgressRetrieveReq(STQ* pTq, SRpcMsg* pMsg);
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen);
// sma
int32_t smaInit();

View File

@ -828,7 +828,7 @@ static void doStartFillhistoryStep2(SStreamTask* pTask, SStreamTask* pStreamTask
streamSetParamForStreamScannerStep2(pTask, pStep2Range, pWindow);
int64_t dstVer =pStep2Range->minVer;
int64_t dstVer = pStep2Range->minVer;
pTask->chkInfo.nextProcessVer = dstVer;
walReaderSetSkipToVersion(pTask->exec.pWalReader, dstVer);
@ -1009,6 +1009,20 @@ int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen);
}
int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t vgId = TD_VID(pTq->pVnode);
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64
", transId:%d s-task:0x%x ignore it",
vgId, pReq->checkpointId, pReq->transId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen);
}
int32_t tqProcessTaskPauseReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
return tqStreamTaskProcessTaskPauseReq(pTq->pStreamMeta, msg);
}

View File

@ -65,6 +65,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode)
.fillHistory = pTask->info.fillHistory,
.winRange = pTask->dataRange.window,
};
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
handle.vnode = pVnode;
handle.initTqReader = 1;
@ -577,22 +578,23 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
return code;
}
void tqStreamRmTaskBackend(SStreamMeta* pMeta, STaskId* id) {
static void tqStreamRemoveTaskBackend(SStreamMeta* pMeta, const STaskId* pId) {
char taskKey[128] = {0};
sprintf(taskKey, "0x%" PRIx64 "-0x%x", id->streamId, (int32_t)id->taskId);
sprintf(taskKey, "0x%" PRIx64 "-0x%x", pId->streamId, (int32_t)pId->taskId);
char* path = taosMemoryCalloc(1, strlen(pMeta->path) + 128);
sprintf(path, "%s%s%s", pMeta->path, TD_DIRSEP, taskKey);
taosRemoveDir(path);
tqInfo("vgId:%d drop stream task:0x%x file:%s", pMeta->vgId, (int32_t)pId->taskId, path);
taosMemoryFree(path);
// do nothing
}
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
int32_t vgId = pMeta->vgId;
STaskId hTaskId = {0};
int32_t vgId = pMeta->vgId;
STaskId hTaskId = {0};
tqDebug("vgId:%d receive msg to drop s-task:0x%x", vgId, pReq->taskId);
streamMetaWLock(pMeta);
@ -634,10 +636,32 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
streamMetaWUnLock(pMeta);
tqStreamRmTaskBackend(pMeta, &id);
tqStreamRemoveTaskBackend(pMeta, &id);
return 0;
}
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
int32_t vgId = pMeta->vgId;
tqDebug("vgId:%d receive msg to update-checkpoint-info for s-task:0x%x", vgId, pReq->taskId);
streamMetaWLock(pMeta);
STaskId id = {.streamId = pReq->streamId, .taskId = pReq->taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask != NULL && (*ppTask) != NULL) {
streamTaskUpdateTaskCheckpointInfo(*ppTask, pReq);
} else { // failed to get the task.
tqError("vgId:%d failed to locate the s-task:0x%x to update the checkpoint info, it may have been dropped already",
vgId, pReq->taskId);
}
streamMetaWUnLock(pMeta);
return TSDB_CODE_SUCCESS;
}
static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
int32_t vgId = pMeta->vgId;
int32_t code = 0;
@ -927,9 +951,10 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* msg, bool fromVnode) {
SVResumeStreamTaskReq* pReq = (SVResumeStreamTaskReq*)msg;
SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
SStreamMeta* pMeta = fromVnode ? ((STQ*)handle)->pStreamMeta : handle;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
int32_t code = tqProcessTaskResumeImpl(handle, pTask, sversion, pReq->igUntreated, fromVnode);
if (code != 0) {
return code;
}

View File

@ -625,6 +625,11 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
goto _err;
}
} break;
case TDMT_STREAM_TASK_UPDATE_CHKPT: {
if (tqProcessTaskUpdateCheckpointReq(pVnode->pTq, pMsg->pCont, pMsg->contLen) < 0) {
goto _err;
}
} break;
case TDMT_STREAM_TASK_PAUSE: {
if (pVnode->restored && vnodeIsLeader(pVnode) &&
tqProcessTaskPauseReq(pVnode->pTq, ver, pMsg->pCont, pMsg->contLen) < 0) {

View File

@ -141,7 +141,7 @@ SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId);
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkptId);
void taskDbDestroy(void* pBackend, bool flush);
void taskDbDestroy2(void* pBackend);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);

View File

@ -113,7 +113,7 @@ void destroyDispatchMsg(SStreamDispatchReq* pReq, int32_t numOfVgroups);
int32_t getNumOfDispatchBranch(SStreamTask* pTask);
void clearBufferedDispatchMsg(SStreamTask* pTask);
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock);
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg);
SStreamDataBlock* createStreamBlockFromResults(SStreamQueueItem* pItem, SStreamTask* pTask, int64_t resultSize,
SArray* pRes);
@ -122,7 +122,6 @@ void destroyStreamDataBlock(SStreamDataBlock* pBlock);
int32_t streamRetrieveReqToData(const SStreamRetrieveReq* pReq, SStreamDataBlock* pData);
int32_t streamBroadcastToUpTasks(SStreamTask* pTask, const SSDataBlock* pBlock);
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId);
int32_t streamSendCheckMsg(SStreamTask* pTask, const SStreamTaskCheckReq* pReq, int32_t nodeId, SEpSet* pEpSet);
int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t srcTaskId, int32_t index, int64_t checkpointId);

View File

@ -42,7 +42,7 @@ void destroyRocksdbCfInst(RocksdbCfInst* inst);
int32_t getCfIdx(const char* cfName);
STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath);
int32_t backendCopyFiles(char* src, char* dst);
static int32_t backendCopyFiles(const char* src, const char* dst);
void destroyCompactFilteFactory(void* arg);
void destroyCompactFilte(void* arg);
@ -234,12 +234,14 @@ int32_t rebuildDirFromCheckpoint(const char* path, int64_t chkpId, char** dst) {
tstrerror(TAOS_SYSTEM_ERROR(errno)), state);
taosMkDir(state);
}
taosMemoryFree(chkp);
}
*dst = state;
*dst = state;
return 0;
}
int32_t remoteChkp_readMetaData(char* path, SArray* list) {
char* metaPath = taosMemoryCalloc(1, strlen(path));
sprintf(metaPath, "%s%s%s", path, TD_DIRSEP, "META");
@ -323,7 +325,7 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
return complete == 1 ? 0 : -1;
}
int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) {
// impl later
int32_t code = 0;
if (taosIsDir(chkpPath)) {
@ -336,10 +338,14 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkpPath, int64_t chkpId, c
code = streamTaskDownloadCheckpointData(key, chkpPath);
if (code != 0) {
stError("failed to download checkpoint data:%s", key);
return code;
}
stDebug("download backup checkpoint data into:%s, checkpointId:%" PRId64 ", %s", chkpPath, checkpointId, key);
code = backendCopyFiles(chkpPath, defaultPath);
return code;
}
@ -378,13 +384,16 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char
return code;
}
int32_t rebuildFromRemoteChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t rebuildFromRemoteCheckpoint(char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_S3) {
return rebuildFromRemoteChkp_s3(key, chkpPath, chkpId, defaultPath);
return rebuildFromRemoteChkp_s3(key, chkpPath, checkpointId, defaultPath);
} else if (type == DATA_UPLOAD_RSYNC) {
return rebuildFromRemoteChkp_rsync(key, chkpPath, chkpId, defaultPath);
return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath);
} else {
stError("%s not remote backup checkpoint data for:%"PRId64, key, checkpointId);
}
return -1;
}
@ -403,7 +412,7 @@ int32_t copyFiles_hardlink(char* src, char* dst, int8_t type) {
return taosLinkFile(src, dst);
}
int32_t backendFileCopyFilesImpl(char* src, char* dst) {
int32_t backendFileCopyFilesImpl(const char* src, const char* dst) {
const char* current = "CURRENT";
size_t currLen = strlen(current);
@ -412,20 +421,26 @@ int32_t backendFileCopyFilesImpl(char* src, char* dst) {
int32_t dLen = strlen(dst);
char* srcName = taosMemoryCalloc(1, sLen + 64);
char* dstName = taosMemoryCalloc(1, dLen + 64);
// copy file to dst
// copy file to dst
TdDirPtr pDir = taosOpenDir(src);
if (pDir == NULL) {
taosMemoryFree(srcName);
taosMemoryFree(dstName);
code = TAOS_SYSTEM_ERROR(errno);
errno = 0;
return -1;
return code;
}
errno = 0;
TdDirEntryPtr de = NULL;
while ((de = taosReadDir(pDir)) != NULL) {
char* name = taosGetDirEntryName(de);
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) {
continue;
}
sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
@ -433,18 +448,21 @@ int32_t backendFileCopyFilesImpl(char* src, char* dst) {
if (strncmp(name, current, strlen(name) <= currLen ? strlen(name) : currLen) == 0) {
code = copyFiles_create(srcName, dstName, 0);
if (code != 0) {
stError("failed to copy file, detail: %s to %s reason: %s", srcName, dstName,
tstrerror(TAOS_SYSTEM_ERROR(code)));
code = TAOS_SYSTEM_ERROR(code);
stError("failed to copy file, detail: %s to %s reason:%s", srcName, dstName, tstrerror(code));
goto _ERROR;
}
} else {
code = copyFiles_hardlink(srcName, dstName, 0);
if (code != 0) {
stError("failed to hard line file, detail: %s to %s, reason: %s", srcName, dstName,
tstrerror(TAOS_SYSTEM_ERROR(code)));
code = TAOS_SYSTEM_ERROR(code);
stError("failed to hardlink file, detail:%s to %s, reason:%s", srcName, dstName, tstrerror(code));
goto _ERROR;
} else {
stDebug("succ hard link file:%s to %s", srcName, dstName);
}
}
memset(srcName, 0, sLen + 64);
memset(dstName, 0, dLen + 64);
}
@ -453,76 +471,46 @@ int32_t backendFileCopyFilesImpl(char* src, char* dst) {
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
errno = 0;
return 0;
return code;
_ERROR:
taosMemoryFreeClear(srcName);
taosMemoryFreeClear(dstName);
taosCloseDir(&pDir);
errno = 0;
return -1;
return code;
}
int32_t backendCopyFiles(char* src, char* dst) {
int32_t backendCopyFiles(const char* src, const char* dst) {
return backendFileCopyFilesImpl(src, dst);
// // opt later, just hard link
// int32_t sLen = strlen(src);
// int32_t dLen = strlen(dst);
// char* srcName = taosMemoryCalloc(1, sLen + 64);
// char* dstName = taosMemoryCalloc(1, dLen + 64);
// TdDirPtr pDir = taosOpenDir(src);
// if (pDir == NULL) {
// taosMemoryFree(srcName);
// taosMemoryFree(dstName);
// return -1;
// }
// TdDirEntryPtr de = NULL;
// while ((de = taosReadDir(pDir)) != NULL) {
// char* name = taosGetDirEntryName(de);
// if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue;
// sprintf(srcName, "%s%s%s", src, TD_DIRSEP, name);
// sprintf(dstName, "%s%s%s", dst, TD_DIRSEP, name);
// // if (!taosDirEntryIsDir(de)) {
// // // code = taosCopyFile(srcName, dstName);
// // if (code == -1) {
// // goto _err;
// // }
// // }
// return backendFileCopyFilesImpl(src, dst);
// memset(srcName, 0, sLen + 64);
// memset(dstName, 0, dLen + 64);
// }
// _err:
// taosMemoryFreeClear(srcName);
// taosMemoryFreeClear(dstName);
// taosCloseDir(&pDir);
// return code >= 0 ? 0 : -1;
// return 0;
}
int32_t rebuildFromLocalChkp(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpointPath, int64_t chkpId, const char* defaultPath) {
int32_t code = 0;
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
taosMkDir(defaultPath);
stInfo("succ to clear stream backend %s", defaultPath);
stInfo("clear task backend path:%s, done", defaultPath);
}
if (taosIsDir(chkpPath) && isValidCheckpoint(chkpPath)) {
code = backendCopyFiles(chkpPath, defaultPath);
if (code != 0) {
if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
code = backendCopyFiles(checkpointPath, defaultPath);
if (code != TSDB_CODE_SUCCESS) {
taosRemoveDir(defaultPath);
taosMkDir(defaultPath);
stError("failed to restart stream backend from %s, reason: %s, start to restart from empty path: %s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
code = 0;
stError("%s failed to restart stream backend from %s, reason: %s, start to restart from empty path: %s",
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
code = TSDB_CODE_SUCCESS;
} else {
stInfo("start to restart stream backend at checkpoint path: %s", chkpPath);
stInfo("%s start to restart stream backend at checkpoint path: %s", pTaskIdStr, checkpointPath);
}
} else {
code = TSDB_CODE_FAILED;
stError("%s not valid checkpoint path/data in:%s", pTaskIdStr, checkpointPath);
}
return code;
@ -533,7 +521,7 @@ int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char*
return code;
}
int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char** dbPrefixPath, char** dbPath) {
int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) {
// impl later
int32_t code = 0;
@ -551,29 +539,33 @@ int32_t rebuildDirFromChkp2(const char* path, char* key, int64_t chkpId, char**
taosMulMkDir(defaultPath);
}
char* chkpPath = taosMemoryCalloc(1, strlen(path) + 256);
if (chkpId != 0) {
sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkpId);
code = rebuildFromLocalChkp(key, chkpPath, chkpId, defaultPath);
stDebug("prepare local dir:%s, checkpointId:%d, key:%s succ", defaultPath, chkptId, key);
char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256);
if (chkptId != 0) {
sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId);
code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath);
if (code != 0) {
code = rebuildFromRemoteChkp(key, chkpPath, chkpId, defaultPath);
code = rebuildFromRemoteCheckpoint(key, chkptPath, chkptId, defaultPath);
}
if (code != 0) {
stInfo("failed to start stream backend at %s, reason: %s, restart from default defaultPath dir:%s", chkpPath,
tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath,
tstrerror(code), defaultPath);
code = taosMkDir(defaultPath);
}
} else {
sprintf(chkpPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
(int64_t)-1);
code = rebuildFromLocalChkp(key, chkpPath, -1, defaultPath);
code = rebuildFromLocalCheckpoint(key, chkptPath, -1, defaultPath);
if (code != 0) {
code = taosMkDir(defaultPath);
}
}
taosMemoryFree(chkpPath);
taosMemoryFree(chkptPath);
*dbPath = defaultPath;
*dbPrefixPath = prefixPath;
@ -1055,6 +1047,7 @@ _ERROR:
rocksdb_checkpoint_object_destroy(cp);
return code;
}
int32_t chkpPreFlushDb(rocksdb_t* db, rocksdb_column_family_handle_t** cf, int32_t nCf) {
if (nCf == 0) return 0;
int code = 0;
@ -1098,6 +1091,7 @@ int32_t chkpPreBuildDir(char* path, int64_t chkpId, char** chkpDir, char** chkpI
return 0;
}
int32_t taskDbBuildSnap(void* arg, SArray* pSnap) {
SStreamMeta* pMeta = arg;
@ -1997,11 +1991,11 @@ _EXIT:
return NULL;
}
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkpId) {
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkptId) {
char* statePath = NULL;
char* dbPath = NULL;
if (rebuildDirFromChkp2(path, key, chkpId, &statePath, &dbPath) != 0) {
if (rebuildDirFormCheckpoint(path, key, chkptId, &statePath, &dbPath) != 0) {
return NULL;
}

View File

@ -33,8 +33,11 @@ static int32_t deleteCheckpointFile(const char* id, const char* name);
static int32_t streamTaskBackupCheckpoint(const char* id, const char* path);
static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType);
static int32_t streamAlignCheckpoint(SStreamTask* pTask);
static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t streamAlignCheckpoint(SStreamTask* pTask) {
int32_t num = taosArrayGetSize(pTask->upstreamInfo.pList);
int64_t old = atomic_val_compare_exchange_32(&pTask->chkInfo.downstreamAlignNum, 0, num);
if (old == 0) {
@ -44,7 +47,7 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask) {
return atomic_sub_fetch_32(&pTask->chkInfo.downstreamAlignNum, 1);
}
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType) {
SStreamDataBlock* pChkpoint = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock));
if (pChkpoint == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
@ -94,7 +97,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
}
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
pBlock->srcTaskId = pTask->id.taskId;
pBlock->srcVgId = pTask->pMeta->vgId;
@ -110,7 +113,7 @@ static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStream
return code;
}
int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
int64_t checkpointId = pDataBlock->info.version;
int32_t transId = pDataBlock->info.window.skey;
@ -118,7 +121,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pTask->pMeta->vgId;
stDebug("s-task:%s vgId:%d start to handle the checkpoint block, checkpointId:%" PRId64 " ver:%" PRId64
stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
", transId:%d current checkpointingId:%" PRId64,
id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
@ -141,7 +144,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
continueDispatchCheckpointBlock(pBlock, pTask);
continueDispatchCheckpointTriggerBlock(pBlock, pTask);
} else { // only one task exists, no need to dispatch downstream info
atomic_add_fetch_32(&pTask->chkInfo.numOfNotReady, 1);
streamProcessCheckpointReadyMsg(pTask);
@ -182,7 +185,7 @@ int32_t streamProcessCheckpointBlock(SStreamTask* pTask, SStreamDataBlock* pBloc
// Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
// already. And then, dispatch check point msg to all downstream tasks
code = continueDispatchCheckpointBlock(pBlock, pTask);
code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
}
}
@ -227,75 +230,83 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
}
}
int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
SStreamMeta* pMeta = p->pMeta;
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpointInfoReq* pReq) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId;
const char* id = p->id.idStr;
int32_t code = 0;
SCheckpointInfo* pCKInfo = &p->chkInfo;
const char* id = pTask->id.idStr;
SCheckpointInfo* pInfo = &pTask->chkInfo;
// fill-history task, rsma task, and sink task will not generate the checkpoint
if ((p->info.fillHistory == 1) || (p->info.taskLevel > TASK_LEVEL__SINK)) {
return code;
}
taosThreadMutexLock(&pTask->lock);
taosThreadMutexLock(&p->lock);
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
SStreamTaskState* pStatus = streamTaskGetStatus(p);
ETaskStatus prevStatus = pStatus->state;
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64
" checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs);
if (pStatus->state == TASK_STATUS__CK) {
ASSERT(pCKInfo->checkpointId <= pCKInfo->checkpointingId && pCKInfo->checkpointingId == checkpointId &&
pCKInfo->checkpointVer <= pCKInfo->processedVer);
// in the
if (pStatus->state != TASK_STATUS__DROPPING) {
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer);
pCKInfo->checkpointId = pCKInfo->checkpointingId;
pCKInfo->checkpointVer = pCKInfo->processedVer;
pCKInfo->checkpointTime = pCKInfo->startTs;
pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs;
streamTaskClearCheckInfo(p, false);
taosThreadMutexUnlock(&p->lock);
streamTaskClearCheckInfo(pTask, false);
code = streamTaskHandleEvent(p->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
// todo handle error
if (pStatus->state == TASK_STATUS__CK) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else {
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus->name);
}
} else {
stDebug("s-task:%s vgId:%d status:%s not keep the checkpoint metaInfo, checkpoint:%" PRId64 " failed", id, vgId,
pStatus->name, pCKInfo->checkpointingId);
taosThreadMutexUnlock(&p->lock);
stDebug("s-task:0x%x vgId:%d status:%s not update checkpoint info, checkpointId:%" PRId64 "->%" PRId64 " failed",
pReq->taskId, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
}
if (pReq->dropRelHTask) {
stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
pReq->taskId, vgId, pReq->hTaskId);
CLEAR_RELATED_FILLHISTORY_TASK(pTask);
}
stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask)->name);
pTask->status.taskStatus = TASK_STATUS__READY;
code = streamMetaSaveTask(pMeta, pTask);
if (code != TSDB_CODE_SUCCESS) {
stDebug("s-task:%s vgId:%d handle event:checkpoint-done failed", id, vgId);
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
vgId, pReq->checkpointId, terrstr());
return code;
}
stDebug("vgId:%d s-task:%s level:%d open upstream inputQ, save status after checkpoint, checkpointId:%" PRId64
", Ver(saved):%" PRId64 " currentVer:%" PRId64 ", status: ready, prev:%s",
vgId, id, p->info.taskLevel, checkpointId, pCKInfo->checkpointVer, pCKInfo->nextProcessVer,
streamTaskGetStatusStr(prevStatus));
taosThreadMutexUnlock(&pTask->lock);
streamMetaWUnLock(pMeta);
// save the task if not sink task
if (p->info.taskLevel <= TASK_LEVEL__SINK) {
streamMetaWLock(pMeta);
// drop task should not in the meta-lock, and drop the related fill-history task now
if (pReq->dropRelHTask) {
streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
code = streamMetaSaveTask(pMeta, p);
if (code != TSDB_CODE_SUCCESS) {
streamMetaWUnLock(pMeta);
stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
vgId, checkpointId, terrstr());
return code;
}
code = streamMetaCommit(pMeta);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to commit stream meta after do checkpoint, checkpointId:%" PRId64 ", since %s",
id, vgId, checkpointId, terrstr());
}
streamMetaWUnLock(pMeta);
// commit the update
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks);
}
return code;
streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
return TSDB_CODE_SUCCESS;
}
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
@ -334,6 +345,7 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
}
}
}
taosCloseFile(&pFile);
taosRemoveFile(file);
taosMemoryFree(file);
@ -448,7 +460,9 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
// clear the checkpoint info, and commit the newest checkpoint info if all works are done successfully
if (code == TSDB_CODE_SUCCESS) {
code = streamSaveTaskCheckpointInfo(pTask, ckId);
STaskId* pHTaskId = &pTask->hTaskInfo.id;
code = streamBuildAndSendCheckpointUpdateMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id, pHTaskId, &pTask->chkInfo,
dropRelHTask);
if (code == TSDB_CODE_SUCCESS) {
code = streamTaskRemoteBackupCheckpoint(pTask, ckId, (char*)id);
if (code != TSDB_CODE_SUCCESS) {
@ -459,22 +473,6 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
}
}
if ((code == TSDB_CODE_SUCCESS) && dropRelHTask) {
// transferred from the halt status, it is done the fill-history procedure and finish with the checkpoint
// free it and remove fill-history task from disk meta-store
taosThreadMutexLock(&pTask->lock);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId);
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &hTaskId, 1);
} else {
stWarn("s-task:%s related fill-history task:0x%x is erased", id, (int32_t)pTask->hTaskInfo.id.taskId);
}
taosThreadMutexUnlock(&pTask->lock);
}
// clear the checkpoint info if failed
if (code != TSDB_CODE_SUCCESS) {
taosThreadMutexLock(&pTask->lock);
@ -488,7 +486,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
double el = (taosGetTimestampMs() - startTs) / 1000.0;
stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2f Sec, %s ", id,
pTask->pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
(code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
return code;
@ -586,7 +584,7 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch
int32_t streamTaskDownloadCheckpointData(char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("streamTaskDownloadCheckpointData parameters invalid");
stError("down checkpoint data parameters invalid");
return -1;
}

View File

@ -341,7 +341,7 @@ static int32_t sendDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pDispatch
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id,
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
code = doSendDispatchMsg(pTask, pDispatchMsg, vgId, pEpSet);
@ -432,7 +432,7 @@ static void doRetryDispatchData(void* param, void* tmrId) {
SEpSet* pEpSet = &pTask->outputInfo.fixedDispatcher.epSet;
int32_t downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), id:%d", id,
stDebug("s-task:%s (child taskId:%d) fix-dispatch %d block(s) to s-task:0x%x (vgId:%d), msgId:%d", id,
pTask->info.selfChildId, 1, downstreamTaskId, vgId, msgId);
code = doSendDispatchMsg(pTask, pReq, vgId, pEpSet);

View File

@ -589,7 +589,7 @@ static int32_t doStreamExecTask(SStreamTask* pTask) {
// dispatch checkpoint msg to all downstream tasks
int32_t type = pInput->type;
if (type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
streamProcessCheckpointBlock(pTask, (SStreamDataBlock*)pInput);
streamProcessCheckpointTriggerBlock(pTask, (SStreamDataBlock*)pInput);
continue;
}

View File

@ -556,19 +556,22 @@ int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask) {
if (pTask->ver < SSTREAM_TASK_SUBTABLE_CHANGED_VER){
pTask->ver = SSTREAM_TASK_VER;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, len);
tEncodeStreamTask(&encoder, pTask);
tEncoderClear(&encoder);
int64_t id[2] = {pTask->id.streamId, pTask->id.taskId};
if (tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn) < 0) {
code = tdbTbUpsert(pMeta->pTaskDb, id, STREAM_TASK_KEY_LEN, buf, len, pMeta->txn);
if (code < 0) {
stError("s-task:%s save to disk failed, code:%s", pTask->id.idStr, tstrerror(terrno));
return -1;
} else {
stDebug("s-task:%s vgId:%d stream task write to meta file", pTask->id.idStr, pTask->pMeta->vgId);
}
taosMemoryFree(buf);
return 0;
return code;
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, STaskId* pTaskId) {

View File

@ -686,16 +686,47 @@ int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskI
pReq->head.vgId = vgId;
pReq->taskId = pTaskId->taskId;
pReq->streamId = pTaskId->streamId;
pReq->resetRelHalt = resetRelHalt;
pReq->resetRelHalt = resetRelHalt; // todo: remove this attribute
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
return code;
} else {
stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
}
stDebug("vgId:%d build and send drop task:0x%x msg", vgId, pTaskId->taskId);
return code;
}
int32_t streamBuildAndSendCheckpointUpdateMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId, STaskId* pHTaskId,
SCheckpointInfo* pCheckpointInfo, int8_t dropRelHTask) {
SVUpdateCheckpointInfoReq* pReq = rpcMallocCont(sizeof(SVUpdateCheckpointInfoReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = vgId;
pReq->taskId = pTaskId->taskId;
pReq->streamId = pTaskId->streamId;
pReq->dropRelHTask = dropRelHTask;
pReq->hStreamId = pHTaskId->streamId;
pReq->hTaskId = pHTaskId->taskId;
pReq->transId = pCheckpointInfo->transId;
pReq->checkpointId = pCheckpointInfo->checkpointingId;
pReq->checkpointVer = pCheckpointInfo->processedVer;
pReq->checkpointTs = pCheckpointInfo->startTs;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_UPDATE_CHKPT, .pCont = pReq, .contLen = sizeof(SVUpdateCheckpointInfoReq)};
int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) {
stError("vgId:%d task:0x%x failed to send update checkpoint info msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
} else {
stDebug("vgId:%d task:0x%x build and send update checkpoint info msg msg", vgId, pTaskId->taskId);
}
return code;
}