From 6cbedc5d4a7fe802dba7b273daff192444452b88 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 May 2023 10:45:01 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tqRead.c | 102 ++++++++++++++++------------- 1 file changed, 55 insertions(+), 47 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 1fbdb25528..ed5d7d4516 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -503,6 +503,55 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } +static int32_t buildResSDataBlock(SSDataBlock* pBlock, SSchemaWrapper* pSchema, const SArray* pColIdList) { + if (blockDataGetNumOfCols(pBlock) > 0) { + return TSDB_CODE_SUCCESS; + } + + int32_t numOfCols = taosArrayGetSize(pColIdList); + + if (numOfCols == 0) { // all columns are required + for (int32_t i = 0; i < pSchema->nCols; ++i) { + SSchema* pColSchema = &pSchema->pSchema[i]; + SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); + + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + if (code != TSDB_CODE_SUCCESS) { + blockDataFreeRes(pBlock); + return TSDB_CODE_OUT_OF_MEMORY; + } + } + } else { + if (numOfCols > pSchema->nCols) { + numOfCols = pSchema->nCols; + } + + int32_t i = 0; + int32_t j = 0; + while (i < pSchema->nCols && j < numOfCols) { + SSchema* pColSchema = &pSchema->pSchema[i]; + col_id_t colIdSchema = pColSchema->colId; + + col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pColIdList, j); + if (colIdSchema < colIdNeed) { + i++; + } else if (colIdSchema > colIdNeed) { + j++; + } else { + SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); + int32_t code = blockDataAppendColInfo(pBlock, &colInfo); + if (code != TSDB_CODE_SUCCESS) { + return -1; + } + i++; + j++; + } + } + } + + return TSDB_CODE_SUCCESS; +} + int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); @@ -534,57 +583,16 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet return -1; } + ASSERT(pReader->cachedSchemaVer == pReader->pSchemaWrapper->version); + pReader->cachedSchemaUid = uid; pReader->cachedSchemaSuid = suid; pReader->cachedSchemaVer = sversion; - SSchemaWrapper* pSchemaWrapper = pReader->pSchemaWrapper; - if (blockDataGetNumOfCols(pBlock) > 0) { - blockDataDestroy(pReader->pResBlock); - pReader->pResBlock = createDataBlock(); - pBlock = pReader->pResBlock; - - pBlock->info.id.uid = uid; - pBlock->info.version = pReader->msg.ver; - } - - int32_t numOfCols = taosArrayGetSize(pReader->pColIdList); - if (numOfCols == 0) { // all columns are required - for (int32_t i = 0; i < pSchemaWrapper->nCols; ++i) { - SSchema* pColSchema = &pSchemaWrapper->pSchema[i]; - SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); - if (code != TSDB_CODE_SUCCESS) { - blockDataFreeRes(pBlock); - return -1; - } - } - } else { - if (numOfCols > pSchemaWrapper->nCols) { - numOfCols = pSchemaWrapper->nCols; - } - - int32_t i = 0; - int32_t j = 0; - while (i < pSchemaWrapper->nCols && j < numOfCols) { - SSchema* pColSchema = &pSchemaWrapper->pSchema[i]; - col_id_t colIdSchema = pColSchema->colId; - - col_id_t colIdNeed = *(col_id_t*)taosArrayGet(pReader->pColIdList, j); - if (colIdSchema < colIdNeed) { - i++; - } else if (colIdSchema > colIdNeed) { - j++; - } else { - SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); - int32_t code = blockDataAppendColInfo(pBlock, &colInfo); - if (code != TSDB_CODE_SUCCESS) { - return -1; - } - i++; - j++; - } + if (blockDataGetNumOfCols(pBlock) == 0) { + int32_t code = buildResSDataBlock(pReader->pResBlock, pReader->pSchemaWrapper, pReader->pColIdList); + if (code != TSDB_CODE_SUCCESS) { + return code; } } }