Merge pull request #25389 from taosdata/fix/3_liaohj
fix(stream): reset checkpoint info after receiving task-reset info.
This commit is contained in:
commit
62260bff79
|
@ -384,8 +384,8 @@ typedef struct SSinkRecorder {
|
||||||
|
|
||||||
typedef struct STaskExecStatisInfo {
|
typedef struct STaskExecStatisInfo {
|
||||||
int64_t created;
|
int64_t created;
|
||||||
int64_t init;
|
int64_t checkTs;
|
||||||
int64_t start;
|
int64_t readyTs;
|
||||||
int64_t startCheckpointId;
|
int64_t startCheckpointId;
|
||||||
int64_t startCheckpointVer;
|
int64_t startCheckpointVer;
|
||||||
|
|
||||||
|
@ -432,6 +432,22 @@ typedef struct SUpstreamInfo {
|
||||||
int32_t numOfClosed;
|
int32_t numOfClosed;
|
||||||
} SUpstreamInfo;
|
} SUpstreamInfo;
|
||||||
|
|
||||||
|
typedef struct SDownstreamStatusInfo {
|
||||||
|
int64_t reqId;
|
||||||
|
int32_t taskId;
|
||||||
|
int64_t rspTs;
|
||||||
|
int32_t status;
|
||||||
|
} SDownstreamStatusInfo;
|
||||||
|
|
||||||
|
typedef struct STaskCheckInfo {
|
||||||
|
SArray* pList;
|
||||||
|
int64_t startTs;
|
||||||
|
int32_t notReadyTasks;
|
||||||
|
int32_t inCheckProcess;
|
||||||
|
tmr_h checkRspTmr;
|
||||||
|
TdThreadMutex checkInfoLock;
|
||||||
|
} STaskCheckInfo;
|
||||||
|
|
||||||
struct SStreamTask {
|
struct SStreamTask {
|
||||||
int64_t ver;
|
int64_t ver;
|
||||||
SStreamTaskId id;
|
SStreamTaskId id;
|
||||||
|
@ -455,14 +471,12 @@ struct SStreamTask {
|
||||||
SStreamState* pState; // state backend
|
SStreamState* pState; // state backend
|
||||||
SArray* pRspMsgList;
|
SArray* pRspMsgList;
|
||||||
SUpstreamInfo upstreamInfo;
|
SUpstreamInfo upstreamInfo;
|
||||||
|
STaskCheckInfo taskCheckInfo;
|
||||||
|
|
||||||
// the followings attributes don't be serialized
|
// the followings attributes don't be serialized
|
||||||
SScanhistorySchedInfo schedHistoryInfo;
|
SScanhistorySchedInfo schedHistoryInfo;
|
||||||
|
|
||||||
int32_t notReadyTasks;
|
|
||||||
int32_t numOfWaitingUpstream;
|
int32_t numOfWaitingUpstream;
|
||||||
int64_t checkReqId;
|
|
||||||
SArray* checkReqIds; // shuffle
|
|
||||||
int32_t refCnt;
|
int32_t refCnt;
|
||||||
int32_t transferStateAlignCnt;
|
int32_t transferStateAlignCnt;
|
||||||
struct SStreamMeta* pMeta;
|
struct SStreamMeta* pMeta;
|
||||||
|
@ -478,7 +492,7 @@ typedef struct STaskStartInfo {
|
||||||
int64_t startTs;
|
int64_t startTs;
|
||||||
int64_t readyTs;
|
int64_t readyTs;
|
||||||
int32_t tasksWillRestart;
|
int32_t tasksWillRestart;
|
||||||
int32_t taskStarting; // restart flag, sentinel to guard the restart procedure.
|
int32_t startAllTasks; // restart flag, sentinel to guard the restart procedure.
|
||||||
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
SHashObj* pReadyTaskSet; // tasks that are all ready for running stream processing
|
||||||
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
SHashObj* pFailedTaskSet; // tasks that are done the check downstream process, may be successful or failed
|
||||||
int64_t elapsedTime;
|
int64_t elapsedTime;
|
||||||
|
@ -821,8 +835,6 @@ void streamTaskPause(SStreamMeta* pMeta, SStreamTask* pTask);
|
||||||
void streamTaskResume(SStreamTask* pTask);
|
void streamTaskResume(SStreamTask* pTask);
|
||||||
int32_t streamTaskStop(SStreamTask* pTask);
|
int32_t streamTaskStop(SStreamTask* pTask);
|
||||||
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask);
|
||||||
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
|
||||||
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
|
||||||
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
|
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask);
|
||||||
int32_t streamTaskReleaseState(SStreamTask* pTask);
|
int32_t streamTaskReleaseState(SStreamTask* pTask);
|
||||||
int32_t streamTaskReloadState(SStreamTask* pTask);
|
int32_t streamTaskReloadState(SStreamTask* pTask);
|
||||||
|
@ -832,6 +844,15 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key);
|
||||||
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
bool streamTaskIsSinkTask(const SStreamTask* pTask);
|
||||||
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
int32_t streamTaskSendCheckpointReq(SStreamTask* pTask);
|
||||||
|
|
||||||
|
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs);
|
||||||
|
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id);
|
||||||
|
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
||||||
|
int32_t* pNotReady, const char* id);
|
||||||
|
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo);
|
||||||
|
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id);
|
||||||
|
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id);
|
||||||
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask);
|
||||||
|
|
||||||
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask);
|
||||||
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc);
|
||||||
|
|
||||||
|
|
|
@ -657,7 +657,9 @@ _OVER:
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
|
// 1. stream number check
|
||||||
|
// 2. target stable can not be target table of other existed streams.
|
||||||
|
static int32_t doStreamCheck(SMnode *pMnode, SStreamObj *pStreamObj) {
|
||||||
int32_t numOfStream = 0;
|
int32_t numOfStream = 0;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -670,14 +672,16 @@ static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { /
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
|
||||||
if (numOfStream > MND_STREAM_MAX_NUM) {
|
if (numOfStream > MND_STREAM_MAX_NUM) {
|
||||||
mError("too many streams, no more than %d for each database", MND_STREAM_MAX_NUM);
|
mError("too many streams, no more than %d for each database, failed to create stream:%s", MND_STREAM_MAX_NUM,
|
||||||
|
pStreamObj->name);
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
terrno = TSDB_CODE_MND_TOO_MANY_STREAMS;
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
if (pStream->targetStbUid == pStreamObj->targetStbUid) {
|
||||||
mError("Cannot write the same stable as other stream:%s", pStream->name);
|
mError("Cannot write the same stable as other stream:%s, failed to create stream:%s", pStream->name,
|
||||||
|
pStreamObj->name);
|
||||||
sdbCancelFetch(pMnode->pSdb, pIter);
|
sdbCancelFetch(pMnode->pSdb, pIter);
|
||||||
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
terrno = TSDB_CODE_MND_INVALID_TARGET_TABLE;
|
||||||
return terrno;
|
return terrno;
|
||||||
|
@ -742,7 +746,7 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (checkForNumOfStreams(pMnode, &streamObj) < 0) {
|
if (doStreamCheck(pMnode, &streamObj) < 0) {
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -978,7 +982,6 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
int64_t ts = taosGetTimestampMs();
|
int64_t ts = taosGetTimestampMs();
|
||||||
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
||||||
// mWarn("checkpoint interval less than the threshold, ignore it");
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1396,6 +1399,15 @@ int32_t mndGetNumOfStreams(SMnode *pMnode, char *dbName, int32_t *pNumOfStreams)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void int64ToHexStr(int64_t id, char *pBuf, int32_t bufLen) {
|
||||||
|
memset(pBuf, 0, bufLen);
|
||||||
|
pBuf[2] = '0';
|
||||||
|
pBuf[3] = 'x';
|
||||||
|
|
||||||
|
int32_t len = tintToHex(id, &pBuf[4]);
|
||||||
|
varDataSetLen(pBuf, len + 2);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
@ -1420,19 +1432,14 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
||||||
|
|
||||||
// stream id
|
// stream id
|
||||||
char buf[128] = {0};
|
char buf[128] = {0};
|
||||||
int32_t len = tintToHex(pStream->uid, &buf[4]);
|
int64ToHexStr(pStream->uid, buf, tListLen(buf));
|
||||||
buf[2] = '0';
|
|
||||||
buf[3] = 'x';
|
|
||||||
varDataSetLen(buf, len + 2);
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
colDataSetVal(pColInfo, numOfRows, buf, false);
|
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||||
|
|
||||||
// related fill-history stream id
|
// related fill-history stream id
|
||||||
memset(buf, 0, tListLen(buf));
|
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
if (pStream->hTaskUid != 0) {
|
if (pStream->hTaskUid != 0) {
|
||||||
len = tintToHex(pStream->hTaskUid, &buf[4]);
|
int64ToHexStr(pStream->hTaskUid, buf, tListLen(buf));
|
||||||
varDataSetLen(buf, len + 2);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, buf, false);
|
colDataSetVal(pColInfo, numOfRows, buf, false);
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfo, numOfRows, buf, true);
|
colDataSetVal(pColInfo, numOfRows, buf, true);
|
||||||
|
@ -1532,10 +1539,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
|
|
||||||
char idstr[128] = {0};
|
char idstr[128] = {0};
|
||||||
int32_t len = tintToHex(pTask->id.taskId, &idstr[4]);
|
int64ToHexStr(pTask->id.taskId, idstr, tListLen(idstr));
|
||||||
idstr[2] = '0';
|
|
||||||
idstr[3] = 'x';
|
|
||||||
varDataSetLen(idstr, len + 2);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
||||||
|
|
||||||
// node type
|
// node type
|
||||||
|
@ -1651,11 +1655,7 @@ static int32_t setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SS
|
||||||
// history_task_id
|
// history_task_id
|
||||||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||||
if (pe->hTaskId != 0) {
|
if (pe->hTaskId != 0) {
|
||||||
memset(idstr, 0, tListLen(idstr));
|
int64ToHexStr(pe->hTaskId, idstr, tListLen(idstr));
|
||||||
len = tintToHex(pe->hTaskId, &idstr[4]);
|
|
||||||
idstr[2] = '0';
|
|
||||||
idstr[3] = 'x';
|
|
||||||
varDataSetLen(idstr, len + 2);
|
|
||||||
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
colDataSetVal(pColInfo, numOfRows, idstr, false);
|
||||||
} else {
|
} else {
|
||||||
colDataSetVal(pColInfo, numOfRows, 0, true);
|
colDataSetVal(pColInfo, numOfRows, 0, true);
|
||||||
|
@ -2029,7 +2029,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// no need to build the trans to handle the vgroup upddate
|
// no need to build the trans to handle the vgroup update
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,7 +258,7 @@ int32_t mndProcessStreamHb(SRpcMsg *pReq) {
|
||||||
|
|
||||||
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
int32_t numOfUpdated = taosArrayGetSize(req.pUpdateNodes);
|
||||||
if (numOfUpdated > 0) {
|
if (numOfUpdated > 0) {
|
||||||
mDebug("%d stream node(s) need updated from report of hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
mDebug("%d stream node(s) need updated from hbMsg(vgId:%d)", numOfUpdated, req.vgId);
|
||||||
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
setNodeEpsetExpiredFlag(req.pUpdateNodes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1102,7 +1102,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d checkpoint-source msg received during restoring, s-task:0x%x ignore it", vgId, req.taskId);
|
tqDebug("vgId:%d checkpoint-source msg received during restoring, checkpointId:%" PRId64
|
||||||
|
", transId:%d s-task:0x%x ignore it",
|
||||||
|
vgId, req.checkpointId, req.transId, req.taskId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
@ -1111,7 +1113,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed", vgId, req.taskId);
|
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. checkpointId:%" PRId64
|
||||||
|
" transId:%d it may have been destroyed",
|
||||||
|
vgId, req.taskId, req.checkpointId, req.transId);
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
|
||||||
tmsgSendRsp(&rsp); // error occurs
|
tmsgSendRsp(&rsp); // error occurs
|
||||||
|
@ -1123,7 +1127,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
pTask->chkInfo.checkpointingId = req.checkpointId;
|
pTask->chkInfo.checkpointingId = req.checkpointId;
|
||||||
pTask->chkInfo.transId = req.transId;
|
pTask->chkInfo.transId = req.transId;
|
||||||
|
|
||||||
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpointId:%" PRId64
|
||||||
", transId:%d set it failed",
|
", transId:%d set it failed",
|
||||||
pTask->id.idStr, req.checkpointId, req.transId);
|
pTask->id.idStr, req.checkpointId, req.transId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
@ -1140,7 +1144,7 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
if (req.mndTrigger == 1) {
|
if (req.mndTrigger == 1) {
|
||||||
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
||||||
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpoint:%" PRId64 ", set it failure",
|
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpointId:%" PRId64 ", set it failure",
|
||||||
pTask->id.idStr, req.checkpointId);
|
pTask->id.idStr, req.checkpointId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
|
@ -309,11 +309,10 @@ int32_t doBuildAndSendSubmitMsg(SVnode* pVnode, SStreamTask* pTask, SSubmitReq2*
|
||||||
|
|
||||||
pRec->numOfSubmit += 1;
|
pRec->numOfSubmit += 1;
|
||||||
if ((pRec->numOfSubmit % 1000) == 0) {
|
if ((pRec->numOfSubmit % 1000) == 0) {
|
||||||
double el = (taosGetTimestampMs() - pTask->execInfo.start) / 1000.0;
|
double el = (taosGetTimestampMs() - pTask->execInfo.readyTs) / 1000.0;
|
||||||
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
tqInfo("s-task:%s vgId:%d write %" PRId64 " blocks (%" PRId64 " rows) in %" PRId64
|
||||||
" submit into dst table, %.2fMiB duration:%.2f Sec.",
|
" submit into dst table, %.2fMiB duration:%.2f Sec.",
|
||||||
pTask->id.idStr, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize),
|
id, vgId, pRec->numOfBlocks, pRec->numOfRows, pRec->numOfSubmit, SIZE_IN_MiB(pRec->dataSize), el);
|
||||||
el);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -200,6 +200,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
|
|
||||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
streamTaskResetStatus(pTask);
|
streamTaskResetStatus(pTask);
|
||||||
|
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||||
|
|
||||||
SStreamTask** ppHTask = NULL;
|
SStreamTask** ppHTask = NULL;
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
@ -213,6 +214,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||||
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
||||||
|
streamTaskResetStatus(*ppHTask);
|
||||||
|
streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -455,8 +458,8 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
|
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void setParam(SStreamTask* pTask, int64_t* initTs, bool* hasHTask, STaskId* pId) {
|
static void setParam(SStreamTask* pTask, int64_t* startCheckTs, bool* hasHTask, STaskId* pId) {
|
||||||
*initTs = pTask->execInfo.init;
|
*startCheckTs = pTask->execInfo.checkTs;
|
||||||
|
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
*hasHTask = true;
|
*hasHTask = true;
|
||||||
|
@ -525,6 +528,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
if (pTask == NULL) {
|
if (pTask == NULL) {
|
||||||
streamMetaRLock(pMeta);
|
streamMetaRLock(pMeta);
|
||||||
|
|
||||||
|
// let's try to find this task in hashmap
|
||||||
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
SStreamTask** ppTask = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
|
||||||
if (ppTask != NULL) {
|
if (ppTask != NULL) {
|
||||||
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
|
setParam(*ppTask, &initTs, &hasHistoryTask, &fId);
|
||||||
|
@ -533,7 +537,7 @@ int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLe
|
||||||
if (hasHistoryTask) {
|
if (hasHistoryTask) {
|
||||||
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
|
streamMetaAddTaskLaunchResult(pMeta, fId.streamId, fId.taskId, initTs, now, false);
|
||||||
}
|
}
|
||||||
} else {
|
} else { // not exist even in the hash map of meta, forget it
|
||||||
streamMetaRUnLock(pMeta);
|
streamMetaRUnLock(pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -762,7 +766,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||||
int64_t st = taosGetTimestampMs();
|
int64_t st = taosGetTimestampMs();
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
if (pMeta->startInfo.taskStarting == 1) {
|
if (pMeta->startInfo.startAllTasks == 1) {
|
||||||
pMeta->startInfo.restartCount += 1;
|
pMeta->startInfo.restartCount += 1;
|
||||||
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
|
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
|
||||||
pMeta->startInfo.restartCount);
|
pMeta->startInfo.restartCount);
|
||||||
|
@ -770,7 +774,7 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->startInfo.taskStarting = 1;
|
pMeta->startInfo.startAllTasks = 1;
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
@ -886,7 +890,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
bool scanWal = false;
|
bool scanWal = false;
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
if (pStartInfo->taskStarting == 1) {
|
if (pStartInfo->startAllTasks == 1) {
|
||||||
tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
|
tqDebug("vgId:%d already in start tasks procedure in other thread, restartCounter:%d, do nothing", vgId,
|
||||||
pMeta->startInfo.restartCount);
|
pMeta->startInfo.restartCount);
|
||||||
} else { // not in starting procedure
|
} else { // not in starting procedure
|
||||||
|
@ -936,13 +940,20 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
|
||||||
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
|
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
|
||||||
|
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
|
|
||||||
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
||||||
if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) {
|
SStreamTaskState *pState = streamTaskGetStatus(pTask);
|
||||||
|
if (pState->state == TASK_STATUS__CK) {
|
||||||
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
|
tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d",
|
||||||
pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
|
pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId);
|
||||||
streamTaskClearCheckInfo(pTask, true);
|
|
||||||
streamTaskSetStatusReady(pTask);
|
streamTaskSetStatusReady(pTask);
|
||||||
|
} else if (pState->state == TASK_STATUS__UNINIT) {
|
||||||
|
tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr);
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
/*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
|
@ -4501,6 +4501,10 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// open reader failure may cause the flag still to be READER_STATUS_SUSPEND, which may cause suspend reader failure.
|
||||||
|
// So we need to set it A.S.A.P
|
||||||
|
pReader->flag = READER_STATUS_NORMAL;
|
||||||
|
|
||||||
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
|
if (pReader->type == TIMEWINDOW_RANGE_CONTAINED) {
|
||||||
code = doOpenReaderImpl(pReader);
|
code = doOpenReaderImpl(pReader);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
@ -4531,7 +4535,6 @@ int32_t tsdbReaderResume2(STsdbReader* pReader) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pReader->flag = READER_STATUS_NORMAL;
|
|
||||||
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
|
tsdbDebug("reader: %p resumed uid %" PRIu64 ", numOfTable:%" PRId32 ", in this query %s", pReader,
|
||||||
pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
|
pBlockScanInfo ? (*pBlockScanInfo)->uid : 0, numOfTables, pReader->idStr);
|
||||||
return code;
|
return code;
|
||||||
|
|
|
@ -579,12 +579,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
|
vInfo("vgId:%d sync restore finished, start to launch stream task(s)", pVnode->config.vgId);
|
||||||
int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta);
|
int32_t numOfTasks = tqStreamTasksGetTotalNum(pMeta);
|
||||||
if (numOfTasks > 0) {
|
if (numOfTasks > 0) {
|
||||||
if (pMeta->startInfo.taskStarting == 1) {
|
if (pMeta->startInfo.startAllTasks == 1) {
|
||||||
pMeta->startInfo.restartCount += 1;
|
pMeta->startInfo.restartCount += 1;
|
||||||
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
|
tqDebug("vgId:%d in start tasks procedure, inc restartCounter by 1, remaining restart:%d", vgId,
|
||||||
pMeta->startInfo.restartCount);
|
pMeta->startInfo.restartCount);
|
||||||
} else {
|
} else {
|
||||||
pMeta->startInfo.taskStarting = 1;
|
pMeta->startInfo.startAllTasks = 1;
|
||||||
|
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
|
tqStreamTaskStartAsync(pMeta, &pVnode->msgCb, false);
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define CHECK_DOWNSTREAM_INTERVAL 100
|
#define CHECK_RSP_INTERVAL 300
|
||||||
#define LAUNCH_HTASK_INTERVAL 100
|
#define LAUNCH_HTASK_INTERVAL 100
|
||||||
#define WAIT_FOR_MINIMAL_INTERVAL 100.00
|
#define WAIT_FOR_MINIMAL_INTERVAL 100.00
|
||||||
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40
|
#define MAX_RETRY_LAUNCH_HISTORY_TASK 40
|
||||||
|
|
|
@ -3704,7 +3704,7 @@ int32_t streamStatePutBatch(SStreamState* pState, const char* cfKeyName, rocksdb
|
||||||
{
|
{
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
ginitDict[i].toStrFunc((void*)key, tbuf);
|
ginitDict[i].toStrFunc((void*)key, tbuf);
|
||||||
stDebug("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
|
stTrace("streamState str: %s succ to write to %s_%s, len: %d", tbuf, wrapper->idstr, ginitDict[i].key, vlen);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -3729,7 +3729,7 @@ int32_t streamStatePutBatchOptimize(SStreamState* pState, int32_t cfIdx, rocksdb
|
||||||
{
|
{
|
||||||
char tbuf[256] = {0};
|
char tbuf[256] = {0};
|
||||||
ginitDict[cfIdx].toStrFunc((void*)key, tbuf);
|
ginitDict[cfIdx].toStrFunc((void*)key, tbuf);
|
||||||
stDebug("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
|
stTrace("streamState str: %s succ to write to %s_%s", tbuf, wrapper->idstr, ginitDict[cfIdx].key);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
|
@ -396,6 +396,7 @@ int32_t getChkpMeta(char* id, char* path, SArray* list) {
|
||||||
taosMemoryFree(file);
|
taosMemoryFree(file);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t doUploadChkp(void* param) {
|
int32_t doUploadChkp(void* param) {
|
||||||
SAsyncUploadArg* arg = param;
|
SAsyncUploadArg* arg = param;
|
||||||
char* path = NULL;
|
char* path = NULL;
|
||||||
|
@ -436,6 +437,7 @@ int32_t doUploadChkp(void* param) {
|
||||||
taosMemoryFree(arg);
|
taosMemoryFree(arg);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
|
int32_t streamTaskUploadChkp(SStreamTask* pTask, int64_t chkpId, char* taskId) {
|
||||||
// async upload
|
// async upload
|
||||||
UPLOAD_TYPE type = getUploadType();
|
UPLOAD_TYPE type = getUploadType();
|
||||||
|
|
|
@ -1119,7 +1119,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
.stage = pMeta->stage,
|
.stage = pMeta->stage,
|
||||||
|
|
||||||
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
.inputQUsed = SIZE_IN_MiB(streamQueueGetItemSize((*pTask)->inputq.queue)),
|
||||||
.startTime = (*pTask)->execInfo.start,
|
.startTime = (*pTask)->execInfo.readyTs,
|
||||||
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
|
.checkpointInfo.latestId = (*pTask)->chkInfo.checkpointId,
|
||||||
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
|
.checkpointInfo.latestVer = (*pTask)->chkInfo.checkpointVer,
|
||||||
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
|
.checkpointInfo.latestTime = (*pTask)->chkInfo.checkpointTime,
|
||||||
|
@ -1141,7 +1141,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId;
|
entry.checkpointInfo.activeTransId = (*pTask)->chkInfo.transId;
|
||||||
|
|
||||||
if (entry.checkpointInfo.failed) {
|
if (entry.checkpointInfo.failed) {
|
||||||
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
stInfo("s-task:%s set kill checkpoint trans in hb, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1329,7 +1329,7 @@ void streamMetaResetStartInfo(STaskStartInfo* pStartInfo) {
|
||||||
pStartInfo->readyTs = 0;
|
pStartInfo->readyTs = 0;
|
||||||
|
|
||||||
// reset the sentinel flag value to be 0
|
// reset the sentinel flag value to be 0
|
||||||
pStartInfo->taskStarting = 0;
|
pStartInfo->startAllTasks = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaRLock(SStreamMeta* pMeta) {
|
void streamMetaRLock(SStreamMeta* pMeta) {
|
||||||
|
@ -1496,7 +1496,7 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
streamLaunchFillHistoryTask(pTask);
|
streamLaunchFillHistoryTask(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, true);
|
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, true);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1506,10 +1506,10 @@ int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
|
||||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
||||||
code = ret;
|
code = ret;
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->init, pInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs, false);
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
STaskId* pId = &pTask->hTaskInfo.id;
|
STaskId* pId = &pTask->hTaskInfo.id;
|
||||||
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1601,10 +1601,10 @@ int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t tas
|
||||||
if (ret != TSDB_CODE_SUCCESS) {
|
if (ret != TSDB_CODE_SUCCESS) {
|
||||||
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->init, pInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, pInfo->checkTs, pInfo->readyTs, false);
|
||||||
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
STaskId* pId = &pTask->hTaskInfo.id;
|
STaskId* pId = &pTask->hTaskInfo.id;
|
||||||
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->init, pInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, pId->streamId, pId->taskId, pInfo->checkTs, pInfo->readyTs, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1617,7 +1617,7 @@ static void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ)
|
||||||
void* pIter = NULL;
|
void* pIter = NULL;
|
||||||
size_t keyLen = 0;
|
size_t keyLen = 0;
|
||||||
|
|
||||||
stInfo("vgId:%d %d tasks check-downstream completed %s", vgId, taosHashGetSize(pTaskSet),
|
stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
|
||||||
succ ? "success" : "failed");
|
succ ? "success" : "failed");
|
||||||
|
|
||||||
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
|
while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
|
||||||
|
@ -1641,7 +1641,7 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
|
|
||||||
if (pStartInfo->taskStarting != 1) {
|
if (pStartInfo->startAllTasks != 1) {
|
||||||
int64_t el = endTs - startTs;
|
int64_t el = endTs - startTs;
|
||||||
qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
qDebug("vgId:%d not start all task(s), not record status, s-task:0x%x launch succ:%d elapsed time:%" PRId64 "ms",
|
||||||
pMeta->vgId, taskId, ready, el);
|
pMeta->vgId, taskId, ready, el);
|
||||||
|
|
|
@ -56,8 +56,8 @@ int32_t streamTaskSetReady(SStreamTask* pTask) {
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
pTask->status.downstreamReady = 1;
|
pTask->status.downstreamReady = 1;
|
||||||
|
|
||||||
pTask->execInfo.start = taosGetTimestampMs();
|
pTask->execInfo.readyTs = taosGetTimestampMs();
|
||||||
int64_t el = (pTask->execInfo.start - pTask->execInfo.init);
|
int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs);
|
||||||
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
|
||||||
pTask->id.idStr, numOfDowns, el, p->name);
|
pTask->id.idStr, numOfDowns, el, p->name);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -83,7 +83,7 @@ int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void doReExecScanhistory(void* param, void* tmrId) {
|
static void doExecScanhistoryInFuture(void* param, void* tmrId) {
|
||||||
SStreamTask* pTask = param;
|
SStreamTask* pTask = param;
|
||||||
pTask->schedHistoryInfo.numOfTicks -= 1;
|
pTask->schedHistoryInfo.numOfTicks -= 1;
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ static void doReExecScanhistory(void* param, void* tmrId) {
|
||||||
// release the task.
|
// release the task.
|
||||||
streamMetaReleaseTask(pTask->pMeta, pTask);
|
streamMetaReleaseTask(pTask->pMeta, pTask);
|
||||||
} else {
|
} else {
|
||||||
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
|
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -131,9 +131,9 @@ int32_t streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration)
|
||||||
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
|
stDebug("s-task:%s scan-history resumed in %.2fs, ref:%d", pTask->id.idStr, numOfTicks * 0.1, ref);
|
||||||
|
|
||||||
if (pTask->schedHistoryInfo.pTimer == NULL) {
|
if (pTask->schedHistoryInfo.pTimer == NULL) {
|
||||||
pTask->schedHistoryInfo.pTimer = taosTmrStart(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
|
pTask->schedHistoryInfo.pTimer = taosTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer);
|
||||||
} else {
|
} else {
|
||||||
taosTmrReset(doReExecScanhistory, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
|
taosTmrReset(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTask, streamTimer, &pTask->schedHistoryInfo.pTimer);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -184,12 +184,20 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
|
|
||||||
ASSERT(pTask->status.downstreamReady == 0);
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs());
|
||||||
|
|
||||||
// serialize streamProcessScanHistoryFinishRsp
|
// serialize streamProcessScanHistoryFinishRsp
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
req.reqId = tGenIdPI64();
|
req.reqId = tGenIdPI64();
|
||||||
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
|
req.downstreamNodeId = pTask->outputInfo.fixedDispatcher.nodeId;
|
||||||
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
req.downstreamTaskId = pTask->outputInfo.fixedDispatcher.taskId;
|
||||||
pTask->checkReqId = req.reqId;
|
|
||||||
|
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check single downstream task:0x%x(vgId:%d) ver:%" PRId64 "-%" PRId64
|
||||||
" window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64,
|
" window:%" PRId64 "-%" PRId64 " req:0x%" PRIx64,
|
||||||
|
@ -197,95 +205,36 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
|
||||||
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey, req.reqId);
|
||||||
|
|
||||||
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
||||||
|
|
||||||
|
streamTaskStartMonitorCheckRsp(pTask);
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
pTask->notReadyTasks = numOfVgs;
|
|
||||||
if (pTask->checkReqIds == NULL) {
|
|
||||||
pTask->checkReqIds = taosArrayInit(numOfVgs, sizeof(int64_t));
|
|
||||||
} else {
|
|
||||||
taosArrayClear(pTask->checkReqIds);
|
|
||||||
}
|
|
||||||
|
|
||||||
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
|
stDebug("s-task:%s check %d downstream tasks, ver:%" PRId64 "-%" PRId64 " window:%" PRId64 "-%" PRId64,
|
||||||
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
pTask->id.idStr, numOfVgs, pRange->range.minVer, pRange->range.maxVer, pWindow->skey, pWindow->ekey);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
req.reqId = tGenIdPI64();
|
req.reqId = tGenIdPI64();
|
||||||
taosArrayPush(pTask->checkReqIds, &req.reqId);
|
|
||||||
req.downstreamNodeId = pVgInfo->vgId;
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
req.downstreamTaskId = pVgInfo->taskId;
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
|
|
||||||
|
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
|
||||||
|
|
||||||
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
|
||||||
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
|
||||||
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
|
streamTaskStartMonitorCheckRsp(pTask);
|
||||||
|
} else { // for sink task, set it ready directly.
|
||||||
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId);
|
||||||
|
streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||||
doProcessDownstreamReadyRsp(pTask);
|
doProcessDownstreamReadyRsp(pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static STaskRecheckInfo* createRecheckInfo(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
|
||||||
STaskRecheckInfo* pInfo = taosMemoryCalloc(1, sizeof(STaskRecheckInfo));
|
|
||||||
if (pInfo == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
pInfo->pTask = pTask;
|
|
||||||
pInfo->req = (SStreamTaskCheckReq){
|
|
||||||
.reqId = pRsp->reqId,
|
|
||||||
.streamId = pRsp->streamId,
|
|
||||||
.upstreamTaskId = pRsp->upstreamTaskId,
|
|
||||||
.upstreamNodeId = pRsp->upstreamNodeId,
|
|
||||||
.downstreamTaskId = pRsp->downstreamTaskId,
|
|
||||||
.downstreamNodeId = pRsp->downstreamNodeId,
|
|
||||||
.childId = pRsp->childId,
|
|
||||||
.stage = pTask->pMeta->stage,
|
|
||||||
};
|
|
||||||
|
|
||||||
return pInfo;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void destroyRecheckInfo(STaskRecheckInfo* pInfo) {
|
|
||||||
if (pInfo != NULL) {
|
|
||||||
taosTmrStop(pInfo->checkTimer);
|
|
||||||
pInfo->checkTimer = NULL;
|
|
||||||
taosMemoryFree(pInfo);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static void recheckDownstreamTasks(void* param, void* tmrId) {
|
|
||||||
STaskRecheckInfo* pInfo = param;
|
|
||||||
SStreamTask* pTask = pInfo->pTask;
|
|
||||||
|
|
||||||
SStreamTaskCheckReq* pReq = &pInfo->req;
|
|
||||||
|
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
|
|
||||||
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
|
||||||
pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage);
|
|
||||||
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pTask->outputInfo.fixedDispatcher.epSet);
|
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
|
||||||
SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos;
|
|
||||||
|
|
||||||
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
|
||||||
for (int32_t i = 0; i < numOfVgs; i++) {
|
|
||||||
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
|
||||||
if (pVgInfo->taskId == pReq->downstreamTaskId) {
|
|
||||||
stDebug("s-task:%s (vgId:%d) check downstream task:0x%x (vgId:%d) stage:%" PRId64 " (recheck)", pTask->id.idStr,
|
|
||||||
pTask->info.nodeId, pReq->downstreamTaskId, pReq->downstreamNodeId, pReq->stage);
|
|
||||||
streamSendCheckMsg(pTask, pReq, pReq->downstreamNodeId, &pVgInfo->epSet);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
destroyRecheckInfo(pInfo);
|
|
||||||
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
|
||||||
stDebug("s-task:%s complete send check in timer, ref:%d", pTask->id.idStr, ref);
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_t vgId, int64_t stage,
|
||||||
int64_t* oldStage) {
|
int64_t* oldStage) {
|
||||||
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
SStreamChildEpInfo* pInfo = streamTaskGetUpstreamTaskEpInfo(pTask, upstreamTaskId);
|
||||||
|
@ -391,9 +340,9 @@ void doProcessDownstreamReadyRsp(SStreamTask* pTask) {
|
||||||
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
EStreamTaskEvent event = (pTask->info.fillHistory == 0) ? TASK_EVENT_INIT : TASK_EVENT_INIT_SCANHIST;
|
||||||
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
|
streamTaskOnHandleEventSuccess(pTask->status.pSM, event, NULL, NULL);
|
||||||
|
|
||||||
int64_t initTs = pTask->execInfo.init;
|
int64_t checkTs = pTask->execInfo.checkTs;
|
||||||
int64_t startTs = pTask->execInfo.start;
|
int64_t readyTs = pTask->execInfo.readyTs;
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, initTs, startTs, true);
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, checkTs, readyTs, true);
|
||||||
|
|
||||||
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
if (pTask->status.taskStatus == TASK_STATUS__HALT) {
|
||||||
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask) && (pTask->info.fillHistory == 0));
|
||||||
|
@ -439,7 +388,12 @@ static void addIntoNodeUpdateList(SStreamTask* pTask, int32_t nodeId) {
|
||||||
|
|
||||||
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) {
|
||||||
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
|
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
|
||||||
|
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
int32_t total = streamTaskGetNumOfDownstream(pTask);
|
||||||
|
int32_t left = -1;
|
||||||
|
|
||||||
if (streamTaskShouldStop(pTask)) {
|
if (streamTaskShouldStop(pTask)) {
|
||||||
stDebug("s-task:%s should stop, do not do check downstream again", id);
|
stDebug("s-task:%s should stop, do not do check downstream again", id);
|
||||||
|
@ -447,47 +401,21 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
if (pRsp->status == TASK_DOWNSTREAM_READY) {
|
||||||
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
||||||
bool found = false;
|
|
||||||
int32_t numOfReqs = taosArrayGetSize(pTask->checkReqIds);
|
|
||||||
for (int32_t i = 0; i < numOfReqs; i++) {
|
|
||||||
int64_t reqId = *(int64_t*)taosArrayGet(pTask->checkReqIds, i);
|
|
||||||
if (reqId == pRsp->reqId) {
|
|
||||||
found = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!found) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t left = atomic_sub_fetch_32(&pTask->notReadyTasks, 1);
|
|
||||||
ASSERT(left >= 0);
|
|
||||||
|
|
||||||
if (left == 0) {
|
if (left == 0) {
|
||||||
pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);;
|
doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag
|
||||||
|
streamTaskCompleteCheck(pInfo, id);
|
||||||
doProcessDownstreamReadyRsp(pTask);
|
|
||||||
} else {
|
} else {
|
||||||
int32_t total = taosArrayGetSize(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
|
||||||
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||||
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
ASSERT(pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH);
|
|
||||||
if (pRsp->reqId != pTask->checkReqId) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
doProcessDownstreamReadyRsp(pTask);
|
|
||||||
}
|
|
||||||
} else { // not ready, wait for 100ms and retry
|
} else { // not ready, wait for 100ms and retry
|
||||||
|
streamTaskUpdateCheckInfo(pInfo, pRsp->downstreamTaskId, pRsp->status, now, pRsp->reqId, &left, id);
|
||||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE || pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||||
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
if (pRsp->status == TASK_UPSTREAM_NEW_STAGE) {
|
||||||
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
|
stError("s-task:%s vgId:%d self vnode-transfer/leader-change/restart detected, old stage:%" PRId64
|
||||||
", current stage:%" PRId64
|
", current stage:%" PRId64 ", not check wait for downstream task nodeUpdate, and all tasks restart",
|
||||||
", not check wait for downstream task nodeUpdate, and all tasks restart",
|
|
||||||
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
|
id, pRsp->upstreamNodeId, pRsp->oldStage, pTask->pMeta->stage);
|
||||||
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
|
addIntoNodeUpdateList(pTask, pRsp->upstreamNodeId);
|
||||||
} else {
|
} else {
|
||||||
|
@ -498,8 +426,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
addIntoNodeUpdateList(pTask, pRsp->downstreamNodeId);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t startTs = pTask->execInfo.init;
|
int32_t startTs = pTask->execInfo.checkTs;
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, now, false);
|
||||||
|
|
||||||
// automatically set the related fill-history task to be failed.
|
// automatically set the related fill-history task to be failed.
|
||||||
|
@ -507,13 +434,11 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
|
||||||
STaskId* pId = &pTask->hTaskInfo.id;
|
STaskId* pId = &pTask->hTaskInfo.id;
|
||||||
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false);
|
||||||
}
|
}
|
||||||
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
|
||||||
STaskRecheckInfo* pInfo = createRecheckInfo(pTask, pRsp);
|
|
||||||
|
|
||||||
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
} else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms
|
||||||
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%" PRId64 ", retry in 100ms, ref:%d ", id,
|
ASSERT(left > 0);
|
||||||
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage, ref);
|
stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id,
|
||||||
pInfo->checkTimer = taosTmrStart(recheckDownstreamTasks, CHECK_DOWNSTREAM_INTERVAL, pInfo, streamTimer);
|
pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -603,13 +528,13 @@ static void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask)
|
||||||
SDataRange* pRange = &pHTask->dataRange;
|
SDataRange* pRange = &pHTask->dataRange;
|
||||||
|
|
||||||
// the query version range should be limited to the already processed data
|
// the query version range should be limited to the already processed data
|
||||||
pHTask->execInfo.init = taosGetTimestampMs();
|
pHTask->execInfo.checkTs = taosGetTimestampMs();
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
||||||
stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
|
stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
|
||||||
" verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64,
|
" verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64,
|
||||||
pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
|
pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
|
||||||
pRange->range.maxVer, pHTask->execInfo.init);
|
pRange->range.maxVer, pHTask->execInfo.checkTs);
|
||||||
} else {
|
} else {
|
||||||
stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
|
stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
@ -767,8 +692,7 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
|
||||||
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId);
|
SLaunchHTaskInfo* pInfo = createHTaskLaunchInfo(pMeta, pTask->id.streamId, pTask->id.taskId, hStreamId, hTaskId);
|
||||||
if (pInfo == NULL) {
|
if (pInfo == NULL) {
|
||||||
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
|
stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
|
||||||
|
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
|
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -785,7 +709,7 @@ static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask) {
|
||||||
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref);
|
stError("s-task:%s failed to start timer, related fill-history task not launched, ref:%d", idStr, ref);
|
||||||
|
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -816,7 +740,7 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
|
stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
|
||||||
pStatus->name);
|
pStatus->name);
|
||||||
|
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||||
return -1; // todo set the correct error code
|
return -1; // todo set the correct error code
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -831,11 +755,11 @@ int32_t streamLaunchFillHistoryTask(SStreamTask* pTask) {
|
||||||
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
|
SStreamTask* pHisTask = streamMetaAcquireTask(pMeta, hStreamId, hTaskId);
|
||||||
if (pHisTask == NULL) {
|
if (pHisTask == NULL) {
|
||||||
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
|
stDebug("s-task:%s failed acquire and start fill-history task, it may have been dropped/stopped", idStr);
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, false);
|
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
|
||||||
} else {
|
} else {
|
||||||
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
|
if (pHisTask->status.downstreamReady == 1) { // it's ready now, do nothing
|
||||||
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
|
stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
|
||||||
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->init, pExecInfo->start, true);
|
streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
|
||||||
} else { // exist, but not ready, continue check downstream task status
|
} else { // exist, but not ready, continue check downstream task status
|
||||||
checkFillhistoryTaskStatus(pTask, pHisTask);
|
checkFillhistoryTaskStatus(pTask, pHisTask);
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,11 @@
|
||||||
#include "ttimer.h"
|
#include "ttimer.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
|
||||||
|
#define CHECK_NOT_RSP_DURATION 10*1000 // 10 sec
|
||||||
|
|
||||||
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
static void streamTaskDestroyUpstreamInfo(SUpstreamInfo* pUpstreamInfo);
|
||||||
|
static void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||||
|
static void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet);
|
||||||
|
|
||||||
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
|
||||||
int32_t childId = taosArrayGetSize(pArray);
|
int32_t childId = taosArrayGetSize(pArray);
|
||||||
|
@ -113,6 +117,9 @@ SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, SEpSet* pEpset,
|
||||||
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
pTask->inputq.status = TASK_INPUT_STATUS__NORMAL;
|
||||||
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
pTask->outputq.status = TASK_OUTPUT_STATUS__NORMAL;
|
||||||
|
|
||||||
|
pTask->taskCheckInfo.pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
|
||||||
|
taosThreadMutexInit(&pTask->taskCheckInfo.checkInfoLock, NULL);
|
||||||
|
|
||||||
if (fillHistory) {
|
if (fillHistory) {
|
||||||
ASSERT(hasFillhistory);
|
ASSERT(hasFillhistory);
|
||||||
}
|
}
|
||||||
|
@ -365,8 +372,9 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
|
||||||
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
|
||||||
" nextProcessVer:%" PRId64 ", checkpointCount:%d",
|
" nextProcessVer:%" PRId64 ", checkpointCount:%d",
|
||||||
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
|
taskId, pStatis->created, pStatis->checkTs, pStatis->readyTs, pStatis->updateCount,
|
||||||
pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer, pStatis->checkpoint);
|
pStatis->latestUpdateTs, pCkInfo->checkpointId, pCkInfo->checkpointVer, pCkInfo->nextProcessVer,
|
||||||
|
pStatis->checkpoint);
|
||||||
|
|
||||||
// remove the ref by timer
|
// remove the ref by timer
|
||||||
while (pTask->status.timerActive > 0) {
|
while (pTask->status.timerActive > 0) {
|
||||||
|
@ -423,9 +431,10 @@ void tFreeStreamTask(SStreamTask* pTask) {
|
||||||
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
|
tSimpleHashCleanup(pTask->outputInfo.tbSink.pTblInfo);
|
||||||
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
taosArrayDestroy(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
streamTaskCleanCheckInfo(&pTask->taskCheckInfo);
|
||||||
|
|
||||||
if (pTask->pState) {
|
if (pTask->pState) {
|
||||||
stDebug("s-task:0x%x start to free task state", taskId);
|
stDebug("s-task:0x%x start to free task state", taskId);
|
||||||
streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING);
|
streamStateClose(pTask->pState, status1 == TASK_STATUS__DROPPING);
|
||||||
|
@ -932,3 +941,321 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
||||||
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) {
|
||||||
|
if (pInfo->pList == NULL) {
|
||||||
|
pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo));
|
||||||
|
} else {
|
||||||
|
taosArrayClear(pInfo->pList);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
pInfo->notReadyTasks = 1;
|
||||||
|
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
pInfo->notReadyTasks = taosArrayGetSize(pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos);
|
||||||
|
ASSERT(pInfo->notReadyTasks == pOutputInfo->shuffleDispatcher.dbInfo.vgNum);
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->startTs = startTs;
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id) {
|
||||||
|
SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .reqId = reqId, .rspTs = 0};
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||||
|
if (p->taskId == taskId) {
|
||||||
|
stDebug("s-task:%s check info to task:0x%x already sent", id, taskId);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pInfo->pList, &info);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId,
|
||||||
|
int32_t* pNotReady, const char* id) {
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||||
|
if (p->taskId == taskId) {
|
||||||
|
ASSERT(reqId == p->reqId);
|
||||||
|
p->status = status;
|
||||||
|
p->rspTs = rspTs;
|
||||||
|
|
||||||
|
// count down one, since it is ready now
|
||||||
|
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||||
|
*pNotReady = atomic_sub_fetch_32(&pInfo->notReadyTasks, 1);
|
||||||
|
} else {
|
||||||
|
*pNotReady = pInfo->notReadyTasks;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
stError("s-task:%s unexpected check rsp msg, downstream task:0x%x, reqId:%"PRIx64, id, taskId, reqId);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
if (pInfo->inCheckProcess == 0) {
|
||||||
|
pInfo->inCheckProcess = 1;
|
||||||
|
} else {
|
||||||
|
ASSERT(pInfo->startTs > 0);
|
||||||
|
stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
stDebug("s-task:%s set the in check procedure flag", id);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) {
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
if (!pInfo->inCheckProcess) {
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t el = taosGetTimestampMs() - pInfo->startTs;
|
||||||
|
stDebug("s-task:%s check downstream completed, elapsed time:%" PRId64 " ms", id, el);
|
||||||
|
|
||||||
|
pInfo->startTs = 0;
|
||||||
|
pInfo->inCheckProcess = 0;
|
||||||
|
pInfo->notReadyTasks = 0;
|
||||||
|
taosArrayClear(pInfo->pList);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
|
||||||
|
SStreamTaskCheckReq req = {
|
||||||
|
.streamId = pTask->id.streamId,
|
||||||
|
.upstreamTaskId = pTask->id.taskId,
|
||||||
|
.upstreamNodeId = pTask->info.nodeId,
|
||||||
|
.childId = pTask->info.selfChildId,
|
||||||
|
.stage = pTask->pMeta->stage,
|
||||||
|
};
|
||||||
|
|
||||||
|
STaskOutputInfo* pOutputInfo = &pTask->outputInfo;
|
||||||
|
if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) {
|
||||||
|
req.reqId = p->reqId;
|
||||||
|
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
|
||||||
|
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
|
||||||
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64,
|
||||||
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
|
||||||
|
|
||||||
|
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
|
||||||
|
} else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||||
|
SArray* vgInfo = pOutputInfo->shuffleDispatcher.dbInfo.pVgroupInfos;
|
||||||
|
int32_t numOfVgs = taosArrayGetSize(vgInfo);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfVgs; i++) {
|
||||||
|
SVgroupInfo* pVgInfo = taosArrayGet(vgInfo, i);
|
||||||
|
|
||||||
|
if (p->taskId == pVgInfo->taskId) {
|
||||||
|
req.reqId = p->reqId;
|
||||||
|
req.downstreamNodeId = pVgInfo->vgId;
|
||||||
|
req.downstreamTaskId = pVgInfo->taskId;
|
||||||
|
|
||||||
|
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x (vgId:%d) (shuffle), idx:%d",
|
||||||
|
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i);
|
||||||
|
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void rspMonitorFn(void* param, void* tmrId) {
|
||||||
|
SStreamTask* pTask = param;
|
||||||
|
SStreamTaskState* pStat = streamTaskGetStatus(pTask);
|
||||||
|
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
|
||||||
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
int64_t el = now - pInfo->startTs;
|
||||||
|
ETaskStatus state = pStat->state;
|
||||||
|
|
||||||
|
int32_t numOfReady = 0;
|
||||||
|
int32_t numOfFault = 0;
|
||||||
|
|
||||||
|
stDebug("s-task:%s start to do check downstream rsp check", pTask->id.idStr);
|
||||||
|
|
||||||
|
if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s status:%s vgId:%d quit from monitor rsp tmr, ref:%d", pTask->id.idStr, pStat->name, vgId, ref);
|
||||||
|
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pInfo->checkInfoLock);
|
||||||
|
if (pInfo->notReadyTasks == 0) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", pTask->id.idStr,
|
||||||
|
pStat->name, vgId, ref);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pNotReadyList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
|
||||||
|
|
||||||
|
if (pStat->state == TASK_STATUS__UNINIT) {
|
||||||
|
for (int32_t i = 0; i < taosArrayGetSize(pInfo->pList); ++i) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, i);
|
||||||
|
if (p->status == TASK_DOWNSTREAM_READY) {
|
||||||
|
numOfReady += 1;
|
||||||
|
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
|
||||||
|
stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", pTask->id.idStr,
|
||||||
|
p->taskId);
|
||||||
|
numOfFault += 1;
|
||||||
|
} else { // TASK_DOWNSTREAM_NOT_READY
|
||||||
|
if (p->rspTs == 0) { // not response yet
|
||||||
|
ASSERT(p->status == -1);
|
||||||
|
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
|
||||||
|
taosArrayPush(pTimeoutList, &p->taskId);
|
||||||
|
} else { // el < CHECK_NOT_RSP_DURATION
|
||||||
|
// do nothing and continue waiting for their rsps
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
taosArrayPush(pNotReadyList, &p->taskId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else { // unexpected status
|
||||||
|
stError("s-task:%s unexpected task status:%s during waiting for check rsp", pTask->id.idStr, pStat->name);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
|
||||||
|
int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
|
||||||
|
|
||||||
|
// fault tasks detected, not try anymore
|
||||||
|
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug(
|
||||||
|
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
|
||||||
|
"detected, ref:%d",
|
||||||
|
pTask->id.idStr, pStat->name, vgId, ref);
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
taosArrayDestroy(pNotReadyList);
|
||||||
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
|
||||||
|
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
// checking of downstream tasks has been stopped by other threads
|
||||||
|
if (pInfo->inCheckProcess == 0) {
|
||||||
|
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug(
|
||||||
|
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, "
|
||||||
|
"timeout:%d, ready:%d ref:%d",
|
||||||
|
pTask->id.idStr, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
|
||||||
|
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
|
||||||
|
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
|
||||||
|
streamMetaAddTaskLaunchResult(pTask->pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, pInfo->startTs, now, false);
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfNotReady > 0) { // check to make sure not in recheck timer
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
// reset the info, and send the check msg to failure downstream again
|
||||||
|
for (int32_t i = 0; i < numOfNotReady; ++i) {
|
||||||
|
int32_t taskId = *(int32_t*)taosArrayGet(pNotReadyList, i);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||||
|
if (p->taskId == taskId) {
|
||||||
|
p->rspTs = 0;
|
||||||
|
p->status = -1;
|
||||||
|
doSendCheckMsg(pTask, p);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", pTask->id.idStr, numOfNotReady);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (numOfTimeout > 0) {
|
||||||
|
pInfo->startTs = now;
|
||||||
|
ASSERT(pTask->status.downstreamReady == 0);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTimeout; ++i) {
|
||||||
|
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < taosArrayGetSize(pInfo->pList); ++j) {
|
||||||
|
SDownstreamStatusInfo* p = taosArrayGet(pInfo->pList, j);
|
||||||
|
if (p->taskId == taskId) {
|
||||||
|
ASSERT(p->status == -1 && p->rspTs == 0);
|
||||||
|
doSendCheckMsg(pTask, p);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, pTask->id.idStr,
|
||||||
|
numOfTimeout, now);
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&pInfo->checkInfoLock);
|
||||||
|
|
||||||
|
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
|
||||||
|
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", pTask->id.idStr,
|
||||||
|
numOfNotReady, numOfFault, numOfTimeout, numOfReady);
|
||||||
|
|
||||||
|
taosArrayDestroy(pNotReadyList);
|
||||||
|
taosArrayDestroy(pTimeoutList);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) {
|
||||||
|
ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL);
|
||||||
|
|
||||||
|
int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1);
|
||||||
|
stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref);
|
||||||
|
pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) {
|
||||||
|
ASSERT(pInfo->inCheckProcess == 0);
|
||||||
|
|
||||||
|
pInfo->pList = taosArrayDestroy(pInfo->pList);
|
||||||
|
if (pInfo->checkRspTmr != NULL) {
|
||||||
|
/*bool ret = */ taosTmrStop(pInfo->checkRspTmr);
|
||||||
|
pInfo->checkRspTmr = NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexDestroy(&pInfo->checkInfoLock);
|
||||||
|
}
|
|
@ -80,10 +80,10 @@ static int32_t attachWaitedEvent(SStreamTask* pTask, SFutureHandleEventInfo* pEv
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
int32_t streamTaskInitStatus(SStreamTask* pTask) {
|
||||||
pTask->execInfo.init = taosGetTimestampMs();
|
pTask->execInfo.checkTs = taosGetTimestampMs();
|
||||||
|
|
||||||
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
|
stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr,
|
||||||
pTask->execInfo.init);
|
pTask->execInfo.checkTs);
|
||||||
|
|
||||||
streamTaskCheckDownstream(pTask);
|
streamTaskCheckDownstream(pTask);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue