fix(stream): notify close for snode.

This commit is contained in:
Haojun Liao 2023-08-28 09:53:08 +08:00
parent 6b327886e9
commit b75ae217f6
4 changed files with 65 additions and 80 deletions

View File

@ -732,6 +732,8 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed);
void streamMetaNotifyClose(SStreamMeta* pMeta);
#ifdef __cplusplus
}
#endif

View File

@ -116,6 +116,7 @@ FAIL:
}
void sndClose(SSnode *pSnode) {
streamMetaNotifyClose(pSnode->pMeta);
streamMetaCommit(pSnode->pMeta);
streamMetaClose(pSnode->pMeta);
taosMemoryFree(pSnode->path);

View File

@ -171,70 +171,11 @@ void tqClose(STQ* pTq) {
taosMemoryFree(pTq);
}
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false;
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive >= 1) {
inTimer = true;
}
}
taosWUnLockLatch(&pMeta->lock);
return inTimer;
}
void tqNotifyClose(STQ* pTq) {
if (pTq == NULL) {
return;
}
SStreamMeta* pMeta = pTq->pStreamMeta;
int32_t vgId = pMeta->vgId;
tqDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId);
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
tqDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr);
streamTaskStop(pTask);
}
taosWUnLockLatch(&pMeta->lock);
// wait for the stream meta hb function stopping
pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
while(pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
tqDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
tqDebug("vgId:%d start to check all tasks", vgId);
int64_t st = taosGetTimestampMs();
while (hasStreamTaskInTimer(pMeta)) {
tqDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
tqDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
streamMetaNotifyClose(pTq->pStreamMeta);
}
int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
@ -258,26 +199,6 @@ int32_t tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
return 0;
}
// int32_t tqPushDataRsp(STqHandle* pHandle, int32_t vgId) {
// SMqDataRsp dataRsp = {0};
// dataRsp.head.consumerId = pHandle->consumerId;
// dataRsp.head.epoch = pHandle->epoch;
// dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
//
// int64_t sver = 0, ever = 0;
// walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
// tqDoSendDataRsp(&pHandle->msg->info, &dataRsp, pHandle->epoch, pHandle->consumerId, TMQ_MSG_TYPE__POLL_RSP, sver,
// ever);
//
// char buf1[TSDB_OFFSET_LEN] = {0};
// char buf2[TSDB_OFFSET_LEN] = {0};
// tFormatOffset(buf1, tListLen(buf1), &dataRsp.reqOffset);
// tFormatOffset(buf2, tListLen(buf2), &dataRsp.rspOffset);
// tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) push rsp, block num: %d, req:%s, rsp:%s", vgId,
// dataRsp.head.consumerId, dataRsp.head.epoch, dataRsp.blockNum, buf1, buf2);
// return 0;
// }
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId) {
int64_t sver = 0, ever = 0;

View File

@ -775,4 +775,65 @@ void metaHbToMnode(void* param, void* tmrId) {
tmsgSendReq(&epset, &msg);
taosTmrReset(metaHbToMnode, META_HB_CHECK_INTERVAL, param, streamEnv.timer, &pMeta->hbInfo.hbTmr);
taosReleaseRef(streamMetaId, rid);
}
static bool hasStreamTaskInTimer(SStreamMeta* pMeta) {
bool inTimer = false;
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->status.timerActive >= 1) {
inTimer = true;
}
}
taosWUnLockLatch(&pMeta->lock);
return inTimer;
}
void streamMetaNotifyClose(SStreamMeta* pMeta) {
int32_t vgId = pMeta->vgId;
qDebug("vgId:%d notify all stream tasks that the vnode is closing", vgId);
taosWLockLatch(&pMeta->lock);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) {
break;
}
SStreamTask* pTask = *(SStreamTask**)pIter;
qDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr);
streamTaskStop(pTask);
}
taosWUnLockLatch(&pMeta->lock);
// wait for the stream meta hb function stopping
pMeta->hbInfo.stopFlag = STREAM_META_WILL_STOP;
while(pMeta->hbInfo.stopFlag != STREAM_META_OK_TO_STOP) {
taosMsleep(100);
qDebug("vgId:%d wait for meta to stop timer", pMeta->vgId);
}
qDebug("vgId:%d start to check all tasks", vgId);
int64_t st = taosGetTimestampMs();
while (hasStreamTaskInTimer(pMeta)) {
qDebug("vgId:%d some tasks in timer, wait for 100ms and recheck", pMeta->vgId);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
}