Merge pull request #29718 from taosdata/enh/streamqueue
refactor(stream): notify close stream in main thread
This commit is contained in:
commit
dae17cd1b0
|
@ -711,6 +711,21 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
||||||
pMgmt->state.openVnodes = 0;
|
pMgmt->state.openVnodes = 0;
|
||||||
dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
|
dInfo("close %d vnodes with %d threads", numOfVnodes, threadNum);
|
||||||
|
|
||||||
|
int64_t st = taosGetTimestampMs();
|
||||||
|
dInfo("notify all streams closed in all %d vnodes, ts:%" PRId64, numOfVnodes, st);
|
||||||
|
if (ppVnodes != NULL) {
|
||||||
|
for (int32_t i = 0; i < numOfVnodes; ++i) {
|
||||||
|
if (ppVnodes[i] != NULL) {
|
||||||
|
if (ppVnodes[i]->pImpl != NULL) {
|
||||||
|
tqNotifyClose(ppVnodes[i]->pImpl->pTq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int64_t et = taosGetTimestampMs();
|
||||||
|
dInfo("notify close stream completed in %d vnodes, elapsed time: %" PRId64 "ms", numOfVnodes, et - st);
|
||||||
|
|
||||||
for (int32_t t = 0; t < threadNum; ++t) {
|
for (int32_t t = 0; t < threadNum; ++t) {
|
||||||
SVnodeThread *pThread = &threads[t];
|
SVnodeThread *pThread = &threads[t];
|
||||||
if (pThread->vnodeNum == 0) continue;
|
if (pThread->vnodeNum == 0) continue;
|
||||||
|
@ -718,6 +733,7 @@ static void vmCloseVnodes(SVnodeMgmt *pMgmt) {
|
||||||
TdThreadAttr thAttr;
|
TdThreadAttr thAttr;
|
||||||
(void)taosThreadAttrInit(&thAttr);
|
(void)taosThreadAttrInit(&thAttr);
|
||||||
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
|
if (taosThreadCreate(&pThread->thread, &thAttr, vmCloseVnodeInThread, pThread) != 0) {
|
||||||
dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
|
dError("thread:%d, failed to create thread to close vnode since %s", pThread->threadIndex, strerror(errno));
|
||||||
}
|
}
|
||||||
|
|
|
@ -1705,9 +1705,6 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(mndSetDropDbPrepareLogs(pMnode, pTrans, pDb), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndSetDropDbPrepareLogs(pMnode, pTrans, pDb), NULL, _OVER);
|
||||||
TAOS_CHECK_GOTO(mndSetDropDbCommitLogs(pMnode, pTrans, pDb), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndSetDropDbCommitLogs(pMnode, pTrans, pDb), NULL, _OVER);
|
||||||
/*if (mndDropOffsetByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
|
||||||
/*if (mndDropSubByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
|
||||||
/*if (mndDropTopicByDB(pMnode, pTrans, pDb) != 0) goto _OVER;*/
|
|
||||||
TAOS_CHECK_GOTO(mndDropStreamByDb(pMnode, pTrans, pDb), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndDropStreamByDb(pMnode, pTrans, pDb), NULL, _OVER);
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
TAOS_CHECK_GOTO(mndDropViewByDb(pMnode, pTrans, pDb), NULL, _OVER);
|
TAOS_CHECK_GOTO(mndDropViewByDb(pMnode, pTrans, pDb), NULL, _OVER);
|
||||||
|
|
|
@ -2643,6 +2643,7 @@ int32_t mndProcessConsensusInTmr(SRpcMsg *pMsg) {
|
||||||
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
|
code = mndGetStreamObj(pMnode, pInfo->streamId, &pStream);
|
||||||
if (pStream == NULL || code != 0) { // stream has been dropped already
|
if (pStream == NULL || code != 0) { // stream has been dropped already
|
||||||
mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
|
mDebug("stream:0x%" PRIx64 " dropped already, continue", pInfo->streamId);
|
||||||
|
void *p = taosArrayPush(pStreamList, &pInfo->streamId);
|
||||||
taosArrayDestroy(pList);
|
taosArrayDestroy(pList);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -174,7 +174,10 @@ void tqNotifyClose(STQ* pTq) {
|
||||||
if (pTq == NULL) {
|
if (pTq == NULL) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
streamMetaNotifyClose(pTq->pStreamMeta);
|
|
||||||
|
if (pTq->pStreamMeta != NULL) {
|
||||||
|
streamMetaNotifyClose(pTq->pStreamMeta);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
|
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
|
||||||
|
|
|
@ -960,9 +960,9 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, char* pMsg) {
|
||||||
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
|
tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr);
|
||||||
|
|
||||||
streamMutexLock(&pTask->lock);
|
streamMutexLock(&pTask->lock);
|
||||||
streamTaskClearCheckInfo(pTask, true);
|
|
||||||
|
|
||||||
streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
|
streamTaskSetFailedCheckpointId(pTask, pReq->chkptId);
|
||||||
|
streamTaskClearCheckInfo(pTask, true);
|
||||||
|
|
||||||
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
// clear flag set during do checkpoint, and open inputQ for all upstream tasks
|
||||||
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
SStreamTaskState pState = streamTaskGetStatus(pTask);
|
||||||
|
|
|
@ -464,8 +464,8 @@ int32_t streamTransferStateDoPrepare(SStreamTask* pTask) {
|
||||||
// 2. send msg to mnode to launch a checkpoint to keep the state for current stream
|
// 2. send msg to mnode to launch a checkpoint to keep the state for current stream
|
||||||
code = streamTaskSendCheckpointReq(pStreamTask);
|
code = streamTaskSendCheckpointReq(pStreamTask);
|
||||||
|
|
||||||
// 3. assign the status to the value that will be kept in disk
|
// 3. the default task status should be ready or something, not halt.
|
||||||
pStreamTask->status.taskStatus = streamTaskGetStatus(pStreamTask).state;
|
// status to the value that will be kept in disk
|
||||||
|
|
||||||
// 4. open the inputQ for all upstream tasks
|
// 4. open the inputQ for all upstream tasks
|
||||||
streamTaskOpenAllUpstreamInput(pStreamTask);
|
streamTaskOpenAllUpstreamInput(pStreamTask);
|
||||||
|
|
Loading…
Reference in New Issue