fix(stream): do checkpoint after fill-history task completed.
This commit is contained in:
parent
996e2939a9
commit
d973f66cea
|
@ -803,6 +803,7 @@ SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask);
|
||||||
const char* streamTaskGetStatusStr(ETaskStatus status);
|
const char* streamTaskGetStatusStr(ETaskStatus status);
|
||||||
void streamTaskResetStatus(SStreamTask* pTask);
|
void streamTaskResetStatus(SStreamTask* pTask);
|
||||||
void streamTaskSetStatusReady(SStreamTask* pTask);
|
void streamTaskSetStatusReady(SStreamTask* pTask);
|
||||||
|
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask);
|
||||||
|
|
||||||
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen);
|
||||||
|
|
||||||
|
|
|
@ -223,6 +223,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_UPDATE_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_STREAM_REQ_CHKPT, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_KILL_COMPACT_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
|
@ -61,15 +61,15 @@ static int32_t mndRetrieveStreamTask(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextStreamTask(SMnode *pMnode, void *pIter);
|
||||||
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
|
static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
|
static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq);
|
||||||
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
|
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
|
||||||
int64_t streamId, int32_t taskId, int32_t transId);
|
int64_t streamId, int32_t taskId, int32_t transId, int8_t mndTrigger);
|
||||||
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
static int32_t mndProcessNodeCheck(SRpcMsg *pReq);
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg);
|
||||||
static SArray *extractNodeListFromStream(SMnode *pMnode);
|
static SArray *extractNodeListFromStream(SMnode *pMnode);
|
||||||
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
static SArray *mndTakeVgroupSnapshot(SMnode *pMnode, bool *allReady);
|
||||||
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
static int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq);
|
||||||
static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
|
||||||
|
|
||||||
|
static SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId);
|
||||||
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
|
||||||
|
|
||||||
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
|
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, SRpcMsg *pReq, const char *name, const char *pMsg);
|
||||||
|
@ -983,8 +983,9 @@ static int32_t mndProcessStreamCheckpointTmr(SRpcMsg *pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
|
static int32_t mndBuildStreamCheckpointSourceReq(void **pBuf, int32_t *pLen, int32_t nodeId, int64_t checkpointId,
|
||||||
int64_t streamId, int32_t taskId, int32_t transId) {
|
int64_t streamId, int32_t taskId, int32_t transId,
|
||||||
|
int8_t mndTrigger) {
|
||||||
SStreamCheckpointSourceReq req = {0};
|
SStreamCheckpointSourceReq req = {0};
|
||||||
req.checkpointId = checkpointId;
|
req.checkpointId = checkpointId;
|
||||||
req.nodeId = nodeId;
|
req.nodeId = nodeId;
|
||||||
|
@ -992,7 +993,7 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
||||||
req.streamId = streamId; // pTask->id.streamId;
|
req.streamId = streamId; // pTask->id.streamId;
|
||||||
req.taskId = taskId; // pTask->id.taskId;
|
req.taskId = taskId; // pTask->id.taskId;
|
||||||
req.transId = transId;
|
req.transId = transId;
|
||||||
req.mndTrigger = 1;
|
req.mndTrigger = mndTrigger;
|
||||||
|
|
||||||
int32_t code;
|
int32_t code;
|
||||||
int32_t blen;
|
int32_t blen;
|
||||||
|
@ -1028,14 +1029,16 @@ static int32_t mndBuildStreamCheckpointSourceReq2(void **pBuf, int32_t *pLen, in
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId) {
|
static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStream, int64_t checkpointId,
|
||||||
|
int8_t mndTrigger, bool lock) {
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
int64_t timestampMs = taosGetTimestampMs();
|
int64_t ts = taosGetTimestampMs();
|
||||||
if (timestampMs - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000) {
|
if (mndTrigger == 1 && (ts - pStream->checkpointFreq < tsStreamCheckpointInterval * 1000)) {
|
||||||
|
// mWarn("checkpoint interval less than the threshold, ignore it");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, true);
|
bool conflict = mndStreamTransConflictCheck(pMnode, pStream->uid, MND_STREAM_CHECKPOINT_NAME, lock);
|
||||||
if (conflict) {
|
if (conflict) {
|
||||||
mndAddtoCheckpointWaitingList(pStream, checkpointId);
|
mndAddtoCheckpointWaitingList(pStream, checkpointId);
|
||||||
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
|
mWarn("checkpoint conflict with other trans in %s, ignore the checkpoint for stream:%s %" PRIx64, pStream->sourceDb,
|
||||||
|
@ -1081,8 +1084,8 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
|
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
|
||||||
pTask->id.taskId, pTrans->id) < 0) {
|
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
taosWUnLockLatch(&pStream->lock);
|
taosWUnLockLatch(&pStream->lock);
|
||||||
goto _ERR;
|
goto _ERR;
|
||||||
|
@ -1126,80 +1129,6 @@ _ERR:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAddStreamCheckpointToTrans(STrans *pTrans, SStreamObj *pStream, SMnode *pMnode, int64_t chkptId) {
|
|
||||||
taosWLockLatch(&pStream->lock);
|
|
||||||
|
|
||||||
int32_t totLevel = taosArrayGetSize(pStream->tasks);
|
|
||||||
for (int32_t i = 0; i < totLevel; i++) {
|
|
||||||
SArray *pLevel = taosArrayGetP(pStream->tasks, i);
|
|
||||||
SStreamTask *pTask = taosArrayGetP(pLevel, 0);
|
|
||||||
|
|
||||||
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
|
|
||||||
int32_t sz = taosArrayGetSize(pLevel);
|
|
||||||
for (int32_t j = 0; j < sz; j++) {
|
|
||||||
pTask = taosArrayGetP(pLevel, j);
|
|
||||||
if (pTask->info.fillHistory == 1) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
/*A(pTask->info.nodeId > 0);*/
|
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
|
||||||
if (pVgObj == NULL) {
|
|
||||||
taosWUnLockLatch(&pStream->lock);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
void *buf;
|
|
||||||
int32_t tlen;
|
|
||||||
if (mndBuildStreamCheckpointSourceReq2(&buf, &tlen, pTask->info.nodeId, chkptId, pTask->id.streamId,
|
|
||||||
pTask->id.taskId, pTrans->id) < 0) {
|
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
|
||||||
taosWUnLockLatch(&pStream->lock);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
STransAction action = {0};
|
|
||||||
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
|
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
|
||||||
|
|
||||||
initTransAction(&action, buf, tlen, TDMT_VND_STREAM_CHECK_POINT_SOURCE, &epset,
|
|
||||||
TSDB_CODE_SYN_PROPOSE_NOT_READY);
|
|
||||||
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(buf);
|
|
||||||
taosWUnLockLatch(&pStream->lock);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pStream->checkpointId = chkptId;
|
|
||||||
pStream->checkpointFreq = taosGetTimestampMs();
|
|
||||||
pStream->currentTick = 0;
|
|
||||||
|
|
||||||
// 3. commit log: stream checkpoint info
|
|
||||||
pStream->version = pStream->version + 1;
|
|
||||||
|
|
||||||
taosWUnLockLatch(&pStream->lock);
|
|
||||||
|
|
||||||
SSdbRaw *pCommitRaw = mndStreamActionEncode(pStream);
|
|
||||||
if (pCommitRaw == NULL) {
|
|
||||||
mError("failed to prepare trans rebalance since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
|
||||||
sdbFreeRaw(pCommitRaw);
|
|
||||||
mError("failed to prepare trans rebalance since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) {
|
|
||||||
sdbFreeRaw(pCommitRaw);
|
|
||||||
mError("failed to prepare trans rebalance since %s", terrstr());
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t initStreamNodeList(SMnode *pMnode) {
|
static int32_t initStreamNodeList(SMnode *pMnode) {
|
||||||
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
|
if (execInfo.pNodeList == NULL || (taosArrayGetSize(execInfo.pNodeList) == 0)) {
|
||||||
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
|
execInfo.pNodeList = taosArrayDestroy(execInfo.pNodeList);
|
||||||
|
@ -1296,9 +1225,10 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// make sure the time interval between two consecutive checkpoint trans is long enough
|
||||||
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
|
SMStreamDoCheckpointMsg *pMsg = (SMStreamDoCheckpointMsg *)pReq->pCont;
|
||||||
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
while ((pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream)) != NULL) {
|
||||||
code = mndProcessStreamCheckpointTrans(pMnode, pStream, pMsg->checkpointId);
|
code = mndProcessStreamCheckpointTrans(pMnode, pStream, pMsg->checkpointId, 1, true);
|
||||||
sdbRelease(pSdb, pStream);
|
sdbRelease(pSdb, pStream);
|
||||||
if (code == -1) {
|
if (code == -1) {
|
||||||
break;
|
break;
|
||||||
|
@ -1335,7 +1265,7 @@ static int32_t mndProcessStreamCheckpointInCandid(SRpcMsg *pReq) {
|
||||||
|
|
||||||
mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId);
|
mDebug("start to launch checkpoint for stream:%s %" PRIx64 " in candidate list", pEntry->pName, pEntry->streamId);
|
||||||
|
|
||||||
code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId);
|
code = mndProcessStreamCheckpointTrans(pMnode, ps, pEntry->checkpointId, 1, true);
|
||||||
mndReleaseStream(pMnode, ps);
|
mndReleaseStream(pMnode, ps);
|
||||||
|
|
||||||
if (code == TSDB_CODE_SUCCESS) {
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
|
@ -2905,6 +2835,16 @@ static SStreamTask *mndGetStreamTask(STaskId *pId, SStreamObj *pStream) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t mndGetNumOfStreamTasks(const SStreamObj *pStream) {
|
||||||
|
int32_t num = 0;
|
||||||
|
for(int32_t i = 0; i < taosArrayGetSize(pStream->tasks); ++i) {
|
||||||
|
SArray* pLevel = taosArrayGetP(pStream->tasks, i);
|
||||||
|
num += taosArrayGetSize(pLevel);
|
||||||
|
}
|
||||||
|
|
||||||
|
return num;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
int32_t setNodeEpsetExpiredFlag(const SArray *pNodeList) {
|
||||||
int32_t num = taosArrayGetSize(pNodeList);
|
int32_t num = taosArrayGetSize(pNodeList);
|
||||||
mInfo("set node expired for %d nodes", num);
|
mInfo("set node expired for %d nodes", num);
|
||||||
|
@ -3082,6 +3022,18 @@ SStreamObj *mndGetStreamObj(SMnode *pMnode, int64_t streamId) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void doAddTaskId(SArray* pList, int32_t taskId) {
|
||||||
|
int32_t num = taosArrayGetSize(pList);
|
||||||
|
for(int32_t i = 0; i < num; ++i) {
|
||||||
|
int32_t* pId = taosArrayGet(pList, i);
|
||||||
|
if (taskId == *pId) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pList, &taskId);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
SMnode *pMnode = pReq->info.node;
|
SMnode *pMnode = pReq->info.node;
|
||||||
|
|
||||||
|
@ -3101,6 +3053,39 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
|
mDebug("receive stream task checkpoint req msg, vgId:%d, s-task:0x%x", req.nodeId, req.taskId);
|
||||||
|
|
||||||
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
|
// register to the stream task done map, if all tasks has sent this kinds of message, start the checkpoint trans.
|
||||||
|
taosThreadMutexLock(&execInfo.lock);
|
||||||
|
|
||||||
|
SStreamObj *pStream = mndGetStreamObj(pMnode, req.streamId);
|
||||||
|
int32_t numOfTasks = mndGetNumOfStreamTasks(pStream);
|
||||||
|
|
||||||
|
void **pReqTaskList = taosHashGet(execInfo.pTransferStateStreams, &req.streamId, sizeof(req.streamId));
|
||||||
|
if (pReqTaskList == NULL) {
|
||||||
|
SArray *pList = taosArrayInit(4, sizeof(int32_t));
|
||||||
|
doAddTaskId(pList, req.taskId);
|
||||||
|
taosHashPut(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t), &pList, sizeof(void *));
|
||||||
|
mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", pStream->uid, 1, numOfTasks - 1);
|
||||||
|
|
||||||
|
} else {
|
||||||
|
doAddTaskId(*pReqTaskList, req.taskId);
|
||||||
|
|
||||||
|
int32_t total = taosArrayGetSize(*pReqTaskList);
|
||||||
|
if (total == numOfTasks) { // all tasks has send the reqs
|
||||||
|
int64_t checkpointId = mndStreamGenChkpId(pMnode);
|
||||||
|
mDebug("stream:0x%" PRIx64 " all tasks req, start checkpointId:%" PRId64, pStream->uid, checkpointId);
|
||||||
|
|
||||||
|
// TODO:handle error
|
||||||
|
int32_t code = mndProcessStreamCheckpointTrans(pMnode, pStream, checkpointId, 0, false);
|
||||||
|
|
||||||
|
// remove this entry
|
||||||
|
taosHashRemove(execInfo.pTransferStateStreams, &req.streamId, sizeof(int64_t));
|
||||||
|
int32_t numOfStreams = taosHashGetSize(execInfo.pTransferStateStreams);
|
||||||
|
mDebug("stream:0x%" PRIx64 " removed, remain streams:%d fill-history not completed", pStream->uid, numOfStreams);
|
||||||
|
} else {
|
||||||
|
mDebug("stream:0x%" PRIx64 " receive %d reqs for checkpoint, remain:%d", pStream->uid, total, numOfTasks - total);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
|
@ -1169,8 +1169,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
ETaskStatus status = streamTaskGetStatus(pTask)->state;
|
||||||
|
|
||||||
|
if (req.mndTrigger == 1) {
|
||||||
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
if (status == TASK_STATUS__HALT || status == TASK_STATUS__PAUSE) {
|
||||||
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore this checkpoint:%" PRId64 ", set it failure",
|
tqError("s-task:%s not ready for checkpoint, since it is halt, ignore checkpoint:%" PRId64 ", set it failure",
|
||||||
pTask->id.idStr, req.checkpointId);
|
pTask->id.idStr, req.checkpointId);
|
||||||
|
|
||||||
taosThreadMutexUnlock(&pTask->lock);
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
@ -1182,6 +1183,9 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
ASSERT(status == TASK_STATUS__HALT);
|
||||||
|
}
|
||||||
|
|
||||||
// check if the checkpoint msg already sent or not.
|
// check if the checkpoint msg already sent or not.
|
||||||
if (status == TASK_STATUS__CK) {
|
if (status == TASK_STATUS__CK) {
|
||||||
|
|
|
@ -56,13 +56,6 @@ struct SStreamTaskSM {
|
||||||
SArray* pWaitingEventList;
|
SArray* pWaitingEventList;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct SStreamEventInfo {
|
|
||||||
EStreamTaskEvent event;
|
|
||||||
const char* name;
|
|
||||||
} SStreamEventInfo;
|
|
||||||
|
|
||||||
// SStreamTaskSM* streamCreateStateMachine(SStreamTask* pTask);
|
|
||||||
// void* streamDestroyStateMachine(SStreamTaskSM* pSM);
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -153,7 +153,8 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
||||||
|
|
||||||
// todo this status may not be set here.
|
// todo this status may not be set here.
|
||||||
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
// 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
|
||||||
streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
|
||||||
|
ASSERT(code == TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
pTask->chkInfo.transId = pReq->transId;
|
pTask->chkInfo.transId = pReq->transId;
|
||||||
pTask->chkInfo.checkpointingId = pReq->checkpointId;
|
pTask->chkInfo.checkpointingId = pReq->checkpointId;
|
||||||
|
@ -162,8 +163,7 @@ int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSo
|
||||||
pTask->execInfo.checkpoint += 1;
|
pTask->execInfo.checkpoint += 1;
|
||||||
|
|
||||||
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
// 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
|
||||||
int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER);
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
static int32_t continueDispatchCheckpointBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
|
||||||
|
@ -461,6 +461,7 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
int64_t startTs = pTask->chkInfo.startTs;
|
int64_t startTs = pTask->chkInfo.startTs;
|
||||||
int64_t ckId = pTask->chkInfo.checkpointingId;
|
int64_t ckId = pTask->chkInfo.checkpointingId;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
|
||||||
|
|
||||||
// sink task do not need to save the status, and generated the checkpoint
|
// sink task do not need to save the status, and generated the checkpoint
|
||||||
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
|
||||||
|
@ -499,6 +500,16 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if ((code == TSDB_CODE_SUCCESS) && dropRelHTask) {
|
||||||
|
// transferred from the halt status, it is done the fill-history procedure and finish with the checkpoint
|
||||||
|
// free it and remove fill-history task from disk meta-store
|
||||||
|
ASSERT(HAS_RELATED_FILLHISTORY_TASK(pTask));
|
||||||
|
SStreamTaskId hTaskId = {.streamId = pTask->hTaskInfo.id.streamId, .taskId = pTask->hTaskInfo.id.taskId};
|
||||||
|
|
||||||
|
stDebug("s-task:%s fill-history finish checkpoint done, drop related fill-history task:0x%x", id, hTaskId.taskId);
|
||||||
|
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pTask->pMeta->vgId, &hTaskId);
|
||||||
|
}
|
||||||
|
|
||||||
// clear the checkpoint info if failed
|
// clear the checkpoint info if failed
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
taosThreadMutexLock(&pTask->lock);
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
|
|
@ -402,8 +402,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
streamTaskSendCheckpointReq(pStreamTask);
|
streamTaskSendCheckpointReq(pStreamTask);
|
||||||
// streamTaskResume(pStreamTask);
|
// streamTaskResume(pStreamTask);
|
||||||
|
|
||||||
// stDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", id);
|
|
||||||
|
|
||||||
// 4. free it and remove fill-history task from disk meta-store
|
// 4. free it and remove fill-history task from disk meta-store
|
||||||
// streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
// streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
||||||
|
|
||||||
|
@ -413,7 +411,6 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
// 6. open the inputQ for all upstream tasks
|
// 6. open the inputQ for all upstream tasks
|
||||||
streamTaskOpenAllUpstreamInput(pStreamTask);
|
streamTaskOpenAllUpstreamInput(pStreamTask);
|
||||||
|
|
||||||
// streamSchedExec(pStreamTask);
|
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -434,11 +431,21 @@ int32_t streamTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
} else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
|
} else { // no state transfer for sink tasks, and drop fill-history task, followed by opening inputQ of sink task.
|
||||||
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
SStreamTask* pStreamTask = streamMetaAcquireTask(pMeta, pTask->streamTaskId.streamId, pTask->streamTaskId.taskId);
|
||||||
if (pStreamTask != NULL) {
|
if (pStreamTask != NULL) {
|
||||||
streamTaskOpenAllUpstreamInput(pStreamTask);
|
// halt the related stream sink task
|
||||||
|
code = streamTaskHandleEvent(pStreamTask->status.pSM, TASK_EVENT_HALT);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
stError("s-task:%s halt stream task:%s failed, code:%s not transfer state to stream task", pTask->id.idStr,
|
||||||
|
pStreamTask->id.idStr, tstrerror(code));
|
||||||
streamMetaReleaseTask(pMeta, pStreamTask);
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
return code;
|
||||||
|
} else {
|
||||||
|
stDebug("s-task:%s halt by related fill-history task:%s", pStreamTask->id.idStr, pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamBuildAndSendDropTaskMsg(pTask->pMsgCb, pMeta->vgId, &pTask->id);
|
streamTaskOpenAllUpstreamInput(pStreamTask);
|
||||||
|
streamTaskSendCheckpointReq(pStreamTask);
|
||||||
|
streamMetaReleaseTask(pMeta, pStreamTask);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
|
@ -702,7 +709,8 @@ bool streamTaskReadyToRun(const SStreamTask* pTask, char** pStatus) {
|
||||||
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
|
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
|
||||||
st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
|
st == TASK_STATUS__PAUSE || st == TASK_STATUS__HALT);
|
||||||
} else {
|
} else {
|
||||||
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK);
|
return (st == TASK_STATUS__READY || st == TASK_STATUS__SCAN_HISTORY || st == TASK_STATUS__CK ||
|
||||||
|
st == TASK_STATUS__HALT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1059,6 +1059,7 @@ int32_t tEncodeStreamTaskCheckpointReq(SEncoder* pEncoder, const SStreamTaskChec
|
||||||
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
if (tEncodeI64(pEncoder, pReq->streamId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->taskId) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
if (tEncodeI32(pEncoder, pReq->nodeId) < 0) return -1;
|
||||||
|
tEndEncode(pEncoder);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -859,7 +859,7 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) {
|
||||||
int32_t vgId = pTask->pMeta->vgId;
|
int32_t vgId = pTask->pMeta->vgId;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
SStreamTaskCheckpointReq req = {0};
|
SStreamTaskCheckpointReq req = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId, .nodeId = vgId};
|
||||||
tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
|
tEncodeSize(tEncodeStreamTaskCheckpointReq, &req, tlen, code);
|
||||||
if (code < 0) {
|
if (code < 0) {
|
||||||
stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
|
stError("s-task:%s vgId:%d encode stream task req checkpoint failed, code:%s", id, vgId, tstrerror(code));
|
||||||
|
|
|
@ -31,9 +31,13 @@ SStreamTaskState StreamTaskStatusList[9] = {
|
||||||
{.state = TASK_STATUS__HALT, .name = "halt"},
|
{.state = TASK_STATUS__HALT, .name = "halt"},
|
||||||
{.state = TASK_STATUS__PAUSE, .name = "paused"},
|
{.state = TASK_STATUS__PAUSE, .name = "paused"},
|
||||||
{.state = TASK_STATUS__CK, .name = "checkpoint"},
|
{.state = TASK_STATUS__CK, .name = "checkpoint"},
|
||||||
// {.state = TASK_STATUS__STREAM_SCAN_HISTORY, .name = "stream-scan-history"},
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
typedef struct SStreamEventInfo {
|
||||||
|
EStreamTaskEvent event;
|
||||||
|
const char* name;
|
||||||
|
} SStreamEventInfo;
|
||||||
|
|
||||||
SStreamEventInfo StreamTaskEventList[12] = {
|
SStreamEventInfo StreamTaskEventList[12] = {
|
||||||
{.event = 0, .name = ""}, // dummy event, place holder
|
{.event = 0, .name = ""}, // dummy event, place holder
|
||||||
{.event = TASK_EVENT_INIT, .name = "initialize"},
|
{.event = TASK_EVENT_INIT, .name = "initialize"},
|
||||||
|
@ -402,6 +406,10 @@ SStreamTaskState* streamTaskGetStatus(const SStreamTask* pTask) {
|
||||||
return &pTask->status.pSM->current; // copy one obj in case of multi-thread environment
|
return &pTask->status.pSM->current; // copy one obj in case of multi-thread environment
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask) {
|
||||||
|
return pTask->status.pSM->prev.state.state;
|
||||||
|
}
|
||||||
|
|
||||||
const char* streamTaskGetStatusStr(ETaskStatus status) {
|
const char* streamTaskGetStatusStr(ETaskStatus status) {
|
||||||
return StreamTaskStatusList[status].name;
|
return StreamTaskStatusList[status].name;
|
||||||
}
|
}
|
||||||
|
@ -497,6 +505,8 @@ void doInitStateTransferTable(void) {
|
||||||
// checkpoint related event
|
// checkpoint related event
|
||||||
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
|
trans = createStateTransform(TASK_STATUS__READY, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
trans = createStateTransform(TASK_STATUS__HALT, TASK_STATUS__CK, TASK_EVENT_GEN_CHECKPOINT, NULL, streamTaskDoCheckpoint, NULL, true);
|
||||||
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
|
trans = createStateTransform(TASK_STATUS__CK, TASK_STATUS__READY, TASK_EVENT_CHECKPOINT_DONE, NULL, NULL, NULL, true);
|
||||||
taosArrayPush(streamTaskSMTrans, &trans);
|
taosArrayPush(streamTaskSMTrans, &trans);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue