Merge pull request #27260 from taosdata/fix/3_liaohj
fix(stream): perform node update when mnode leader/follower switches.
This commit is contained in:
commit
e9cf8471ef
|
@ -473,7 +473,9 @@ typedef struct STaskStartInfo {
|
||||||
|
|
||||||
typedef struct STaskUpdateInfo {
|
typedef struct STaskUpdateInfo {
|
||||||
SHashObj* pTasks;
|
SHashObj* pTasks;
|
||||||
int32_t transId;
|
int32_t activeTransId;
|
||||||
|
int32_t completeTransId;
|
||||||
|
int64_t completeTs;
|
||||||
} STaskUpdateInfo;
|
} STaskUpdateInfo;
|
||||||
|
|
||||||
typedef struct SScanWalInfo {
|
typedef struct SScanWalInfo {
|
||||||
|
@ -753,8 +755,8 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta
|
||||||
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
|
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs);
|
||||||
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
|
void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SStreamTask* pHTask, int32_t transId,
|
||||||
int64_t startTs);
|
int64_t startTs);
|
||||||
void streamMetaClearUpdateTaskList(SStreamMeta* pMeta);
|
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta);
|
||||||
void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId);
|
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId);
|
||||||
|
|
||||||
void streamMetaRLock(SStreamMeta* pMeta);
|
void streamMetaRLock(SStreamMeta* pMeta);
|
||||||
void streamMetaRUnLock(SStreamMeta* pMeta);
|
void streamMetaRUnLock(SStreamMeta* pMeta);
|
||||||
|
|
|
@ -64,6 +64,8 @@ typedef struct SChkptReportInfo {
|
||||||
} SChkptReportInfo;
|
} SChkptReportInfo;
|
||||||
|
|
||||||
typedef struct SStreamExecInfo {
|
typedef struct SStreamExecInfo {
|
||||||
|
int32_t role;
|
||||||
|
bool switchFromFollower;
|
||||||
bool initTaskList;
|
bool initTaskList;
|
||||||
SArray *pNodeList;
|
SArray *pNodeList;
|
||||||
int64_t ts; // snapshot ts
|
int64_t ts; // snapshot ts
|
||||||
|
@ -153,6 +155,7 @@ int32_t streamTaskIterGetCurrent(SStreamTaskIter *pIter, SStreamTask **pTask);
|
||||||
int32_t mndInitExecInfo();
|
int32_t mndInitExecInfo();
|
||||||
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
|
void mndInitStreamExecInfo(SMnode *pMnode, SStreamExecInfo *pExecInfo);
|
||||||
void mndInitStreamExecInfoForLeader(SMnode *pMnode);
|
void mndInitStreamExecInfoForLeader(SMnode *pMnode);
|
||||||
|
void mndInitStreamExecInfoUpdateRole(SMnode *pMnode, int32_t role);
|
||||||
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
|
int32_t removeExpiredNodeEntryAndTaskInBuf(SArray *pNodeSnapshot);
|
||||||
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecInfo *pExecNode);
|
||||||
|
|
||||||
|
|
|
@ -144,7 +144,7 @@ int32_t mndInitStream(SMnode *pMnode) {
|
||||||
|
|
||||||
code = sdbSetTable(pMnode->pSdb, table);
|
code = sdbSetTable(pMnode->pSdb, table);
|
||||||
if (code) {
|
if (code) {
|
||||||
return terrno;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = sdbSetTable(pMnode->pSdb, tableSeq);
|
code = sdbSetTable(pMnode->pSdb, tableSeq);
|
||||||
|
@ -2024,7 +2024,7 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo, bool includeAllNodes) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
@ -2069,12 +2069,14 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void *p = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
if (!includeAllNodes) {
|
||||||
void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
|
void *p1 = taosHashGet(pChangeInfo->pDBMap, pStream->targetDb, strlen(pStream->targetDb));
|
||||||
if (p == NULL && p1 == NULL) {
|
void *p2 = taosHashGet(pChangeInfo->pDBMap, pStream->sourceDb, strlen(pStream->sourceDb));
|
||||||
mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
|
if (p1 == NULL && p2 == NULL) {
|
||||||
sdbRelease(pSdb, pStream);
|
mDebug("stream:0x%" PRIx64 " %s not involved nodeUpdate, ignore", pStream->uid, pStream->name);
|
||||||
continue;
|
sdbRelease(pSdb, pStream);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
mDebug("stream:0x%" PRIx64 " %s involved node changed, create update trans, transId:%d", pStream->uid,
|
||||||
|
@ -2192,11 +2194,36 @@ static int32_t refreshNodeListFromExistedStreams(SMnode *pMnode, SArray *pNodeLi
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t addAllDbsIntoHashmap(SHashObj *pDBMap, SSdb *pSdb) {
|
||||||
|
void *pIter = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
while (1) {
|
||||||
|
SVgObj *pVgroup = NULL;
|
||||||
|
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||||
|
if (pIter == NULL) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
code = taosHashPut(pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
|
||||||
|
sdbRelease(pSdb, pVgroup);
|
||||||
|
|
||||||
|
if (code == 0) {
|
||||||
|
int32_t size = taosHashGetSize(pDBMap);
|
||||||
|
mDebug("add Db:%s into Dbs list (total:%d) for kill checkpoint trans", pVgroup->dbName, size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
// this function runs by only one thread, so it is not multi-thread safe
|
// this function runs by only one thread, so it is not multi-thread safe
|
||||||
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
bool allReady = true;
|
bool allReady = true;
|
||||||
SArray *pNodeSnapshot = NULL;
|
SArray *pNodeSnapshot = NULL;
|
||||||
|
SMnode *pMnode = pMsg->info.node;
|
||||||
|
int64_t ts = taosGetTimestampSec();
|
||||||
|
bool updateAllVgroups = false;
|
||||||
|
|
||||||
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
int32_t old = atomic_val_compare_exchange_32(&mndNodeCheckSentinel, 0, 1);
|
||||||
if (old != 0) {
|
if (old != 0) {
|
||||||
|
@ -2204,10 +2231,7 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("start to do node change checking");
|
mDebug("start to do node changing check");
|
||||||
int64_t ts = taosGetTimestampSec();
|
|
||||||
|
|
||||||
SMnode *pMnode = pMsg->info.node;
|
|
||||||
|
|
||||||
streamMutexLock(&execInfo.lock);
|
streamMutexLock(&execInfo.lock);
|
||||||
int32_t numOfNodes = extractStreamNodeList(pMnode);
|
int32_t numOfNodes = extractStreamNodeList(pMnode);
|
||||||
|
@ -2240,10 +2264,20 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execInfo.pNodeList, pNodeSnapshot);
|
||||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
|
||||||
|
{
|
||||||
|
if (execInfo.role == NODE_ROLE_LEADER && execInfo.switchFromFollower) {
|
||||||
|
mInfo("rollback all stream due to mnode leader/follower switch by using nodeUpdate trans");
|
||||||
|
updateAllVgroups = true;
|
||||||
|
execInfo.switchFromFollower = false; // reset the flag
|
||||||
|
(void) addAllDbsIntoHashmap(changeInfo.pDBMap, pMnode->pSdb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0 || updateAllVgroups) {
|
||||||
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
// kill current active checkpoint transaction, since the transaction is vnode wide.
|
||||||
killAllCheckpointTrans(pMnode, &changeInfo);
|
killAllCheckpointTrans(pMnode, &changeInfo);
|
||||||
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
code = mndProcessVgroupChange(pMnode, &changeInfo, updateAllVgroups);
|
||||||
|
|
||||||
// keep the new vnode snapshot if success
|
// keep the new vnode snapshot if success
|
||||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
@ -2284,6 +2318,9 @@ static int32_t mndProcessNodeCheck(SRpcMsg *pReq) {
|
||||||
|
|
||||||
int32_t size = sizeof(SMStreamNodeCheckMsg);
|
int32_t size = sizeof(SMStreamNodeCheckMsg);
|
||||||
SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
|
SMStreamNodeCheckMsg *pMsg = rpcMallocCont(size);
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_STREAM_NODECHANGE_CHECK, .pCont = pMsg, .contLen = size};
|
||||||
return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
return tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
@ -2459,6 +2496,10 @@ int32_t mndProcessStreamReqCheckpoint(SRpcMsg *pReq) {
|
||||||
{
|
{
|
||||||
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
|
SRpcMsg rsp = {.code = 0, .info = pReq->info, .contLen = sizeof(SMStreamReqCheckpointRsp)};
|
||||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||||
|
if (rsp.pCont == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
SMsgHead *pHead = rsp.pCont;
|
SMsgHead *pHead = rsp.pCont;
|
||||||
pHead->vgId = htonl(req.nodeId);
|
pHead->vgId = htonl(req.nodeId);
|
||||||
|
|
||||||
|
@ -2667,11 +2708,13 @@ static int64_t getConsensusId(int64_t streamId, int32_t numOfTasks, int32_t* pEx
|
||||||
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
|
static void doSendQuickRsp(SRpcHandleInfo *pInfo, int32_t msgSize, int32_t vgId, int32_t code) {
|
||||||
SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
|
SRpcMsg rsp = {.code = code, .info = *pInfo, .contLen = msgSize};
|
||||||
rsp.pCont = rpcMallocCont(rsp.contLen);
|
rsp.pCont = rpcMallocCont(rsp.contLen);
|
||||||
SMsgHead *pHead = rsp.pCont;
|
if (rsp.pCont != NULL) {
|
||||||
pHead->vgId = htonl(vgId);
|
SMsgHead *pHead = rsp.pCont;
|
||||||
|
pHead->vgId = htonl(vgId);
|
||||||
|
|
||||||
tmsgSendRsp(&rsp);
|
tmsgSendRsp(&rsp);
|
||||||
pInfo->handle = NULL; // disable auto rsp
|
pInfo->handle = NULL; // disable auto rsp
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||||
|
@ -2808,6 +2851,10 @@ static int32_t mndProcessCreateStreamReqFromMNode(SRpcMsg *pReq) {
|
||||||
int32_t code = mndProcessCreateStreamReq(pReq);
|
int32_t code = mndProcessCreateStreamReq(pReq);
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
pReq->info.rsp = rpcMallocCont(1);
|
pReq->info.rsp = rpcMallocCont(1);
|
||||||
|
if (pReq->info.rsp == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
pReq->info.rspLen = 1;
|
pReq->info.rspLen = 1;
|
||||||
pReq->info.noResp = false;
|
pReq->info.noResp = false;
|
||||||
pReq->code = code;
|
pReq->code = code;
|
||||||
|
@ -2819,6 +2866,10 @@ static int32_t mndProcessDropStreamReqFromMNode(SRpcMsg *pReq) {
|
||||||
int32_t code = mndProcessDropStreamReq(pReq);
|
int32_t code = mndProcessDropStreamReq(pReq);
|
||||||
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
pReq->info.rsp = rpcMallocCont(1);
|
pReq->info.rsp = rpcMallocCont(1);
|
||||||
|
if (pReq->info.rsp == NULL) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
pReq->info.rspLen = 1;
|
pReq->info.rspLen = 1;
|
||||||
pReq->info.noResp = false;
|
pReq->info.noResp = false;
|
||||||
pReq->code = code;
|
pReq->code = code;
|
||||||
|
@ -2841,6 +2892,36 @@ void mndInitStreamExecInfoForLeader(SMnode* pMnode) {
|
||||||
mndInitStreamExecInfo(pMnode, &execInfo);
|
mndInitStreamExecInfo(pMnode, &execInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void mndInitStreamExecInfoUpdateRole(SMnode* pMnode, int32_t role) {
|
||||||
|
execInfo.switchFromFollower = false;
|
||||||
|
|
||||||
|
if (execInfo.role == NODE_ROLE_UNINIT) {
|
||||||
|
execInfo.role = role;
|
||||||
|
if (role == NODE_ROLE_LEADER) {
|
||||||
|
mInfo("init mnode is set to leader");
|
||||||
|
} else {
|
||||||
|
mInfo("init mnode is set to follower");
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (role == NODE_ROLE_LEADER) {
|
||||||
|
if (execInfo.role == NODE_ROLE_FOLLOWER) {
|
||||||
|
execInfo.role = role;
|
||||||
|
execInfo.switchFromFollower = true;
|
||||||
|
mInfo("mnode switch to be leader from follower");
|
||||||
|
} else {
|
||||||
|
mInfo("mnode remain to be leader, do nothing");
|
||||||
|
}
|
||||||
|
} else { // follower's
|
||||||
|
if (execInfo.role == NODE_ROLE_LEADER) {
|
||||||
|
execInfo.role = role;
|
||||||
|
mInfo("mnode switch to be follower from leader");
|
||||||
|
} else {
|
||||||
|
mInfo("mnode remain to be follower, do nothing");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
|
void addAllStreamTasksIntoBuf(SMnode *pMnode, SStreamExecInfo *pExecInfo) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SStreamObj *pStream = NULL;
|
SStreamObj *pStream = NULL;
|
||||||
|
|
|
@ -334,5 +334,5 @@ void killAllCheckpointTrans(SMnode *pMnode, SVgroupChangeInfo *pChangeInfo) {
|
||||||
taosMemoryFree(p);
|
taosMemoryFree(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
mDebug("complete clear checkpoints in Dbs");
|
mDebug("complete clear checkpoints in all Dbs");
|
||||||
}
|
}
|
||||||
|
|
|
@ -785,6 +785,9 @@ int32_t mndInitExecInfo() {
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
execInfo.role = NODE_ROLE_UNINIT;
|
||||||
|
execInfo.switchFromFollower = false;
|
||||||
|
|
||||||
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
taosHashSetFreeFp(execInfo.pTransferStateStreams, freeTaskList);
|
||||||
taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
|
taosHashSetFreeFp(execInfo.pChkptStreams, freeTaskList);
|
||||||
taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
|
taosHashSetFreeFp(execInfo.pStreamConsensus, freeTaskList);
|
||||||
|
|
|
@ -363,6 +363,8 @@ static void mndBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
(void)tsem_post(&pMgmt->syncSem);
|
(void)tsem_post(&pMgmt->syncSem);
|
||||||
}
|
}
|
||||||
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
(void)taosThreadMutexUnlock(&pMgmt->lock);
|
||||||
|
|
||||||
|
mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_FOLLOWER);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
|
@ -385,6 +387,8 @@ static void mndBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
static void mndBecomeLeader(const SSyncFSM *pFsm) {
|
||||||
mInfo("vgId:1, become leader");
|
mInfo("vgId:1, become leader");
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
|
|
||||||
|
mndInitStreamExecInfoUpdateRole(pMnode, NODE_ROLE_LEADER);
|
||||||
mndInitStreamExecInfoForLeader(pMnode);
|
mndInitStreamExecInfoForLeader(pMnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -193,28 +193,23 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
SStreamTask* pTask = *ppTask;
|
SStreamTask* pTask = *ppTask;
|
||||||
const char* idstr = pTask->id.idStr;
|
const char* idstr = pTask->id.idStr;
|
||||||
|
|
||||||
if (pMeta->updateInfo.transId == -1) { // info needs to be kept till the new trans to update the nodeEp arrived.
|
if (req.transId <= 0) {
|
||||||
streamMetaInitUpdateTaskList(pMeta, req.transId);
|
tqError("vgId:%d invalid update nodeEp task, transId:%d, discard", vgId, req.taskId);
|
||||||
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
|
taosArrayDestroy(req.pNodeList);
|
||||||
|
return rsp.code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pMeta->updateInfo.transId != req.transId) {
|
// info needs to be kept till the new trans to update the nodeEp arrived.
|
||||||
if (req.transId < pMeta->updateInfo.transId) {
|
bool update = streamMetaInitUpdateTaskList(pMeta, req.transId);
|
||||||
tqError("s-task:%s vgId:%d disorder update nodeEp msg recv, discarded, newest transId:%d, recv:%d", idstr, vgId,
|
if (!update) {
|
||||||
pMeta->updateInfo.transId, req.transId);
|
rsp.code = TSDB_CODE_SUCCESS;
|
||||||
rsp.code = TSDB_CODE_SUCCESS;
|
streamMetaWUnLock(pMeta);
|
||||||
streamMetaWUnLock(pMeta);
|
|
||||||
|
|
||||||
taosArrayDestroy(req.pNodeList);
|
taosArrayDestroy(req.pNodeList);
|
||||||
return rsp.code;
|
return rsp.code;
|
||||||
} else {
|
|
||||||
tqInfo("s-task:%s vgId:%d receive new trans to update nodeEp msg from mnode, transId:%d, prev transId:%d", idstr,
|
|
||||||
vgId, req.transId, pMeta->updateInfo.transId);
|
|
||||||
// info needs to be kept till the new trans to update the nodeEp arrived.
|
|
||||||
streamMetaInitUpdateTaskList(pMeta, req.transId);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
tqDebug("s-task:%s vgId:%d recv trans to update nodeEp from mnode, transId:%d, recorded update transId:%d", idstr,
|
|
||||||
vgId, req.transId, pMeta->updateInfo.transId);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// duplicate update epset msg received, discard this redundant message
|
// duplicate update epset msg received, discard this redundant message
|
||||||
|
@ -311,7 +306,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaClearUpdateTaskList(pMeta);
|
streamMetaClearSetUpdateTaskListComplete(pMeta);
|
||||||
|
|
||||||
if (!restored) {
|
if (!restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not start all tasks", vgId);
|
||||||
|
@ -775,8 +770,8 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) {
|
||||||
streamMetaWUnLock(pMeta);
|
streamMetaWUnLock(pMeta);
|
||||||
|
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d", vgId,
|
tqInfo("vgId:%d tasks are all updated and stopped, restart all tasks, triggered by transId:%d, ts:%" PRId64, vgId,
|
||||||
pMeta->updateInfo.transId);
|
pMeta->updateInfo.completeTransId, pMeta->updateInfo.completeTs);
|
||||||
|
|
||||||
while (streamMetaTaskInTimer(pMeta)) {
|
while (streamMetaTaskInTimer(pMeta)) {
|
||||||
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
|
||||||
|
@ -902,7 +897,7 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
|
||||||
return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
|
return restartStreamTasks(pMeta, (pMeta->role == NODE_ROLE_LEADER));
|
||||||
} else {
|
} else {
|
||||||
if (pStartInfo->restartCount == 0) {
|
if (pStartInfo->restartCount == 0) {
|
||||||
tqDebug("vgId:%d start all tasks completed in callbackFn, restartCount is 0", pMeta->vgId);
|
tqDebug("vgId:%d start all tasks completed in callbackFn, restartCounter is 0", pMeta->vgId);
|
||||||
} else if (allReady) {
|
} else if (allReady) {
|
||||||
pStartInfo->restartCount = 0;
|
pStartInfo->restartCount = 0;
|
||||||
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
|
tqDebug("vgId:%d all tasks are ready, reset restartCounter 0, not restart tasks", vgId);
|
||||||
|
|
|
@ -561,12 +561,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
|
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
|
||||||
id, vgId, pReq->taskId, numOfTasks);
|
id, vgId, pReq->taskId, numOfTasks);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMetaWLock(pMeta);
|
streamMetaWLock(pMeta);
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (pReq->dropRelHTask) {
|
||||||
// persist to disk
|
code = streamMetaCommit(pMeta);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// always return true
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -594,13 +596,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
|
||||||
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
|
||||||
pInfo->processedVer <= pReq->checkpointVer);
|
pInfo->processedVer <= pReq->checkpointVer);
|
||||||
|
|
||||||
// update only it is in checkpoint status.
|
// update only it is in checkpoint status, or during restore procedure.
|
||||||
if (pStatus.state == TASK_STATUS__CK) {
|
if (pStatus.state == TASK_STATUS__CK || (!restored)) {
|
||||||
pInfo->checkpointId = pReq->checkpointId;
|
pInfo->checkpointId = pReq->checkpointId;
|
||||||
pInfo->checkpointVer = pReq->checkpointVer;
|
pInfo->checkpointVer = pReq->checkpointVer;
|
||||||
pInfo->checkpointTime = pReq->checkpointTs;
|
pInfo->checkpointTime = pReq->checkpointTs;
|
||||||
|
|
||||||
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
if (restored) {
|
||||||
|
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskClearCheckInfo(pTask, true);
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
|
|
|
@ -631,7 +631,13 @@ void flushStateDataInExecutor(SStreamTask* pTask, SStreamQueueItem* pCheckpointB
|
||||||
(void) streamTaskReloadState(pTask);
|
(void) streamTaskReloadState(pTask);
|
||||||
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
|
stDebug("s-task:%s transfer state from fill-history task:%s, status:%s completed", id, pHTask->id.idStr,
|
||||||
streamTaskGetStatus(pHTask).name);
|
streamTaskGetStatus(pHTask).name);
|
||||||
|
// todo execute qExecTask to fetch the reload-generated result, if this is stream is for session window query.
|
||||||
|
/*
|
||||||
|
* while(1) {
|
||||||
|
* qExecTask()
|
||||||
|
* }
|
||||||
|
* // put into the output queue.
|
||||||
|
*/
|
||||||
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
streamMetaReleaseTask(pTask->pMeta, pHTask);
|
||||||
} else {
|
} else {
|
||||||
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
|
stError("s-task:%s related fill-history task:0x%x failed to acquire, transfer state failed", id,
|
||||||
|
|
|
@ -431,7 +431,8 @@ int32_t streamMetaOpen(const char* path, void* ahandle, FTaskBuild buildTaskFn,
|
||||||
pMeta->expandTaskFn = expandTaskFn;
|
pMeta->expandTaskFn = expandTaskFn;
|
||||||
pMeta->stage = stage;
|
pMeta->stage = stage;
|
||||||
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
|
pMeta->role = (vgId == SNODE_HANDLE) ? NODE_ROLE_LEADER : NODE_ROLE_UNINIT;
|
||||||
pMeta->updateInfo.transId = -1;
|
pMeta->updateInfo.activeTransId = -1;
|
||||||
|
pMeta->updateInfo.completeTransId = -1;
|
||||||
|
|
||||||
pMeta->startInfo.completeFn = fn;
|
pMeta->startInfo.completeFn = fn;
|
||||||
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
pMeta->pTaskDbUnique = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK);
|
||||||
|
@ -890,24 +891,28 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamMetaCommit(SStreamMeta* pMeta) {
|
int32_t streamMetaCommit(SStreamMeta* pMeta) {
|
||||||
if (tdbCommit(pMeta->db, pMeta->txn) < 0) {
|
int32_t code = 0;
|
||||||
|
code = tdbCommit(pMeta->db, pMeta->txn);
|
||||||
|
if (code != 0) {
|
||||||
stError("vgId:%d failed to commit stream meta", pMeta->vgId);
|
stError("vgId:%d failed to commit stream meta", pMeta->vgId);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) {
|
code = tdbPostCommit(pMeta->db, pMeta->txn);
|
||||||
|
if (code != 0) {
|
||||||
stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId);
|
stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
|
||||||
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) {
|
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
|
||||||
|
if (code != 0) {
|
||||||
stError("vgId:%d failed to begin trans", pMeta->vgId);
|
stError("vgId:%d failed to begin trans", pMeta->vgId);
|
||||||
return -1;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
|
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
|
||||||
return 0;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {
|
||||||
|
@ -1781,12 +1786,56 @@ void streamMetaAddIntoUpdateTaskList(SStreamMeta* pMeta, SStreamTask* pTask, SSt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaClearUpdateTaskList(SStreamMeta* pMeta) {
|
void streamMetaClearSetUpdateTaskListComplete(SStreamMeta* pMeta) {
|
||||||
taosHashClear(pMeta->updateInfo.pTasks);
|
STaskUpdateInfo* pInfo = &pMeta->updateInfo;
|
||||||
pMeta->updateInfo.transId = -1;
|
|
||||||
|
taosHashClear(pInfo->pTasks);
|
||||||
|
|
||||||
|
int32_t prev = pInfo->completeTransId;
|
||||||
|
pInfo->completeTransId = pInfo->activeTransId;
|
||||||
|
pInfo->activeTransId = -1;
|
||||||
|
pInfo->completeTs = taosGetTimestampMs();
|
||||||
|
|
||||||
|
stDebug("vgId:%d set the nodeEp update complete, ts:%" PRId64 ", complete transId:%d->%d, reset active transId",
|
||||||
|
pMeta->vgId, pInfo->completeTs, prev, pInfo->completeTransId);
|
||||||
}
|
}
|
||||||
|
|
||||||
void streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
|
bool streamMetaInitUpdateTaskList(SStreamMeta* pMeta, int32_t transId) {
|
||||||
taosHashClear(pMeta->updateInfo.pTasks);
|
STaskUpdateInfo* pInfo = &pMeta->updateInfo;
|
||||||
pMeta->updateInfo.transId = transId;
|
|
||||||
|
if (transId > pInfo->completeTransId) {
|
||||||
|
if (pInfo->activeTransId == -1) {
|
||||||
|
taosHashClear(pInfo->pTasks);
|
||||||
|
pInfo->activeTransId = transId;
|
||||||
|
|
||||||
|
stInfo("vgId:%d set the active epset update transId:%d, prev complete transId:%d", pMeta->vgId, transId,
|
||||||
|
pInfo->completeTransId);
|
||||||
|
return true;
|
||||||
|
} else {
|
||||||
|
if (pInfo->activeTransId == transId) {
|
||||||
|
// do nothing
|
||||||
|
return true;
|
||||||
|
} else if (transId < pInfo->activeTransId) {
|
||||||
|
stError("vgId:%d invalid(out of order)epset update transId:%d, active transId:%d, complete transId:%d, discard",
|
||||||
|
pMeta->vgId, transId, pInfo->activeTransId, pInfo->completeTransId);
|
||||||
|
return false;
|
||||||
|
} else { // transId > pInfo->activeTransId
|
||||||
|
taosHashClear(pInfo->pTasks);
|
||||||
|
int32_t prev = pInfo->activeTransId;
|
||||||
|
pInfo->activeTransId = transId;
|
||||||
|
|
||||||
|
stInfo("vgId:%d active epset update transId updated from:%d to %d, prev complete transId:%d", pMeta->vgId,
|
||||||
|
transId, prev, pInfo->completeTransId);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if (transId == pInfo->completeTransId) {
|
||||||
|
stError("vgId:%d already handled epset update transId:%d, completeTs:%" PRId64 " ignore", pMeta->vgId, transId,
|
||||||
|
pInfo->completeTs);
|
||||||
|
return false;
|
||||||
|
} else { // pInfo->completeTransId > transId
|
||||||
|
stError("vgId:%d disorder update nodeEp msg recv, prev completed epset update transId:%d, recv:%d, discard",
|
||||||
|
pMeta->vgId, pInfo->activeTransId, transId);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -637,7 +637,7 @@ bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
|
||||||
numOfNodes, p->updateCount, prevTs);
|
numOfNodes, p->updateCount, prevTs);
|
||||||
|
|
||||||
bool updated = false;
|
bool updated = false;
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
|
for (int32_t i = 0; i < numOfNodes; ++i) {
|
||||||
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
|
||||||
if (pInfo == NULL) {
|
if (pInfo == NULL) {
|
||||||
continue;
|
continue;
|
||||||
|
|
Loading…
Reference in New Issue