fix(stream): fix bug in multi-replica vnode redistribute.
This commit is contained in:
parent
9612704fa6
commit
e8294ed8dc
|
@ -2009,14 +2009,11 @@ static int32_t createStreamUpdateTrans(SMnode *pMnode, SStreamObj *pStream, SVgr
|
||||||
|
|
||||||
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
|
static bool isNodeEpsetChanged(const SEpSet *pPrevEpset, const SEpSet *pCurrent) {
|
||||||
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
|
const SEp *pEp = GET_ACTIVE_EP(pPrevEpset);
|
||||||
|
const SEp* p = GET_ACTIVE_EP(pCurrent);
|
||||||
|
|
||||||
for (int32_t i = 0; i < pCurrent->numOfEps; ++i) {
|
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
|
||||||
const SEp *p = &(pCurrent->eps[i]);
|
return false;
|
||||||
if (pEp->port == p->port && strncmp(pEp->fqdn, p->fqdn, TSDB_FQDN_LEN) == 0) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2113,6 +2110,7 @@ static int32_t mndProcessVgroupChange(SMnode *pMnode, SVgroupChangeInfo *pChange
|
||||||
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
|
mDebug("stream:0x%" PRIx64 " involved node changed, create update trans", pStream->uid);
|
||||||
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
|
int32_t code = createStreamUpdateTrans(pMnode, pStream, pChangeInfo);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
sdbCancelFetch(pSdb, pIter);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2216,18 +2214,21 @@ static int32_t mndProcessNodeCheckReq(SRpcMsg *pMsg) {
|
||||||
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
SVgroupChangeInfo changeInfo = mndFindChangedNodeInfo(pMnode, execNodeList.pNodeEntryList, pNodeSnapshot);
|
||||||
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
if (taosArrayGetSize(changeInfo.pUpdateNodeList) > 0) {
|
||||||
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
code = mndProcessVgroupChange(pMnode, &changeInfo);
|
||||||
|
|
||||||
|
// keep the new vnode snapshot
|
||||||
|
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
||||||
|
mDebug("create trans successfully, update cached node list");
|
||||||
|
taosArrayDestroy(execNodeList.pNodeEntryList);
|
||||||
|
execNodeList.pNodeEntryList = pNodeSnapshot;
|
||||||
|
execNodeList.ts = ts;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
mDebug("no update found in nodeList");
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
taosArrayDestroy(changeInfo.pUpdateNodeList);
|
||||||
taosHashCleanup(changeInfo.pDBMap);
|
taosHashCleanup(changeInfo.pDBMap);
|
||||||
|
|
||||||
// keep the new vnode snapshot
|
|
||||||
if (code == TSDB_CODE_SUCCESS || code == TSDB_CODE_ACTION_IN_PROGRESS) {
|
|
||||||
taosArrayDestroy(execNodeList.pNodeEntryList);
|
|
||||||
execNodeList.pNodeEntryList = pNodeSnapshot;
|
|
||||||
execNodeList.ts = ts;
|
|
||||||
}
|
|
||||||
|
|
||||||
mDebug("end to do stream task node change checking");
|
mDebug("end to do stream task node change checking");
|
||||||
atomic_store_32(&mndNodeCheckSentinel, 0);
|
atomic_store_32(&mndNodeCheckSentinel, 0);
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -165,6 +165,7 @@ int32_t tqOffsetRestoreFromFile(STqOffsetStore* pStore, const char* fname);
|
||||||
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver);
|
||||||
int32_t tqScanWal(STQ* pTq);
|
int32_t tqScanWal(STQ* pTq);
|
||||||
int32_t tqCheckAndRunStreamTask(STQ* pTq);
|
int32_t tqCheckAndRunStreamTask(STQ* pTq);
|
||||||
|
int32_t tqStartStreamTasks(STQ* pTq);
|
||||||
int32_t tqStopStreamTasks(STQ* pTq);
|
int32_t tqStopStreamTasks(STQ* pTq);
|
||||||
|
|
||||||
// tq util
|
// tq util
|
||||||
|
|
|
@ -1772,20 +1772,47 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
tqDebug("s-task:%s receive task nodeEp update msg from mnode", pTask->id.idStr);
|
||||||
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
|
streamSetStatusNormal(pTask);
|
||||||
|
|
||||||
|
SStreamTask** ppHTask = NULL;
|
||||||
|
if (pTask->historyTaskId.taskId != 0) {
|
||||||
|
keys[0] = pTask->historyTaskId.streamId;
|
||||||
|
keys[1] = pTask->historyTaskId.taskId;
|
||||||
|
|
||||||
|
ppHTask = (SStreamTask**)taosHashGet(pMeta->pTasks, keys, sizeof(keys));
|
||||||
|
if (ppHTask == NULL || *ppHTask == NULL) {
|
||||||
|
tqError("vgId:%d failed to acquire fill-history task:0x%x when handling update, it may have been dropped already",
|
||||||
|
pMeta->vgId, req.taskId);
|
||||||
|
} else {
|
||||||
|
tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr);
|
||||||
|
streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
streamSetStatusNormal(pTask);
|
|
||||||
streamMetaSaveTask(pMeta, pTask);
|
streamMetaSaveTask(pMeta, pTask);
|
||||||
|
if (ppHTask != NULL) {
|
||||||
|
streamMetaSaveTask(pMeta, *ppHTask);
|
||||||
|
}
|
||||||
|
|
||||||
if (streamMetaCommit(pMeta) < 0) {
|
if (streamMetaCommit(pMeta) < 0) {
|
||||||
// persist to disk
|
// persist to disk
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
streamTaskStop(pTask);
|
streamTaskStop(pTask);
|
||||||
|
if (ppHTask != NULL) {
|
||||||
|
streamTaskStop(*ppHTask);
|
||||||
|
}
|
||||||
|
|
||||||
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
|
tqDebug("s-task:%s task nodeEp update completed", pTask->id.idStr);
|
||||||
|
|
||||||
pMeta->closedTask += 1;
|
pMeta->closedTask += 1;
|
||||||
|
if (ppHTask != NULL) {
|
||||||
|
pMeta->closedTask += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
// possibly only handle the stream task.
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
bool allStopped = (pMeta->closedTask == numOfTasks);
|
bool allStopped = (pMeta->closedTask == numOfTasks);
|
||||||
if (allStopped) {
|
if (allStopped) {
|
||||||
|
@ -1824,6 +1851,7 @@ _end:
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
vInfo("vgId:%d, restart all stream tasks", vgId);
|
vInfo("vgId:%d, restart all stream tasks", vgId);
|
||||||
|
tqStartStreamTasks(pTq);
|
||||||
tqCheckAndRunStreamTaskAsync(pTq);
|
tqCheckAndRunStreamTaskAsync(pTq);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -224,6 +224,35 @@ int32_t tqStopStreamTasks(STQ* pTq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
|
|
||||||
|
tqDebug("vgId:%d start to stop all %d stream task(s)", vgId, numOfTasks);
|
||||||
|
|
||||||
|
if (numOfTasks == 0) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
|
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
|
int64_t key[2] = {pTaskId->streamId, pTaskId->taskId};
|
||||||
|
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
|
||||||
|
|
||||||
|
int8_t status = (*pTask)->status.taskStatus;
|
||||||
|
if (status == TASK_STATUS__STOP) {
|
||||||
|
streamSetStatusNormal(*pTask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
||||||
// seek the stored version and extract data from WAL
|
// seek the stored version and extract data from WAL
|
||||||
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
int64_t firstVer = walReaderGetValidFirstVer(pTask->exec.pWalReader);
|
||||||
|
|
|
@ -560,6 +560,7 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
|
vInfo("vgId:%d, not launch stream tasks, since stream tasks are disabled", vgId);
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
|
vInfo("vgId:%d start to launch stream tasks", pVnode->config.vgId);
|
||||||
|
tqStartStreamTasks(pVnode->pTq);
|
||||||
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
|
tqCheckAndRunStreamTaskAsync(pVnode->pTq);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue