fix(stream): fix invalid access when handling error, not start stream if tsDisablestream is set.
This commit is contained in:
parent
a0d08b7fe4
commit
05d416f3b9
|
@ -555,7 +555,12 @@ static void vnodeRestoreFinish(const SSyncFSM *pFsm, const SyncIndex commitIdx)
|
||||||
vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId);
|
vInfo("vgId:%d, sync restore finished, start to restore stream tasks by replay wal", pVnode->config.vgId);
|
||||||
|
|
||||||
// start to restore all stream tasks
|
// start to restore all stream tasks
|
||||||
tqStartStreamTasks(pVnode->pTq);
|
if (tsDisableStream) {
|
||||||
|
vInfo("vgId:%d, not restore stream tasks, since disabled", pVnode->config.vgId);
|
||||||
|
} else {
|
||||||
|
vInfo("vgId:%d start to restore stream tasks", pVnode->config.vgId);
|
||||||
|
tqStartStreamTasks(pVnode->pTq);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
static void vnodeBecomeFollower(const SSyncFSM *pFsm) {
|
||||||
|
|
|
@ -312,7 +312,6 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, SReadHandle* readers, int32_t v
|
||||||
qTaskInfo_t pTaskInfo = NULL;
|
qTaskInfo_t pTaskInfo = NULL;
|
||||||
code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
code = qCreateExecTask(readers, vgId, 0, pPlan, &pTaskInfo, NULL, NULL, OPTR_EXEC_MODEL_STREAM);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
nodesDestroyNode((SNode*)pPlan);
|
|
||||||
qDestroyTask(pTaskInfo);
|
qDestroyTask(pTaskInfo);
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -99,6 +99,7 @@ int32_t createExecTaskInfo(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHand
|
||||||
if (NULL == (*pTaskInfo)->pRoot) {
|
if (NULL == (*pTaskInfo)->pRoot) {
|
||||||
int32_t code = (*pTaskInfo)->code;
|
int32_t code = (*pTaskInfo)->code;
|
||||||
doDestroyTask(*pTaskInfo);
|
doDestroyTask(*pTaskInfo);
|
||||||
|
(*pTaskInfo) = NULL;
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -206,11 +207,14 @@ static void freeBlock(void* pParam) {
|
||||||
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
|
void doDestroyTask(SExecTaskInfo* pTaskInfo) {
|
||||||
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
|
qDebug("%s execTask is freed", GET_TASKID(pTaskInfo));
|
||||||
destroyOperator(pTaskInfo->pRoot);
|
destroyOperator(pTaskInfo->pRoot);
|
||||||
|
pTaskInfo->pRoot = NULL;
|
||||||
|
|
||||||
cleanupQueriedTableScanInfo(&pTaskInfo->schemaInfo);
|
cleanupQueriedTableScanInfo(&pTaskInfo->schemaInfo);
|
||||||
cleanupStreamInfo(&pTaskInfo->streamInfo);
|
cleanupStreamInfo(&pTaskInfo->streamInfo);
|
||||||
|
|
||||||
if (!pTaskInfo->localFetch.localExec) {
|
if (!pTaskInfo->localFetch.localExec) {
|
||||||
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
|
nodesDestroyNode((SNode*)pTaskInfo->pSubplan);
|
||||||
|
pTaskInfo->pSubplan = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
|
taosArrayDestroyEx(pTaskInfo->pResultBlockList, freeBlock);
|
||||||
|
|
Loading…
Reference in New Issue