fix(stream): update the nodeEp even during checkpoint trans.

This commit is contained in:
Haojun Liao 2023-10-21 01:42:43 +08:00
parent 0d4892f25b
commit 48ff521968
2 changed files with 43 additions and 37 deletions

View File

@ -83,14 +83,14 @@ static SArray *mndTakeVgroupSnapshot(SMnode *pMnode);
static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pPrevNodeList, const SArray *pNodeList);
static STrans *doCreateTrans(SMnode *pMnode, SStreamObj *pStream, const char *name);
static STrans *doCreateTrans1(SMnode *pMnode, const char *name, const char* pDbName);
static int32_t mndPersistTransLog(SStreamObj *pStream, STrans *pTrans);
static void initTransAction(STransAction *pAction, void *pCont, int32_t contLen, int32_t msgType, const SEpSet *pEpset);
static int32_t createStreamUpdateTrans(SStreamObj *pStream, SVgroupChangeInfo *pInfo, STrans *pTrans);
static void removeStreamTasksInBuf(SStreamObj* pStream, SStreamExecNodeInfo * pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode);
static void removeStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode);
static void keepStreamTasksInBuf(SStreamObj *pStream, SStreamExecNodeInfo *pExecNode);
static int32_t removeExpirednodeEntryAndTask(SArray *pNodeSnapshot);
static int32_t doKillActiveCheckpointTrans(SMnode *pMnode);
int32_t mndInitStream(SMnode *pMnode) {
SSdbTable table = {
@ -1189,7 +1189,6 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
}
}
taosThreadMutexUnlock(&execNodeList.lock);
if (!ready) {
return 0;
}
@ -1203,7 +1202,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
mError("failed to trigger checkpoint, reason: %s", tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return -1;
}
mDebug("start to trigger checkpoint, checkpointId: %" PRId64 "", checkpointId);
mDebug("start to trigger checkpoint, checkpointId: %" PRId64, checkpointId);
const char *pDb = mndGetStreamDB(pMnode);
mndTransSetDbName(pTrans, pDb, "checkpoint");
@ -2082,8 +2082,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
// check all streams that involved this vnode should update the epset info
SStreamObj *pStream = NULL;
void *pIter = NULL;
STrans *pTrans = NULL;
STrans *pTrans = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_STREAM, pIter, (void **)&pStream);
@ -2091,6 +2090,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
break;
}
// here create only one trans
if (pTrans == NULL) {
pTrans = doCreateTrans(pMnode, pStream, "stream-task-update");
if (pTrans == NULL) {
@ -2137,8 +2137,6 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
// return TSDB_CODE_ACTION_IN_PROGRESS;
return 0;
}
@ -2295,7 +2293,6 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
if (execNodeList.pNodeEntryList != NULL) {
execNodeList.pNodeEntryList = taosArrayDestroy(execNodeList.pNodeEntryList);
}
execNodeList.pNodeEntryList = extractNodeListFromStream(pMnode);
}
@ -2313,6 +2310,9 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
// kill current active checkpoint transaction, since the transaction is vnode wide.
doKillActiveCheckpointTrans(pMnode);
code = mndProcessVgroupChange(pMnode, &changeInfo);
// keep the new vnode snapshot
@ -2498,29 +2498,26 @@ int32_t createStreamResetStatusTrans(SMnode* pMnode, SStreamObj* pStream) {
return TSDB_CODE_ACTION_IN_PROGRESS;
}
int32_t mndResetFromCheckpoint(SMnode* pMnode) {
// find the checkpoint trans id
int32_t doKillActiveCheckpointTrans(SMnode *pMnode) {
int32_t transId = 0;
SSdb *pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void *pIter = NULL;
{
SSdb *pSdb = pMnode->pSdb;
STrans *pTrans = NULL;
void* pIter = NULL;
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) {
break;
}
if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) {
transId = pTrans->id;
sdbRelease(pSdb, pTrans);
sdbCancelFetch(pSdb, pIter);
break;
}
sdbRelease(pSdb, pTrans);
while (1) {
pIter = sdbFetch(pSdb, SDB_TRANS, pIter, (void **)&pTrans);
if (pIter == NULL) {
break;
}
if (strncmp(pTrans->opername, MND_STREAM_CHECKPOINT_NAME, tListLen(pTrans->opername) - 1) == 0) {
transId = pTrans->id;
sdbRelease(pSdb, pTrans);
sdbCancelFetch(pSdb, pIter);
break;
}
sdbRelease(pSdb, pTrans);
}
if (transId == 0) {
@ -2528,8 +2525,16 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
return TSDB_CODE_SUCCESS;
}
STrans* pTrans = mndAcquireTrans(pMnode, transId);
pTrans = mndAcquireTrans(pMnode, transId);
mInfo("kill checkpoint trans:%d", transId);
mndKillTrans(pMnode, pTrans);
mndReleaseTrans(pMnode, pTrans);
return TSDB_CODE_SUCCESS;
}
int32_t mndResetFromCheckpoint(SMnode* pMnode) {
doKillActiveCheckpointTrans(pMnode);
// set all tasks status to be normal, refactor later to be stream level, instead of vnode level.
SSdb *pSdb = pMnode->pSdb;
@ -2541,6 +2546,7 @@ int32_t mndResetFromCheckpoint(SMnode* pMnode) {
break;
}
// todo this transaction should exist be only one
mDebug("stream:%s (0x%" PRIx64 ") reset checkpoint procedure, create reset trans", pStream->name, pStream->uid);
int32_t code = createStreamResetStatusTrans(pMnode, pStream);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -1116,16 +1116,16 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
ASSERT(leftRsp >= 0);
if (leftRsp > 0) {
stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp);
stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, waiting for %d rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code), leftRsp);
} else {
stDebug(
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, all rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
"s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s, all rsp",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
}
} else {
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code);
stDebug("s-task:%s recv fix-dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%s",
id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, tstrerror(code));
}
ASSERT(leftRsp >= 0);