enh(stream): stop stream asap.
This commit is contained in:
parent
8a6b07347c
commit
f9801ba9c5
|
@ -119,6 +119,7 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
|||
pVnode->pFetchQ->threadId);
|
||||
while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10);
|
||||
|
||||
tqNotifyClose(pVnode->pImpl->pTq);
|
||||
dInfo("vgId:%d, wait for vnode stream queue:%p is empty", pVnode->vgId, pVnode->pStreamQ);
|
||||
while (!taosQueueEmpty(pVnode->pStreamQ)) taosMsleep(10);
|
||||
|
||||
|
@ -141,7 +142,6 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
|
|||
dInfo("vgId:%d, vnode is closed", pVnode->vgId);
|
||||
|
||||
if (commitAndRemoveWal) {
|
||||
char path[TSDB_FILENAME_LEN] = {0};
|
||||
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d%swal", TD_DIRSEP, pVnode->vgId, TD_DIRSEP);
|
||||
dInfo("vgId:%d, remove all wals, path:%s", pVnode->vgId, path);
|
||||
tfsRmdir(pMgmt->pTfs, path);
|
||||
|
|
|
@ -121,6 +121,7 @@ struct STQ {
|
|||
TTB* pExecStore;
|
||||
TTB* pCheckStore;
|
||||
SStreamMeta* pStreamMeta;
|
||||
bool closing;
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
|
|
|
@ -190,6 +190,7 @@ int32_t tsdbSetKeepCfg(STsdb* pTsdb, STsdbCfg* pCfg);
|
|||
int tqInit();
|
||||
void tqCleanUp();
|
||||
STQ* tqOpen(const char* path, SVnode* pVnode);
|
||||
void tqNotifyClose(STQ*);
|
||||
void tqClose(STQ*);
|
||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||
int tqRegisterPushHandle(STQ* pTq, void* pHandle, const SMqPollReq* pRequest, SRpcMsg* pRpcMsg, SMqDataRsp* pDataRsp,
|
||||
|
|
|
@ -97,6 +97,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pTq->closing = false;
|
||||
pTq->path = taosStrdup(path);
|
||||
pTq->pVnode = pVnode;
|
||||
pTq->walLogLastVer = pVnode->pWal->vers.lastVer;
|
||||
|
@ -154,6 +155,28 @@ void tqClose(STQ* pTq) {
|
|||
taosMemoryFree(pTq);
|
||||
}
|
||||
|
||||
void tqNotifyClose(STQ* pTq) {
|
||||
if (pTq != NULL) {
|
||||
pTq->closing = true;
|
||||
taosWLockLatch(&pTq->pStreamMeta->lock);
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
|
||||
if (pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
||||
SStreamTask* pTask = *(SStreamTask**)pIter;
|
||||
tqDebug("vgId:%d s-task:%s set dropping flag", pTq->pStreamMeta->vgId, pTask->id.idStr);
|
||||
pTask->status.taskStatus = TASK_STATUS__DROPPING;
|
||||
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
taosWUnLockLatch(&pTq->pStreamMeta->lock);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t doSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch,
|
||||
int64_t consumerId, int32_t type) {
|
||||
int32_t len = 0;
|
||||
|
|
|
@ -2541,6 +2541,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
}
|
||||
|
||||
while (1) {
|
||||
if (isTaskKilled(pTaskInfo)) {
|
||||
T_LONG_JMP(pTaskInfo->env, pTaskInfo->code);
|
||||
}
|
||||
|
||||
SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream);
|
||||
if (pBlock == NULL) {
|
||||
pOperator->status = OP_RES_TO_RETURN;
|
||||
|
@ -2635,6 +2639,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
|
|||
while ((pIte = tSimpleHashIterate(pInfo->pUpdatedMap, pIte, &iter)) != NULL) {
|
||||
taosArrayPush(pInfo->pUpdated, pIte);
|
||||
}
|
||||
|
||||
tSimpleHashCleanup(pInfo->pUpdatedMap);
|
||||
pInfo->pUpdatedMap = NULL;
|
||||
taosArraySort(pInfo->pUpdated, winKeyCmprImpl);
|
||||
|
|
Loading…
Reference in New Issue