fix(stream): async load and start stream tasks.
This commit is contained in:
parent
d214dd5cdd
commit
3dfffe9113
|
@ -58,6 +58,10 @@ extern "C" {
|
|||
#define STREAM_EXEC_T_STOP_ALL_TASKS (-5)
|
||||
#define STREAM_EXEC_T_RESUME_TASK (-6)
|
||||
#define STREAM_EXEC_T_ADD_FAILED_TASK (-7)
|
||||
// the load and start stream task should be executed after snode has started successfully, since the load of stream
|
||||
// tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources.
|
||||
#define STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS (-8)
|
||||
#define STREAM_EXEC_T_LOAD_ALL_TASKS (-9)
|
||||
|
||||
typedef struct SStreamTask SStreamTask;
|
||||
typedef struct SStreamQueue SStreamQueue;
|
||||
|
|
|
@ -130,6 +130,7 @@ void startRsync() {
|
|||
uError("[rsync] build checkpoint backup dir failed, dir:%s,"ERRNO_ERR_FORMAT, tsCheckpointBackupDir, ERRNO_ERR_DATA);
|
||||
return;
|
||||
}
|
||||
|
||||
removeEmptyDir();
|
||||
|
||||
char confDir[PATH_MAX] = {0};
|
||||
|
@ -186,18 +187,20 @@ int32_t uploadRsync(const char* id, const char* path) {
|
|||
if (code != 0) {
|
||||
uError("[rsync] s-task:%s upload checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
|
||||
tsSnodeAddress, code, ERRNO_ERR_DATA);
|
||||
return -1;
|
||||
}
|
||||
|
||||
} else {
|
||||
int64_t el = (taosGetTimestampMs() - st);
|
||||
uDebug("[rsync] s-task:%s upload checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
|
||||
tsSnodeAddress, el);
|
||||
return 0;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t downloadRsync(const char* id, const char* path) {
|
||||
int64_t st = taosGetTimestampMs();
|
||||
uDebug("[rsync] %s start to sync data from remote to local:%s", id, path);
|
||||
int32_t MAX_RETRY = 60;
|
||||
int32_t times = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
#ifdef WINDOWS
|
||||
char pathTransform[PATH_MAX] = {0};
|
||||
|
@ -205,7 +208,7 @@ int32_t downloadRsync(const char* id, const char* path) {
|
|||
#endif
|
||||
|
||||
char command[PATH_MAX] = {0};
|
||||
snprintf(command, PATH_MAX, "rsync -av --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
|
||||
snprintf(command, PATH_MAX, "rsync -av --debug=all --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/ %s",
|
||||
tsSnodeAddress, id,
|
||||
#ifdef WINDOWS
|
||||
pathTransform
|
||||
|
@ -214,13 +217,20 @@ int32_t downloadRsync(const char* id, const char* path) {
|
|||
#endif
|
||||
);
|
||||
|
||||
int32_t code = execCommand(command);
|
||||
uDebug("[rsync] %s start to sync data from remote to local:%s, %s", id, path, command);
|
||||
|
||||
int32_t el = taosGetTimestampMs() - st;
|
||||
if (code != 0) {
|
||||
uError("[rsync] %s download checkpoint data:%s failed, code:%d," ERRNO_ERR_FORMAT, id, path, code, ERRNO_ERR_DATA);
|
||||
while(times++ < MAX_RETRY) {
|
||||
|
||||
code = execCommand(command);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, code:%d," ERRNO_ERR_FORMAT, id, path, code,
|
||||
ERRNO_ERR_DATA);
|
||||
taosSsleep(1);
|
||||
} else {
|
||||
int32_t el = taosGetTimestampMs() - st;
|
||||
uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -86,6 +86,9 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
stopRsync();
|
||||
startRsync();
|
||||
|
||||
pSnode->msgCb = pOption->msgCb;
|
||||
pSnode->pMeta = streamMetaOpen(path, pSnode, (FTaskExpand *)sndExpandTask, SNODE_HANDLE, taosGetTimestampMs(), tqStartTaskCompleteCallback);
|
||||
if (pSnode->pMeta == NULL) {
|
||||
|
@ -93,11 +96,6 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
|
|||
goto FAIL;
|
||||
}
|
||||
|
||||
streamMetaLoadAllTasks(pSnode->pMeta);
|
||||
|
||||
stopRsync();
|
||||
startRsync();
|
||||
|
||||
return pSnode;
|
||||
|
||||
FAIL:
|
||||
|
@ -106,8 +104,7 @@ FAIL:
|
|||
}
|
||||
|
||||
int32_t sndInit(SSnode *pSnode) {
|
||||
streamMetaResetTaskStatus(pSnode->pMeta);
|
||||
streamMetaStartAllTasks(pSnode->pMeta);
|
||||
streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -92,8 +92,6 @@ int32_t tqInitialize(STQ* pTq) {
|
|||
return -1;
|
||||
}
|
||||
|
||||
streamMetaLoadAllTasks(pTq->pStreamMeta);
|
||||
|
||||
if (tqMetaTransform(pTq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
@ -800,6 +798,11 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
|
|||
}
|
||||
|
||||
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
|
||||
if (!pTq->pVnode->restored) {
|
||||
tqDebug("vgId:%d not restored, ignore the stream task deploy msg", TD_VID(pTq->pVnode));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen,
|
||||
vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
|
||||
}
|
||||
|
|
|
@ -178,6 +178,7 @@ int32_t tqMetaRestoreCheckInfo(STQ* pTq) {
|
|||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
END:
|
||||
tdbFree(pKey);
|
||||
tdbFree(pVal);
|
||||
|
@ -514,35 +515,6 @@ int32_t tqMetaTransform(STQ* pTq) {
|
|||
return code;
|
||||
}
|
||||
|
||||
//int32_t tqMetaRestoreHandle(STQ* pTq) {
|
||||
// int code = 0;
|
||||
// TBC* pCur = NULL;
|
||||
// if (tdbTbcOpen(pTq->pExecStore, &pCur, NULL) < 0) {
|
||||
// return -1;
|
||||
// }
|
||||
//
|
||||
// void* pKey = NULL;
|
||||
// int kLen = 0;
|
||||
// void* pVal = NULL;
|
||||
// int vLen = 0;
|
||||
//
|
||||
// tdbTbcMoveToFirst(pCur);
|
||||
//
|
||||
// while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) {
|
||||
// STqHandle handle = {0};
|
||||
// code = restoreHandle(pTq, pVal, vLen, &handle);
|
||||
// if (code < 0) {
|
||||
// tqDestroyTqHandle(&handle);
|
||||
// break;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
// tdbFree(pKey);
|
||||
// tdbFree(pVal);
|
||||
// tdbTbcClose(pCur);
|
||||
// return code;
|
||||
//}
|
||||
|
||||
int32_t tqMetaGetHandle(STQ* pTq, const char* key) {
|
||||
void* pVal = NULL;
|
||||
int vLen = 0;
|
||||
|
|
|
@ -158,10 +158,9 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
|||
STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
|
||||
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||
if (ppTask == NULL || *ppTask == NULL) {
|
||||
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped", vgId, req.taskId);
|
||||
tqError("vgId:%d failed to acquire task:0x%x when handling update task epset, it may have been dropped", vgId, req.taskId);
|
||||
rsp.code = TSDB_CODE_SUCCESS;
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
||||
taosArrayDestroy(req.pNodeList);
|
||||
return rsp.code;
|
||||
}
|
||||
|
@ -739,6 +738,13 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
|
|||
} else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) {
|
||||
int32_t code = streamMetaAddFailedTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS) {
|
||||
streamMetaLoadAllTasks(pMeta);
|
||||
int32_t code = streamMetaStartAllTasks(pMeta);
|
||||
return code;
|
||||
} else if (type == STREAM_EXEC_T_LOAD_ALL_TASKS) {
|
||||
streamMetaLoadAllTasks(pMeta);
|
||||
return 0;
|
||||
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
|
||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
|
||||
|
||||
|
|
|
@ -576,24 +576,22 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
|||
if (tsDisableStream) {
|
||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
|
||||
} else {
|
||||
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
|
||||
int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta);
|
||||
if (numOfTasks > 0) {
|
||||
vInfo("vgId:%d sync restore finished, start to load and launch stream task(s)", pVnode->config.vgId);
|
||||
if (pMeta->startInfo.startAllTasks == 1) {
|
||||
pMeta->startInfo.restartCount += 1;
|
||||
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
|
||||
pMeta->startInfo.restartCount);
|
||||
} else {
|
||||
pMeta->startInfo.startAllTasks = 1;
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
|
||||
|
||||
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS);
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||
vInfo("vgId:%d, sync restore finished, load stream tasks, not start tasks since not leader", vgId);
|
||||
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_ALL_TASKS);
|
||||
}
|
||||
|
||||
streamMetaWUnLock(pMeta);
|
||||
|
|
|
@ -490,7 +490,7 @@ static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpoi
|
|||
if (taosIsDir(defaultPath)) {
|
||||
taosRemoveDir(defaultPath);
|
||||
taosMkDir(defaultPath);
|
||||
stInfo("clear task backend dir:%s, done", defaultPath);
|
||||
stInfo("clear local backend dir:%s, done", defaultPath);
|
||||
}
|
||||
|
||||
if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
|
||||
|
|
|
@ -30,7 +30,7 @@ typedef struct {
|
|||
|
||||
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
|
||||
static int32_t deleteCheckpointFile(const char* id, const char* name);
|
||||
static int32_t streamTaskBackupCheckpoint(const char* id, const char* path);
|
||||
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path);
|
||||
static int32_t deleteCheckpoint(const char* id);
|
||||
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
|
||||
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
|
||||
|
@ -321,7 +321,7 @@ static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* l
|
|||
|
||||
int32_t code = downloadCheckpointDataByName(id, "META", file);
|
||||
if (code != 0) {
|
||||
stDebug("chkp failed to download meta file:%s", file);
|
||||
stDebug("%s chkp failed to download meta file:%s", id, file);
|
||||
taosMemoryFree(file);
|
||||
return code;
|
||||
}
|
||||
|
@ -379,7 +379,7 @@ int32_t uploadCheckpointData(void* param) {
|
|||
}
|
||||
|
||||
if (code == TSDB_CODE_SUCCESS) {
|
||||
code = streamTaskBackupCheckpoint(pParam->taskId, path);
|
||||
code = streamTaskUploadCheckpoint(pParam->taskId, path);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
stError("s-task:%s failed to upload checkpoint data:%s, checkpointId:%" PRId64, taskStr, path, pParam->chkpId);
|
||||
} else {
|
||||
|
@ -562,9 +562,9 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
|
|||
}
|
||||
}
|
||||
|
||||
int32_t streamTaskBackupCheckpoint(const char* id, const char* path) {
|
||||
int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
|
||||
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
|
||||
stError("streamTaskBackupCheckpoint parameters invalid");
|
||||
stError("invalid parameters in upload checkpoint, %s", id);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -580,7 +580,7 @@ int32_t streamTaskBackupCheckpoint(const char* id, const char* path) {
|
|||
// fileName: CURRENT
|
||||
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
|
||||
if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
|
||||
stError("uploadCheckpointByName parameters invalid");
|
||||
stError("down load checkpoint data parameters invalid");
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
|
@ -52,7 +52,11 @@ int32_t streamTaskSchedTask(SMsgCb* pMsgCb, int32_t vgId, int64_t streamId, int3
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (streamId != 0) {
|
||||
stDebug("vgId:%d create msg to start stream task:0x%x, exec type:%d", vgId, taskId, execType);
|
||||
} else {
|
||||
stDebug("vgId:%d create msg to exec, type:%d", vgId, execType);
|
||||
}
|
||||
|
||||
pRunReq->head.vgId = vgId;
|
||||
pRunReq->streamId = streamId;
|
||||
|
|
Loading…
Reference in New Issue