Merge pull request #27286 from taosdata/fix/3_liaohj
refactor: do some internal refactor.
This commit is contained in:
commit
cf645f4cdf
|
@ -750,6 +750,9 @@ void streamMetaStartHb(SStreamMeta* pMeta);
|
||||||
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
bool streamMetaTaskInTimer(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||||
int64_t endTs, bool ready);
|
int64_t endTs, bool ready);
|
||||||
|
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo);
|
||||||
|
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo);
|
||||||
|
|
||||||
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
|
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
|
||||||
|
@ -770,7 +773,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId);
|
||||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
bool streamMetaAllTasksReady(const SStreamMeta* pMeta);
|
||||||
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask);
|
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask);
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
int32_t streamTimerGetInstance(tmr_h* pTmr);
|
||||||
|
|
|
@ -26,12 +26,13 @@ static int32_t mndStreamSendUpdateChkptInfoMsg(SMnode *pMnode);
|
||||||
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
|
static int32_t mndSendDropOrphanTasksMsg(SMnode *pMnode, SArray *pList);
|
||||||
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId);
|
static int32_t mndSendResetFromCheckpointMsg(SMnode *pMnode, int64_t streamId, int32_t transId);
|
||||||
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
|
static void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage);
|
||||||
static void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo);
|
static void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo);
|
||||||
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
|
static int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList);
|
||||||
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
|
static int32_t suspendAllStreams(SMnode *pMnode, SRpcHandleInfo *info);
|
||||||
static bool validateHbMsg(const SArray *pNodeList, int32_t vgId);
|
static bool validateHbMsg(const SArray *pNodeList, int32_t vgId);
|
||||||
static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
|
static void cleanupAfterProcessHbMsg(SStreamHbMsg *pReq, SArray *pFailedChkptList, SArray *pOrphanTasks);
|
||||||
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId);
|
static void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_t msgId);
|
||||||
|
static void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks);
|
||||||
|
|
||||||
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
|
void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
|
||||||
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
int32_t numOfNodes = taosArrayGetSize(execInfo.pNodeList);
|
||||||
|
@ -52,7 +53,7 @@ void updateStageInfo(STaskStatusEntry *pTaskEntry, int64_t stage) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void addIntoCheckpointList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
|
void addIntoFailedChkptList(SArray *pList, const SFailedCheckpointInfo *pInfo) {
|
||||||
int32_t num = taosArrayGetSize(pList);
|
int32_t num = taosArrayGetSize(pList);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
SFailedCheckpointInfo *p = taosArrayGet(pList, i);
|
SFailedCheckpointInfo *p = taosArrayGet(pList, i);
|
||||||
|
@ -401,13 +402,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &p->id, sizeof(p->id));
|
||||||
if (pTaskEntry == NULL) {
|
if (pTaskEntry == NULL) {
|
||||||
mError("s-task:0x%" PRIx64 " not found in mnode task list, added into orphan task list", p->id.taskId);
|
checkforOrphanTask(pMnode, p, pOrphanTasks);
|
||||||
|
|
||||||
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
|
|
||||||
void* px = taosArrayPush(pOrphanTasks, &oTask);
|
|
||||||
if (px == NULL) {
|
|
||||||
mError("failed to put task into list, taskId:0x%" PRIx64, p->id.taskId);
|
|
||||||
}
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -423,7 +418,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
|
code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
|
||||||
if (code) {
|
if (code) {
|
||||||
mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code));
|
mError("stream:0x%" PRIx64 " not exist, failed to handle consensus checkpoint-info req for task:0x%x, code:%s",
|
||||||
|
p->id.streamId, (int32_t)p->id.taskId, tstrerror(code));
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -434,7 +430,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
mndAddConsensusTasks(pInfo, &cp);
|
mndAddConsensusTasks(pInfo, &cp);
|
||||||
} else {
|
} else {
|
||||||
mError("failed to get consensus checkpoint-info");
|
mError("failed to get consensus checkpoint-info for stream:0x%" PRIx64, p->id.streamId);
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseStream(pMnode, pStream);
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
@ -454,7 +450,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
SFailedCheckpointInfo info = {
|
SFailedCheckpointInfo info = {
|
||||||
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
|
.transId = pChkInfo->activeTransId, .checkpointId = pChkInfo->activeId, .streamUid = p->id.streamId};
|
||||||
addIntoCheckpointList(pFailedChkpt, &info);
|
addIntoFailedChkptList(pFailedChkpt, &info);
|
||||||
|
|
||||||
// remove failed trans from pChkptStreams
|
// remove failed trans from pChkptStreams
|
||||||
code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
|
code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
|
||||||
|
@ -516,6 +512,9 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
if (pMnode != NULL) { // make sure that the unit test case can work
|
if (pMnode != NULL) { // make sure that the unit test case can work
|
||||||
code = mndStreamSendUpdateChkptInfoMsg(pMnode);
|
code = mndStreamSendUpdateChkptInfoMsg(pMnode);
|
||||||
|
if (code) {
|
||||||
|
mError("failed to send update checkpointInfo msg, code:%s, try next time", tstrerror(code));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexUnlock(&execInfo.lock);
|
streamMutexUnlock(&execInfo.lock);
|
||||||
|
@ -554,3 +553,26 @@ void doSendHbMsgRsp(int32_t code, SRpcHandleInfo *pRpcInfo, int32_t vgId, int32_
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
pRpcInfo->handle = NULL; // disable auto rsp
|
pRpcInfo->handle = NULL; // disable auto rsp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void checkforOrphanTask(SMnode* pMnode, STaskStatusEntry* p, SArray* pOrphanTasks) {
|
||||||
|
SStreamObj *pStream = NULL;
|
||||||
|
|
||||||
|
int32_t code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
|
||||||
|
if (code) {
|
||||||
|
mError("stream:0x%" PRIx64 " not exists, s-task:0x%" PRIx64 " not found in task list, add into orphan list",
|
||||||
|
p->id.streamId, p->id.taskId);
|
||||||
|
|
||||||
|
SOrphanTask oTask = {.streamId = p->id.streamId, .taskId = p->id.taskId, .nodeId = p->nodeId};
|
||||||
|
void *px = taosArrayPush(pOrphanTasks, &oTask);
|
||||||
|
if (px == NULL) {
|
||||||
|
mError("failed to put task into orphan list, taskId:0x%" PRIx64", code:%s", p->id.taskId, tstrerror(terrno));
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (pStream != NULL) {
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
}
|
||||||
|
|
||||||
|
mError("s-task:0x%" PRIx64 " not found in task list but exists in mnode meta, data inconsistent, not drop yet",
|
||||||
|
p->id.taskId);
|
||||||
|
}
|
||||||
|
}
|
|
@ -131,27 +131,6 @@ int32_t tqStreamStartOneTaskAsync(SStreamMeta* pMeta, SMsgCb* cb, int64_t stream
|
||||||
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
|
return streamTaskSchedTask(cb, vgId, streamId, taskId, STREAM_EXEC_T_START_ONE_TASK);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqStreamTaskRestoreCheckpoint(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
|
||||||
if (numOfTasks == 0) {
|
|
||||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
tqDebug("vgId:%d restore task:0x%" PRIx64 "-0x%x checkpointId", vgId, streamId, taskId);
|
|
||||||
SStreamTask* pTask = NULL;
|
|
||||||
int32_t code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
|
|
||||||
if (pTask == NULL) {
|
|
||||||
tqError("failed to acquire task:0x%x when trying to restore checkpointId", taskId);
|
|
||||||
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
|
|
||||||
}
|
|
||||||
|
|
||||||
code = streamTaskSendRestoreChkptMsg(pTask);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// this is to process request from transaction, always return true.
|
// this is to process request from transaction, always return true.
|
||||||
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pMsg, bool restored) {
|
||||||
int32_t vgId = pMeta->vgId;
|
int32_t vgId = pMeta->vgId;
|
||||||
|
|
|
@ -828,16 +828,19 @@ static int32_t findNextValidRow(SLDataIter *pIter, const char *idStr) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
int32_t tLDataIterNextRow(SLDataIter *pIter, const char *idStr, bool* hasNext) {
|
||||||
int32_t step = pIter->backward ? -1 : 1;
|
int32_t step = pIter->backward ? -1 : 1;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t iBlockL = pIter->iSttBlk;
|
int32_t iBlockL = pIter->iSttBlk;
|
||||||
SBlockData *pBlockData = NULL;
|
SBlockData *pBlockData = NULL;
|
||||||
|
int32_t lino = 0;
|
||||||
|
|
||||||
|
*hasNext = false;
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
// no qualified last file block in current file, no need to fetch row
|
// no qualified last file block in current file, no need to fetch row
|
||||||
if (pIter->pSttBlk == NULL) {
|
if (pIter->pSttBlk == NULL) {
|
||||||
return false;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = loadLastBlock(pIter, idStr, &pBlockData);
|
code = loadLastBlock(pIter, idStr, &pBlockData);
|
||||||
|
@ -850,9 +853,7 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
while (1) {
|
while (1) {
|
||||||
bool skipBlock = false;
|
bool skipBlock = false;
|
||||||
code = findNextValidRow(pIter, idStr);
|
code = findNextValidRow(pIter, idStr);
|
||||||
if (code) {
|
TSDB_CHECK_CODE(code, lino, _exit);
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pIter->pBlockLoadInfo->checkRemainingRow) {
|
if (pIter->pBlockLoadInfo->checkRemainingRow) {
|
||||||
skipBlock = true;
|
skipBlock = true;
|
||||||
|
@ -902,7 +903,8 @@ bool tLDataIterNextRow(SLDataIter *pIter, const char *idStr) {
|
||||||
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
|
pIter->rInfo.row = tsdbRowFromBlockData(pBlockData, pIter->iRow);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
return (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
|
*hasNext = (code == TSDB_CODE_SUCCESS) && (pIter->pSttBlk != NULL) && (pBlockData != NULL);
|
||||||
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
// SMergeTree =================================================
|
// SMergeTree =================================================
|
||||||
|
@ -1005,7 +1007,12 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
||||||
goto _end;
|
goto _end;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
|
bool hasVal = NULL;
|
||||||
|
code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
|
||||||
|
if (code) {
|
||||||
|
goto _end;
|
||||||
|
}
|
||||||
|
|
||||||
if (hasVal) {
|
if (hasVal) {
|
||||||
tMergeTreeAddIter(pMTree, pIter);
|
tMergeTreeAddIter(pMTree, pIter);
|
||||||
|
|
||||||
|
@ -1018,7 +1025,6 @@ int32_t tMergeTreeOpen2(SMergeTree *pMTree, SMergeTreeConf *pConf, SSttDataInfoF
|
||||||
pSttDataInfo->numOfRows += numOfRows;
|
pSttDataInfo->numOfRows += numOfRows;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
TAOS_CHECK_GOTO(terrno, NULL, _end);
|
|
||||||
if (!pMTree->ignoreEarlierTs) {
|
if (!pMTree->ignoreEarlierTs) {
|
||||||
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
|
pMTree->ignoreEarlierTs = pIter->ignoreEarlierTs;
|
||||||
}
|
}
|
||||||
|
@ -1100,8 +1106,9 @@ bool tMergeTreeNext(SMergeTree *pMTree) {
|
||||||
if (pMTree->pIter) {
|
if (pMTree->pIter) {
|
||||||
SLDataIter *pIter = pMTree->pIter;
|
SLDataIter *pIter = pMTree->pIter;
|
||||||
|
|
||||||
bool hasVal = tLDataIterNextRow(pIter, pMTree->idStr);
|
bool hasVal = false;
|
||||||
if (!hasVal) {
|
int32_t code = tLDataIterNextRow(pIter, pMTree->idStr, &hasVal);
|
||||||
|
if (!hasVal || (code != 0)) {
|
||||||
pMTree->pIter = NULL;
|
pMTree->pIter = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1354,7 +1354,7 @@ int32_t deleteCheckpointFile(const char* id, const char* name) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
|
|
|
@ -200,7 +200,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
|
||||||
if ((*pTask)->status.requireConsensusChkptId) {
|
if ((*pTask)->status.requireConsensusChkptId) {
|
||||||
entry.checkpointInfo.consensusChkptId = 1;
|
entry.checkpointInfo.consensusChkptId = 1;
|
||||||
(*pTask)->status.requireConsensusChkptId = false;
|
(*pTask)->status.requireConsensusChkptId = false;
|
||||||
stDebug("s-task:%s vgId:%d set the require consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId);
|
stDebug("s-task:%s vgId:%d set requiring consensus-checkpointId in hbMsg", (*pTask)->id.idStr, pMeta->vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*pTask)->exec.pWalReader != NULL) {
|
if ((*pTask)->exec.pWalReader != NULL) {
|
||||||
|
|
|
@ -37,12 +37,6 @@ typedef struct {
|
||||||
SHashObj* pTable;
|
SHashObj* pTable;
|
||||||
} SMetaRefMgt;
|
} SMetaRefMgt;
|
||||||
|
|
||||||
typedef struct STaskInitTs {
|
|
||||||
int64_t start;
|
|
||||||
int64_t end;
|
|
||||||
bool success;
|
|
||||||
} STaskInitTs;
|
|
||||||
|
|
||||||
SMetaRefMgt gMetaRefMgt;
|
SMetaRefMgt gMetaRefMgt;
|
||||||
|
|
||||||
int32_t metaRefMgtInit();
|
int32_t metaRefMgtInit();
|
||||||
|
@ -405,15 +399,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->startInfo.pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
code = streamMetaInitStartInfo(&pMeta->startInfo);
|
||||||
if (pMeta->startInfo.pReadyTaskSet == NULL) {
|
if (code) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMeta->startInfo.pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
|
|
||||||
if (pMeta->startInfo.pFailedTaskSet == NULL) {
|
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -609,8 +596,8 @@ void streamMetaCloseImpl(void* arg) {
|
||||||
taosHashCleanup(pMeta->pTasksMap);
|
taosHashCleanup(pMeta->pTasksMap);
|
||||||
taosHashCleanup(pMeta->pTaskDbUnique);
|
taosHashCleanup(pMeta->pTaskDbUnique);
|
||||||
taosHashCleanup(pMeta->updateInfo.pTasks);
|
taosHashCleanup(pMeta->updateInfo.pTasks);
|
||||||
taosHashCleanup(pMeta->startInfo.pReadyTaskSet);
|
|
||||||
taosHashCleanup(pMeta->startInfo.pFailedTaskSet);
|
streamMetaClearStartInfo(&pMeta->startInfo);
|
||||||
|
|
||||||
destroyMetaHbInfo(pMeta->pHbInfo);
|
destroyMetaHbInfo(pMeta->pHbInfo);
|
||||||
pMeta->pHbInfo = NULL;
|
pMeta->pHbInfo = NULL;
|
||||||
|
@ -1191,18 +1178,6 @@ void streamMetaStartHb(SStreamMeta* pMeta) {
|
||||||
streamMetaHbToMnode(pRid, NULL);
|
streamMetaHbToMnode(pRid, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
|
|
||||||
taosHashClear(pStartInfo->pReadyTaskSet);
|
|
||||||
taosHashClear(pStartInfo->pFailedTaskSet);
|
|
||||||
pStartInfo->tasksWillRestart = 0;
|
|
||||||
pStartInfo->readyTs = 0;
|
|
||||||
pStartInfo->elapsedTime = 0;
|
|
||||||
|
|
||||||
// reset the sentinel flag value to be 0
|
|
||||||
pStartInfo->startAllTasks = 0;
|
|
||||||
stDebug("vgId:%d clear start-all-task info", vgId);
|
|
||||||
}
|
|
||||||
|
|
||||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||||
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
// stTrace("vgId:%d meta-rlock", pMeta->vgId);
|
||||||
(void)taosThreadRwlockRdlock(&pMeta->lock);
|
(void)taosThreadRwlockRdlock(&pMeta->lock);
|
||||||
|
@ -1302,185 +1277,6 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
|
|
||||||
streamMetaWLock(pMeta);
|
|
||||||
|
|
||||||
if (pMeta->closeFlag) {
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
|
|
||||||
return TSDB_CODE_FAILED;
|
|
||||||
}
|
|
||||||
|
|
||||||
*pList = taosArrayDup(pMeta->pTaskList, NULL);
|
|
||||||
if (*pList == NULL) {
|
|
||||||
return terrno;
|
|
||||||
}
|
|
||||||
|
|
||||||
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
|
||||||
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
|
||||||
pMeta->startInfo.startTs = now;
|
|
||||||
|
|
||||||
int32_t code = streamMetaResetTaskStatus(pMeta);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
// restore the checkpoint id by negotiating the latest consensus checkpoint id
|
|
||||||
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
|
||||||
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
|
|
||||||
|
|
||||||
if (numOfTasks == 0) {
|
|
||||||
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
SArray* pTaskList = NULL;
|
|
||||||
code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
ASSERT(pTaskList == NULL);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
|
|
||||||
numOfTasks = taosArrayGetSize(pTaskList);
|
|
||||||
|
|
||||||
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
|
|
||||||
// initialization, when the operation of check downstream tasks status is executed far quickly.
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
|
||||||
SStreamTask* pTask = NULL;
|
|
||||||
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
|
||||||
if (pTask == NULL) {
|
|
||||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
|
||||||
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
|
|
||||||
code = pMeta->expandTaskFn(pTask);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
|
|
||||||
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
|
||||||
|
|
||||||
SStreamTask* pTask = NULL;
|
|
||||||
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
|
||||||
if (pTask == NULL) {
|
|
||||||
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
|
||||||
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
|
||||||
|
|
||||||
// fill-history task can only be launched by related stream tasks.
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
|
||||||
stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ready now, start the related fill-history task
|
|
||||||
if (pTask->status.downstreamReady == 1) {
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
|
||||||
pTask->id.idStr);
|
|
||||||
(void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
|
|
||||||
}
|
|
||||||
|
|
||||||
(void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
|
|
||||||
true);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
|
||||||
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
|
||||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
|
||||||
code = ret;
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
// negotiate the consensus checkpoint id for current task
|
|
||||||
code = streamTaskSendRestoreChkptMsg(pTask);
|
|
||||||
|
|
||||||
// this task may has no checkpoint, but others tasks may generate checkpoint already?
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
|
|
||||||
// initialization, when the operation of check downstream tasks status is executed far quickly.
|
|
||||||
stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
|
|
||||||
taosArrayDestroy(pTaskList);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
|
||||||
streamMetaRLock(pMeta);
|
|
||||||
|
|
||||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
|
||||||
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
|
|
||||||
if (num == 0) {
|
|
||||||
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
|
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
int64_t st = taosGetTimestampMs();
|
|
||||||
|
|
||||||
// send hb msg to mnode before closing all tasks.
|
|
||||||
SArray* pTaskList = NULL;
|
|
||||||
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pTaskList);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
|
||||||
SStreamTask* pTask = NULL;
|
|
||||||
|
|
||||||
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
(void)streamTaskStop(pTask);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
}
|
|
||||||
|
|
||||||
taosArrayDestroy(pTaskList);
|
|
||||||
|
|
||||||
double el = (taosGetTimestampMs() - st) / 1000.0;
|
|
||||||
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
|
|
||||||
|
|
||||||
streamMetaRUnLock(pMeta);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
|
bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
|
||||||
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||||
for (int32_t i = 0; i < num; ++i) {
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
|
@ -1499,196 +1295,6 @@ bool streamMetaAllTasksReady(const SStreamMeta* pMeta) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
|
||||||
int32_t code = 0;
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
SStreamTask* pTask = NULL;
|
|
||||||
bool continueExec = true;
|
|
||||||
|
|
||||||
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
|
|
||||||
|
|
||||||
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
|
|
||||||
if (pTask == NULL) {
|
|
||||||
stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
|
|
||||||
(void)streamMetaAddFailedTask(pMeta, streamId, taskId);
|
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// fill-history task can only be launched by related stream tasks.
|
|
||||||
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
|
||||||
stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId);
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
// the start all tasks procedure may happen to start the newly deployed stream task, and results in the
|
|
||||||
// concurrently start this task by two threads.
|
|
||||||
streamMutexLock(&pTask->lock);
|
|
||||||
SStreamTaskState status = streamTaskGetStatus(pTask);
|
|
||||||
if (status.state != TASK_STATUS__UNINIT) {
|
|
||||||
stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
|
|
||||||
continueExec = false;
|
|
||||||
} else {
|
|
||||||
continueExec = true;
|
|
||||||
}
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
|
||||||
|
|
||||||
if (!continueExec) {
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
|
||||||
|
|
||||||
// avoid initialization and destroy running concurrently.
|
|
||||||
streamMutexLock(&pTask->lock);
|
|
||||||
if (pTask->pBackend == NULL) {
|
|
||||||
code = pMeta->expandTaskFn(pTask);
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
streamMutexUnlock(&pTask->lock);
|
|
||||||
}
|
|
||||||
|
|
||||||
// concurrently start task may cause the later started task be failed, and also failed to added into meta result.
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
|
||||||
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
|
|
||||||
tstrerror(code));
|
|
||||||
|
|
||||||
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
|
|
||||||
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
|
||||||
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
void* pIter = NULL;
|
|
||||||
size_t keyLen = 0;
|
|
||||||
|
|
||||||
stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
|
|
||||||
succ ? "success" : "failed");
|
|
||||||
|
|
||||||
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
|
|
||||||
STaskInitTs* pInfo = pIter;
|
|
||||||
void* key = taosHashGetKey(pIter, &keyLen);
|
|
||||||
|
|
||||||
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
|
|
||||||
if (pTask1 == NULL) {
|
|
||||||
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
|
|
||||||
} else {
|
|
||||||
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
|
|
||||||
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// check all existed tasks are received rsp
|
|
||||||
static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
|
|
||||||
for (int32_t i = 0; i < numOfTotal; ++i) {
|
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
|
||||||
if (pTaskId == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
|
||||||
void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
|
|
||||||
if (px == NULL) {
|
|
||||||
px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
|
|
||||||
if (px == NULL) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
|
||||||
int64_t endTs, bool ready) {
|
|
||||||
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
|
||||||
STaskId id = {.streamId = streamId, .taskId = taskId};
|
|
||||||
int32_t vgId = pMeta->vgId;
|
|
||||||
bool allRsp = true;
|
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
|
||||||
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
|
||||||
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
|
||||||
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// clear the send consensus-checkpointId flag
|
|
||||||
streamMutexLock(&(*p)->lock);
|
|
||||||
(*p)->status.sendConsensusChkptId = false;
|
|
||||||
streamMutexUnlock(&(*p)->lock);
|
|
||||||
|
|
||||||
if (pStartInfo->startAllTasks != 1) {
|
|
||||||
int64_t el = endTs - startTs;
|
|
||||||
stDebug(
|
|
||||||
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
|
|
||||||
"time:%" PRId64 "ms",
|
|
||||||
vgId, taskId, ready, el);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
|
||||||
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
|
|
||||||
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
|
||||||
if (code) {
|
|
||||||
if (code == TSDB_CODE_DUP_KEY) {
|
|
||||||
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
|
|
||||||
" already exist start results in meta start task result hashmap",
|
|
||||||
vgId, id.taskId);
|
|
||||||
} else {
|
|
||||||
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
|
|
||||||
}
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
|
||||||
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
|
|
||||||
|
|
||||||
allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
|
|
||||||
if (allRsp) {
|
|
||||||
pStartInfo->readyTs = taosGetTimestampMs();
|
|
||||||
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
|
||||||
|
|
||||||
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
|
|
||||||
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
|
||||||
vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
|
|
||||||
pStartInfo->elapsedTime / 1000.0);
|
|
||||||
|
|
||||||
// print the initialization elapsed time and info
|
|
||||||
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
|
||||||
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
|
||||||
streamMetaResetStartInfo(pStartInfo, vgId);
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
|
|
||||||
code = pStartInfo->completeFn(pMeta);
|
|
||||||
} else {
|
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
|
|
||||||
numOfRecv, numOfTotal);
|
|
||||||
}
|
|
||||||
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
|
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,444 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include "executor.h"
|
||||||
|
#include "streamBackendRocksdb.h"
|
||||||
|
#include "streamInt.h"
|
||||||
|
#include "tmisce.h"
|
||||||
|
#include "tref.h"
|
||||||
|
#include "tsched.h"
|
||||||
|
#include "tstream.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
#include "wal.h"
|
||||||
|
|
||||||
|
typedef struct STaskInitTs {
|
||||||
|
int64_t start;
|
||||||
|
int64_t end;
|
||||||
|
bool success;
|
||||||
|
} STaskInitTs;
|
||||||
|
|
||||||
|
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
|
||||||
|
static bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
|
||||||
|
static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
|
||||||
|
|
||||||
|
// restore the checkpoint id by negotiating the latest consensus checkpoint id
|
||||||
|
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
|
||||||
|
|
||||||
|
if (numOfTasks == 0) {
|
||||||
|
stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pTaskList = NULL;
|
||||||
|
code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
ASSERT(pTaskList == NULL);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
|
||||||
|
numOfTasks = taosArrayGetSize(pTaskList);
|
||||||
|
|
||||||
|
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
|
||||||
|
// initialization, when the operation of check downstream tasks status is executed far quickly.
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
SStreamTask* pTask = NULL;
|
||||||
|
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||||
|
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
|
||||||
|
code = pMeta->expandTaskFn(pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
|
||||||
|
streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
|
||||||
|
SStreamTask* pTask = NULL;
|
||||||
|
code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
|
||||||
|
(void)streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
||||||
|
|
||||||
|
// fill-history task can only be launched by related stream tasks.
|
||||||
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// ready now, start the related fill-history task
|
||||||
|
if (pTask->status.downstreamReady == 1) {
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
|
||||||
|
pTask->id.idStr);
|
||||||
|
(void)streamLaunchFillHistoryTask(pTask); // todo: how about retry launch fill-history task?
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
|
||||||
|
true);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
|
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
||||||
|
code = ret;
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
||||||
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
// negotiate the consensus checkpoint id for current task
|
||||||
|
code = streamTaskSendNegotiateChkptIdMsg(pTask);
|
||||||
|
|
||||||
|
// this task may has no checkpoint, but others tasks may generate checkpoint already?
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
// prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
|
||||||
|
// initialization, when the operation of check downstream tasks status is executed far quickly.
|
||||||
|
stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
|
||||||
|
taosArrayDestroy(pTaskList);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
|
if (pMeta->closeFlag) {
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
*pList = taosArrayDup(pMeta->pTaskList, NULL);
|
||||||
|
if (*pList == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosHashClear(pMeta->startInfo.pReadyTaskSet);
|
||||||
|
taosHashClear(pMeta->startInfo.pFailedTaskSet);
|
||||||
|
pMeta->startInfo.startTs = now;
|
||||||
|
|
||||||
|
int32_t code = streamMetaResetTaskStatus(pMeta);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
|
||||||
|
taosHashClear(pStartInfo->pReadyTaskSet);
|
||||||
|
taosHashClear(pStartInfo->pFailedTaskSet);
|
||||||
|
pStartInfo->tasksWillRestart = 0;
|
||||||
|
pStartInfo->readyTs = 0;
|
||||||
|
pStartInfo->elapsedTime = 0;
|
||||||
|
|
||||||
|
// reset the sentinel flag value to be 0
|
||||||
|
pStartInfo->startAllTasks = 0;
|
||||||
|
stDebug("vgId:%d clear start-all-task info", vgId);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
|
||||||
|
int64_t endTs, bool ready) {
|
||||||
|
STaskStartInfo* pStartInfo = &pMeta->startInfo;
|
||||||
|
STaskId id = {.streamId = streamId, .taskId = taskId};
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
bool allRsp = true;
|
||||||
|
|
||||||
|
streamMetaWLock(pMeta);
|
||||||
|
SStreamTask** p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
|
if (p == NULL) { // task does not exists in current vnode, not record the complete info
|
||||||
|
stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// clear the send consensus-checkpointId flag
|
||||||
|
streamMutexLock(&(*p)->lock);
|
||||||
|
(*p)->status.sendConsensusChkptId = false;
|
||||||
|
streamMutexUnlock(&(*p)->lock);
|
||||||
|
|
||||||
|
if (pStartInfo->startAllTasks != 1) {
|
||||||
|
int64_t el = endTs - startTs;
|
||||||
|
stDebug(
|
||||||
|
"vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
|
||||||
|
"time:%" PRId64 "ms",
|
||||||
|
vgId, taskId, ready, el);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
|
||||||
|
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
|
||||||
|
int32_t code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
|
||||||
|
if (code) {
|
||||||
|
if (code == TSDB_CODE_DUP_KEY) {
|
||||||
|
stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
|
||||||
|
" already exist start results in meta start task result hashmap",
|
||||||
|
vgId, id.taskId);
|
||||||
|
} else {
|
||||||
|
stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
|
||||||
|
}
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
|
||||||
|
int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
|
||||||
|
|
||||||
|
allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
|
||||||
|
if (allRsp) {
|
||||||
|
pStartInfo->readyTs = taosGetTimestampMs();
|
||||||
|
pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
|
||||||
|
|
||||||
|
stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
|
||||||
|
", readyTs:%" PRId64 " total elapsed time:%.2fs",
|
||||||
|
vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
|
||||||
|
pStartInfo->elapsedTime / 1000.0);
|
||||||
|
|
||||||
|
// print the initialization elapsed time and info
|
||||||
|
displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
|
||||||
|
displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
|
||||||
|
streamMetaResetStartInfo(pStartInfo, vgId);
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
code = pStartInfo->completeFn(pMeta);
|
||||||
|
} else {
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
|
||||||
|
numOfRecv, numOfTotal);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
// check all existed tasks are received rsp
|
||||||
|
bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
|
||||||
|
for (int32_t i = 0; i < numOfTotal; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
if (pTaskId == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
|
||||||
|
void* px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
|
||||||
|
if (px == NULL) {
|
||||||
|
px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
|
||||||
|
if (px == NULL) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
void* pIter = NULL;
|
||||||
|
size_t keyLen = 0;
|
||||||
|
|
||||||
|
stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
|
||||||
|
succ ? "success" : "failed");
|
||||||
|
|
||||||
|
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
|
||||||
|
STaskInitTs* pInfo = pIter;
|
||||||
|
void* key = taosHashGetKey(pIter, &keyLen);
|
||||||
|
|
||||||
|
SStreamTask** pTask1 = taosHashGet(pMeta->pTasksMap, key, sizeof(STaskId));
|
||||||
|
if (pTask1 == NULL) {
|
||||||
|
stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
|
||||||
|
} else {
|
||||||
|
stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", (*pTask1)->id.idStr,
|
||||||
|
(*pTask1)->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) {
|
||||||
|
_hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
|
||||||
|
|
||||||
|
pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||||
|
if (pStartInfo->pReadyTaskSet == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
|
||||||
|
if (pStartInfo->pFailedTaskSet == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) {
|
||||||
|
taosHashCleanup(pStartInfo->pReadyTaskSet);
|
||||||
|
taosHashCleanup(pStartInfo->pFailedTaskSet);
|
||||||
|
pStartInfo->readyTs = 0;
|
||||||
|
pStartInfo->elapsedTime = 0;
|
||||||
|
pStartInfo->startTs = 0;
|
||||||
|
pStartInfo->startAllTasks = 0;
|
||||||
|
pStartInfo->tasksWillRestart = 0;
|
||||||
|
pStartInfo->restartCount = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
|
||||||
|
int32_t code = 0;
|
||||||
|
int32_t vgId = pMeta->vgId;
|
||||||
|
SStreamTask* pTask = NULL;
|
||||||
|
bool continueExec = true;
|
||||||
|
|
||||||
|
stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
|
||||||
|
|
||||||
|
code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
|
||||||
|
if (pTask == NULL) {
|
||||||
|
stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
|
||||||
|
(void)streamMetaAddFailedTask(pMeta, streamId, taskId);
|
||||||
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// fill-history task can only be launched by related stream tasks.
|
||||||
|
STaskExecStatisInfo* pInfo = &pTask->execInfo;
|
||||||
|
if (pTask->info.fillHistory == 1) {
|
||||||
|
stError("s-task:0x%x vgId:%d fill-histroy task, not start here", taskId, vgId);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
// the start all tasks procedure may happen to start the newly deployed stream task, and results in the
|
||||||
|
// concurrently start this task by two threads.
|
||||||
|
streamMutexLock(&pTask->lock);
|
||||||
|
SStreamTaskState status = streamTaskGetStatus(pTask);
|
||||||
|
if (status.state != TASK_STATUS__UNINIT) {
|
||||||
|
stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
|
||||||
|
continueExec = false;
|
||||||
|
} else {
|
||||||
|
continueExec = true;
|
||||||
|
}
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
if (!continueExec) {
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
// avoid initialization and destroy running concurrently.
|
||||||
|
streamMutexLock(&pTask->lock);
|
||||||
|
if (pTask->pBackend == NULL) {
|
||||||
|
code = pMeta->expandTaskFn(pTask);
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
streamMutexUnlock(&pTask->lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
// concurrently start task may cause the later started task be failed, and also failed to added into meta result.
|
||||||
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
stError("s-task:%s vgId:%d failed to handle event:%d, code:%s", pTask->id.idStr, pMeta->vgId, TASK_EVENT_INIT,
|
||||||
|
tstrerror(code));
|
||||||
|
|
||||||
|
// do no added into result hashmap if it is failed due to concurrently starting of this stream task.
|
||||||
|
if (code != TSDB_CODE_STREAM_INVALID_STATETRANS) {
|
||||||
|
streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
|
||||||
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
|
int32_t num = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
|
||||||
|
if (num == 0) {
|
||||||
|
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
|
// send hb msg to mnode before closing all tasks.
|
||||||
|
SArray* pTaskList = NULL;
|
||||||
|
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pTaskList);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
|
||||||
|
SStreamTask* pTask = NULL;
|
||||||
|
|
||||||
|
code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
(void)streamTaskStop(pTask);
|
||||||
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayDestroy(pTaskList);
|
||||||
|
|
||||||
|
double el = (taosGetTimestampMs() - st) / 1000.0;
|
||||||
|
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
|
||||||
|
|
||||||
|
streamMetaRUnLock(pMeta);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
|
@ -1218,6 +1218,8 @@ const char* streamTaskGetExecType(int32_t type) {
|
||||||
return "resume-task-from-idle";
|
return "resume-task-from-idle";
|
||||||
case STREAM_EXEC_T_ADD_FAILED_TASK:
|
case STREAM_EXEC_T_ADD_FAILED_TASK:
|
||||||
return "record-start-failed-task";
|
return "record-start-failed-task";
|
||||||
|
case 0:
|
||||||
|
return "exec-all-tasks";
|
||||||
default:
|
default:
|
||||||
return "invalid-exec-type";
|
return "invalid-exec-type";
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue