diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 2ae3115c0a..b87e5d5503 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -287,12 +287,17 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 hasRes = true; p->ts = pColVal->ts; - uint8_t* px = p->colVal.value.pData; - p->colVal = pColVal->colVal; + if (!IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + p->colVal = pColVal->colVal; + } else { + if (COL_VAL_IS_VALUE(&pColVal->colVal)) { + memcpy(p->colVal.value.pData, pColVal->colVal.value.pData, pColVal->colVal.value.nData); + } - if (COL_VAL_IS_VALUE(&pColVal->colVal) && IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - p->colVal.value.pData = px; - memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData); + p->colVal.value.nData = pColVal->colVal.value.nData; + p->colVal.type = pColVal->colVal.type; + p->colVal.flag = pColVal->colVal.flag; + p->colVal.cid = pColVal->colVal.cid; } } } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index ca3db1e5d8..4b332c3e85 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -305,7 +305,7 @@ static void* getPosInBlockInfoBuf(SBlockInfoBuf* pBuf, int32_t index) { } // NOTE: speedup the whole processing by preparing the buffer for STableBlockScanInfo in batch model -static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableKeyInfo* idList, int32_t numOfTables) { +static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, SBlockInfoBuf* pBuf, const STableKeyInfo* idList, int32_t numOfTables) { // allocate buffer in order to load data blocks from file // todo use simple hash instead, optimize the memory consumption SHashObj* pTableMap = @@ -315,10 +315,10 @@ static SHashObj* createDataBlockScanInfo(STsdbReader* pTsdbReader, const STableK } int64_t st = taosGetTimestampUs(); - initBlockScanInfoBuf(&pTsdbReader->blockInfoBuf, numOfTables); + initBlockScanInfoBuf(pBuf, numOfTables); for (int32_t j = 0; j < numOfTables; ++j) { - STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(&pTsdbReader->blockInfoBuf, j); + STableBlockScanInfo* pScanInfo = getPosInBlockInfoBuf(pBuf, j); pScanInfo->uid = idList[j].uid; if (ASCENDING_TRAVERSE(pTsdbReader->order)) { int64_t skey = pTsdbReader->window.skey; @@ -3785,9 +3785,9 @@ int32_t tsdbReaderOpen(SVnode* pVnode, SQueryTableDataCond* pCond, void* pTableL } STsdbReader* p = (pReader->innerReader[0] != NULL)? pReader->innerReader[0]:pReader; - pReader->status.pTableMap = createDataBlockScanInfo(p, pTableList, numOfTables); + pReader->status.pTableMap = createDataBlockScanInfo(p, &pReader->blockInfoBuf, pTableList, numOfTables); if (pReader->status.pTableMap == NULL) { - tsdbReaderClose(pReader); + tsdbReaderClose(p); *ppReader = NULL; code = TSDB_CODE_TDB_OUT_OF_MEMORY; diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 922ed05653..5e5c01201f 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -26,7 +26,7 @@ #include "ttypes.h" static SSDataBlock* doScanCache(SOperatorInfo* pOperator); -static void destroyLastrowScanOperator(void* param); +static void destroyCacheScanOperator(void* param); static int32_t extractCacheScanSlotId(const SArray* pColMatchInfo, SExecTaskInfo* pTaskInfo, int32_t** pSlotIds); static int32_t removeRedundantTsCol(SLastRowScanPhysiNode* pScanNode, SColMatchInfo* pColMatchInfo); @@ -97,14 +97,14 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, destroyLastrowScanOperator, NULL); + createOperatorFpSet(operatorDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, NULL); pOperator->cost.openCost = 0; return pOperator; _error: pTaskInfo->code = code; - destroyLastrowScanOperator(pInfo); + destroyCacheScanOperator(pInfo); taosMemoryFree(pOperator); return NULL; } @@ -157,9 +157,12 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { SColumnInfoData* pSrc = taosArrayGet(pInfo->pBufferredRes->pDataBlock, slotId); SColumnInfoData* pDst = taosArrayGet(pRes->pDataBlock, slotId); - char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes); - bool isNull = colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes); - colDataAppend(pDst, 0, p, isNull); + if (colDataIsNull_s(pSrc, pInfo->indexOfBufferedRes)) { + colDataAppendNULL(pDst, 0); + } else { + char* p = colDataGetData(pSrc, pInfo->indexOfBufferedRes); + colDataAppend(pDst, 0, p, false); + } } pRes->info.uid = *(tb_uid_t*)taosArrayGet(pInfo->pUidList, pInfo->indexOfBufferedRes); @@ -226,6 +229,8 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader); return pInfo->pRes; + } else { + pInfo->pLastrowReader = tsdbCacherowsReaderClose(pInfo->pLastrowReader); } } @@ -234,7 +239,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) { } } -void destroyLastrowScanOperator(void* param) { +void destroyCacheScanOperator(void* param) { SLastrowScanInfo* pInfo = (SLastrowScanInfo*)param; blockDataDestroy(pInfo->pRes); blockDataDestroy(pInfo->pBufferredRes); diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 87e15370e4..4d69b4d45c 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -6568,7 +6568,9 @@ int32_t cachedLastRowFunction(SqlFunctionCtx* pCtx) { for (int32_t i = pInput->numOfRows + pInput->startRowIndex - 1; i >= pInput->startRowIndex; --i) { numOfElems++; - char* data = colDataGetData(pInputCol, i); + bool isNull = colDataIsNull(pInputCol, pInput->numOfRows, i, NULL); + char* data = isNull ? NULL : colDataGetData(pInputCol, i); + TSKEY cts = getRowPTs(pInput->pPTS, i); if (pResInfo->numOfRes == 0 || pInfo->ts < cts) { doSaveLastrow(pCtx, data, i, cts, pInfo);