fix(stream): disable follower send hb to mnode. and do some internal refactor.
This commit is contained in:
parent
1256eafddb
commit
6fa54789cb
|
@ -400,6 +400,8 @@ typedef struct SStreamMeta {
|
||||||
FTaskExpand* expandFunc;
|
FTaskExpand* expandFunc;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t stage;
|
int64_t stage;
|
||||||
|
bool leader;
|
||||||
|
int8_t taskWillbeLaunched;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
// TdThreadRwlock lock;
|
// TdThreadRwlock lock;
|
||||||
int32_t walScanCounter;
|
int32_t walScanCounter;
|
||||||
|
@ -408,7 +410,8 @@ typedef struct SStreamMeta {
|
||||||
SHashObj* pTaskBackendUnique;
|
SHashObj* pTaskBackendUnique;
|
||||||
TdThreadMutex backendMutex;
|
TdThreadMutex backendMutex;
|
||||||
SMetaHbInfo hbInfo;
|
SMetaHbInfo hbInfo;
|
||||||
int32_t closedTask;
|
SHashObj* pUpdateTaskList;
|
||||||
|
// int32_t closedTask;
|
||||||
int32_t totalTasks; // this value should be increased when a new task is added into the meta
|
int32_t totalTasks; // this value should be increased when a new task is added into the meta
|
||||||
int32_t chkptNotReadyTasks;
|
int32_t chkptNotReadyTasks;
|
||||||
int64_t rid;
|
int64_t rid;
|
||||||
|
@ -722,6 +725,7 @@ int32_t streamMetaReopen(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
int32_t streamMetaCommit(SStreamMeta* pMeta);
|
||||||
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta);
|
||||||
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
void streamMetaNotifyClose(SStreamMeta* pMeta);
|
||||||
|
void streamMetaStartHb(SStreamMeta* pMeta);
|
||||||
|
|
||||||
// checkpoint
|
// checkpoint
|
||||||
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq);
|
||||||
|
|
|
@ -174,7 +174,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
|
||||||
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
|
||||||
int32_t type, int64_t sver, int64_t ever);
|
int32_t type, int64_t sver, int64_t ever);
|
||||||
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
|
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
|
||||||
void tqUpdateNodeStage(STQ* pTq);
|
void tqUpdateNodeStage(STQ* pTq, bool isLeader);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -1756,10 +1756,10 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
streamTaskStop(*ppHTask);
|
streamTaskStop(*ppHTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
pMeta->closedTask += 1;
|
taosHashPut(pMeta->pUpdateTaskList, &pTask->id, sizeof(pTask->id), NULL, 0);
|
||||||
if (ppHTask != NULL) {
|
if (ppHTask != NULL) {
|
||||||
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
|
tqDebug("s-task:%s task nodeEp update completed, streamTask and related fill-history task closed", pTask->id.idStr);
|
||||||
pMeta->closedTask += 1;
|
taosHashPut(pMeta->pUpdateTaskList, &(*ppHTask)->id, sizeof(pTask->id), NULL, 0);
|
||||||
} else {
|
} else {
|
||||||
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
|
tqDebug("s-task:%s task nodeEp update completed, streamTask closed", pTask->id.idStr);
|
||||||
}
|
}
|
||||||
|
@ -1768,11 +1768,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
|
|
||||||
// possibly only handle the stream task.
|
// possibly only handle the stream task.
|
||||||
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
|
||||||
if (pMeta->closedTask < numOfTasks) {
|
int32_t updateTasks = taosHashGetSize(pMeta->pUpdateTaskList);
|
||||||
tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, pMeta->closedTask, (numOfTasks - pMeta->closedTask));
|
if (updateTasks < numOfTasks) {
|
||||||
|
pMeta->taskWillbeLaunched = 1;
|
||||||
|
|
||||||
|
tqDebug("vgId:%d closed tasks:%d, unclosed:%d", vgId, updateTasks, (numOfTasks - updateTasks));
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
} else {
|
} else {
|
||||||
pMeta->closedTask = 0;
|
taosHashClear(pMeta->pUpdateTaskList);
|
||||||
|
|
||||||
if (!pTq->pVnode->restored) {
|
if (!pTq->pVnode->restored) {
|
||||||
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
|
tqDebug("vgId:%d vnode restore not completed, not restart the tasks", vgId);
|
||||||
|
@ -1794,12 +1797,14 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
if (vnodeIsRoleLeader(pTq->pVnode) && !tsDisableStream) {
|
||||||
vInfo("vgId:%d, restart all stream tasks", vgId);
|
vInfo("vgId:%d, restart all stream tasks", vgId);
|
||||||
tqStartStreamTasks(pTq);
|
tqStartStreamTasks(pTq);
|
||||||
tqCheckAndRunStreamTaskAsync(pTq);
|
tqCheckAndRunStreamTaskAsync(pTq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMeta->taskWillbeLaunched = 0;
|
||||||
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -111,12 +111,12 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
|
||||||
int32_t vgId = TD_VID(pTq->pVnode);
|
int32_t vgId = TD_VID(pTq->pVnode);
|
||||||
SStreamMeta* pMeta = pTq->pStreamMeta;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
// taosWLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
|
||||||
if (numOfTasks == 0) {
|
if (numOfTasks == 0) {
|
||||||
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
tqDebug("vgId:%d no stream tasks existed to run", vgId);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
// taosWUnLockLatch(&pMeta->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,7 +124,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
|
||||||
if (pRunReq == NULL) {
|
if (pRunReq == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
tqError("vgId:%d failed to create msg to start wal scanning to launch stream tasks, code:%s", vgId, terrstr());
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
// taosWUnLockLatch(&pMeta->lock);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,7 +135,7 @@ int32_t tqCheckAndRunStreamTaskAsync(STQ* pTq) {
|
||||||
|
|
||||||
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_RUN, .pCont = pRunReq, .contLen = sizeof(SStreamTaskRunReq)};
|
||||||
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
tmsgPutToQueue(&pTq->pVnode->msgCb, STREAM_QUEUE, &msg);
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
// taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -237,8 +237,6 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWLockLatch(&pMeta->lock);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTasks; ++i) {
|
for (int32_t i = 0; i < numOfTasks; ++i) {
|
||||||
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
|
||||||
|
|
||||||
|
@ -246,12 +244,11 @@ int32_t tqStartStreamTasks(STQ* pTq) {
|
||||||
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
|
SStreamTask** pTask = taosHashGet(pMeta->pTasks, key, sizeof(key));
|
||||||
|
|
||||||
int8_t status = (*pTask)->status.taskStatus;
|
int8_t status = (*pTask)->status.taskStatus;
|
||||||
if (status == TASK_STATUS__STOP) {
|
if (status == TASK_STATUS__STOP && (*pTask)->info.fillHistory != 1) {
|
||||||
streamSetStatusNormal(*pTask);
|
streamSetStatusNormal(*pTask);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -36,10 +36,15 @@ int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqUpdateNodeStage(STQ* pTq) {
|
void tqUpdateNodeStage(STQ* pTq, bool isLeader) {
|
||||||
SSyncState state = syncGetState(pTq->pVnode->sync);
|
SSyncState state = syncGetState(pTq->pVnode->sync);
|
||||||
pTq->pStreamMeta->stage = state.term;
|
SStreamMeta* pMeta = pTq->pStreamMeta;
|
||||||
tqDebug("vgId:%d update the meta stage to be:%"PRId64, pTq->pStreamMeta->vgId, pTq->pStreamMeta->stage);
|
tqDebug("vgId:%d update the meta stage:%"PRId64", prev:%"PRId64" leader:%d", pMeta->vgId, state.term, pMeta->stage, isLeader);
|
||||||
|
pMeta->stage = state.term;
|
||||||
|
pMeta->leader = isLeader;
|
||||||
|
if (isLeader) {
|
||||||
|
streamMetaStartHb(pMeta);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
|
static int32_t tqInitTaosxRsp(STaosxRsp* pRsp, STqOffsetVal pOffset) {
|
||||||
|
|
|
@ -549,9 +549,20 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
|
|
||||||
ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
|
ASSERT(commitIdx == vnodeSyncAppliedIndex(pFsm));
|
||||||
walApplyVer(pVnode->pWal, commitIdx);
|
walApplyVer(pVnode->pWal, commitIdx);
|
||||||
|
|
||||||
pVnode->restored = true;
|
pVnode->restored = true;
|
||||||
|
|
||||||
|
if (!pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
|
||||||
|
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosWLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
||||||
|
if (!pVnode->pTq->pStreamMeta->taskWillbeLaunched) {
|
||||||
|
vInfo("vgId:%d, sync restore finished, stream tasks will be launched by other thread", vgId);
|
||||||
|
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (vnodeIsRoleLeader(pVnode)) {
|
if (vnodeIsRoleLeader(pVnode)) {
|
||||||
// start to restore all stream tasks
|
// start to restore all stream tasks
|
||||||
if (tsDisableStream) {
|
if (tsDisableStream) {
|
||||||
|
@ -564,6 +575,8 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
} else {
|
} else {
|
||||||
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
vInfo("vgId:%d, sync restore finished, not launch stream tasks since not leader", vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosWUnLockLatch(&pVnode->pTq->pStreamMeta->lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
|
@ -578,7 +591,10 @@ static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
}
|
}
|
||||||
taosThreadMutexUnlock(&pVnode->lock);
|
taosThreadMutexUnlock(&pVnode->lock);
|
||||||
|
|
||||||
tqStopStreamTasks(pVnode->pTq);
|
if (pVnode->pTq) {
|
||||||
|
tqUpdateNodeStage(pVnode->pTq, false);
|
||||||
|
tqStopStreamTasks(pVnode->pTq);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
|
static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
|
@ -597,7 +613,7 @@ static void vnodeBecomeLearner(const SSyncFSM *pFsm) {
|
||||||
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
|
static void vnodeBecomeLeader(const SSyncFSM *pFsm) {
|
||||||
SVnode *pVnode = pFsm->data;
|
SVnode *pVnode = pFsm->data;
|
||||||
if (pVnode->pTq) {
|
if (pVnode->pTq) {
|
||||||
tqUpdateNodeStage(pVnode->pTq);
|
tqUpdateNodeStage(pVnode->pTq, true);
|
||||||
}
|
}
|
||||||
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
vDebug("vgId:%d, become leader", pVnode->config.vgId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -140,6 +140,11 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pMeta->pUpdateTaskList = taosHashInit(64, fp, false, HASH_NO_LOCK);
|
||||||
|
if (pMeta->pUpdateTaskList == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
// task list
|
// task list
|
||||||
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
|
pMeta->pTaskList = taosArrayInit(4, sizeof(SStreamTaskId));
|
||||||
if (pMeta->pTaskList == NULL) {
|
if (pMeta->pTaskList == NULL) {
|
||||||
|
@ -316,6 +321,7 @@ void streamMetaCloseImpl(void* arg) {
|
||||||
|
|
||||||
taosHashCleanup(pMeta->pTasks);
|
taosHashCleanup(pMeta->pTasks);
|
||||||
taosHashCleanup(pMeta->pTaskBackendUnique);
|
taosHashCleanup(pMeta->pTaskBackendUnique);
|
||||||
|
taosHashCleanup(pMeta->pUpdateTaskList);
|
||||||
|
|
||||||
taosMemoryFree(pMeta->path);
|
taosMemoryFree(pMeta->path);
|
||||||
taosThreadMutexDestroy(&pMeta->backendMutex);
|
taosThreadMutexDestroy(&pMeta->backendMutex);
|
||||||
|
@ -758,9 +764,8 @@ int32_t tDecodeStreamHbMsg(SDecoder* pDecoder, SStreamHbMsg* pReq) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static bool readyToSendHb(SMetaHbInfo* pInfo) {
|
static bool enoughTimeDuration(SMetaHbInfo* pInfo) {
|
||||||
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) {
|
if ((++pInfo->tickCounter) >= META_HB_SEND_IDLE_COUNTER) { // reset the counter
|
||||||
// reset the counter
|
|
||||||
pInfo->tickCounter = 0;
|
pInfo->tickCounter = 0;
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -784,7 +789,14 @@ void metaHbToMnode(void* param, void* tmrId) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!readyToSendHb(&pMeta->hbInfo)) {
|
// not leader not send msg
|
||||||
|
if (!pMeta->leader) {
|
||||||
|
qInfo("vgId:%d follower not send hb to mnode", pMeta->vgId);
|
||||||
|
taosReleaseRef(streamMetaId, rid);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!enoughTimeDuration(&pMeta->hbInfo)) {
|
||||||
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
|
||||||
taosReleaseRef(streamMetaId, rid);
|
taosReleaseRef(streamMetaId, rid);
|
||||||
return;
|
return;
|
||||||
|
@ -907,10 +919,12 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
taosWUnLockLatch(&pMeta->lock);
|
taosWUnLockLatch(&pMeta->lock);
|
||||||
|
|
||||||
// wait for the stream meta hb function stopping
|
// wait for the stream meta hb function stopping
|
||||||
pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
|
if (pMeta->leader) {
|
||||||
while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
|
pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
|
||||||
taosMsleep(100);
|
while (pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
|
||||||
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
taosMsleep(100);
|
||||||
|
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
qDebug("vgId:%d start to check all tasks", vgId);
|
qDebug("vgId:%d start to check all tasks", vgId);
|
||||||
|
@ -924,3 +938,5 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
|
||||||
int64_t el = taosGetTimestampMs() - st;
|
int64_t el = taosGetTimestampMs() - st;
|
||||||
qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
|
qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void streamMetaStartHb(SStreamMeta* pMeta) { metaHbToMnode(pMeta, NULL); }
|
||||||
|
|
Loading…
Reference in New Issue