From 85ddb63e53245e17cd6d6fb611b043c38ffc715b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Jul 2022 17:44:10 +0800 Subject: [PATCH 1/3] fix(query): free resources in tsdbread --- source/dnode/vnode/src/tsdb/tsdbRead.c | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 5f796bbab9..b1a1cc87cc 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -304,6 +304,10 @@ static bool filesetIteratorNext(SFilesetIter* pIter, STsdbReader* pReader) { STimeWindow win = {0}; while (1) { + if (pReader->pFileReader != NULL) { + tsdbDataFReaderClose(&pReader->pFileReader); + } + pReader->status.pCurrentFileset = (SDFileSet*)taosArrayGet(pIter->pFileList, pIter->index); int32_t code = tsdbDataFReaderOpen(&pReader->pFileReader, pReader->pTsdb, pReader->status.pCurrentFileset); @@ -2437,6 +2441,7 @@ static int32_t doMergeRowsInFileBlockImpl(SBlockData* pBlockData, int32_t rowInd SVersionRange* pVerRange, int32_t step) { while (pBlockData->aTSKEY[rowIndex] == key && rowIndex < pBlockData->nRow && rowIndex >= 0) { if (pBlockData->aVersion[rowIndex] > pVerRange->maxVer || pBlockData->aVersion[rowIndex] < pVerRange->minVer) { + rowIndex += step; continue; } @@ -2834,7 +2839,7 @@ void tsdbReaderClose(STsdbReader* pReader) { cleanupDataBlockIterator(&pReader->status.blockIter); destroyBlockScanInfo(pReader->status.pTableMap); blockDataDestroy(pReader->pResBlock); - + tsdbDataFReaderClose(&pReader->pFileReader); #if 0 // if (pReader->status.pTableScanInfo != NULL) { @@ -3023,6 +3028,7 @@ int32_t tsdbReaderReset(STsdbReader* pReader, SQueryTableDataCond* pCond) { memset(pReader->suppInfo.plist, 0, POINTER_BYTES); pReader->suppInfo.tsColAgg.colId = PRIMARYKEY_TIMESTAMP_COL_ID; + tsdbDataFReaderClose(&pReader->pFileReader); // todo set the correct numOfTables int32_t numOfTables = 1; From e99231c1c8c5c5b5d8ed53d39c49f66f35cae6cf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Jul 2022 17:59:25 +0800 Subject: [PATCH 2/3] fix(query): set correct output buffer. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 417c014b6e..750797cd69 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -22,7 +22,6 @@ typedef struct SLastrowReader { SVnode* pVnode; STSchema* pSchema; uint64_t uid; - // int32_t* pSlotIds; char** transferBuf; // todo remove it soon int32_t numOfCols; int32_t type; @@ -40,15 +39,17 @@ static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReade if (slotIds[i] == -1) { colDataAppend(pColInfoData, numOfRows, (const char*)&pRow->ts, false); } else { - tTSRowGetVal(pRow, pReader->pSchema, slotIds[i], &colVal); + int32_t slotId = slotIds[i]; + + tTSRowGetVal(pRow, pReader->pSchema, slotId, &colVal); if (IS_VAR_DATA_TYPE(colVal.type)) { if (colVal.isNull || colVal.isNone) { colDataAppendNULL(pColInfoData, numOfRows); } else { - varDataSetLen(pReader->transferBuf[i], colVal.value.nData); - memcpy(varDataVal(pReader->transferBuf[i]), colVal.value.pData, colVal.value.nData); - colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[i], false); + varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData); + memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData); + colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); } } else { colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, colVal.isNull || colVal.isNone); @@ -79,7 +80,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); p->pTableList = pTableIdList; - for (int32_t i = 0; i < p->numOfCols; ++i) { + for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) { p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes); } @@ -92,10 +93,11 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, int32_t tsdbLastrowReaderClose(void* pReader) { SLastrowReader* p = pReader; - for (int32_t i = 0; i < p->numOfCols; ++i) { + for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { taosMemoryFreeClear(p->transferBuf[i]); } + taosMemoryFree(p->pSchema); taosMemoryFree(p->transferBuf); taosMemoryFree(pReader); return TSDB_CODE_SUCCESS; From 6f2ac5ba7221a3c90e2e7e02ab762b7bf7e28e6f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Tue, 12 Jul 2022 21:07:12 +0800 Subject: [PATCH 3/3] fix(query): always generated results for last_row query if rows exists. --- source/dnode/vnode/src/tsdb/tsdbCacheRead.c | 3 ++- source/libs/function/src/builtins.c | 2 +- source/libs/function/src/builtinsimpl.c | 28 +++++++++++++-------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 750797cd69..5c09c7663f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -30,6 +30,7 @@ typedef struct SLastrowReader { } SLastrowReader; static void saveOneRow(STSRow* pRow, SSDataBlock* pBlock, SLastrowReader* pReader, const int32_t* slotIds) { + ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; SColVal colVal = {0}; @@ -69,7 +70,6 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, p->type = type; p->pVnode = pVnode; p->numOfCols = numOfCols; - p->transferBuf = taosMemoryCalloc(p->numOfCols, POINTER_BYTES); if (taosArrayGetSize(pTableIdList) == 0) { *pReader = p; @@ -80,6 +80,7 @@ int32_t tsdbLastRowReaderOpen(void* pVnode, int32_t type, SArray* pTableIdList, p->pSchema = metaGetTbTSchema(p->pVnode->pMeta, pKeyInfo->uid, -1); p->pTableList = pTableIdList; + p->transferBuf = taosMemoryCalloc(p->pSchema->numOfCols, POINTER_BYTES); for (int32_t i = 0; i < p->pSchema->numOfCols; ++i) { if (IS_VAR_DATA_TYPE(p->pSchema->columns[i].type)) { p->transferBuf[i] = taosMemoryMalloc(p->pSchema->columns[i].bytes); diff --git a/source/libs/function/src/builtins.c b/source/libs/function/src/builtins.c index b4e0e82ac6..1b8b39a76e 100644 --- a/source/libs/function/src/builtins.c +++ b/source/libs/function/src/builtins.c @@ -2217,7 +2217,7 @@ const SBuiltinFuncDefinition funcMgtBuiltins[] = { { .name = "last_row", .type = FUNCTION_TYPE_LAST_ROW, - .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC, + .classification = FUNC_MGT_AGG_FUNC | FUNC_MGT_MULTI_RES_FUNC | FUNC_MGT_SELECT_FUNC | FUNC_MGT_TIMELINE_FUNC | FUNC_MGT_IMPLICIT_TS_FUNC, .translateFunc = translateFirstLast, .getEnvFunc = getFirstLastFuncEnv, .initFunc = functionSetup, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 5dee2e8480..0324a6bae0 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -80,8 +80,10 @@ typedef struct STopBotRes { } STopBotRes; typedef struct SFirstLastRes { - bool hasResult; - bool isNull; // used for last_row function only + bool hasResult; + // used for last_row function only, isNullRes in SResultRowEntry can not be passed to downstream.So, + // this attribute is required + bool isNull; int32_t bytes; char buf[]; } SFirstLastRes; @@ -2900,7 +2902,7 @@ int32_t firstLastFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { pResInfo->isNullRes = (pResInfo->numOfRes == 0) ? 1 : 0; SFirstLastRes* pRes = GET_ROWCELL_INTERBUF(pResInfo); - colDataAppend(pCol, pBlock->info.rows, pRes->buf, pResInfo->isNullRes); + colDataAppend(pCol, pBlock->info.rows, pRes->buf, pRes->isNull||pResInfo->isNullRes); // handle selectivity STuplePos* pTuplePos = (STuplePos*)(pRes->buf + pRes->bytes + sizeof(TSKEY)); setSelectivityValue(pCtx, pBlock, pTuplePos, pBlock->info.rows); @@ -5956,24 +5958,28 @@ int32_t lastrowFunction(SqlFunctionCtx* pCtx) { int32_t type = pInputCol->info.type; int32_t bytes = pInputCol->info.bytes; + pInfo->bytes = bytes; + // last_row function does not ignore the null value for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { - if (pInputCol->hasNull && colDataIsNull_s(pInputCol, i)) { - continue; - } - numOfElems++; char* data = colDataGetData(pInputCol, i); TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || *(TSKEY*)(pInfo->buf + bytes) < cts) { - if (IS_VAR_DATA_TYPE(type)) { - bytes = varDataTLen(data); - pInfo->bytes = bytes; + + if (colDataIsNull_s(pInputCol, i)) { + pInfo->isNull = true; + } else { + if (IS_VAR_DATA_TYPE(type)) { + bytes = varDataTLen(data); + pInfo->bytes = bytes; + } + + memcpy(pInfo->buf, data, bytes); } - memcpy(pInfo->buf, data, bytes); *(TSKEY*)(pInfo->buf + bytes) = cts; pInfo->hasResult = true;