fix(stream): expand stream tasks are divided into two phase, the first is to build stream task and then expand stream task before start stream tasks.

This commit is contained in:
Haojun Liao 2024-05-14 14:34:31 +08:00
parent fb248b2682
commit 6898eba6ed
17 changed files with 122 additions and 115 deletions

View File

@ -40,7 +40,7 @@ int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);
int32_t tqStreamTaskProcessUpdateCheckpointReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode);
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta);
void tqSetRestoreVersionInfo(SStreamTask* pTask);
#endif // TDENGINE_TQ_COMMON_H

View File

@ -410,7 +410,7 @@ typedef struct SStateStore {
void (*streamFileStateClear)(struct SStreamFileState* pFileState);
bool (*needClearDiskBuff)(struct SStreamFileState* pFileState);
SStreamState* (*streamStateOpen)(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
SStreamState* (*streamStateOpen)(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void (*streamStateClose)(SStreamState* pState, bool remove);
int32_t (*streamStateBegin)(SStreamState* pState);
int32_t (*streamStateCommit)(SStreamState* pState);

View File

@ -150,6 +150,7 @@ typedef struct SBackendCfWrapper {
int64_t backendId;
char idstr[64];
} SBackendCfWrapper;
typedef struct STdbState {
SBackendCfWrapper *pBackendCfWrapper;
int64_t backendCfWrapperId;

View File

@ -29,7 +29,7 @@ extern "C" {
#include "storageapi.h"
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages);
void streamStateClose(SStreamState* pState, bool remove);
int32_t streamStateBegin(SStreamState* pState);
int32_t streamStateCommit(SStreamState* pState);

View File

@ -61,7 +61,6 @@ extern "C" {
// the load and start stream task should be executed after snode has started successfully, since the load of stream
// tasks may incur the download of checkpoint data from remote, which may consume significant network and CPU resources.
#define STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS (-8)
#define STREAM_EXEC_T_LOAD_ALL_TASKS (-9)
typedef struct SStreamTask SStreamTask;
typedef struct SStreamQueue SStreamQueue;
@ -156,8 +155,6 @@ typedef enum EStreamTaskEvent {
TASK_EVENT_DROPPING = 0xA,
} EStreamTaskEvent;
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
typedef void FTbSink(SStreamTask* pTask, void* vnode, void* data);
typedef void FSmaSink(void* vnode, int64_t smaId, const SArray* data);
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask, int64_t ver);
@ -491,7 +488,6 @@ typedef struct SStreamMeta {
int32_t vgId;
int64_t stage;
int32_t role;
bool taskLoadFlag;
bool closeFlag;
bool sendMsgBeforeClosing; // send hb to mnode before close all tasks when switch to follower.
STaskStartInfo startInfo;
@ -522,6 +518,9 @@ typedef struct STaskUpdateEntry {
int32_t transId;
} STaskUpdateEntry;
typedef int32_t (*__state_trans_user_fn)(SStreamTask*, void* param);
typedef int32_t (*__stream_task_expand_fn)(struct SStreamTask* pTask);
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset, bool fillHistory, int64_t triggerParam,
SArray* pTaskList, bool hasFillhistory, int8_t subtableWithoutMd5);
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask);
@ -675,7 +674,7 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
void streamTaskCloseUpstreamInput(SStreamTask* pTask, int32_t taskId);
void streamTaskOpenAllUpstreamInput(SStreamTask* pTask);
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key);
bool streamTaskIsSinkTask(const SStreamTask* pTask);
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
@ -723,9 +722,9 @@ void streamMetaResetStartInfo(STaskStartInfo* pMeta);
SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta);
void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader);
void streamMetaLoadAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn fn);
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn fn);
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
// timer

View File

@ -217,7 +217,7 @@ int32_t downloadRsync(const char* id, const char* path) {
#endif
);
uDebug("[rsync] %s start to sync data from remote to local:%s, %s", id, path, command);
uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command);
while(times++ < MAX_RETRY) {

View File

@ -45,15 +45,10 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t nextProcessVer
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pTask->pBackend = NULL;
streamTaskOpenAllUpstreamInput(pTask);
code = tqExpandStreamTask(pTask, pSnode->pMeta, NULL);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
streamTaskResetUpstreamStageInfo(pTask);
streamSetupScheduleTrigger(pTask);
@ -96,6 +91,7 @@ SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) {
goto FAIL;
}
streamMetaLoadAllTasks(pSnode->pMeta);
return pSnode;
FAIL:
@ -104,7 +100,7 @@ FAIL:
}
int32_t sndInit(SSnode *pSnode) {
streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS);
streamTaskSchedTask(&pSnode->msgCb, pSnode->pMeta->vgId, 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
return 0;
}

View File

@ -92,6 +92,8 @@ int32_t tqInitialize(STQ* pTq) {
return -1;
}
streamMetaLoadAllTasks(pTq->pStreamMeta);
if (tqMetaTransform(pTq) < 0) {
return -1;
}
@ -715,17 +717,18 @@ static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); }
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
int32_t vgId = TD_VID(pTq->pVnode);
tqDebug("s-task:0x%x start to expand task", pTask->id.taskId);
tqDebug("s-task:0x%x start to build task", pTask->id.taskId);
int32_t code = streamTaskInit(pTask, pTq->pStreamMeta, &pTq->pVnode->msgCb, nextProcessVer);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
code = tqExpandStreamTask(pTask, pTq->pStreamMeta, pTq->pVnode);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
pTask->pBackend = NULL;
// code = tqExpandStreamTask(pTask, pTq->pStreamMeta);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
// sink
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
@ -768,7 +771,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
const char* pNext = streamTaskGetStatusStr(pTask->status.taskStatus);
if (pTask->info.fillHistory) {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
tqInfo("vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, cur-status:%s, next-status:%s fill-history:%d, related stream task:0x%x "
"trigger:%" PRId64 " ms, inputVer:%" PRId64,
@ -777,7 +780,7 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) {
(int32_t)pTask->streamTaskId.taskId, pTask->info.triggerParam, nextProcessVer);
} else {
tqInfo(
"vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
"vgId:%d build stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, cur-status:%s next-status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64
" ms, inputVer:%" PRId64,
@ -798,10 +801,10 @@ int32_t tqProcessTaskCheckRsp(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) {
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d not restored, ignore the stream task deploy msg", TD_VID(pTq->pVnode));
return TSDB_CODE_SUCCESS;
}
// if (!pTq->pVnode->restored) {
// tqDebug("vgId:%d not restored, ignore the stream task deploy msg", TD_VID(pTq->pVnode));
// return TSDB_CODE_SUCCESS;
// }
return tqStreamTaskProcessDeployReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, sversion, msg, msgLen,
vnodeIsRoleLeader(pTq->pVnode), pTq->pVnode->restored);
@ -1016,12 +1019,12 @@ int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t vgId = TD_VID(pTq->pVnode);
SVUpdateCheckpointInfoReq* pReq = (SVUpdateCheckpointInfoReq*)msg;
if (!pTq->pVnode->restored) {
tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64
", transId:%d s-task:0x%x ignore it",
vgId, pReq->checkpointId, pReq->transId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
// if (!pTq->pVnode->restored) {
// tqDebug("vgId:%d update-checkpoint-info msg received during restoring, checkpointId:%" PRId64
// ", transId:%d s-task:0x%x ignore it",
// vgId, pReq->checkpointId, pReq->transId, pReq->taskId);
// return TSDB_CODE_SUCCESS;
// }
return tqStreamTaskProcessUpdateCheckpointReq(pTq->pStreamMeta, msg, msgLen);
}

View File

@ -1089,7 +1089,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
if ((pTask->info.taskLevel == TASK_LEVEL__SOURCE) && (pTask->exec.pExecutor != NULL)) {
int32_t code = qUpdateTableListForStreamScanner(pTask->exec.pExecutor, tbUidList, isAdd);
if (code != 0) {
tqError("vgId:%d, s-task:%s update qualified table error for stream task", vgId, pTask->id.idStr);

View File

@ -39,9 +39,13 @@ static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) {
pTask->id.streamId = pId->streamId;
}
int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode) {
int32_t vgId = pMeta->vgId;
STaskId taskId = {0};
int32_t tqExpandStreamTask(SStreamTask* pTask) {
SStreamMeta* pMeta = pTask->pMeta;
int32_t vgId = pMeta->vgId;
STaskId taskId = {0};
int64_t st = taosGetTimestampMs();
tqDebug("s-task:%s vgId:%d start to expand stream task", pTask->id.idStr, vgId);
if (pTask->info.fillHistory) {
taskId = replaceStreamTaskId(pTask);
@ -67,7 +71,7 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode)
};
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
handle.vnode = pVnode;
handle.vnode = ((STQ*)pMeta->ahandle)->pVnode;
handle.initTqReader = 1;
} else if (pTask->info.taskLevel == TASK_LEVEL__AGG) {
handle.numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList);
@ -84,6 +88,9 @@ int32_t tqExpandStreamTask(SStreamTask* pTask, SStreamMeta* pMeta, void* pVnode)
qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId);
}
double el = (taosGetTimestampMs() - st) / 1000.0;
tqDebug("s-task:%s vgId:%d expand stream task completed, elapsed time:%.2fsec", pTask->id.idStr, vgId, el);
return TSDB_CODE_SUCCESS;
}
@ -706,7 +713,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta);
streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
} else {
streamMetaResetStartInfo(&pMeta->startInfo);
streamMetaWUnLock(pMeta);
@ -724,10 +731,10 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t vgId = pMeta->vgId;
if (type == STREAM_EXEC_T_START_ONE_TASK) {
streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId);
streamMetaStartOneTask(pMeta, pReq->streamId, pReq->taskId, tqExpandStreamTask);
return 0;
} else if (type == STREAM_EXEC_T_START_ALL_TASKS) {
streamMetaStartAllTasks(pMeta);
streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
return 0;
} else if (type == STREAM_EXEC_T_RESTART_ALL_TASKS) {
restartStreamTasks(pMeta, isLeader);
@ -740,11 +747,8 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
return code;
} else if (type == STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS) {
streamMetaLoadAllTasks(pMeta);
int32_t code = streamMetaStartAllTasks(pMeta);
int32_t code = streamMetaStartAllTasks(pMeta, tqExpandStreamTask);
return code;
} else if (type == STREAM_EXEC_T_LOAD_ALL_TASKS) {
streamMetaLoadAllTasks(pMeta);
return 0;
} else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);

View File

@ -576,37 +576,22 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
if (tsDisableStream) {
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else {
vInfo("vgId:%d sync restore finished, start to load and launch stream task(s)", vgId);
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", vgId);
if (pMeta->startInfo.startAllTasks == 1) {
pMeta->startInfo.restartCount += 1;
vDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
pMeta->startInfo.restartCount);
} else {
pMeta->startInfo.startAllTasks = 1;
bool loadTaskInfo = pMeta->taskLoadFlag;
pMeta->taskLoadFlag = true;
streamMetaWUnLock(pMeta);
if (loadTaskInfo) {
tqInfo("vgId:%d stream task already loaded, start them", vgId);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
} else {
tqInfo("vgId:%d start load and launch stream task(s)", vgId);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_AND_START_ALL_TASKS);
}
tqInfo("vgId:%d stream task already loaded, start them", vgId);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_START_ALL_TASKS);
return;
}
}
} else {
if (!pMeta->taskLoadFlag) {
pMeta->taskLoadFlag = true;
vInfo("vgId:%d, sync restore finished, load stream tasks, not start tasks since not leader", vgId);
streamTaskSchedTask(&pVnode->msgCb, TD_VID(pVnode), 0, 0, STREAM_EXEC_T_LOAD_ALL_TASKS);
} else {
vInfo("vgId:%d, sync restore finished, not load stream tasks since already loaded for follower");
}
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
}
streamMetaWUnLock(pMeta);

View File

@ -141,7 +141,7 @@ SListNode* streamBackendAddCompare(void* backend, void* arg);
void streamBackendDelCompare(void* backend, void* arg);
int32_t streamStateCvtDataFormat(char* path, char* key, void* cfInst);
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkptId);
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId);
void taskDbDestroy(void* pBackend, bool flush);
void taskDbDestroy2(void* pBackend);
int32_t taskDbDoCheckpoint(void* arg, int64_t chkpId);

View File

@ -164,7 +164,7 @@ typedef enum ECHECKPOINT_BACKUP_TYPE {
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(char* id, char* path);
int32_t streamTaskDownloadCheckpointData(const char* id, char* path);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanhistoryTaskReady(SStreamTask* pTask);

View File

@ -40,7 +40,7 @@ int32_t streamStateOpenBackendCf(void* backend, char* name, char** cfs, int32_t
void destroyRocksdbCfInst(RocksdbCfInst* inst);
int32_t getCfIdx(const char* cfName);
STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath);
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath);
static int32_t backendCopyFiles(const char* src, const char* dst);
@ -325,7 +325,7 @@ int32_t remoteChkp_validAndCvtMeta(char* path, SArray* list, int64_t chkpId) {
return complete == 1 ? 0 : -1;
}
int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) {
int32_t rebuildFromRemoteChkp_rsync(const char* key, char* chkptPath, int64_t checkpointId, char* defaultPath) {
int32_t code = 0;
if (taosIsDir(chkptPath)) {
taosRemoveDir(chkptPath);
@ -335,7 +335,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkptPath, int64_t checkpoi
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
taosMulMkDir(defaultPath);
stDebug("clear local backend dir:%s succ", defaultPath);
stDebug("clear local default dir before download checkpoint data:%s succ", defaultPath);
}
code = streamTaskDownloadCheckpointData(key, chkptPath);
@ -348,7 +348,7 @@ int32_t rebuildFromRemoteChkp_rsync(char* key, char* chkptPath, int64_t checkpoi
return backendCopyFiles(chkptPath, defaultPath);
}
int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath);
if (code != 0) {
return code;
@ -383,14 +383,14 @@ int32_t rebuildFromRemoteChkp_s3(char* key, char* chkpPath, int64_t chkpId, char
return code;
}
int32_t rebuildFromRemoteCheckpoint(char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) {
int32_t rebuildFromRemoteCheckpoint(const char* key, char* chkpPath, int64_t checkpointId, char* defaultPath) {
ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
if (type == DATA_UPLOAD_S3) {
return rebuildFromRemoteChkp_s3(key, chkpPath, checkpointId, defaultPath);
} else if (type == DATA_UPLOAD_RSYNC) {
return rebuildFromRemoteChkp_rsync(key, chkpPath, checkpointId, defaultPath);
} else {
stError("%s not remote backup checkpoint data for:%"PRId64, key, checkpointId);
stError("%s not remote backup checkpoint data for:%" PRId64" restore ", key, checkpointId);
}
return -1;
@ -484,24 +484,26 @@ int32_t backendCopyFiles(const char* src, const char* dst) {
return backendFileCopyFilesImpl(src, dst);
}
static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpointPath, int64_t chkptId, const char* defaultPath) {
static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* checkpointPath, int64_t checkpointId,
const char* defaultPath) {
int32_t code = 0;
if (taosIsDir(defaultPath)) {
taosRemoveDir(defaultPath);
taosMkDir(defaultPath);
stInfo("clear local backend dir:%s, done", defaultPath);
stInfo("%s clear local backend dir:%s, succ", pTaskIdStr, defaultPath);
}
if (taosIsDir(checkpointPath) && isValidCheckpoint(checkpointPath)) {
code = backendCopyFiles(checkpointPath, defaultPath);
stDebug("%s local checkpoint data existed, checkpointId:%d copy to backend dir", pTaskIdStr, checkpointId);
code = backendCopyFiles(checkpointPath, defaultPath);
if (code != TSDB_CODE_SUCCESS) {
taosRemoveDir(defaultPath);
taosMkDir(defaultPath);
stError("%s failed to restart stream backend from %s, reason: %s, start to restart from empty path: %s",
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)), defaultPath);
stError("%s failed to start stream backend from local %s, reason:%s, try download checkpoint from remote",
pTaskIdStr, checkpointPath, tstrerror(TAOS_SYSTEM_ERROR(errno)));
code = TSDB_CODE_SUCCESS;
} else {
stInfo("%s copy checkpoint data from:%s to:%s succ, try to start stream backend", pTaskIdStr, checkpointPath,
@ -509,18 +511,13 @@ static int32_t rebuildFromLocalCheckpoint(char* pTaskIdStr, const char* checkpoi
}
} else {
code = TSDB_CODE_FAILED;
stError("%s no valid checkpoint data for checkpointId:%" PRId64 " in %s", pTaskIdStr, chkptId, checkpointPath);
stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath);
}
return code;
}
int32_t rebuildFromlocalDefault(char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int32_t code = 0;
return code;
}
int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) {
int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId, char** dbPrefixPath, char** dbPath) {
int32_t code = 0;
char* prefixPath = taosMemoryCalloc(1, strlen(path) + 128);
@ -533,13 +530,23 @@ int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, c
char* defaultPath = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(defaultPath, "%s%s%s", prefixPath, TD_DIRSEP, "state");
if (!taosIsDir(defaultPath)) {
taosMulMkDir(defaultPath);
}
stDebug("local default dir:%s, checkpointId:%" PRId64 ", key:%s succ", defaultPath, chkptId, key);
char* checkpointRoot = taosMemoryCalloc(1, strlen(path) + 256);
sprintf(checkpointRoot, "%s%s%s", prefixPath, TD_DIRSEP, "checkpoints");
if (!taosIsDir(checkpointRoot)) {
taosMulMkDir(checkpointRoot);
}
taosMemoryFree(checkpointRoot);
stDebug("%s check local default:%s, checkpointId:%" PRId64 " succ", key, defaultPath, chkptId);
char* chkptPath = taosMemoryCalloc(1, strlen(path) + 256);
if (chkptId != 0) {
if (chkptId > 0) {
sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint", chkptId);
code = rebuildFromLocalCheckpoint(key, chkptPath, chkptId, defaultPath);
@ -550,16 +557,9 @@ int32_t rebuildDirFormCheckpoint(const char* path, char* key, int64_t chkptId, c
if (code != 0) {
stError("failed to start stream backend at %s, reason: %s, restart from default defaultPath:%s", chkptPath,
tstrerror(code), defaultPath);
code = taosMkDir(defaultPath);
}
} else {
sprintf(chkptPath, "%s%s%s%s%s%" PRId64 "", prefixPath, TD_DIRSEP, "checkpoints", TD_DIRSEP, "checkpoint",
(int64_t)-1);
code = rebuildFromLocalCheckpoint(key, chkptPath, -1, defaultPath);
if (code != 0) {
code = taosMkDir(defaultPath);
}
} else { // no valid checkpoint id
stInfo("%s no valid checkpoint ever generated, no need to copy checkpoint data", key);
}
taosMemoryFree(chkptPath);
@ -646,7 +646,7 @@ void* streamBackendInit(const char* streamPath, int64_t chkpId, int32_t vgId) {
if (cfs != NULL) {
rocksdb_list_column_families_destroy(cfs, nCf);
}
stDebug("succ to init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
stDebug("%s init stream backend at %s, backend:%p, vgId:%d", backendPath, pHandle, vgId);
taosMemoryFreeClear(backendPath);
return (void*)pHandle;
@ -1933,6 +1933,7 @@ int32_t taskDbBuildFullPath(char* path, char* key, char** dbFullPath, char** sta
*stateFullPath = statePath;
return 0;
}
void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
STaskDbWrapper* p = pTaskDb;
taosThreadMutexLock(&p->mutex);
@ -1940,7 +1941,7 @@ void taskDbUpdateChkpId(void* pTaskDb, int64_t chkpId) {
taosThreadMutexUnlock(&p->mutex);
}
STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) {
STaskDbWrapper* taskDbOpenImpl(const char* key, char* statePath, char* dbPath) {
char* err = NULL;
char** cfNames = NULL;
size_t nCf = 0;
@ -1955,7 +1956,7 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) {
cfNames = rocksdb_list_column_families(pTaskDb->dbOpt, dbPath, &nCf, &err);
if (nCf == 0) {
stInfo("newly create db, need to restart");
stInfo("%s newly create db, need to restart", key);
// pre create db
pTaskDb->db = rocksdb_open(pTaskDb->pCfOpts[0], dbPath, &err);
if (pTaskDb->db == NULL) goto _EXIT;
@ -1980,21 +1981,21 @@ STaskDbWrapper* taskDbOpenImpl(char* key, char* statePath, char* dbPath) {
cfNames = NULL;
}
stDebug("succ to init stream backend at %s, backend:%p", dbPath, pTaskDb);
stDebug("init s-task backend in:%s, backend:%p, %s", dbPath, pTaskDb, key);
return pTaskDb;
_EXIT:
_EXIT:
taskDbDestroy(pTaskDb, false);
if (err) taosMemoryFree(err);
if (cfNames) rocksdb_list_column_families_destroy(cfNames, nCf);
return NULL;
}
STaskDbWrapper* taskDbOpen(char* path, char* key, int64_t chkptId) {
STaskDbWrapper* taskDbOpen(const char* path, const char* key, int64_t chkptId) {
char* statePath = NULL;
char* dbPath = NULL;
if (rebuildDirFormCheckpoint(path, key, chkptId, &statePath, &dbPath) != 0) {
if (restoreCheckpointData(path, key, chkptId, &statePath, &dbPath) != 0) {
return NULL;
}

View File

@ -239,6 +239,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
taosThreadMutexLock(&pTask->lock);
if (pReq->checkpointId <= pInfo->checkpointId) {
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64
" no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 " ignored",
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer);
taosThreadMutexUnlock(&pTask->lock);
return TSDB_CODE_SUCCESS;
}
SStreamTaskState* pStatus = streamTaskGetStatus(pTask);
stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint info, checkpointId:%" PRId64 "->%" PRId64
@ -246,7 +254,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin
id, vgId, pStatus->name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
pInfo->checkpointTime, pReq->checkpointTs);
// in the
if (pStatus->state != TASK_STATUS__DROPPING) {
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer);
@ -593,7 +600,7 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch
return 0;
}
int32_t streamTaskDownloadCheckpointData(char* id, char* path) {
int32_t streamTaskDownloadCheckpointData(const char* id, char* path) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("down checkpoint data parameters invalid");
return -1;

View File

@ -239,9 +239,7 @@ int32_t streamMetaMayCvtDbFormat(SStreamMeta* pMeta) {
return 0;
}
int32_t streamTaskSetDb(SStreamMeta* pMeta, void* arg, char* key) {
SStreamTask* pTask = arg;
int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key) {
int64_t chkpId = pTask->chkInfo.checkpointId;
taosThreadMutexLock(&pMeta->backendMutex);
@ -1358,7 +1356,7 @@ static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64
return TSDB_CODE_SUCCESS;
}
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta, __stream_task_expand_fn expandFn) {
int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId;
int64_t now = taosGetTimestampMs();
@ -1392,8 +1390,17 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
continue;
}
// fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo;
code = expandFn(pTask);
if (code != TSDB_CODE_SUCCESS) {
stError("s-task:0x%x vgId:%d failed to build stream backend", pTaskId->taskId, vgId);
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
streamMetaReleaseTask(pMeta, pTask);
continue;
}
// fill-history task can only be launched by related stream tasks.
if (pTask->info.fillHistory == 1) {
stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
streamMetaReleaseTask(pMeta, pTask);
@ -1481,9 +1488,9 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
return true;
}
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, __stream_task_expand_fn expandFn) {
int32_t vgId = pMeta->vgId;
stInfo("vgId:%d start to task:0x%x by checking downstream status", vgId, taskId);
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, streamId, taskId);
if (pTask == NULL) {
@ -1501,6 +1508,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
ASSERT(pTask->status.downstreamReady == 0);
if (pTask->pBackend == NULL) { // todo handle the error code
int32_t code = expandFn(pTask);
}
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
if (ret != TSDB_CODE_SUCCESS) {
stError("s-task:%s vgId:%d failed to handle event:%d", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT);

View File

@ -98,7 +98,7 @@ int stateKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, int kLen2) {
return winKeyCmprImpl(&pWin1->key, &pWin2->key);
}
SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
SStreamState* streamStateOpen(const char* path, void* pTask, bool specPath, int32_t szPage, int32_t pages) {
SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState));
stDebug("open stream state %p, %s", pState, path);
if (pState == NULL) {
@ -127,7 +127,7 @@ SStreamState* streamStateOpen(char* path, void* pTask, bool specPath, int32_t sz
_hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT);
pState->parNameMap = tSimpleHashInit(1024, hashFn);
stInfo("succ to open state %p on backend %p 0x%" PRIx64 "-%d", pState, pMeta->streamBackend, pState->streamId,
stInfo("open state %p on backend %p 0x%" PRIx64 "-%d succ", pState, pMeta->streamBackend, pState->streamId,
pState->taskId);
return pState;