From edaec52bca3337abc5d6314fbd67764d0228ec7c Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 19 Jul 2022 19:47:47 +0800 Subject: [PATCH] refactor: add debug flag --- source/libs/executor/src/executor.c | 25 +++++++++++++++---------- source/libs/stream/src/streamExec.c | 5 +++-- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 529779e2c2..9491c675c1 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -42,23 +42,27 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu // TODO: if a block was set but not consumed, // prevent setting a different type of block - pInfo->blockType = type; pInfo->validBlockIndex = 0; taosArrayClear(pInfo->pBlockLists); - if (type == STREAM_INPUT__DATA_SUBMIT) { + if (type == STREAM_INPUT__MERGED_SUBMIT) { + ASSERT(numOfBlocks > 1); + for (int32_t i = 0; i < numOfBlocks; i++) { + SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); + taosArrayPush(pInfo->pBlockLists, &pReq); + } + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + } else if (type == STREAM_INPUT__DATA_SUBMIT) { /*if (tqReaderSetDataMsg(pInfo->tqReader, input, 0) < 0) {*/ /*qError("submit msg messed up when initing stream block, %s" PRIx64, id);*/ /*return TSDB_CODE_QRY_APP_ERROR;*/ /*}*/ - if (numOfBlocks == 1) { - taosArrayPush(pInfo->pBlockLists, &input); - } else { - for (int32_t i = 0; i < numOfBlocks; i++) { - SSubmitReq* pReq = *(void**)POINTER_SHIFT(input, i * sizeof(void*)); - taosArrayPush(pInfo->pBlockLists, &pReq); - } - } + ASSERT(numOfBlocks == 1); + /*if (numOfBlocks == 1) {*/ + taosArrayPush(pInfo->pBlockLists, &input); + pInfo->blockType = STREAM_INPUT__DATA_SUBMIT; + /*} else {*/ + /*}*/ } else if (type == STREAM_INPUT__DATA_BLOCK) { for (int32_t i = 0; i < numOfBlocks; ++i) { SSDataBlock* pDataBlock = &((SSDataBlock*)input)[i]; @@ -71,6 +75,7 @@ static int32_t doSetStreamBlock(SOperatorInfo* pOperator, void* input, size_t nu taosArrayAddAll(p->pDataBlock, pDataBlock->pDataBlock); taosArrayPush(pInfo->pBlockLists, &p); } + pInfo->blockType = STREAM_INPUT__DATA_BLOCK; } else { ASSERT(0); } diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 33d6762646..a8192b49f3 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -36,7 +36,8 @@ static int32_t streamTaskExecImpl(SStreamTask* pTask, void* data, SArray* pRes) } else if (pItem->type == STREAM_INPUT__MERGED_SUBMIT) { SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)data; SArray* blocks = pMerged->reqs; - qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__DATA_SUBMIT, false); + qDebug("task %d %p set submit input (merged), batch num: %d", pTask->taskId, pTask, (int32_t)blocks->size); + qSetMultiStreamInput(exec, blocks->pData, blocks->size, STREAM_INPUT__MERGED_SUBMIT, false); } else { ASSERT(0); } @@ -147,7 +148,7 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum) { static SArray* streamExecForQall(SStreamTask* pTask, SArray* pRes) { while (1) { - int32_t cnt = 0; + int32_t cnt = 1; void* data = NULL; while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);