fix(stream): clean up timer
This commit is contained in:
parent
82e0f77715
commit
1e7d9ada19
|
@ -46,6 +46,7 @@ void tqCleanUp() {
|
|||
|
||||
if (old == 1) {
|
||||
taosTmrCleanUp(tqMgmt.timer);
|
||||
streamCleanUp();
|
||||
atomic_store_8(&tqMgmt.inited, 0);
|
||||
}
|
||||
}
|
||||
|
@ -529,19 +530,16 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
|
|||
if (pTask->execType != TASK_EXEC__NONE) {
|
||||
// expand runners
|
||||
if (pTask->isDataScan) {
|
||||
/*SStreamReader* pStreamReader = tqInitSubmitMsgScanner(pTq->pVnode->pMeta);*/
|
||||
SReadHandle handle = {
|
||||
.meta = pTq->pVnode->pMeta,
|
||||
.vnode = pTq->pVnode,
|
||||
.initStreamReader = 1,
|
||||
};
|
||||
/*pTask->exec.inputHandle = pStreamReader;*/
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, &handle);
|
||||
ASSERT(pTask->exec.executor);
|
||||
} else {
|
||||
pTask->exec.executor = qCreateStreamExecTaskInfo(pTask->exec.qmsg, NULL);
|
||||
ASSERT(pTask->exec.executor);
|
||||
}
|
||||
ASSERT(pTask->exec.executor);
|
||||
}
|
||||
|
||||
// sink
|
||||
|
|
Loading…
Reference in New Issue