diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 46f8d85d55..f8077d08eb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -293,6 +293,9 @@ _exit: return code; } +static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, + int nCols, int16_t *slotIds); + int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsReader *pr, int32_t ltype) { static char const *alstring[2] = {"last_row", "last"}; char const *lstring = alstring[ltype]; @@ -328,6 +331,7 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR taosMemoryFree(errs); for (int i = 0; i < num_keys; ++i) { + SArray *pTmpColArray = NULL; SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i]); int16_t cid = *(int16_t *)taosArrayGet(pCidList, i); SLastCol noneCol = {.ts = TSKEY_MIN, .colVal = COL_VAL_NONE(cid, pr->pSchema->columns[pr->pSlotIds[i]].type)}; @@ -343,18 +347,29 @@ int32_t tsdbCacheGet(STsdb *pTsdb, tb_uid_t uid, SArray *pLastArray, SCacheRowsR } else { // recalc: load from tsdb if (ltype) { - SArray *pArray = NULL; - // mergeLastCid(uid, pTsdb, &pArray, pr, cid); + int16_t aCols[1] = {cid}; + int16_t slotIds[1] = {pr->pSlotIds[i]}; + pTmpColArray = NULL; + + mergeLastCid(uid, pTsdb, &pTmpColArray, pr, aCols, 1, slotIds); + if (pTmpColArray && TARRAY_SIZE(pTmpColArray) >= 1) { + pLastCol = taosArrayGet(pTmpColArray, 0); + } } else { // mergeLastRowCid(uid, pTsdb, &pArray, pr, cid); } - // still null, then make up a null col value - pLastCol = &noneCol; + + // still null, then make up a none col value + if (!pLastCol) { + pLastCol = &noneCol; + } // maybe store it back to rocks cache } + taosArrayPush(pLastArray, pLastCol); + taosArrayDestroy(pTmpColArray); taosMemoryFree(values_list[i]); } taosMemoryFree(values_list); @@ -1831,6 +1846,21 @@ static int32_t initLastColArray(STSchema *pTSchema, SArray **ppColArray) { return TSDB_CODE_SUCCESS; } +static int32_t initLastColArrayPartial(STSchema *pTSchema, SArray **ppColArray, int16_t *slotIds, int nCols) { + SArray *pColArray = taosArrayInit(nCols, sizeof(SLastCol)); + if (NULL == pColArray) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int32_t i = 0; i < nCols; ++i) { + int16_t slotId = slotIds[i]; + SLastCol col = {.ts = 0, .colVal = COL_VAL_NULL(pTSchema->columns[slotId].colId, pTSchema->columns[slotId].type)}; + taosArrayPush(pColArray, &col); + } + *ppColArray = pColArray; + return TSDB_CODE_SUCCESS; +} + static int32_t cloneTSchema(STSchema *pSrc, STSchema **ppDst) { int32_t len = sizeof(STSchema) + sizeof(STColumn) * pSrc->numOfCols; *ppDst = taosMemoryMalloc(len); @@ -2169,6 +2199,172 @@ _err: return code; } +static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, + int nCols, int16_t *slotIds) { + STSchema *pTSchema = pr->pSchema; // metaGetTbTSchema(pTsdb->pVnode->pMeta, uid, -1, 1); + int16_t nLastCol = nCols; + int16_t noneCol = 0; + bool setNoneCol = false; + bool hasRow = false; + bool ignoreEarlierTs = false; + SArray *pColArray = NULL; + SColVal *pColVal = &(SColVal){0}; + + int32_t code = initLastColArrayPartial(pTSchema, &pColArray, slotIds, nCols); + if (TSDB_CODE_SUCCESS != code) { + return code; + } + SArray *aColArray = taosArrayInit(nCols, sizeof(int16_t)); + if (NULL == aColArray) { + taosArrayDestroy(pColArray); + + return TSDB_CODE_OUT_OF_MEMORY; + } + + for (int i = 0; i < nCols; ++i) { + taosArrayPush(aColArray, &aCols[i]); + } + + TSKEY lastRowTs = TSKEY_MAX; + + CacheNextRowIter iter = {0}; + nextRowIterOpen(&iter, uid, pTsdb, pTSchema, pr->suid, pr->pLoadInfo, pr->pReadSnap, &pr->pDataFReader, + &pr->pDataFReaderLast, pr->lastTs); + + do { + TSDBROW *pRow = NULL; + nextRowIterGet(&iter, &pRow, &ignoreEarlierTs, true, TARRAY_DATA(aColArray), TARRAY_SIZE(aColArray)); + + if (!pRow) { + break; + } + + hasRow = true; + + int32_t sversion = TSDBROW_SVERSION(pRow); + if (sversion != -1) { + code = updateTSchema(sversion, pr, uid); + if (TSDB_CODE_SUCCESS != code) { + goto _err; + } + pTSchema = pr->pCurrSchema; + } + // int16_t nCol = pTSchema->numOfCols; + + TSKEY rowTs = TSDBROW_TS(pRow); + + if (lastRowTs == TSKEY_MAX) { + lastRowTs = rowTs; + + for (int16_t iCol = 0; iCol < nCols; ++iCol) { + if (iCol >= nLastCol) { + break; + } + SLastCol *pCol = taosArrayGet(pColArray, iCol); + if (pCol->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { + continue; + } + tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); + + *pCol = (SLastCol){.ts = lastRowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { + pCol->colVal.value.pData = taosMemoryMalloc(pCol->colVal.value.nData); + if (pCol->colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + memcpy(pCol->colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + if (!COL_VAL_IS_VALUE(pColVal)) { + if (!setNoneCol) { + noneCol = iCol; + setNoneCol = true; + } + } else { + int32_t aColIndex = taosArraySearchIdx(aColArray, &pColVal->cid, compareInt16Val, TD_EQ); + if (aColIndex >= 0) { + taosArrayRemove(aColArray, aColIndex); + } + } + } + if (!setNoneCol) { + // done, goto return pColArray + break; + } else { + continue; + } + } + + // merge into pColArray + setNoneCol = false; + for (int16_t iCol = noneCol; iCol < nCols; ++iCol) { + if (iCol >= nLastCol) { + break; + } + // high version's column value + SLastCol *lastColVal = (SLastCol *)taosArrayGet(pColArray, iCol); + if (lastColVal->colVal.cid != pTSchema->columns[slotIds[iCol]].colId) { + continue; + } + SColVal *tColVal = &lastColVal->colVal; + + tsdbRowGetColVal(pRow, pTSchema, slotIds[iCol], pColVal); + if (!COL_VAL_IS_VALUE(tColVal) && COL_VAL_IS_VALUE(pColVal)) { + SLastCol lastCol = {.ts = rowTs, .colVal = *pColVal}; + if (IS_VAR_DATA_TYPE(pColVal->type) && pColVal->value.nData > 0) { + SLastCol *pLastCol = (SLastCol *)taosArrayGet(pColArray, iCol); + taosMemoryFree(pLastCol->colVal.value.pData); + + lastCol.colVal.value.pData = taosMemoryMalloc(lastCol.colVal.value.nData); + if (lastCol.colVal.value.pData == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + memcpy(lastCol.colVal.value.pData, pColVal->value.pData, pColVal->value.nData); + } + + taosArraySet(pColArray, iCol, &lastCol); + int32_t aColIndex = taosArraySearchIdx(aColArray, &lastCol.colVal.cid, compareInt16Val, TD_EQ); + taosArrayRemove(aColArray, aColIndex); + } else if (!COL_VAL_IS_VALUE(tColVal) && !COL_VAL_IS_VALUE(pColVal) && !setNoneCol) { + noneCol = iCol; + setNoneCol = true; + } + } + } while (setNoneCol); + + // if (taosArrayGetSize(pColArray) <= 0) { + //*ppLastArray = NULL; + // taosArrayDestroy(pColArray); + //} else { + if (!hasRow) { + if (ignoreEarlierTs) { + taosArrayDestroy(pColArray); + pColArray = NULL; + } else { + taosArrayClear(pColArray); + } + } + *ppLastArray = pColArray; + //} + + nextRowIterClose(&iter); + taosArrayDestroy(aColArray); + // taosMemoryFreeClear(pTSchema); + return code; + +_err: + nextRowIterClose(&iter); + // taosMemoryFreeClear(pTSchema); + *ppLastArray = NULL; + taosArrayDestroy(pColArray); + taosArrayDestroy(aColArray); + return code; +} + int32_t tsdbCacheGetLastrowH(SLRUCache *pCache, tb_uid_t uid, SCacheRowsReader *pr, LRUHandle **handle) { int32_t code = 0; char key[32] = {0};