Merge pull request #24488 from taosdata/fix/3_liaohj

fix(stream): fix some errors and do some internal refactor.
This commit is contained in:
Haojun Liao 2024-01-17 14:25:32 +08:00 committed by GitHub
commit fc16d26da2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 110 additions and 67 deletions

View File

@ -32,8 +32,9 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve
bool isLeader, bool restored); bool isLeader, bool restored);
int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks); int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg); int32_t tqStreamTaskProcessTaskPauseReq(SStreamMeta* pMeta, char* pMsg);
int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode); int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* pMsg, bool fromVnode);

View File

@ -690,9 +690,9 @@ typedef struct STaskStatusEntry {
int64_t verStart; // start version in WAL, only valid for source task int64_t verStart; // start version in WAL, only valid for source task
int64_t verEnd; // end version in WAL, only valid for source task int64_t verEnd; // end version in WAL, only valid for source task
int64_t processedVer; // only valid for source task int64_t processedVer; // only valid for source task
int64_t activeCheckpointId; // current active checkpoint id int64_t checkpointId; // current active checkpoint id
int32_t chkpointTransId; // checkpoint trans id int32_t chkpointTransId; // checkpoint trans id
bool checkpointFailed; // denote if the checkpoint is failed or not int8_t checkpointFailed; // denote if the checkpoint is failed or not
bool inputQChanging; // inputQ is changing or not bool inputQChanging; // inputQ is changing or not
int64_t inputQUnchangeCounter; int64_t inputQUnchangeCounter;
double inputQUsed; // in MiB double inputQUsed; // in MiB
@ -875,6 +875,8 @@ void streamMetaStartHb(SStreamMeta* pMeta);
bool streamMetaTaskInTimer(SStreamMeta* pMeta); bool streamMetaTaskInTimer(SStreamMeta* pMeta);
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs, int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
int64_t endTs, bool ready); int64_t endTs, bool ready);
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta);
void streamMetaRLock(SStreamMeta* pMeta); void streamMetaRLock(SStreamMeta* pMeta);
void streamMetaRUnLock(SStreamMeta* pMeta); void streamMetaRUnLock(SStreamMeta* pMeta);
void streamMetaWLock(SStreamMeta* pMeta); void streamMetaWLock(SStreamMeta* pMeta);

View File

@ -2374,7 +2374,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
sdbRelease(pSdb, pStream); sdbRelease(pSdb, pStream);
if (conflict) { if (conflict) {
mWarn("nodeUpdate trans in progress, current nodeUpdate ignored"); mError("nodeUpdate conflict with other trans, current nodeUpdate ignored");
sdbCancelFetch(pSdb, pIter); sdbCancelFetch(pSdb, pIter);
return -1; return -1;
} }
@ -2587,14 +2587,22 @@ int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot) {
// kill all trans in the dst DB // kill all trans in the dst DB
static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) { static void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
mDebug("start to clear checkpoints in all Dbs");
void *pIter = NULL; void *pIter = NULL;
while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) { while ((pIter = taosHashIterate(pChangeInfo->pDBMap, pIter)) != NULL) {
char *pDb = (char *)pIter; char *pDb = (char *)pIter;
size_t len = 0; size_t len = 0;
void *pKey = taosHashGetKey(pDb, &len); void *pKey = taosHashGetKey(pDb, &len);
char *p = strndup(pKey, len);
mDebug("clear checkpoint trans in Db:%s", p);
doKillCheckpointTrans(pMnode, pKey, len); doKillCheckpointTrans(pMnode, pKey, len);
taosMemoryFree(p);
} }
mDebug("complete clear checkpoints in Dbs");
} }
// this function runs by only one thread, so it is not multi-thread safe // this function runs by only one thread, so it is not multi-thread safe
@ -2649,7 +2657,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
execInfo.pNodeList = pNodeSnapshot; execInfo.pNodeList = pNodeSnapshot;
execInfo.ts = ts; execInfo.ts = ts;
} else { } else {
mDebug("unexpect code during create nodeUpdate trans, code:%s", tstrerror(code)); mError("unexpected code during create nodeUpdate trans, code:%s", tstrerror(code));
taosArrayDestroy(pNodeSnapshot); taosArrayDestroy(pNodeSnapshot);
} }
} else { } else {
@ -2844,6 +2852,8 @@ void killTransImpl(SMnode *pMnode, int32_t transId, const char *pDbName) {
mInfo("kill active transId:%d in Db:%s", transId, pDbName); mInfo("kill active transId:%d in Db:%s", transId, pDbName);
mndKillTrans(pMnode, pTrans); mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans); mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire trans in Db:%s, transId:%d", pDbName, transId);
} }
} }
@ -2869,15 +2879,7 @@ int32_t doKillCheckpointTrans(SMnode *pMnode, const char *pDBName, size_t len) {
static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) { static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int64_t streamId, int32_t transId) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
killTransImpl(pMnode, transId, "");
STrans *pTrans = mndAcquireTrans(pMnode, transId);
if (pTrans != NULL) {
mInfo("kill checkpoint transId:%d to reset task status", transId);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
} else {
mError("failed to acquire checkpoint trans:%d", transId);
}
SStreamObj *pStream = mndGetStreamObj(pMnode, streamId); SStreamObj *pStream = mndGetStreamObj(pMnode, streamId);
if (pStream == NULL) { if (pStream == NULL) {
@ -3033,7 +3035,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SStreamHbMsg req = {0}; SStreamHbMsg req = {0};
bool checkpointFailed = false; bool checkpointFailed = false;
int64_t activeCheckpointId = 0; int64_t checkpointId = 0;
int64_t streamId = 0; int64_t streamId = 0;
int32_t transId = 0; int32_t transId = 0;
@ -3095,14 +3097,17 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
streamTaskStatusCopy(pTaskEntry, p); streamTaskStatusCopy(pTaskEntry, p);
if (p->activeCheckpointId != 0) { if (p->checkpointId != 0) {
if (activeCheckpointId != 0) { if (checkpointId != 0) {
ASSERT(activeCheckpointId == p->activeCheckpointId); ASSERT(checkpointId == p->checkpointId);
} else { } else {
activeCheckpointId = p->activeCheckpointId; checkpointId = p->checkpointId;
} }
if (p->checkpointFailed) { if (p->checkpointFailed) {
mError("stream task:0x%" PRIx64 " checkpointId:%" PRIx64 " transId:%d failed, kill it", p->id.taskId,
p->checkpointId, p->chkpointTransId);
checkpointFailed = p->checkpointFailed; checkpointFailed = p->checkpointFailed;
streamId = p->id.streamId; streamId = p->id.streamId;
transId = p->chkpointTransId; transId = p->chkpointTransId;
@ -3124,17 +3129,17 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
// current checkpoint is failed, rollback from the checkpoint trans // current checkpoint is failed, rollback from the checkpoint trans
// kill the checkpoint trans and then set all tasks status to be normal // kill the checkpoint trans and then set all tasks status to be normal
if (checkpointFailed && activeCheckpointId != 0) { if (checkpointFailed && checkpointId != 0) {
bool allReady = true; bool allReady = true;
SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady); SArray *p = mndTakeVgroupSnapshot(pMnode, &allReady);
taosArrayDestroy(p); taosArrayDestroy(p);
if (allReady || snodeChanged) { if (allReady || snodeChanged) {
// if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal // if the execInfo.activeCheckpoint == 0, the checkpoint is restoring from wal
mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", activeCheckpointId); mInfo("checkpointId:%" PRId64 " failed, issue task-reset trans to reset all tasks status", checkpointId);
mndResetStatusFromCheckpoint(pMnode, streamId, transId); mndResetStatusFromCheckpoint(pMnode, streamId, transId);
} else { } else {
mInfo("not all vgroups are ready, wait for next HB from stream tasks"); mInfo("not all vgroups are ready, wait for next HB from stream tasks to reset the task status");
} }
} }
@ -3148,6 +3153,7 @@ void freeCheckpointCandEntry(void *param) {
SCheckpointCandEntry *pEntry = param; SCheckpointCandEntry *pEntry = param;
taosMemoryFreeClear(pEntry->pName); taosMemoryFreeClear(pEntry->pName);
} }
SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) { SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
void *pIter = NULL; void *pIter = NULL;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;

View File

@ -89,7 +89,7 @@ bool mndStreamTransConflictCheck(SMnode* pMnode, int64_t streamUid, const char*
} }
if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) { if (strcmp(tInfo.name, MND_STREAM_CHECKPOINT_NAME) == 0) {
if (strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) { if ((strcmp(pTransName, MND_STREAM_DROP_NAME) != 0) && (strcmp(pTransName, MND_STREAM_TASK_RESET_NAME) != 0)) {
mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid, mWarn("conflict with other transId:%d streamUid:0x%" PRIx64 ", trans:%s", tInfo.transId, tInfo.streamUid,
tInfo.name); tInfo.name);
terrno = TSDB_CODE_MND_TRANS_CONFLICT; terrno = TSDB_CODE_MND_TRANS_CONFLICT;

View File

@ -145,8 +145,7 @@ FAIL:
} }
int32_t sndInit(SSnode *pSnode) { int32_t sndInit(SSnode *pSnode) {
int32_t numOfTasks = 0; streamMetaResetTaskStatus(pSnode->pMeta);
tqStreamTaskResetStatus(pSnode->pMeta, &numOfTasks);
streamMetaStartAllTasks(pSnode->pMeta); streamMetaStartAllTasks(pSnode->pMeta);
return 0; return 0;
} }

View File

@ -175,11 +175,11 @@ int32_t tqStopStreamTasksAsync(STQ* pTq) {
SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq)); SStreamTaskRunReq* pRunReq = rpcMallocCont(sizeof(SStreamTaskRunReq));
if (pRunReq == NULL) { if (pRunReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
tqError("vgId:%d failed to create msg to stop tasks, code:%s", vgId, terrstr()); tqError("vgId:%d failed to create msg to stop tasks async, code:%s", vgId, terrstr());
return -1; return -1;
} }
tqDebug("vgId:%d create msg to stop tasks", vgId); tqDebug("vgId:%d create msg to stop all tasks async", vgId);
pRunReq->head.vgId = vgId; pRunReq->head.vgId = vgId;
pRunReq->streamId = 0; pRunReq->streamId = 0;

View File

@ -103,8 +103,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
STaskId id = {.streamId = req.streamId, .taskId = req.taskId}; STaskId id = {.streamId = req.streamId, .taskId = req.taskId};
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (ppTask == NULL || *ppTask == NULL) { if (ppTask == NULL || *ppTask == NULL) {
tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped already", vgId, tqError("vgId:%d failed to acquire task:0x%x when handling update, it may have been dropped", vgId, req.taskId);
req.taskId);
rsp.code = TSDB_CODE_SUCCESS; rsp.code = TSDB_CODE_SUCCESS;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
@ -124,6 +123,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId); tqDebug("s-task:%s recv trans to update nodeEp from mnode, transId:%d", idstr, req.transId);
} }
// duplicate update epset msg received, discard this redundant message
STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId}; STaskUpdateEntry entry = {.streamId = req.streamId, .taskId = req.taskId, .transId = req.transId};
void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry)); void* pReqTask = taosHashGet(pMeta->updateInfo.pTasks, &entry, sizeof(STaskUpdateEntry));
@ -135,7 +135,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
return rsp.code; return rsp.code;
} }
// the following two functions should not be executed within the scope of meta lock to avoid deadlock
streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
streamTaskResetStatus(pTask); streamTaskResetStatus(pTask);
@ -159,11 +158,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
streamMetaSaveTask(pMeta, *ppHTask); streamMetaSaveTask(pMeta, *ppHTask);
} }
#if 0
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
#endif
} else { } else {
tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId); tqDebug("s-task:%s vgId:%d not save since restore not finish", idstr, vgId);
} }
@ -171,7 +165,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId); tqDebug("s-task:%s vgId:%d start to stop task after save task", idstr, vgId);
streamTaskStop(pTask); streamTaskStop(pTask);
// keep the already handled info // keep the already updated info
taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0); taosHashPut(pMeta->updateInfo.pTasks, &entry, sizeof(entry), NULL, 0);
if (ppHTask != NULL) { if (ppHTask != NULL) {
@ -200,6 +194,10 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
updateTasks, (numOfTasks - updateTasks)); updateTasks, (numOfTasks - updateTasks));
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
} else { } else {
if (streamMetaCommit(pMeta) < 0) {
// persist to disk
}
if (!restored) { if (!restored) {
tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId); tqDebug("vgId:%d vnode restore not completed, not start the tasks, clear the start after nodeUpdate flag", vgId);
pMeta->startInfo.tasksWillRestart = 0; pMeta->startInfo.tasksWillRestart = 0;
@ -686,16 +684,16 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen
return 0; return 0;
} }
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta, int32_t* numOfTasks) { int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
*numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, *numOfTasks); tqDebug("vgId:%d reset all %d stream task(s) status to be uninit", vgId, numOfTasks);
if (*numOfTasks == 0) { if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
for (int32_t i = 0; i < (*numOfTasks); ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId}; STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
@ -754,8 +752,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
} }
if (isLeader && !tsDisableStream) { if (isLeader && !tsDisableStream) {
int32_t numOfTasks = 0; streamMetaResetTaskStatus(pMeta);
tqStreamTaskResetStatus(pMeta, &numOfTasks);
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
streamMetaStartAllTasks(pMeta); streamMetaStartAllTasks(pMeta);
@ -965,8 +962,7 @@ static int32_t tqProcessTaskResumeImpl(void* handle, SStreamTask* pTask, int64_t
} else if (status == TASK_STATUS__UNINIT) { } else if (status == TASK_STATUS__UNINIT) {
// todo: fill-history task init ? // todo: fill-history task init ?
if (pTask->info.fillHistory == 0) { if (pTask->info.fillHistory == 0) {
EStreamTaskEvent event = /*HAS_RELATED_FILLHISTORY_TASK(pTask) ? TASK_EVENT_INIT_STREAM_SCANHIST : */TASK_EVENT_INIT; streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
streamTaskHandleEvent(pTask->status.pSM, event);
} }
} }
@ -990,4 +986,8 @@ int32_t tqStreamTaskProcessTaskResumeReq(void* handle, int64_t sversion, char* m
} }
return code; return code;
}
int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta) {
return taosArrayGetSize(pMeta->pTaskList);
} }

View File

@ -570,9 +570,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId); vInfo("vgId:%d, sync restore finished, not launch stream tasks, since stream tasks are disabled", vgId);
} else { } else {
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId); vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
int32_t numOfTasks = 0; int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta);
tqStreamTaskResetStatus(pMeta, &numOfTasks);
if (numOfTasks > 0) { if (numOfTasks > 0) {
if (pMeta->startInfo.taskStarting == 1) { if (pMeta->startInfo.taskStarting == 1) {
pMeta->startInfo.restartCount += 1; pMeta->startInfo.restartCount += 1;
@ -580,6 +578,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
pMeta->startInfo.restartCount); pMeta->startInfo.restartCount);
} else { } else {
pMeta->startInfo.taskStarting = 1; pMeta->startInfo.taskStarting = 1;
streamMetaWUnLock(pMeta); streamMetaWUnLock(pMeta);
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false); tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
return; return;

View File

@ -360,6 +360,8 @@ int32_t streamSaveTaskCheckpointInfo(SStreamTask* p, int64_t checkpointId) {
void streamTaskSetCheckpointFailedId(SStreamTask* pTask) { void streamTaskSetCheckpointFailedId(SStreamTask* pTask) {
pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId; pTask->chkInfo.failedId = pTask->chkInfo.checkpointingId;
stDebug("s-task:%s mark the checkpointId:%" PRId64 " (transId:%d) failed", pTask->id.idStr,
pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
} }
int32_t getChkpMeta(char* id, char* path, SArray* list) { int32_t getChkpMeta(char* id, char* path, SArray* list) {

View File

@ -960,7 +960,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1; if (tEncodeI64(pEncoder, ps->processedVer) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1; if (tEncodeI64(pEncoder, ps->verStart) < 0) return -1;
if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1; if (tEncodeI64(pEncoder, ps->verEnd) < 0) return -1;
if (tEncodeI64(pEncoder, ps->activeCheckpointId) < 0) return -1; if (tEncodeI64(pEncoder, ps->checkpointId) < 0) return -1;
if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1; if (tEncodeI8(pEncoder, ps->checkpointFailed) < 0) return -1;
if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1; if (tEncodeI32(pEncoder, ps->chkpointTransId) < 0) return -1;
} }
@ -999,8 +999,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1; if (tDecodeI64(pDecoder, &entry.processedVer) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verStart) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1; if (tDecodeI64(pDecoder, &entry.verEnd) < 0) return -1;
if (tDecodeI64(pDecoder, &entry.activeCheckpointId) < 0) return -1; if (tDecodeI64(pDecoder, &entry.checkpointId) < 0) return -1;
if (tDecodeI8(pDecoder, (int8_t*)&entry.checkpointFailed) < 0) return -1; if (tDecodeI8(pDecoder, &entry.checkpointFailed) < 0) return -1;
if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1; if (tDecodeI32(pDecoder, &entry.chkpointTransId) < 0) return -1;
entry.id.taskId = taskId; entry.id.taskId = taskId;
@ -1113,8 +1113,8 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
} }
if ((*pTask)->chkInfo.checkpointingId != 0) { if ((*pTask)->chkInfo.checkpointingId != 0) {
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId); entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId)? 1:0;
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId; entry.checkpointId = (*pTask)->chkInfo.checkpointingId;
entry.chkpointTransId = (*pTask)->chkInfo.transId; entry.chkpointTransId = (*pTask)->chkInfo.transId;
if (entry.checkpointFailed) { if (entry.checkpointFailed) {
@ -1373,7 +1373,6 @@ SArray* streamMetaSendMsgBeforeCloseTasks(SStreamMeta* pMeta) {
SStreamTaskState* pState = streamTaskGetStatus(pTask); SStreamTaskState* pState = streamTaskGetStatus(pTask);
if (pState->state == TASK_STATUS__CK) { if (pState->state == TASK_STATUS__CK) {
streamTaskSetCheckpointFailedId(pTask); streamTaskSetCheckpointFailedId(pTask);
stDebug("s-task:%s mark the checkpoint:%"PRId64" failed", pTask->id.idStr, pTask->chkInfo.checkpointingId);
} else { } else {
stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name); stDebug("s-task:%s status:%s not reset the checkpoint", pTask->id.idStr, pState->name);
} }
@ -1411,31 +1410,43 @@ void streamMetaUpdateStageRole(SStreamMeta* pMeta, int64_t stage, bool isLeader)
} }
} }
static SArray* prepareBeforeStartTasks(SStreamMeta* pMeta) {
streamMetaWLock(pMeta);
SArray* pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaResetTaskStatus(pMeta);
streamMetaWUnLock(pMeta);
return pTaskList;
}
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) { int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
int32_t vgId = pMeta->vgId; int32_t vgId = pMeta->vgId;
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList); int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks); stInfo("vgId:%d start to check all %d stream task(s) downstream status", vgId, numOfTasks);
if (numOfTasks == 0) { if (numOfTasks == 0) {
stInfo("vgId:%d start tasks completed", pMeta->vgId); stInfo("vgId:%d start tasks completed", pMeta->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
SArray* pTaskList = NULL;
streamMetaWLock(pMeta);
pTaskList = taosArrayDup(pMeta->pTaskList, NULL);
taosHashClear(pMeta->startInfo.pReadyTaskSet);
taosHashClear(pMeta->startInfo.pFailedTaskSet);
pMeta->startInfo.startTs = taosGetTimestampMs();
streamMetaWUnLock(pMeta);
// broadcast the check downstream tasks msg
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
SArray* pTaskList = prepareBeforeStartTasks(pMeta);
numOfTasks = taosArrayGetSize(pTaskList);
// broadcast the check downstream tasks msg
for (int32_t i = 0; i < numOfTasks; ++i) { for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i); SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
// todo: may be we should find the related fill-history task and set it failed.
// todo: use hashTable instead
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId); SStreamTask* pTask = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId);
if (pTask == NULL) { if (pTask == NULL) {
stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId); stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
@ -1443,8 +1454,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
continue; continue;
} }
// todo: may be we should find the related fill-history task and set it failed.
// fill-history task can only be launched by related stream tasks. // fill-history task can only be launched by related stream tasks.
STaskExecStatisInfo* pInfo = &pTask->execInfo; STaskExecStatisInfo* pInfo = &pTask->execInfo;
if (pTask->info.fillHistory == 1) { if (pTask->info.fillHistory == 1) {
@ -1491,10 +1500,13 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
int32_t num = taosArrayGetSize(pMeta->pTaskList); int32_t num = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num); stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
if (num == 0) { if (num == 0) {
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int64_t st = taosGetTimestampMs();
// send hb msg to mnode before closing all tasks. // send hb msg to mnode before closing all tasks.
SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta); SArray* pTaskList = streamMetaSendMsgBeforeCloseTasks(pMeta);
int32_t numOfTasks = taosArrayGetSize(pTaskList); int32_t numOfTasks = taosArrayGetSize(pTaskList);
@ -1512,6 +1524,9 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
taosArrayDestroy(pTaskList); taosArrayDestroy(pTaskList);
double el = (taosGetTimestampMs() - st) / 1000.0;
stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);
return 0; return 0;
} }
@ -1638,4 +1653,23 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
}
int32_t streamMetaResetTaskStatus(SStreamMeta* pMeta) {
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
stDebug("vgId:%d reset all %d stream task(s) status to be uninit", pMeta->vgId, numOfTasks);
if (numOfTasks == 0) {
return TSDB_CODE_SUCCESS;
}
for (int32_t i = 0; i < numOfTasks; ++i) {
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
STaskId id = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
SStreamTask** pTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
streamTaskResetStatus(*pTask);
}
return 0;
} }

View File

@ -810,7 +810,7 @@ void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc)
pDst->verEnd = pSrc->verEnd; pDst->verEnd = pSrc->verEnd;
pDst->sinkQuota = pSrc->sinkQuota; pDst->sinkQuota = pSrc->sinkQuota;
pDst->sinkDataSize = pSrc->sinkDataSize; pDst->sinkDataSize = pSrc->sinkDataSize;
pDst->activeCheckpointId = pSrc->activeCheckpointId; pDst->checkpointId = pSrc->checkpointId;
pDst->checkpointFailed = pSrc->checkpointFailed; pDst->checkpointFailed = pSrc->checkpointFailed;
pDst->chkpointTransId = pSrc->chkpointTransId; pDst->chkpointTransId = pSrc->chkpointTransId;
} }

View File

@ -676,7 +676,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak, int64_
// heartbeat timeout // heartbeat timeout
if (syncNodeHeartbeatReplyTimeout(pSyncNode)) { if (syncNodeHeartbeatReplyTimeout(pSyncNode)) {
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY; terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
sNError(pSyncNode, "failed to sync propose since hearbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64, sNError(pSyncNode, "failed to sync propose since heartbeat timeout, type:%s, last:%" PRId64 ", cmt:%" PRId64,
TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex); TMSG_INFO(pMsg->msgType), syncNodeGetLastIndex(pSyncNode), pSyncNode->commitIndex);
return -1; return -1;
} }