refactor(stream): do not merge output

This commit is contained in:
Liu Jicong 2022-07-22 10:43:49 +08:00
parent c82bda25e2
commit b1f3428c05
2 changed files with 6 additions and 4 deletions

View File

@ -215,10 +215,10 @@ int32_t tqCheckColModifiable(STQ* pTq, int32_t colId) {
if (pIter == NULL) break; if (pIter == NULL) break;
STqHandle* pExec = (STqHandle*)pIter; STqHandle* pExec = (STqHandle*)pIter;
if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { if (pExec->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) {
int32_t sz = taosArrayGetSize(pExec->colIdList); int32_t sz = pExec->execHandle.pSchemaWrapper->nCols;
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int32_t forbidColId = *(int32_t*)taosArrayGet(pExec->colIdList, i); SSchema* pSchema = &pExec->execHandle.pSchemaWrapper->pSchema[i];
if (forbidColId == colId) { if (pSchema->colId == colId) {
taosHashCancelIterate(pTq->handles, pIter); taosHashCancelIterate(pTq->handles, pIter);
return -1; return -1;
} }
@ -523,7 +523,8 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
.version = ver, .version = ver,
}; };
pHandle->execHandle.execCol.task = pHandle->execHandle.execCol.task =
qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols, &pHandle->execHandle.pSchemaWrapper); qCreateQueueExecTaskInfo(pHandle->execHandle.execCol.qmsg, &handle, &pHandle->execHandle.numOfCols,
&pHandle->execHandle.pSchemaWrapper);
ASSERT(pHandle->execHandle.execCol.task); ASSERT(pHandle->execHandle.execCol.task);
void* scanner = NULL; void* scanner = NULL;
qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner); qExtractStreamScanner(pHandle->execHandle.execCol.task, &scanner);

View File

@ -159,6 +159,7 @@ static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) {
if (data == NULL) { if (data == NULL) {
data = qItem; data = qItem;
streamQueueProcessSuccess(pTask->inputQueue); streamQueueProcessSuccess(pTask->inputQueue);
if (pTask->execType == TASK_EXEC__NONE) break;
/*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/ /*if (qItem->type == STREAM_INPUT__DATA_BLOCK) {*/
/*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/ /*streamUpdateVer(pTask, (SStreamDataBlock*)qItem);*/
/*}*/ /*}*/