From b1f3428c05042f29ca3112353ec12e7ffbeebdbc Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 22 Jul 2022 10:43:49 +0800 Subject: [PATCH] refactor(stream): do not merge output --- source/dnode/vnode/src/tq/tq.c | 9 +++++---- source/libs/stream/src/streamExec.c | 1 + 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 89e330b78d..3739897ec0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -215,10 +215,10 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) { if (pIter == NULL) break; STqHandle* pExec = (STqHandle*)pIter; if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - int32_t sz = taosArrayGetSize(pExec->colIdList); + int32_t sz = pExec->execHandle.pSchemaWrapper->nCols; for (int32_t i = 0; i < sz; i++) { - int32_t forbidColId = *(int32_t*)taosArrayGet(pExec->colIdList, i); - if (forbidColId == colId) { + SSchema* pSchema = &pExec->execHandle.pSchemaWrapper->pSchema[i]; + if (pSchema->colId == colId) { taosHashCancelIterate(pTq->handles, pIter); return -1; } @@ -523,7 +523,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) { .version = ver, }; pHandle->execHandle.execCol.task = - qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper); + qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, + &pHandle->execHandle.pSchemaWrapper); ASSERT(pHandle->execHandle.execCol.task); void* scanner = NULL; qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a8192b49f3..52b610228e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -159,6 +159,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { if (data == NULL) { data = qItem; streamQueueProcessSuccess(pTask->inputQueue); + if (pTask->execType == TASK_EXEC__NONE) break; /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ /*}*/