fix(stream): add snode check, and handle the task status reset from checkpoint when it is in the snode.

This commit is contained in:
Haojun Liao 2023-12-19 18:34:30 +08:00
parent 17e9e84aa9
commit 92045f5485
6 changed files with 120 additions and 55 deletions

View File

@ -33,5 +33,6 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead
int32_t startStreamTasks(SStreamMeta* pMeta);
int32_t tqStreamTaskResetStatus(SStreamMeta* pMeta);
int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta);
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg);
#endif // TDENGINE_TQ_COMMON_H

View File

@ -90,7 +90,7 @@ SArray *smGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_SCAN_HISTORY_FINISH_RSP, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_STREAM_TASK_CHECKPOINT_READY, smPutNodeMsgToStreamQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_STREAM_TASK_RESET, smPutNodeMsgToMgmtQueue, 1) == NULL) goto _OVER;
code = 0;
_OVER:

View File

@ -685,6 +685,40 @@ _OVER:
return -1;
}
static int32_t extractNodeEpset(SMnode *pMnode, SEpSet *pEpSet, bool* hasEpset, int32_t taskId, int32_t nodeId) {
*hasEpset = false;
if (nodeId == SNODE_HANDLE) {
SSnodeObj *pObj = NULL;
void *pIter = NULL;
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter != NULL) {
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj);
sdbCancelFetch(pMnode->pSdb, pIter);
*hasEpset = true;
return TSDB_CODE_SUCCESS;
} else {
mError("failed to acquire snode epset");
return TSDB_CODE_INVALID_PARA;
}
} else {
SVgObj *pVgObj = mndAcquireVgroup(pMnode, nodeId);
if (pVgObj != NULL) {
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
epsetAssign(pEpSet, &epset);
*hasEpset = true;
return TSDB_CODE_SUCCESS;
} else {
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", taskId, nodeId);
return TSDB_CODE_SUCCESS;
}
}
}
static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask *pTask) {
SVDropStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
@ -698,28 +732,17 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
STransAction action = {0};
SEpSet epset = {0};
if (pTask->info.nodeId == SNODE_HANDLE) {
SSnodeObj *pObj = NULL;
void *pIter = NULL;
while (1) {
pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj);
if (pIter == NULL) {
break;
}
bool hasEpset = false;
addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pMnode->pSdb, pObj);
}
} else {
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
if (pVgObj != NULL) {
epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
} else {
mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId);
taosMemoryFree(pReq);
return 0;
}
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return -1;
}
// no valid epset, return directly without redoAction
if (!hasEpset) {
return TSDB_CODE_SUCCESS;
}
// The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode.
@ -1776,9 +1799,20 @@ static int32_t mndPauseStreamTask(SMnode *pMnode, STrans *pTrans, SStreamTask *p
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
SEpSet epset;
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
// no valid epset, return directly without redoAction
if (!hasEpset) {
taosMemoryFree(pReq);
return TSDB_CODE_SUCCESS;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVPauseStreamTaskReq), TDMT_STREAM_TASK_PAUSE, &epset, 0);
@ -1920,17 +1954,25 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
static int32_t mndResumeStreamTask(STrans *pTrans, SMnode *pMnode, SStreamTask *pTask, int8_t igUntreated) {
SVResumeStreamTaskReq *pReq = taosMemoryCalloc(1, sizeof(SVResumeStreamTaskReq));
if (pReq == NULL) {
mError("failed to malloc in resume stream, size:%" PRIzu ", code:%s", sizeof(SVResumeStreamTaskReq),
tstrerror(TSDB_CODE_OUT_OF_MEMORY));
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = htonl(pTask->info.nodeId);
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
pReq->igUntreated = igUntreated;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
SEpSet epset;
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
taosMemoryFree(pReq);
return -1;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResumeStreamTaskReq), TDMT_STREAM_TASK_RESUME, &epset, 0);
@ -2208,6 +2250,8 @@ static SVgroupChangeInfo mndFindChangedNodeInfo(SMnode *pMnode, const SArray *pP
epsetAssign(&updateInfo.newEp, &pCurrent->epset);
taosArrayPush(info.pUpdateNodeList, &updateInfo);
}
// todo handle the snode info
if (pCurrent->nodeId != SNODE_HANDLE) {
SVgObj *pVgroup = mndAcquireVgroup(pMnode, pCurrent->nodeId);
taosHashPut(info.pDBMap, pVgroup->dbName, strlen(pVgroup->dbName), NULL, 0);
@ -2704,9 +2748,18 @@ int32_t createStreamResetStatusTrans(SMnode *pMnode, SStreamObj *pStream) {
pReq->taskId = pTask->id.taskId;
pReq->streamId = pTask->id.streamId;
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
SEpSet epset;
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(pReq);
continue;
}
if (!hasEpset) {
taosMemoryFree(pReq);
continue;
}
STransAction action = {0};
initTransAction(&action, pReq, sizeof(SVResetStreamTaskReq), TDMT_VND_STREAM_TASK_RESET, &epset, 0);
@ -2878,11 +2931,17 @@ static int32_t mndDropRelatedFillhistoryTask(SMnode *pMnode, STaskStatusEntry *p
mDebug("build and send drop related fill-history task for task:0x%x", pTask->id.taskId);
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
SEpSet epset = mndGetVgroupEpset(pMnode, pVgObj);
mndReleaseVgroup(pMnode, pVgObj);
SEpSet epset;
bool hasEpset = false;
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (hasEpset) {
tmsgSendReq(&epset, &msg);
}
tmsgSendReq(&epset, &msg);
return TSDB_CODE_SUCCESS;
}

View File

@ -205,6 +205,8 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) {
return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen);
case TDMT_VND_STREAM_TASK_UPDATE:
return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true);
case TDMT_VND_STREAM_TASK_RESET:
return tqStreamTaskProcessTaskResetReq(pSnode->pMeta, pMsg);
default:
ASSERT(0);
}

View File

@ -1336,26 +1336,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t tqProcessTaskResetReq(STQ* pTq, SRpcMsg* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont;
SStreamMeta* pMeta = pTq->pStreamMeta;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
return tqStreamTaskProcessTaskResetReq(pTq->pStreamMeta, pMsg);
}
int32_t tqProcessTaskDropHTask(STQ* pTq, SRpcMsg* pMsg) {

View File

@ -834,3 +834,25 @@ int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta) {
return TSDB_CODE_SUCCESS;
}
int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) {
SVPauseStreamTaskReq* pReq = (SVPauseStreamTaskReq*)pMsg->pCont;
SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, pReq->taskId);
if (pTask == NULL) {
tqError("vgId:%d process task-reset req, failed to acquire task:0x%x, it may have been dropped already",
pMeta->vgId, pReq->taskId);
return TSDB_CODE_SUCCESS;
}
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
if (streamTaskGetStatus(pTask, NULL) == TASK_STATUS__CK) {
streamTaskClearCheckInfo(pTask, true);
streamTaskSetStatusReady(pTask);
}
streamMetaReleaseTask(pMeta, pTask);
return TSDB_CODE_SUCCESS;
}