Merge pull request #26619 from taosdata/fix/3_liaohj

fix(stream): adjust the time to free task backend.
This commit is contained in:
Haojun Liao 2024-08-09 09:09:21 +08:00 committed by GitHub
commit 6fc8c7bd4e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
24 changed files with 526 additions and 247 deletions

View File

@ -13,8 +13,8 @@ extern "C" {
void stopRsync(); void stopRsync();
int32_t startRsync(); int32_t startRsync();
int32_t uploadByRsync(const char* id, const char* path); int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId);
int32_t downloadRsync(const char* id, const char* path); int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId);
int32_t deleteRsync(const char* id); int32_t deleteRsync(const char* id);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -164,6 +164,7 @@ int32_t tDecodeStreamTaskCheckpointReq(SDecoder* pDecoder, SStreamTaskCheckpoint
typedef struct SStreamHbMsg { typedef struct SStreamHbMsg {
int32_t vgId; int32_t vgId;
int32_t msgId; int32_t msgId;
int64_t ts;
int32_t numOfTasks; int32_t numOfTasks;
SArray* pTaskStatus; // SArray<STaskStatusEntry> SArray* pTaskStatus; // SArray<STaskStatusEntry>
SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes. SArray* pUpdateNodes; // SArray<int32_t>, needs update the epsets in stream tasks for those nodes.

View File

@ -163,7 +163,7 @@ int32_t startRsync() {
return code; return code;
} }
int32_t uploadByRsync(const char* id, const char* path) { int32_t uploadByRsync(const char* id, const char* path, int64_t checkpointId) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
char command[PATH_MAX] = {0}; char command[PATH_MAX] = {0};
@ -203,12 +203,12 @@ int32_t uploadByRsync(const char* id, const char* path) {
// prepare the data directory // prepare the data directory
int32_t code = execCommand(command); int32_t code = execCommand(command);
if (code != 0) { if (code != 0) {
uError("[rsync] s-task:%s prepare checkpoint data in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path, uError("[rsync] s-task:%s prepare checkpoint dir in %s to %s failed, code:%d," ERRNO_ERR_FORMAT, id, path,
tsSnodeAddress, code, ERRNO_ERR_DATA); tsSnodeAddress, code, ERRNO_ERR_DATA);
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(errno);
} else { } else {
int64_t el = (taosGetTimestampMs() - st); int64_t el = (taosGetTimestampMs() - st);
uDebug("[rsync] s-task:%s prepare checkpoint data in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path, uDebug("[rsync] s-task:%s prepare checkpoint dir in:%s to %s successfully, elapsed time:%" PRId64 "ms", id, path,
tsSnodeAddress, el); tsSnodeAddress, el);
} }
@ -222,7 +222,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
#endif #endif
snprintf(command, PATH_MAX, snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ " "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s/ "
"rsync://%s/checkpoint/%s/data/", "rsync://%s/checkpoint/%s/%" PRId64 "/",
tsLogDir, tsLogDir,
#ifdef WINDOWS #ifdef WINDOWS
pathTransform pathTransform
@ -230,11 +230,11 @@ int32_t uploadByRsync(const char* id, const char* path) {
path path
#endif #endif
, ,
tsSnodeAddress, id); tsSnodeAddress, id, checkpointId);
} else { } else {
snprintf(command, PATH_MAX, snprintf(command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s " "rsync -av --debug=all --log-file=%s/rsynclog --delete --timeout=10 --bwlimit=100000 %s "
"rsync://%s/checkpoint/%s/data/", "rsync://%s/checkpoint/%s/%" PRId64 "/",
tsLogDir, tsLogDir,
#ifdef WINDOWS #ifdef WINDOWS
pathTransform pathTransform
@ -242,7 +242,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
path path
#endif #endif
, ,
tsSnodeAddress, id); tsSnodeAddress, id, checkpointId);
} }
code = execCommand(command); code = execCommand(command);
@ -260,7 +260,7 @@ int32_t uploadByRsync(const char* id, const char* path) {
} }
// abort from retry if quit // abort from retry if quit
int32_t downloadRsync(const char* id, const char* path) { int32_t downloadByRsync(const char* id, const char* path, int64_t checkpointId) {
int64_t st = taosGetTimestampMs(); int64_t st = taosGetTimestampMs();
int32_t MAX_RETRY = 10; int32_t MAX_RETRY = 10;
int32_t times = 0; int32_t times = 0;
@ -272,6 +272,38 @@ int32_t downloadRsync(const char* id, const char* path) {
#endif #endif
char command[PATH_MAX] = {0}; char command[PATH_MAX] = {0};
snprintf(
command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/%" PRId64
"/ %s",
tsLogDir, tsSnodeAddress, id, checkpointId,
#ifdef WINDOWS
pathTransform
#else
path
#endif
);
uDebug("[rsync] %s start to sync data from remote to:%s, cmd:%s", id, path, command);
code = execCommand(command);
if (code != TSDB_CODE_SUCCESS) {
uError("[rsync] %s download checkpointId:%" PRId64
" data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
id, checkpointId, path, times, code, ERRNO_ERR_DATA);
} else {
int32_t el = taosGetTimestampMs() - st;
uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
path, el);
}
if (code != TSDB_CODE_SUCCESS) { // if failed, try to load it from data directory
#ifdef WINDOWS
memset(pathTransform, 0, PATH_MAX);
changeDirFromWindowsToLinux(path, pathTransform);
#endif
memset(command, 0, PATH_MAX);
snprintf( snprintf(
command, PATH_MAX, command, PATH_MAX,
"rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s", "rsync -av --debug=all --log-file=%s/rsynclog --timeout=10 --bwlimit=100000 rsync://%s/checkpoint/%s/data/ %s",
@ -283,19 +315,18 @@ int32_t downloadRsync(const char* id, const char* path) {
#endif #endif
); );
uDebug("[rsync] %s start to sync data from remote to:%s, %s", id, path, command); uDebug("[rsync] %s start to sync data from remote data dir to:%s, cmd:%s", id, path, command);
while (times++ < MAX_RETRY) {
code = execCommand(command); code = execCommand(command);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
uError("[rsync] %s download checkpoint data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT, id, uError("[rsync] %s download checkpointId:%" PRId64
path, times, code, ERRNO_ERR_DATA); " data:%s failed, retry after 1sec, times:%d, code:%d," ERRNO_ERR_FORMAT,
taosSsleep(1); id, checkpointId, path, times, code, ERRNO_ERR_DATA);
code = TAOS_SYSTEM_ERROR(errno); code = TAOS_SYSTEM_ERROR(code);
} else { } else {
int32_t el = taosGetTimestampMs() - st; int32_t el = taosGetTimestampMs() - st;
uDebug("[rsync] %s download checkpoint data:%s successfully, elapsed time:%dms", id, path, el); uDebug("[rsync] %s download checkpointId:%" PRId64 " data:%s successfully, elapsed time:%dms", id, checkpointId,
break; path, el);
} }
} }
return code; return code;

View File

@ -109,7 +109,7 @@ static void dmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) {
int32_t svrVer = 0; int32_t svrVer = 0;
(void)taosVersionStrToInt(version, &svrVer); (void)taosVersionStrToInt(version, &svrVer);
if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) { if ((code = taosCheckVersionCompatible(pRpc->info.cliVer, svrVer, 3)) != 0) {
dError("Version not compatible, cli ver: %d, svr ver: %d", pRpc->info.cliVer, svrVer); dError("Version not compatible, cli ver: %d, svr ver: %d, ip:0x%x", pRpc->info.cliVer, svrVer, pRpc->info.conn.clientIp);
goto _OVER; goto _OVER;
} }

View File

@ -57,6 +57,12 @@ typedef struct SStreamTaskResetMsg {
int32_t transId; int32_t transId;
} SStreamTaskResetMsg; } SStreamTaskResetMsg;
typedef struct SChkptReportInfo {
SArray* pTaskList;
int64_t reportChkpt;
int64_t streamId;
} SChkptReportInfo;
typedef struct SStreamExecInfo { typedef struct SStreamExecInfo {
bool initTaskList; bool initTaskList;
SArray *pNodeList; SArray *pNodeList;
@ -66,7 +72,7 @@ typedef struct SStreamExecInfo {
SArray *pTaskList; SArray *pTaskList;
TdThreadMutex lock; TdThreadMutex lock;
SHashObj *pTransferStateStreams; SHashObj *pTransferStateStreams;
SHashObj *pChkptStreams; SHashObj *pChkptStreams; // use to update the checkpoint info, if all tasks send the checkpoint-report msgs
SHashObj *pStreamConsensus; SHashObj *pStreamConsensus;
SArray *pKilledChkptTrans; // SArray<SStreamTaskResetMsg> SArray *pKilledChkptTrans; // SArray<SStreamTaskResetMsg>
} SStreamExecInfo; } SStreamExecInfo;
@ -79,6 +85,8 @@ typedef struct SNodeEntry {
bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot. bool stageUpdated; // the stage has been updated due to the leader/follower change or node reboot.
SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes. SEpSet epset; // compare the epset to identify the vgroup tranferring between different dnodes.
int64_t hbTimestamp; // second int64_t hbTimestamp; // second
int32_t lastHbMsgId; // latest hb msgId
int64_t lastHbMsgTs;
} SNodeEntry; } SNodeEntry;
typedef struct { typedef struct {
@ -151,6 +159,8 @@ int32_t mndGetConsensusInfo(SHashObj *pHash, int64_t streamId, int32_t numOfTask
void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo); void mndAddConsensusTasks(SCheckpointConsensusInfo *pInfo, const SRestoreCheckpointInfo *pRestoreInfo);
void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo); void mndClearConsensusRspEntry(SCheckpointConsensusInfo *pInfo);
int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId); int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId);
int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId);
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId);
int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows); int32_t setStreamAttrInResBlock(SStreamObj *pStream, SSDataBlock *pBlock, int32_t numOfRows);
int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows); int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDataBlock *pBlock, int32_t numOfRows);

View File

@ -2139,7 +2139,7 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi
break; break;
} }
SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; SNodeEntry entry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
epsetAssign(&entry.epset, &pTask->info.epSet); epsetAssign(&entry.epset, &pTask->info.epSet);
(void)taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry)); (void)taosHashPut(pHash, &entry.nodeId, sizeof(entry.nodeId), &entry, sizeof(entry));
} }
@ -2319,7 +2319,7 @@ void saveTaskAndNodeInfoIntoBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode)
} }
if (!exist) { if (!exist) {
SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId}; SNodeEntry nodeEntry = {.hbTimestamp = -1, .nodeId = pTask->info.nodeId, .lastHbMsgId = -1};
epsetAssign(&nodeEntry.epset, &pTask->info.epSet); epsetAssign(&nodeEntry.epset, &pTask->info.epSet);
void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry); void* px = taosArrayPush(pExecNode->pNodeList, &nodeEntry);
@ -2454,8 +2454,45 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
return 0; return 0;
} }
static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pReport) { // valid the info according to the HbMsg
bool existed = false; static bool validateChkptReport(const SCheckpointReport *pReport, int64_t reportChkptId) {
STaskId id = {.streamId = pReport->streamId, .taskId = pReport->taskId};
STaskStatusEntry *pTaskEntry = taosHashGet(execInfo.pTaskMap, &id, sizeof(id));
if (pTaskEntry == NULL) {
mError("invalid checkpoint-report msg from task:0x%x, discard", pReport->taskId);
return false;
}
if (pTaskEntry->checkpointInfo.latestId >= pReport->checkpointId) {
mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " saved checkpointId:%" PRId64 " discard",
pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
return false;
}
// now the task in checkpoint procedure
if ((pTaskEntry->checkpointInfo.activeId != 0) && (pTaskEntry->checkpointInfo.activeId > pReport->checkpointId)) {
mError("s-task:0x%x invalid checkpoint-report msg, checkpointId:%" PRId64 " active checkpointId:%" PRId64
" discard",
pReport->taskId, pReport->checkpointId, pTaskEntry->checkpointInfo.activeId);
return false;
}
if (reportChkptId >= pReport->checkpointId) {
mError("s-task:0x%x expired checkpoint-report msg, checkpointId:%" PRId64 " already update checkpointId:%" PRId64
" discard",
pReport->taskId, pReport->checkpointId, reportChkptId);
return false;
}
return true;
}
static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SCheckpointReport *pReport) {
bool valid = validateChkptReport(pReport, reportChkptId);
if (!valid) {
return;
}
for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
STaskChkptInfo *p = taosArrayGet(pList, i); STaskChkptInfo *p = taosArrayGet(pList, i);
if (p == NULL) { if (p == NULL) {
@ -2463,12 +2500,21 @@ static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pRepor
} }
if (p->taskId == pReport->taskId) { if (p->taskId == pReport->taskId) {
existed = true; if (p->checkpointId > pReport->checkpointId) {
break; mError("s-task:0x%x invalid checkpoint-report msg, existed:%" PRId64 " req checkpointId:%" PRId64 ", discard",
pReport->taskId, p->checkpointId, pReport->checkpointId);
} else if (p->checkpointId < pReport->checkpointId) { // expired checkpoint-report msg, update it
mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
pReport->taskId, p->checkpointId, pReport->checkpointId);
memcpy(p, pReport, sizeof(STaskChkptInfo));
} else {
mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
}
return;
} }
} }
if (!existed) {
STaskChkptInfo info = { STaskChkptInfo info = {
.streamId = pReport->streamId, .streamId = pReport->streamId,
.taskId = pReport->taskId, .taskId = pReport->taskId,
@ -2480,10 +2526,12 @@ static void doAddReportStreamTask(SArray* pList, const SCheckpointReport* pRepor
.nodeId = pReport->nodeId, .nodeId = pReport->nodeId,
}; };
void* p = taosArrayPush(pList, &info); void *p = taosArrayPush(pList, &info);
if (p == NULL) { if (p == NULL) {
mError("failed to put into task list, taskId:0x%x", pReport->taskId); mError("failed to put into task list, taskId:0x%x", pReport->taskId);
} } else {
int32_t size = taosArrayGetSize(pList);
mDebug("stream:0x%"PRIx64" %d tasks has send checkpoint-report", pReport->streamId, size);
} }
} }
@ -2530,23 +2578,23 @@ int32_t mndProcessCheckpointReport(SRpcMsg *pReq) {
int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream);
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); SChkptReportInfo *pInfo = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
if (pReqTaskList == NULL) { if (pInfo == NULL) {
SArray *pList = taosArrayInit(4, sizeof(STaskChkptInfo)); SChkptReportInfo info = {.pTaskList = taosArrayInit(4, sizeof(STaskChkptInfo)), .streamId = req.streamId};
if (pList != NULL) { if (info.pTaskList != NULL) {
doAddReportStreamTask(pList, &req); doAddReportStreamTask(info.pTaskList, info.reportChkpt, &req);
code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &pList, POINTER_BYTES); code = taosHashPut(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId), &info, sizeof(info));
if (code) { if (code) {
mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId); mError("stream:0x%" PRIx64 " failed to put into checkpoint stream", req.streamId);
} }
pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId)); pInfo = (SChkptReportInfo *)taosHashGet(execInfo.pChkptStreams, &req.streamId, sizeof(req.streamId));
} }
} else { } else {
doAddReportStreamTask(*pReqTaskList, &req); doAddReportStreamTask(pInfo->pTaskList, pInfo->reportChkpt, &req);
} }
int32_t total = taosArrayGetSize(*pReqTaskList); int32_t total = taosArrayGetSize(pInfo->pTaskList);
if (total == numOfTasks) { // all tasks has send the reqs if (total == numOfTasks) { // all tasks has send the reqs
mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64 mInfo("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, checkpoint meta-info for checkpointId:%" PRId64
" will be issued soon", " will be issued soon",

View File

@ -211,6 +211,10 @@ int32_t mndProcessResetStatusReq(SRpcMsg *pReq) {
SStreamTaskResetMsg* pMsg = pReq->pCont; SStreamTaskResetMsg* pMsg = pReq->pCont;
mndKillTransImpl(pMnode, pMsg->transId, ""); mndKillTransImpl(pMnode, pMsg->transId, "");
streamMutexLock(&execInfo.lock);
(void) mndResetChkptReportInfo(execInfo.pChkptStreams, pMsg->streamId);
streamMutexUnlock(&execInfo.lock);
code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream); code = mndGetStreamObj(pMnode, pMsg->streamId, &pStream);
if (pStream == NULL || code != 0) { if (pStream == NULL || code != 0) {
code = TSDB_CODE_STREAM_TASK_NOT_EXIST; code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
@ -333,7 +337,8 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, msgId:%d", req.vgId, req.numOfTasks, req.msgId); mDebug("receive stream-meta hb from vgId:%d, active numOfTasks:%d, HbMsgId:%d, HbMsgTs:%" PRId64, req.vgId,
req.numOfTasks, req.msgId, req.ts);
pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo)); pFailedChkpt = taosArrayInit(4, sizeof(SFailedCheckpointInfo));
pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask)); pOrphanTasks = taosArrayInit(4, sizeof(SOrphanTask));
@ -356,6 +361,31 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
TAOS_RETURN(TSDB_CODE_INVALID_MSG); TAOS_RETURN(TSDB_CODE_INVALID_MSG);
} }
for(int32_t i = 0; i < taosArrayGetSize(execInfo.pNodeList); ++i) {
SNodeEntry* pEntry = taosArrayGet(execInfo.pNodeList, i);
if (pEntry == NULL) {
continue;
}
if (pEntry->nodeId != req.vgId) {
continue;
}
if ((pEntry->lastHbMsgId == req.msgId) && (pEntry->lastHbMsgTs == req.ts)) {
mError("vgId:%d HbMsgId:%d already handled, bh msg discard", pEntry->nodeId, req.msgId);
terrno = TSDB_CODE_INVALID_MSG;
doSendHbMsgRsp(terrno, &pReq->info, req.vgId, req.msgId);
streamMutexUnlock(&execInfo.lock);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return terrno;
} else {
pEntry->lastHbMsgId = req.msgId;
pEntry->lastHbMsgTs = req.ts;
}
}
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes); int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
if (numOfUpdated > 0) { if (numOfUpdated > 0) {
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId); mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
@ -393,6 +423,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
code = mndGetStreamObj(pMnode, p->id.streamId, &pStream); code = mndGetStreamObj(pMnode, p->id.streamId, &pStream);
if (code) { if (code) {
mError("stream obj not exist, failed to handle consensus checkpoint-info req, code:%s", tstrerror(code));
continue; continue;
} }
@ -426,7 +457,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
addIntoCheckpointList(pFailedChkpt, &info); addIntoCheckpointList(pFailedChkpt, &info);
// remove failed trans from pChkptStreams // remove failed trans from pChkptStreams
code = taosHashRemove(execInfo.pChkptStreams, &p->id.streamId, sizeof(p->id.streamId)); code = mndResetChkptReportInfo(execInfo.pChkptStreams, p->id.streamId);
if (code) { if (code) {
mError("failed to remove stream:0x%"PRIx64" in checkpoint stream list", p->id.streamId); mError("failed to remove stream:0x%"PRIx64" in checkpoint stream list", p->id.streamId);
} }
@ -484,14 +515,14 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
} }
if (pMnode != NULL) { // make sure that the unit test case can work if (pMnode != NULL) { // make sure that the unit test case can work
mndStreamSendUpdateChkptInfoMsg(pMnode); code = mndStreamSendUpdateChkptInfoMsg(pMnode);
} }
streamMutexUnlock(&execInfo.lock); streamMutexUnlock(&execInfo.lock);
doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId); doSendHbMsgRsp(TSDB_CODE_SUCCESS, &pReq->info, req.vgId, req.msgId);
cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks); cleanupAfterProcessHbMsg(&req, pFailedChkpt, pOrphanTasks);
return code; return code;
} }

View File

@ -129,6 +129,8 @@ int32_t mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady, SArray **pList) {
goto _err; goto _err;
} }
*allReady = true;
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) { if (pIter == NULL) {
@ -540,8 +542,7 @@ static int32_t doSetDropActionFromId(SMnode *pMnode, STrans *pTrans, SOrphanTask
} }
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, code = setTransAction(pTrans, pReq, sizeof(SVDropStreamTaskReq), TDMT_STREAM_TASK_DROP, &epset, 0, TSDB_CODE_VND_INVALID_VGROUP_ID);
TSDB_CODE_VND_INVALID_VGROUP_ID);
if (code != 0) { if (code != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
return code; return code;
@ -812,9 +813,13 @@ void removeExpiredNodeInfo(const SArray *pNodeSnapshot) {
} }
if (pEntry->nodeId == p->nodeId) { if (pEntry->nodeId == p->nodeId) {
p->hbTimestamp = pEntry->hbTimestamp;
void* px = taosArrayPush(pValidList, p); void* px = taosArrayPush(pValidList, p);
if (px == NULL) { if (px == NULL) {
mError("failed to put node into list, nodeId:%d", p->nodeId); mError("failed to put node into list, nodeId:%d", p->nodeId);
} else {
mDebug("vgId:%d ts:%" PRId64 " HbMsgId:%d is valid", p->nodeId, p->hbTimestamp, p->lastHbMsgId);
} }
break; break;
} }
@ -899,8 +904,9 @@ void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode) {
ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList)); ASSERT(taosHashGetSize(pExecNode->pTaskMap) == taosArrayGetSize(pExecNode->pTaskList));
// 2. remove stream entry in consensus hash table // 2. remove stream entry in consensus hash table and checkpoint-report hash table
(void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid); (void) mndClearConsensusCheckpointId(execInfo.pStreamConsensus, pStream->uid);
(void) mndClearChkptReportInfo(execInfo.pChkptStreams, pStream->uid);
streamMutexUnlock(&pExecNode->lock); streamMutexUnlock(&pExecNode->lock);
destroyStreamTaskIter(pIter); destroyStreamTaskIter(pIter);
@ -968,9 +974,8 @@ int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot) {
static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) { static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq)); SVUpdateCheckpointInfoReq *pReq = taosMemoryCalloc(1, sizeof(SVUpdateCheckpointInfoReq));
if (pReq == NULL) { if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq), mError("failed to malloc in reset stream, size:%" PRIzu ", code:%s", sizeof(SVUpdateCheckpointInfoReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY)); tstrerror(terrno));
return terrno; return terrno;
} }
@ -978,12 +983,14 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas
pReq->taskId = pTask->id.taskId; pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId; pReq->streamId = pTask->id.streamId;
SArray **pReqTaskList = (SArray **)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId)); SChkptReportInfo *pStreamItem = (SChkptReportInfo*)taosHashGet(execInfo.pChkptStreams, &pTask->id.streamId, sizeof(pTask->id.streamId));
ASSERT(pReqTaskList); if (pStreamItem == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t size = taosArrayGetSize(*pReqTaskList); int32_t size = taosArrayGetSize(pStreamItem->pTaskList);
for(int32_t i = 0; i < size; ++i) { for(int32_t i = 0; i < size; ++i) {
STaskChkptInfo* pInfo = taosArrayGet(*pReqTaskList, i); STaskChkptInfo* pInfo = taosArrayGet(pStreamItem->pTaskList, i);
if (pInfo == NULL) { if (pInfo == NULL) {
continue; continue;
} }
@ -1058,11 +1065,12 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
} }
mDebug("start to scan checkpoint report info"); mDebug("start to scan checkpoint report info");
streamMutexLock(&execInfo.lock);
while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) { while ((pIter = taosHashIterate(execInfo.pChkptStreams, pIter)) != NULL) {
SArray *pList = *(SArray **)pIter; SChkptReportInfo* px = (SChkptReportInfo *)pIter;
STaskChkptInfo *pInfo = taosArrayGet(pList, 0); STaskChkptInfo *pInfo = taosArrayGet(px->pTaskList, 0);
if (pInfo == NULL) { if (pInfo == NULL) {
continue; continue;
} }
@ -1075,12 +1083,11 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
if (p == NULL) { if (p == NULL) {
mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId); mError("failed to put stream into drop list:0x%" PRIx64, pInfo->streamId);
} }
continue; continue;
} }
int32_t total = mndGetNumOfStreamTasks(pStream); int32_t total = mndGetNumOfStreamTasks(pStream);
int32_t existed = (int32_t)taosArrayGetSize(pList); int32_t existed = (int32_t)taosArrayGetSize(px->pTaskList);
if (total == existed) { if (total == existed) {
mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info", mDebug("stream:0x%" PRIx64 " %s all %d tasks send checkpoint-report, start to update checkpoint-info",
@ -1088,14 +1095,11 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false); bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHKPT_UPDATE_NAME, false);
if (!conflict) { if (!conflict) {
code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, pList); code = mndCreateStreamChkptInfoUpdateTrans(pMnode, pStream, px->pTaskList);
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) { // remove this entry
void* p = taosArrayPush(pDropped, &pInfo->streamId); taosArrayClear(px->pTaskList);
if (p == NULL) { px->reportChkpt = pInfo->checkpointId;
mError("failed to remove stream:0x%" PRIx64, pInfo->streamId); mDebug("stream:0x%" PRIx64 " clear checkpoint-report list", pInfo->streamId);
} else {
mDebug("stream:0x%" PRIx64 " removed", pInfo->streamId);
}
} else { } else {
mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet", mDebug("stream:0x%" PRIx64 " not launch chkpt-meta update trans, due to checkpoint not finished yet",
pInfo->streamId); pInfo->streamId);
@ -1130,6 +1134,8 @@ int32_t mndScanCheckpointReportInfo(SRpcMsg *pReq) {
mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams); mDebug("drop %d stream(s) in checkpoint-report list, remain:%d", size, numOfStreams);
} }
streamMutexUnlock(&execInfo.lock);
taosArrayDestroy(pDropped); taosArrayDestroy(pDropped);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1314,7 +1320,7 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
int32_t code = 0; int32_t code = 0;
int32_t numOfStreams = taosHashGetSize(pHash); int32_t numOfStreams = taosHashGetSize(pHash);
if (numOfStreams == 0) { if (numOfStreams == 0) {
return TSDB_CODE_SUCCESS; return code;
} }
code = taosHashRemove(pHash, &streamId, sizeof(streamId)); code = taosHashRemove(pHash, &streamId, sizeof(streamId));
@ -1327,6 +1333,35 @@ int64_t mndClearConsensusCheckpointId(SHashObj* pHash, int64_t streamId) {
return code; return code;
} }
int64_t mndClearChkptReportInfo(SHashObj* pHash, int64_t streamId) {
int32_t code = 0;
int32_t numOfStreams = taosHashGetSize(pHash);
if (numOfStreams == 0) {
return code;
}
code = taosHashRemove(pHash, &streamId, sizeof(streamId));
if (code == 0) {
mDebug("drop stream:0x%" PRIx64 " in chkpt-report list, remain:%d", streamId, numOfStreams);
} else {
mError("failed to remove stream:0x%"PRIx64" in chkpt-report list, remain:%d", streamId, numOfStreams);
}
return code;
}
int32_t mndResetChkptReportInfo(SHashObj* pHash, int64_t streamId) {
SChkptReportInfo* pInfo = taosHashGet(pHash, &streamId, sizeof(streamId));
if (pInfo != NULL) {
taosArrayClear(pInfo->pTaskList);
mDebug("stream:0x%" PRIx64 " checkpoint-report list cleared, prev report checkpointId:%" PRId64, streamId,
pInfo->reportChkpt);
return 0;
}
return TSDB_CODE_MND_STREAM_NOT_EXIST;
}
static void mndShowStreamStatus(char *dst, SStreamObj *pStream) { static void mndShowStreamStatus(char *dst, SStreamObj *pStream) {
int8_t status = atomic_load_8(&pStream->status); int8_t status = atomic_load_8(&pStream->status);
if (status == STREAM_STATUS__NORMAL) { if (status == STREAM_STATUS__NORMAL) {

View File

@ -362,7 +362,7 @@ int32_t streamTaskSnapReaderClose(SStreamTaskReader* pReader);
int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData); int32_t streamTaskSnapRead(SStreamTaskReader* pReader, uint8_t** ppData);
int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter); int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTaskWriter** ppWriter);
int32_t streamTaskSnapWriterClose(SStreamTaskWriter* ppWriter, int8_t rollback); int32_t streamTaskSnapWriterClose(SStreamTaskWriter* ppWriter, int8_t rollback, int8_t loadTask);
int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData); int32_t streamTaskSnapWrite(SStreamTaskWriter* pWriter, uint8_t* pData, uint32_t nData);
int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader); int32_t streamStateSnapReaderOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamStateReader** ppReader);

View File

@ -192,7 +192,7 @@ int32_t streamTaskSnapWriterOpen(STQ* pTq, int64_t sver, int64_t ever, SStreamTa
return code; return code;
} }
int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) { int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback, int8_t loadTask) {
int32_t code = 0; int32_t code = 0;
STQ* pTq = pWriter->pTq; STQ* pTq = pWriter->pTq;
@ -213,6 +213,10 @@ int32_t streamTaskSnapWriterClose(SStreamTaskWriter* pWriter, int8_t rollback) {
} }
streamMetaWUnLock(pTq->pStreamMeta); streamMetaWUnLock(pTq->pStreamMeta);
taosMemoryFree(pWriter); taosMemoryFree(pWriter);
if (loadTask == 1) {
streamMetaLoadAllTasks(pTq->pStreamMeta);
}
return code; return code;
_err: _err:

View File

@ -563,7 +563,7 @@ int32_t tqStreamTaskProcessCheckpointReadyMsg(SStreamMeta* pMeta, SRpcMsg* pMsg)
pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId); pTask->id.idStr, req.downstreamTaskId, req.downstreamNodeId);
} }
code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamTaskId, req.downstreamNodeId); code = streamProcessCheckpointReadyMsg(pTask, req.checkpointId, req.downstreamNodeId, req.downstreamTaskId);
streamMetaReleaseTask(pMeta, pTask); streamMetaReleaseTask(pMeta, pTask);
if (code) { if (code) {
return code; return code;
@ -996,7 +996,13 @@ int32_t tqStreamTaskProcessRetrieveTriggerReq(SStreamMeta* pMeta, SRpcMsg* pMsg)
int64_t checkpointId = 0; int64_t checkpointId = 0;
streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId); streamTaskGetActiveCheckpointInfo(pTask, &transId, &checkpointId);
ASSERT(checkpointId == pReq->checkpointId); if (checkpointId != pReq->checkpointId) {
tqError("s-task:%s invalid checkpoint-trigger retrieve msg from 0x%" PRIx64 ", current checkpointId:%" PRId64
" req:%" PRId64,
pTask->id.idStr, pReq->downstreamTaskId, checkpointId, pReq->checkpointId);
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_INVALID_MSG;
}
if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) { if (streamTaskAlreadySendTrigger(pTask, pReq->downstreamNodeId)) {
// re-send the lost checkpoint-trigger msg to downstream task // re-send the lost checkpoint-trigger msg to downstream task

View File

@ -722,7 +722,8 @@ int32_t vnodeSnapWriterClose(SVSnapWriter *pWriter, int8_t rollback, SSnapshot *
} }
if (pWriter->pStreamTaskWriter) { if (pWriter->pStreamTaskWriter) {
code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback); code = streamTaskSnapWriterClose(pWriter->pStreamTaskWriter, rollback, pWriter->pStreamStateWriter == NULL ? 1 : 0);
if (code) goto _exit; if (code) goto _exit;
} }

View File

@ -48,6 +48,13 @@ extern "C" {
#define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0) #define stTrace(...) do { if (stDebugFlag & DEBUG_TRACE) { taosPrintLog("STM ", DEBUG_TRACE, stDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on // clang-format on
typedef struct SStreamTmrInfo {
int32_t activeCounter; // make sure only launch one checkpoint trigger check tmr
tmr_h tmrHandle;
int64_t launchChkptId;
int8_t isActive;
} SStreamTmrInfo;
struct SActiveCheckpointInfo { struct SActiveCheckpointInfo {
TdThreadMutex lock; TdThreadMutex lock;
int32_t transId; int32_t transId;
@ -59,12 +66,12 @@ struct SActiveCheckpointInfo {
SArray* pReadyMsgList; // SArray<STaskCheckpointReadyInfo*> SArray* pReadyMsgList; // SArray<STaskCheckpointReadyInfo*>
int8_t allUpstreamTriggerRecv; int8_t allUpstreamTriggerRecv;
SArray* pCheckpointReadyRecvList; // SArray<STaskDownstreamReadyInfo> SArray* pCheckpointReadyRecvList; // SArray<STaskDownstreamReadyInfo>
int32_t checkCounter; SStreamTmrInfo chkptTriggerMsgTmr;
tmr_h pChkptTriggerTmr; SStreamTmrInfo chkptReadyMsgTmr;
int32_t sendReadyCheckCounter;
tmr_h pSendReadyMsgTmr;
}; };
int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask);
typedef struct { typedef struct {
int8_t type; int8_t type;
SSDataBlock* pBlock; SSDataBlock* pBlock;
@ -222,7 +229,7 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta);
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType(); ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType();
int32_t streamTaskDownloadCheckpointData(const char* id, char* path); int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId);
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask); int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask);
int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask); int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask);

View File

@ -447,7 +447,7 @@ int32_t rebuildFromRemoteChkp_rsync(const char* key, char* checkpointPath, int64
cleanDir(defaultPath, key); cleanDir(defaultPath, key);
stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath); stDebug("clear local default dir before downloading checkpoint data:%s succ", defaultPath);
code = streamTaskDownloadCheckpointData(key, checkpointPath); code = streamTaskDownloadCheckpointData(key, checkpointPath, checkpointId);
if (code != 0) { if (code != 0) {
stError("failed to download checkpoint data:%s", key); stError("failed to download checkpoint data:%s", key);
return code; return code;
@ -482,7 +482,7 @@ int32_t rebuildDataFromS3(char* chkpPath, int64_t chkpId) {
int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) { int32_t rebuildFromRemoteChkp_s3(const char* key, char* chkpPath, int64_t chkpId, char* defaultPath) {
int8_t rename = 0; int8_t rename = 0;
int32_t code = streamTaskDownloadCheckpointData(key, chkpPath); int32_t code = streamTaskDownloadCheckpointData(key, chkpPath, chkpId);
if (code != 0) { if (code != 0) {
return code; return code;
} }
@ -683,7 +683,7 @@ static int32_t rebuildFromLocalCheckpoint(const char* pTaskIdStr, const char* ch
defaultPath); defaultPath);
} }
} else { } else {
code = TSDB_CODE_FAILED; code = terrno;
stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath); stError("%s no valid data for checkpointId:%" PRId64 " in %s", pTaskIdStr, checkpointId, checkpointPath);
} }
@ -763,7 +763,7 @@ int32_t restoreCheckpointData(const char* path, const char* key, int64_t chkptId
} }
if (code != 0) { if (code != 0) {
stError("failed to start stream backend at %s, restart from default defaultPath:%s, reason:%s", checkpointPath, stError("failed to start stream backend at %s, restart from defaultPath:%s, reason:%s", checkpointPath,
defaultPath, tstrerror(code)); defaultPath, tstrerror(code));
code = 0; // reset the error code code = 0; // reset the error code
} }

View File

@ -20,7 +20,7 @@
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName); static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
static int32_t deleteCheckpointFile(const char* id, const char* name); static int32_t deleteCheckpointFile(const char* id, const char* name);
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path); static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
static int32_t deleteCheckpoint(const char* id); static int32_t deleteCheckpoint(const char* id);
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName); static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask); static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
@ -297,14 +297,26 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
return code; return code;
} }
// if previous launched timer not started yet, not start a new timer
// todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch
// a new checkpoint-trigger timer right now.
// And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
// procedure to be stucked.
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref); stDebug("s-task:%s start checkpoint-trigger monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask); streamMetaAcquireOneTask(pTask);
if (pActiveInfo->pChkptTriggerTmr == NULL) { if (pTmrInfo->tmrHandle == NULL) {
pActiveInfo->pChkptTriggerTmr = taosTmrStart(checkpointTriggerMonitorFn, 100, pTask, streamTimer); pTmrInfo->tmrHandle = taosTmrStart(checkpointTriggerMonitorFn, 200, pTask, streamTimer);
} else { } else {
streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor"); streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
}
pTmrInfo->launchChkptId = pActiveInfo->activeId;
} else { // already launched, do nothing
stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
} }
} }
@ -349,7 +361,6 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
(void)streamTaskBuildCheckpoint(pTask); // todo: not handle error yet (void)streamTaskBuildCheckpoint(pTask); // todo: not handle error yet
} else { // source & agg tasks need to forward the checkpoint msg downwards } else { // source & agg tasks need to forward the checkpoint msg downwards
stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num); stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock); flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
// Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
@ -364,8 +375,8 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task // only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream, static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId, int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
const char* id, int32_t* pNotReady, int32_t* pTransId) { const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) {
bool received = false; *alreadyRecv = false;
int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList); int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
for (int32_t i = 0; i < size; ++i) { for (int32_t i = 0; i < size; ++i) {
STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i); STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
@ -374,12 +385,12 @@ static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t
} }
if (p->downstreamTaskId == downstreamTaskId) { if (p->downstreamTaskId == downstreamTaskId) {
received = true; (*alreadyRecv) = true;
break; break;
} }
} }
if (received) { if (*alreadyRecv) {
stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id, stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)), downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
numOfDownstream); numOfDownstream);
@ -415,6 +426,7 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
int32_t code = 0; int32_t code = 0;
int32_t notReady = 0; int32_t notReady = 0;
int32_t transId = 0; int32_t transId = 0;
bool alreadyHandled = false;
// 1. not in checkpoint status now // 1. not in checkpoint status now
SStreamTaskState pStat = streamTaskGetStatus(pTask); SStreamTaskState pStat = streamTaskGetStatus(pTask);
@ -433,13 +445,18 @@ int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId
streamMutexLock(&pInfo->lock); streamMutexLock(&pInfo->lock);
code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady, code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady,
&transId); &transId, &alreadyHandled);
streamMutexUnlock(&pInfo->lock); streamMutexUnlock(&pInfo->lock);
if ((notReady == 0) && (code == 0)) { if (alreadyHandled) {
stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again",
id, checkpointId, downstreamTaskId);
} else {
if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id); stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
(void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1); (void)appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
} }
}
return code; return code;
} }
@ -508,8 +525,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
if (pReq->checkpointId <= pInfo->checkpointId) { if (pReq->checkpointId <= pInfo->checkpointId) {
stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " checkpointVer:%" PRId64 stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
" no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64
" transId:%d ignored", " transId:%d ignored",
id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer, id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
pReq->transId); pReq->transId);
@ -550,14 +567,13 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
pInfo->processedVer <= pReq->checkpointVer); pInfo->processedVer <= pReq->checkpointVer);
// update only it is in checkpoint status.
if (pStatus.state == TASK_STATUS__CK) {
pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs; pInfo->checkpointTime = pReq->checkpointTs;
streamTaskClearCheckInfo(pTask, true); streamTaskClearCheckInfo(pTask, true);
if (pStatus.state == TASK_STATUS__CK) {
// todo handle error
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
} else { } else {
stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name); stDebug("s-task:0x%x vgId:%d not handle checkpoint-done event, status:%s", pReq->taskId, vgId, pStatus.name);
@ -670,7 +686,7 @@ int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t d
} }
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
code = streamTaskUploadCheckpoint(idStr, path); code = streamTaskUploadCheckpoint(idStr, path, checkpointId);
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId); stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
} else { } else {
@ -810,6 +826,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
@ -820,24 +837,24 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// check the status every 100ms // check the status every 100ms
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
return; return;
} }
if (++pActiveInfo->checkCounter < 100) { if (++pTmrInfo->activeCounter < 50) {
streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor"); streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
return; return;
} }
pActiveInfo->checkCounter = 0; pTmrInfo->activeCounter = 0;
stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now); stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__CK) { if (pState.state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); stDebug("s-task:%s vgId:%d not in checkpoint status, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
@ -847,7 +864,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// checkpoint-trigger recv flag is set, quit // checkpoint-trigger recv flag is set, quit
if (pActiveInfo->allUpstreamTriggerRecv) { if (pActiveInfo->allUpstreamTriggerRecv) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId, stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger, ref:%d", id, vgId,
ref); ref);
@ -867,6 +884,31 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno)); stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
streamMutexUnlock(&pActiveInfo->lock); streamMutexUnlock(&pActiveInfo->lock);
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
return;
}
if ((pTmrInfo->launchChkptId != pActiveInfo->activeId) || (pActiveInfo->activeId == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr, ref:%d",
id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return; return;
} }
@ -900,9 +942,9 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
// check every 100ms // check every 100ms
if (size > 0) { if (size > 0) {
stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id); stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
streamTmrReset(checkpointTriggerMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pChkptTriggerTmr, vgId, "trigger-recv-monitor"); streamTmrReset(checkpointTriggerMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "trigger-recv-monitor");
} else { } else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref); stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr, ref:%d", id, ref);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
} }
@ -1060,11 +1102,8 @@ void streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
streamMutexUnlock(&pInfo->lock); streamMutexUnlock(&pInfo->lock);
} }
int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) { int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
int32_t num = 0; int32_t num = 0;
streamMutexLock(&pInfo->lock);
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) { for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i); STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
if (p == NULL) { if (p == NULL) {
@ -1075,7 +1114,6 @@ int32_t streamTaskGetNumOfConfirmed(SStreamTask* pTask) {
num++; num++;
} }
} }
streamMutexUnlock(&pInfo->lock);
return num; return num;
} }
@ -1101,9 +1139,9 @@ void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
} }
} }
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
streamMutexUnlock(&pInfo->lock); streamMutexUnlock(&pInfo->lock);
int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pTask);
int32_t total = streamTaskGetNumOfDownstream(pTask); int32_t total = streamTaskGetNumOfDownstream(pTask);
if (taskId == 0) { if (taskId == 0) {
stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId); stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
@ -1198,7 +1236,7 @@ ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
} }
} }
int32_t streamTaskUploadCheckpoint(const char* id, const char* path) { int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) {
int32_t code = 0; int32_t code = 0;
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("invalid parameters in upload checkpoint, %s", id); stError("invalid parameters in upload checkpoint, %s", id);
@ -1206,7 +1244,7 @@ int32_t streamTaskUploadCheckpoint(const char* id, const char* path) {
} }
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
code = uploadByRsync(id, path); code = uploadByRsync(id, path, checkpointId);
if (code != 0) { if (code != 0) {
return TAOS_SYSTEM_ERROR(errno); return TAOS_SYSTEM_ERROR(errno);
} }
@ -1233,14 +1271,14 @@ int32_t downloadCheckpointDataByName(const char* id, const char* fname, const ch
return 0; return 0;
} }
int32_t streamTaskDownloadCheckpointData(const char* id, char* path) { int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) {
if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) { if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
stError("down checkpoint data parameters invalid"); stError("down checkpoint data parameters invalid");
return -1; return -1;
} }
if (strlen(tsSnodeAddress) != 0) { if (strlen(tsSnodeAddress) != 0) {
return downloadRsync(id, path); return downloadByRsync(id, path, checkpointId);
} else if (tsS3StreamEnabled) { } else if (tsS3StreamEnabled) {
return s3GetObjectsByPrefix(id, path); return s3GetObjectsByPrefix(id, path);
} }
@ -1281,6 +1319,8 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
ETaskStatus p = streamTaskGetStatus(pTask).state;
if (pTask->status.sendConsensusChkptId == true) { if (pTask->status.sendConsensusChkptId == true) {
stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id); stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
@ -1291,9 +1331,15 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (pTask->pBackend != NULL) {
streamFreeTaskState(pTask, p);
pTask->pBackend = NULL;
}
ASSERT(pTask->pBackend == NULL); ASSERT(pTask->pBackend == NULL);
pTask->status.requireConsensusChkptId = true; pTask->status.requireConsensusChkptId = true;
stDebug("s-task:%s set the require consensus-checkpointId flag", id);
return 0; return 0;
} }

View File

@ -823,28 +823,29 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SStreamTask* pTask = param; SStreamTask* pTask = param;
int32_t vgId = pTask->pMeta->vgId; int32_t vgId = pTask->pMeta->vgId;
const char* id = pTask->id.idStr; const char* id = pTask->id.idStr;
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
// check the status every 100ms // check the status every 100ms
if (streamTaskShouldStop(pTask)) { if (streamTaskShouldStop(pTask)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref); stDebug("s-task:%s vgId:%d status:stop, quit from monitor checkpoint-trigger, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
return; return;
} }
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; if (++pTmrInfo->activeCounter < 50) {
if (++pActiveInfo->sendReadyCheckCounter < 100) { streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor");
streamTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr, vgId, "chkpt-ready-monitor");
return; return;
} }
pActiveInfo->sendReadyCheckCounter = 0; pTmrInfo->activeCounter = 0;
stDebug("s-task:%s in sending checkpoint-ready msg monitor timer", id); stDebug("s-task:%s in sending checkpoint-ready msg monitor tmr", id);
streamMutexLock(&pTask->lock); streamMutexLock(&pTask->lock);
SStreamTaskState pState = streamTaskGetStatus(pTask); SStreamTaskState pState = streamTaskGetStatus(pTask);
if (pState.state != TASK_STATUS__CK) { if (pState.state != TASK_STATUS__CK) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId, stDebug("s-task:%s vgId:%d status:%s not in checkpoint, quit from monitor checkpoint-ready send, ref:%d", id, vgId,
pState.name, ref); pState.name, ref);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
@ -858,10 +859,21 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
SArray* pList = pActiveInfo->pReadyMsgList; SArray* pList = pActiveInfo->pReadyMsgList;
int32_t num = taosArrayGetSize(pList); int32_t num = taosArrayGetSize(pList);
// active checkpoint info is cleared for now if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
if ((pActiveInfo->activeId == 0) && (pActiveInfo->transId == 0) && (num == 0) && (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock); streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d ready-msg send tmr launched by previous checkpoint procedure, checkpointId:%" PRId64
", quit, ref:%d",
id, vgId, pTmrInfo->launchChkptId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask);
return;
}
// active checkpoint info is cleared for now
if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (num == 0) || (pTask->chkInfo.startTs == 0)) {
streamMutexUnlock(&pActiveInfo->lock);
int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref); stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from readyMsg send tmr, ref:%d", id, vgId, ref);
streamMetaReleaseTask(pTask->pMeta, pTask); streamMetaReleaseTask(pTask->pMeta, pTask);
@ -923,10 +935,10 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
} }
} }
streamTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr, vgId, "chkpt-ready-monitor"); streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor");
streamMutexUnlock(&pActiveInfo->lock); streamMutexUnlock(&pActiveInfo->lock);
} else { } else {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = streamCleanBeforeQuitTmr(pTmrInfo, pTask);
stDebug( stDebug(
"s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg " "s-task:%s vgId:%d recv of checkpoint-ready msg confirmed by all upstream task(s), clear checkpoint-ready msg "
"and quit from timer, ref:%d", "and quit from timer, ref:%d",
@ -975,22 +987,32 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
} }
} }
streamMutexUnlock(&pActiveInfo->lock);
stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num); stDebug("s-task:%s level:%d checkpoint-ready msg sent to all %d upstreams", id, pTask->info.taskLevel, num);
// start to check if checkpoint ready msg has successfully received by upstream tasks. // start to check if checkpoint ready msg has successfully received by upstream tasks.
if (pTask->info.taskLevel == TASK_LEVEL__SINK || pTask->info.taskLevel == TASK_LEVEL__AGG) { if (pTask->info.taskLevel == TASK_LEVEL__SINK || pTask->info.taskLevel == TASK_LEVEL__AGG) {
SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptReadyMsgTmr;
int8_t old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
if (old == 0) {
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref); stDebug("s-task:%s start checkpoint-ready monitor in 10s, ref:%d ", pTask->id.idStr, ref);
streamMetaAcquireOneTask(pTask); streamMetaAcquireOneTask(pTask);
if (pActiveInfo->pSendReadyMsgTmr == NULL) { if (pTmrInfo->tmrHandle == NULL) {
pActiveInfo->pSendReadyMsgTmr = taosTmrStart(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer); pTmrInfo->tmrHandle = taosTmrStart(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer);
} else { } else {
streamTmrReset(checkpointReadyMsgSendMonitorFn, 100, pTask, streamTimer, &pActiveInfo->pSendReadyMsgTmr, vgId, "chkpt-ready-monitor"); streamTmrReset(checkpointReadyMsgSendMonitorFn, 200, pTask, streamTimer, &pTmrInfo->tmrHandle, vgId, "chkpt-ready-monitor");
}
// mark the timer monitor checkpointId
pTmrInfo->launchChkptId = pActiveInfo->activeId;
} else {
stError("s-task:%s previous checkpoint-ready monitor tmr is set, not start new one", pTask->id.idStr);
} }
} }
streamMutexUnlock(&pActiveInfo->lock);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1267,17 +1289,18 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
} }
} }
static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp, const char* id) { static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t code, int64_t now, int32_t* pNotRsp,
int32_t* pFailed, const char* id) {
int32_t numOfRsp = 0; int32_t numOfRsp = 0;
bool alreadySet = false; int32_t numOfFailed = 0;
bool updated = false;
bool allRsp = false;
*pNotRsp = 0;
streamMutexLock(&pMsgInfo->lock); bool allRsp = false;
int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo); int32_t numOfDispatchBranch = taosArrayGetSize(pMsgInfo->pSendInfo);
for(int32_t i = 0; i < numOfDispatchBranch; ++i) { *pNotRsp = 0;
*pFailed = 0;
for (int32_t i = 0; i < numOfDispatchBranch; ++i) {
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i); SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i);
if (pEntry == NULL) { if (pEntry == NULL) {
continue; continue;
@ -1295,24 +1318,34 @@ static bool setDispatchRspInfo(SDispatchMsgInfo* pMsgInfo, int32_t vgId, int32_t
} }
if (pEntry->nodeId == vgId) { if (pEntry->nodeId == vgId) {
ASSERT(!alreadySet); if (pEntry->rspTs != -1) {
stDebug("s-task:%s dispatch rsp has already recved at:%" PRId64 ", ignore this rsp, msgId:%d", id,
pEntry->rspTs, pMsgInfo->msgId);
allRsp = false;
} else {
pEntry->rspTs = now; pEntry->rspTs = now;
pEntry->status = code; pEntry->status = code;
alreadySet = true;
updated = true;
numOfRsp += 1; numOfRsp += 1;
allRsp = (numOfRsp == numOfDispatchBranch);
stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j, stDebug("s-task:%s record the rsp recv, ts:%" PRId64 " code:%d, idx:%d, total recv:%d/%d", id, now, code, j,
numOfRsp, numOfDispatchBranch); numOfRsp, numOfDispatchBranch);
} }
break;
}
} }
// this code may be error code.
for (int32_t i = 0; i < numOfDispatchBranch; ++i) {
SDispatchEntry* pEntry = taosArrayGet(pMsgInfo->pSendInfo, i);
if (pEntry->status != TSDB_CODE_SUCCESS || isDispatchRspTimeout(pEntry, now)) {
numOfFailed += 1;
}
}
*pFailed = numOfFailed;
*pNotRsp = numOfDispatchBranch - numOfRsp; *pNotRsp = numOfDispatchBranch - numOfRsp;
allRsp = (numOfRsp == numOfDispatchBranch);
streamMutexUnlock(&pMsgInfo->lock);
ASSERT(updated);
return allRsp; return allRsp;
} }
@ -1345,15 +1378,23 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int64_t now = taosGetTimestampMs(); int64_t now = taosGetTimestampMs();
bool allRsp = false; bool allRsp = false;
int32_t notRsp = 0; int32_t notRsp = 0;
int32_t numOfFailed = 0;
bool triggerDispatchRsp = false;
// we only set the dispatch msg info for current checkpoint trans
streamMutexLock(&pTask->lock);
triggerDispatchRsp = (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) &&
(pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId);
streamMutexUnlock(&pTask->lock);
streamMutexLock(&pMsgInfo->lock); streamMutexLock(&pMsgInfo->lock);
int32_t msgId = pMsgInfo->msgId; int32_t msgId = pMsgInfo->msgId;
streamMutexUnlock(&pMsgInfo->lock);
// follower not handle the dispatch rsp // follower not handle the dispatch rsp
if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) { if ((pTask->pMeta->role == NODE_ROLE_FOLLOWER) || (pTask->status.downstreamReady != 1)) {
stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id, stError("s-task:%s vgId:%d is follower or task just re-launched, not handle the dispatch rsp, discard it", id,
vgId); vgId);
streamMutexUnlock(&pMsgInfo->lock);
return TSDB_CODE_STREAM_TASK_NOT_EXIST; return TSDB_CODE_STREAM_TASK_NOT_EXIST;
} }
@ -1362,6 +1403,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64 stError("s-task:%s vgId:%d not expect rsp, expected: msgId:%d, stage:%" PRId64 " actual msgId:%d, stage:%" PRId64
" discard it", " discard it",
id, vgId, msgId, pTask->pMeta->stage, pRsp->msgId, pRsp->stage); id, vgId, msgId, pTask->pMeta->stage, pRsp->msgId, pRsp->stage);
streamMutexUnlock(&pMsgInfo->lock);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
@ -1373,18 +1415,18 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore if (code == TSDB_CODE_STREAM_TASK_NOT_EXIST) { // destination task does not exist, not retry anymore
stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already", stError("s-task:%s failed to dispatch msg to task:0x%x(vgId:%d), msgId:%d no retry, since task destroyed already",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId); id, pRsp->downstreamTaskId, pRsp->downstreamNodeId, msgId);
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, &notRsp, id); allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, &notRsp, &numOfFailed, id);
} else { } else {
stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId, stError("s-task:%s failed to dispatch msgId:%d to task:0x%x(vgId:%d), code:%s, add to retry list", id, msgId,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code)); pRsp->downstreamTaskId, pRsp->downstreamNodeId, tstrerror(code));
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, &notRsp, id); allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, code, now, &notRsp, &numOfFailed, id);
} }
} else { // code == 0 } else { // code == 0
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED; pTask->inputq.status = TASK_INPUT_STATUS__BLOCKED;
// block the input of current task, to push pressure to upstream // block the input of current task, to push pressure to upstream
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, &notRsp, id); allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, pRsp->inputStatus, now, &notRsp, &numOfFailed, id);
stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id, stTrace("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for retry dispatch", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId); pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} else { } else {
@ -1396,15 +1438,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} }
allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, &notRsp, id); allRsp = setDispatchRspInfo(pMsgInfo, pRsp->downstreamNodeId, TSDB_CODE_SUCCESS, now, &notRsp, &numOfFailed, id);
{ {
bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER); bool delayDispatch = (pMsgInfo->dispatchMsgType == STREAM_INPUT__CHECKPOINT_TRIGGER);
if (delayDispatch) { if (delayDispatch) {
streamMutexLock(&pTask->lock);
// we only set the dispatch msg info for current checkpoint trans // we only set the dispatch msg info for current checkpoint trans
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK && if (triggerDispatchRsp) {
pTask->chkInfo.pActiveInfo->activeId == pMsgInfo->checkpointId) {
ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId); ASSERT(pTask->chkInfo.pActiveInfo->transId == pMsgInfo->transId);
stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed", stDebug("s-task:%s checkpoint-trigger msg to 0x%x rsp for checkpointId:%" PRId64 " transId:%d confirmed",
pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId); pTask->id.idStr, pRsp->downstreamTaskId, pMsgInfo->checkpointId, pMsgInfo->transId);
@ -1415,12 +1455,13 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
" transId:%d discard, since expired", " transId:%d discard, since expired",
pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId); pTask->id.idStr, pMsgInfo->checkpointId, pMsgInfo->transId);
} }
streamMutexUnlock(&pTask->lock);
} }
} }
} }
} }
streamMutexUnlock(&pMsgInfo->lock);
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
if (!allRsp) { if (!allRsp) {
stDebug( stDebug(
@ -1439,16 +1480,12 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
} }
// all msg rsp already, continue // all msg rsp already, continue
if (allRsp) {
ASSERT(pTask->outputq.status == TASK_OUTPUT_STATUS__WAIT);
// we need to re-try send dispatch msg to downstream tasks // we need to re-try send dispatch msg to downstream tasks
int32_t numOfFailed = getFailedDispatchInfo(pMsgInfo, now); if (allRsp && (numOfFailed == 0)) {
if (numOfFailed == 0) { // this message has been sent successfully, let's try next one.
// trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state // trans-state msg has been sent to downstream successfully. let's transfer the fill-history task state
if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) { if (pMsgInfo->dispatchMsgType == STREAM_INPUT__TRANS_STATE) {
stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", stDebug("s-task:%s dispatch trans-state msgId:%d to downstream successfully, start to prepare transfer state", id,
id, msgId); msgId);
ASSERT(pTask->info.fillHistory == 1); ASSERT(pTask->info.fillHistory == 1);
code = streamTransferStatePrepare(pTask); code = streamTransferStatePrepare(pTask);
@ -1460,10 +1497,10 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
// now ready for next data output // now ready for next data output
atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputq.status, TASK_OUTPUT_STATUS__NORMAL);
} else { } else {
// this message has been sent successfully, let's try next one.
code = handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId); code = handleDispatchSuccessRsp(pTask, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} }
} }
}
return code; return code;
} }

