diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index be36848fec..791c73047c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -30,36 +30,69 @@ typedef struct SCacheRowsReader { } SCacheRowsReader; static void saveOneRow(SArray* pRow, SSDataBlock* pBlock, SCacheRowsReader* pReader, const int32_t* slotIds, - SFirstLastRes** pRes) { + void** pRes) { ASSERT(pReader->numOfCols <= taosArrayGetSize(pBlock->pDataBlock)); int32_t numOfRows = pBlock->info.rows; - for (int32_t i = 0; i < pReader->numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + if ((pReader->type & CACHESCAN_RETRIEVE_LAST) == CACHESCAN_RETRIEVE_LAST) { + for (int32_t i = 0; i < pReader->numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[i]); - if (slotIds[i] == -1) { // the primary timestamp - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); - pRes[i]->ts = pColVal->ts; - memcpy(pRes[i]->buf, &pColVal->ts, TSDB_KEYSIZE); - } else { - int32_t slotId = slotIds[i]; - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); + if (slotIds[i] == -1) { // the primary timestamp + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + p->ts = pColVal->ts; + p->bytes = TSDB_KEYSIZE; + *(int64_t*)p->buf = pColVal->ts; + } else { + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); - pRes[i]->ts = pColVal->ts; - pRes[i]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); + p->ts = pColVal->ts; + p->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); + if (!p->isNull) { + if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { + varDataSetLen(p->buf, pColVal->colVal.value.nData); + memcpy(varDataVal(p->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + p->bytes = pColVal->colVal.value.nData; + } else { + memcpy(p->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); + p->bytes = pReader->pSchema->columns[slotId].bytes; + } + } + } - if (!pRes[i]->isNull) { - if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - varDataSetLen(pRes[i]->buf, pColVal->colVal.value.nData); - memcpy(varDataVal(pRes[i]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + p->hasResult = true; + varDataSetLen(pRes[i], pColInfoData->info.bytes); + colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); + } + } else { + ASSERT((pReader->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW); + + SColVal colVal = {0}; + for (int32_t i = 0; i < pReader->numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + if (slotIds[i] == -1) { + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, 0); + colDataAppend(pColInfoData, numOfRows, (const char*)&pColVal->ts, false); + } else { + int32_t slotId = slotIds[i]; + SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); + + if (IS_VAR_DATA_TYPE(colVal.type)) { + if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { + colDataAppendNULL(pColInfoData, numOfRows); + } else { + varDataSetLen(pReader->transferBuf[slotId], colVal.value.nData); + memcpy(varDataVal(pReader->transferBuf[slotId]), colVal.value.pData, colVal.value.nData); + colDataAppend(pColInfoData, numOfRows, pReader->transferBuf[slotId], false); + } } else { - memcpy(pRes[i]->buf, &pColVal->colVal.value, pReader->pSchema->columns[slotId].bytes); + colDataAppend(pColInfoData, numOfRows, (const char*)&colVal.value, !COL_VAL_IS_VALUE(&pColVal->colVal)); } } } - - pRes[i]->hasResult = true; - colDataAppend(pColInfoData, numOfRows, (const char*)pRes[i], false); } pBlock->info.rows += 1; @@ -126,24 +159,17 @@ static int32_t doExtractCacheRow(SCacheRowsReader* pr, SLRUCache* lruCache, uint int32_t code = TSDB_CODE_SUCCESS; if ((pr->type & CACHESCAN_RETRIEVE_LAST_ROW) == CACHESCAN_RETRIEVE_LAST_ROW) { code = tsdbCacheGetLastrowH(lruCache, uid, pr->pVnode->pTsdb, h); - if (code != TSDB_CODE_SUCCESS) { - return code; - } - - // no data in the table of Uid - if (*h != NULL) { - *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); - } } else { code = tsdbCacheGetLastH(lruCache, uid, pr->pVnode->pTsdb, h); - if (code != TSDB_CODE_SUCCESS) { - return code; - } + } - // no data in the table of Uid - if (*h != NULL) { - *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); - } + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + // no data in the table of Uid + if (*h != NULL) { + *pRow = (SArray*)taosLRUCacheValue(lruCache, *h); } return code; @@ -163,10 +189,26 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 size_t numOfTables = taosArrayGetSize(pr->pTableList); bool hasRes = false; - SFirstLastRes** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); + void** pRes = taosMemoryCalloc(pr->numOfCols, POINTER_BYTES); for (int32_t j = 0; j < pr->numOfCols; ++j) { - pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes); - pRes[j]->ts = INT64_MIN; + pRes[j] = taosMemoryCalloc(1, sizeof(SFirstLastRes) + pr->pSchema->columns[slotIds[j]].bytes + VARSTR_HEADER_SIZE); + SFirstLastRes* p = (SFirstLastRes*)varDataVal(pRes[j]); + p->ts = INT64_MIN; + } + + SArray* pLastCols = taosArrayInit(pr->pSchema->numOfCols, sizeof(SLastCol)); + for (int32_t i = 0; i < pr->numOfCols; ++i) { + SLastCol p = {0}; + p.ts = INT64_MIN; + + struct STColumn* pCol = &pr->pSchema->columns[i]; + p.colVal.type = pCol->type; + + if (IS_VAR_DATA_TYPE(pCol->type)) { + p.colVal.value.pData = taosMemoryCalloc(pCol->bytes, sizeof(char)); + } + + taosArrayPush(pLastCols, &p); } // retrieve the only one last row of all tables in the uid list. @@ -185,38 +227,32 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 { for (int32_t k = 0; k < pr->numOfCols; ++k) { - SColumnInfoData* pColInfoData = taosArrayGet(pResBlock->pDataBlock, k); - + SLastCol* p = taosArrayGet(pLastCols, k); if (slotIds[k] == -1) { // the primary timestamp - SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, k); - if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + SLastCol* pCol = (SLastCol*)taosArrayGet(pRow, k); + if (pCol->ts > p->ts) { hasRes = true; - pRes[k]->hasResult = true; - pRes[k]->ts = pColVal->ts; - memcpy(pRes[k]->buf, &pColVal->ts, TSDB_KEYSIZE); - - colDataAppend(pColInfoData, 1, (const char*)pRes[k], false); + p->ts = pCol->ts; + p->colVal = pCol->colVal; } } else { int32_t slotId = slotIds[k]; SLastCol* pColVal = (SLastCol*)taosArrayGet(pRow, slotId); - if (pColVal->ts > pRes[k]->ts || !pRes[k]->hasResult) { + if (pColVal->ts > p->ts) { hasRes = true; - pRes[k]->hasResult = true; - pRes[k]->ts = pColVal->ts; + p->ts = pColVal->ts; - pRes[k]->isNull = !COL_VAL_IS_VALUE(&pColVal->colVal); - if (!pRes[k]->isNull) { + if (!COL_VAL_IS_VALUE(&pColVal->colVal)) { if (IS_VAR_DATA_TYPE(pColVal->colVal.type)) { - varDataSetLen(pRes[k]->buf, pColVal->colVal.value.nData); - memcpy(varDataVal(pRes[k]->buf), pColVal->colVal.value.pData, pColVal->colVal.value.nData); + uint8_t* px = p->colVal.value.pData; + p->colVal = pColVal->colVal; + p->colVal.value.pData = px; + memcpy(px, pColVal->colVal.value.pData, pColVal->colVal.value.nData); } else { - memcpy(pRes[k]->buf, &pColVal->colVal.value, pr->pSchema->columns[slotId].bytes); + p->colVal = pColVal->colVal; } } - - colDataAppend(pColInfoData, 1, (const char*)pRes[k], false); } } } @@ -227,6 +263,7 @@ int32_t tsdbRetrieveCacheRows(void* pReader, SSDataBlock* pResBlock, const int32 if (hasRes) { pResBlock->info.rows = 1; + saveOneRow(pLastCols, pResBlock, pr, slotIds, pRes); } } else if ((pr->type & CACHESCAN_RETRIEVE_TYPE_ALL) == CACHESCAN_RETRIEVE_TYPE_ALL) { diff --git a/source/libs/planner/src/planOptimizer.c b/source/libs/planner/src/planOptimizer.c index 0fc1be8a70..7a145e7dde 100644 --- a/source/libs/planner/src/planOptimizer.c +++ b/source/libs/planner/src/planOptimizer.c @@ -2232,6 +2232,7 @@ static int32_t lastRowScanOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogic cxt.doAgg = false; nodesWalkExprs(pScan->pScanCols, lastRowScanOptSetColDataType, &cxt); nodesWalkExprs(pScan->pScanPseudoCols, lastRowScanOptSetColDataType, &cxt); + nodesWalkExprs(pScan->node.pTargets, lastRowScanOptSetColDataType, &cxt); nodesClearList(cxt.pLastCols); } pAgg->hasLastRow = false;