fix(stream): disable the stream in the followers nodes.

This commit is contained in:
Haojun Liao 2023-08-11 15:58:15 +08:00
parent 4afe603c6e
commit e984affe90
8 changed files with 44 additions and 67 deletions

View File

@ -637,7 +637,7 @@ void streamTaskCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskDoCheckDownstreamTasks(SStreamTask* pTask);
int32_t streamTaskLaunchScanHistory(SStreamTask* pTask);
int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t stage);
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir);
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask);
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList);
int32_t streamTaskStop(SStreamTask* pTask);
int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp,

View File

@ -773,16 +773,16 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
}
}
pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
if (pDb->cfg.replications != 1) {
mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
mndReleaseDb(pMnode, pDb);
pDb = NULL;
goto _OVER;
}
// pDb = mndAcquireDb(pMnode, streamObj.sourceDb);
// if (pDb->cfg.replications != 1) {
// mError("stream source db must have only 1 replica, but %s has %d", pDb->name, pDb->cfg.replications);
// terrno = TSDB_CODE_MND_MULTI_REPLICA_SOURCE_DB;
// mndReleaseDb(pMnode, pDb);
// pDb = NULL;
// goto _OVER;
// }
mndReleaseDb(pMnode, pDb);
// mndReleaseDb(pMnode, pDb);
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB_INSIDE, pReq, "create-stream");
if (pTrans == NULL) {

View File

@ -217,7 +217,7 @@ void tqCleanUp();
STQ* tqOpen(const char* path, SVnode* pVnode);
void tqNotifyClose(STQ*);
void tqClose(STQ*);
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
int tqPushMsg(STQ*, tmsg_t msgType);
int tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg);
int tqUnregisterPushHandle(STQ* pTq, void* pHandle);
int tqStartStreamTasks(STQ* pTq); // restore all stream tasks after vnode launching completed.

View File