View File

@ -142,11 +142,12 @@ int32_t streamMetaSendHbHelper(SStreamMeta* pMeta) {
} }
SStreamHbMsg* pMsg = &pInfo->hbMsg; SStreamHbMsg* pMsg = &pInfo->hbMsg;
stDebug("vgId:%d build stream hbMsg, leader:%d msgId:%d", pMeta->vgId, (pMeta->role == NODE_ROLE_LEADER),
pMeta->pHbInfo->hbCount);
pMsg->vgId = pMeta->vgId; pMsg->vgId = pMeta->vgId;
pMsg->msgId = pMeta->pHbInfo->hbCount; pMsg->msgId = pMeta->pHbInfo->hbCount;
pMsg->ts = taosGetTimestampMs();
stDebug("vgId:%d build stream hbMsg, leader:%d HbMsgId:%d, HbMsgTs:%" PRId64, pMeta->vgId,
(pMeta->role == NODE_ROLE_LEADER), pMsg->msgId, pMsg->ts);
pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry)); pMsg->pTaskStatus = taosArrayInit(numOfTasks, sizeof(STaskStatusEntry));
pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t)); pMsg->pUpdateNodes = taosArrayInit(numOfTasks, sizeof(int32_t));
@ -292,7 +293,7 @@ void streamMetaHbToMnode(void* param, void* tmrId) {
streamMetaRLock(pMeta); streamMetaRLock(pMeta);
code = streamMetaSendHbHelper(pMeta); code = streamMetaSendHbHelper(pMeta);
if (code) { if (code) {
stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, strerror(code)); stError("vgId:%d failed to send hmMsg to mnode, try again in 5s, code:%s", pMeta->vgId, tstrerror(code));
} }
streamMetaRUnLock(pMeta); streamMetaRUnLock(pMeta);

View File

@ -318,7 +318,19 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, SStreamTask* pTask, const char* key)
pBackend->pTask = pTask; pBackend->pTask = pTask;
pBackend->pMeta = pMeta; pBackend->pMeta = pMeta;
if (processVer != -1) pTask->chkInfo.processedVer = processVer; if (processVer != -1) {
if (pTask->chkInfo.processedVer != processVer) {
stWarn("s-task:%s vgId:%d update checkpointVer:%" PRId64 "->%" PRId64 " for checkpointId:%" PRId64,
pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, processVer, pTask->chkInfo.checkpointId);
pTask->chkInfo.processedVer = processVer;
pTask->chkInfo.checkpointVer = processVer;
pTask->chkInfo.nextProcessVer = processVer + 1;
} else {
stInfo("s-task:%s vgId:%d processedVer:%" PRId64
" in task meta equals to data in checkpoint data for checkpointId:%" PRId64,
pTask->id.idStr, pTask->pMeta->vgId, pTask->chkInfo.processedVer, pTask->chkInfo.checkpointId);
}
}
code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*)); code = taosHashPut(pMeta->pTaskDbUnique, key, strlen(key), &pBackend, sizeof(void*));
if (code) { if (code) {
@ -1407,7 +1419,6 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
} }
// negotiate the consensus checkpoint id for current task // negotiate the consensus checkpoint id for current task
ASSERT(pTask->pBackend == NULL);
code = streamTaskSendRestoreChkptMsg(pTask); code = streamTaskSendRestoreChkptMsg(pTask);
// this task may has no checkpoint, but others tasks may generate checkpoint already? // this task may has no checkpoint, but others tasks may generate checkpoint already?

View File

@ -398,6 +398,7 @@ int32_t tEncodeStreamHbMsg(SEncoder* pEncoder, const SStreamHbMsg* pReq) {
} }
if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1; if (tEncodeI32(pEncoder, pReq->msgId) < 0) return -1;
if (tEncodeI64(pEncoder, pReq->ts) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return pEncoder->pos; return pEncoder->pos;
} }
@ -470,6 +471,7 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
} }
if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->msgId) < 0) return -1;
if (tDecodeI64(pDecoder, &pReq->ts) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;

View File

@ -1140,14 +1140,16 @@ void streamTaskDestroyActiveChkptInfo(SActiveCheckpointInfo* pInfo) {
taosArrayDestroy(pInfo->pCheckpointReadyRecvList); taosArrayDestroy(pInfo->pCheckpointReadyRecvList);
pInfo->pCheckpointReadyRecvList = NULL; pInfo->pCheckpointReadyRecvList = NULL;
if (pInfo->pChkptTriggerTmr != NULL) { SStreamTmrInfo* pTriggerTmr = &pInfo->chkptTriggerMsgTmr;
(void) taosTmrStop(pInfo->pChkptTriggerTmr); if (pTriggerTmr->tmrHandle != NULL) {
pInfo->pChkptTriggerTmr = NULL; (void) taosTmrStop(pTriggerTmr->tmrHandle);
pTriggerTmr->tmrHandle = NULL;
} }
if (pInfo->pSendReadyMsgTmr != NULL) { SStreamTmrInfo* pReadyTmr = &pInfo->chkptReadyMsgTmr;
(void) taosTmrStop(pInfo->pSendReadyMsgTmr); if (pReadyTmr->tmrHandle != NULL) {
pInfo->pSendReadyMsgTmr = NULL; (void) taosTmrStop(pReadyTmr->tmrHandle);
pReadyTmr->tmrHandle = NULL;
} }
taosMemoryFree(pInfo); taosMemoryFree(pInfo);

View File

@ -96,12 +96,6 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
} }
} }
static int32_t stopTaskSuccFn(SStreamTask* pTask) {
SStreamTaskSM* pSM = pTask->status.pSM;
streamFreeTaskState(pTask,pSM->current.state == TASK_STATUS__DROPPING ? 1 : 0);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskInitStatus(SStreamTask* pTask) { int32_t streamTaskInitStatus(SStreamTask* pTask) {
pTask->execInfo.checkTs = taosGetTimestampMs(); pTask->execInfo.checkTs = taosGetTimestampMs();
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
@ -698,21 +692,21 @@ void doInitStateTransferTable(void) {
// resume is completed by restore status of state-machine // resume is completed by restore status of state-machine
// stop related event // stop related event
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); trans = createStateTransform(TASK_STATUS__DROPPING, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); trans = createStateTransform(TASK_STATUS__UNINIT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); trans = createStateTransform(TASK_STATUS__STOP, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); trans = createStateTransform(TASK_STATUS__SCAN_HISTORY, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); trans = createStateTransform(TASK_STATUS__PAUSE, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, stopTaskSuccFn, NULL); trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__STOP, TASK_EVENT_STOP, NULL, NULL, NULL);
CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans)); CHECK_RET_VAL(taosArrayPush(streamTaskSMTrans, &trans));
// dropping related event // dropping related event

View File

@ -50,3 +50,13 @@ void streamTmrReset(TAOS_TMR_CALLBACK fp, int32_t mseconds, void* param, void* h
// stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg); // stError("vgId:%d failed to reset tmr: %s, try again", vgId, pMsg);
// } // }
} }
int32_t streamCleanBeforeQuitTmr(SStreamTmrInfo* pInfo, SStreamTask* pTask) {
pInfo->activeCounter = 0;
pInfo->launchChkptId = 0;
atomic_store_8(&pInfo->isActive, 0);
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
ASSERT(ref >= 0);
return ref;
}

View File

@ -76,6 +76,8 @@ system sh/stop_dnodes.sh
system sh/exec.sh -n dnode1 -s start system sh/exec.sh -n dnode1 -s start
run tsim/stream/checkTaskStatus.sim
sql insert into t1 values(1648791213002,3,2,3,1.1); sql insert into t1 values(1648791213002,3,2,3,1.1);
$loop_count = 0 $loop_count = 0

View File

@ -30,14 +30,14 @@ class TDTestCase:
tdSql.execute("CREATE STREAM stream_device_alarm2 TRIGGER AT_ONCE DELETE_MARK 30d INTO st_device_alarm2 tags(factory_id varchar(20), device_code varchar(80), var_name varchar(200))\ tdSql.execute("CREATE STREAM stream_device_alarm2 TRIGGER AT_ONCE DELETE_MARK 30d INTO st_device_alarm2 tags(factory_id varchar(20), device_code varchar(80), var_name varchar(200))\
as select _wstart start_time, last(load_time) end_time, first(var_value) var_value, 1 state_flag from st_variable_data\ as select _wstart start_time, last(load_time) end_time, first(var_value) var_value, 1 state_flag from st_variable_data\
PARTITION BY tbname tname, factory_id, device_code, var_name STATE_WINDOW(case when lower(var_value)=lower(trigger_value) then '1' else '0' end)") PARTITION BY tbname tname, factory_id, device_code, var_name STATE_WINDOW(case when lower(var_value)=lower(trigger_value) then '1' else '0' end)")
time.sleep(2) time.sleep(5)
def insert_data(self): def insert_data(self):
try: try:
tdSql.execute("insert into aaa values('2024-07-15 14:00:00', '2024-07-15 14:00:00', 'a8')", queryTimes=5, show=True) tdSql.execute("insert into aaa values('2024-07-15 14:00:00', '2024-07-15 14:00:00', 'a8')", queryTimes=5, show=True)
time.sleep(0.01) time.sleep(0.01)
tdSql.execute("insert into aaa values('2024-07-15 14:10:00', '2024-07-15 14:10:00', 'a9')", queryTimes=5, show=True) tdSql.execute("insert into aaa values('2024-07-15 14:10:00', '2024-07-15 14:10:00', 'a9')", queryTimes=5, show=True)
time.sleep(1) time.sleep(5)
except Exception as error: except Exception as error:
tdLog.exit(f"insert data failed {error}") tdLog.exit(f"insert data failed {error}")