@ -14,6 +14,7 @@
*/
#include "tq.h"
#include "vnd.h"
typedef struct {
int8_t inited;
@ -1136,14 +1137,13 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
code = tDecodeStreamTask(&decoder, pTask);
tDecoderClear(&decoder);
if (code < 0) {
tDecoderClear(&decoder);
taosMemoryFree(pTask);
return -1;
}
tDecoderClear(&decoder);
SStreamMeta* pStreamMeta = pTq->pStreamMeta;
// 2.save task, use the latest commit version as the initial start version of stream task.
@ -1165,16 +1165,22 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t sversion, char* msg, int32_t ms
// added into meta store, pTask cannot be reference since it may have been destroyed by other threads already now if
// it is added into the meta store
if (added) {
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
// only handled in the leader node
if (vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d s-task:0x%x is deployed and add into meta, numOfTasks:%d", vgId, taskId, numOfTasks);
SStreamTask* p = streamMetaAcquireTask(pStreamMeta, streamId, taskId);
bool restored = pTq->pVnode->restored;
if (p != NULL && restored) { // reset the downstreamReady flag.
streamTaskCheckDownstreamTasks(p);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
bool restored = pTq->pVnode->restored;
if (p != NULL && restored) {
streamTaskCheckDownstreamTasks(p);
} else if (!restored) {
tqWarn("s-task:%s not launched since vnode(vgId:%d) not ready", p->id.idStr, vgId);
}
if (p != NULL) {
streamMetaReleaseTask(pStreamMeta, p);
}
}
streamMetaReleaseTask(pStreamMeta, p);
} else {
tqWarn("vgId:%d failed to add s-task:0x%x, since already exists in meta store", vgId, taskId);
tFreeStreamTask(pTask);
@ -1830,8 +1836,9 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
bool startTask = vnodeIsRoleLeader(pTq->pVnode); // in case of follower, do not launch task
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
SStreamTaskNodeUpdateMsg req = {0};
SDecoder decoder;
@ -1861,7 +1868,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
tqDebug("s-task:%s fill-history task handle task update along with related stream task", pHistoryTask->id.idStr);
streamTaskUpdateEpsetInfo(pHistoryTask, req.pNodeList);
streamTaskRestart(pHistoryTask, NULL);
streamTaskRestart(pHistoryTask, NULL, startTask);
streamMetaReleaseTask(pMeta, pHistoryTask);
} else {
tqError("vgId:%d failed to get fill-history task:0x%x when handling task update, it may have been dropped",
@ -1870,7 +1877,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
streamTaskRestart(pTask, NULL);
streamTaskRestart(pTask, NULL, startTask);
streamMetaReleaseTask(pMeta, pTask);
int32_t code = rsp.code;

View File

@ -30,7 +30,7 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) {
return 0;
}
int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) {
int32_t tqPushMsg(STQ* pTq, tmsg_t msgType) {
if (msgType == TDMT_VND_SUBMIT) {
tqProcessSubmitReqForSubscribe(pTq);
}
@ -39,20 +39,14 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v
int32_t numOfTasks = streamMetaGetNumOfTasks(pTq->pStreamMeta);
taosRUnLockLatch(&pTq->pStreamMeta->lock);
tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, numOfTasks);
tqDebug("handle submit, restore:%d, numOfTasks:%d", pTq->pVnode->restored, numOfTasks);
// push data for stream processing:
// 1. the vnode has already been restored.
// 2. the vnode should be the leader.
// 3. the stream is not suspended yet.
if (!tsDisableStream && vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored) {
if (numOfTasks == 0) {
return 0;
}
if (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE) {
tqStartStreamTasks(pTq);
}
if ((!tsDisableStream) && (numOfTasks > 0) && (msgType == TDMT_VND_SUBMIT || msgType == TDMT_VND_DELETE)) {
tqStartStreamTasks(pTq);
}
return 0;

View File

@ -13,6 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnd.h"
#include "tq.h"
static int32_t createStreamTaskRunReq(SStreamMeta* pStreamMeta, bool* pScanIdle);
@ -130,6 +131,11 @@ int32_t tqStartStreamTasks(STQ* pTq) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
// for follower or vnode does not restored, do not launch the stream tasks.
if (!(vnodeIsRoleLeader(pTq->pVnode) && pTq->pVnode->restored)) {
return TSDB_CODE_SUCCESS;
}
taosWLockLatch(&pMeta->lock);
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);

View File

@ -524,7 +524,7 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t ver, SRpcMsg
walApplyVer(pVnode->pWal, ver);
if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, ver) < 0) {
if (tqPushMsg(pVnode->pTq, pMsg->msgType) < 0) {
vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno));
return -1;
}

View File

@ -465,7 +465,7 @@ int32_t streamTaskStop(SStreamTask* pTask) {
return 0;
}
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir, bool startTask) {
const char* id = pTask->id.idStr;
int32_t vgId = pTask->pMeta->vgId;
@ -499,36 +499,6 @@ int32_t streamTaskRestart(SStreamTask* pTask, const char* pDir) {
return 0;
}
// todo remove it
//int32_t streamTaskUpdateEpInfo(SArray* pTaskList, int32_t nodeId, SEpSet* pEpSet) {
// int32_t numOfLevels = taosArrayGetSize(pTaskList);
//
// for (int32_t j = 0; j < numOfLevels; ++j) {
// SArray *pLevel = taosArrayGetP(pTaskList, j);
//
// int32_t numOfTasks = taosArrayGetSize(pLevel);
// for (int32_t k = 0; k < numOfTasks; ++k) {
// SStreamTask *pTask = taosArrayGetP(pLevel, k);
// if (pTask->info.nodeId == nodeId) {
// pTask->info.epSet = *pEpSet;
// continue;
// }
//
// // check for the dispath info and the upstream task info
// int32_t level = pTask->info.taskLevel;
// if (level == TASK_LEVEL__SOURCE) {
// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
// } else if (level == TASK_LEVEL__AGG) {
// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
// streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
// } else { // TASK_LEVEL__SINK
// streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
// }
// }
// }
// return 0;
//}
int32_t doUpdateEpsetInfo(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);