diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 04a621d4f5..c85316f810 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -922,9 +922,16 @@ typedef struct { SColVal colVal; } SLastCol; +typedef struct { + int8_t lflag; + STsdbRowKey tsdbRowKey; + SColVal colVal; +} SLastUpdateCtx; + int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(STsdb *pTsdb); -int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *row); +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, SRow **aRow); +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData); int32_t tsdbCacheDel(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSKEY sKey, TSKEY eKey); int32_t tsdbCacheInsertLast(SLRUCache *pCache, tb_uid_t uid, TSDBROW *row, STsdb *pTsdb); diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index a8daa8f633..7237b5002c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -998,46 +998,30 @@ static void tsdbCacheUpdateLastCol(SLastCol *pLastCol, SRowKey *pRowKey, SColVal } } -int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow) { +static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray) { + if (!updCtxArray || TARRAY_SIZE(updCtxArray) == 0) { + return 0; + } + int32_t code = 0; - // 1, fetch schema - STSchema *pTSchema = NULL; - int32_t sver = TSDBROW_SVERSION(pRow); - - code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return -1; - } - - // 2, iterate col values into array - SArray *aColVal = taosArrayInit(32, sizeof(SColVal)); - - STSDBRowIter iter = {0}; - tsdbRowIterOpen(&iter, pRow, pTSchema); - - for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { - taosArrayPush(aColVal, pColVal); - } - - tsdbRowClose(&iter); - - // 3, build keys & multi get from rocks - int num_keys = TARRAY_SIZE(aColVal); + int num_keys = TARRAY_SIZE(updCtxArray); SArray *remainCols = NULL; SLRUCache *pCache = pTsdb->lruCache; - STsdbRowKey tsdbRowKey = {0}; - tsdbRowGetKey(pRow, &tsdbRowKey); - SRowKey *pRowKey = &tsdbRowKey.key; - taosThreadMutexLock(&pTsdb->lruMutex); for (int i = 0; i < num_keys; ++i) { - SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, i); - int16_t cid = pColVal->cid; + SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); - SLastKey *key = &(SLastKey){.lflag = LFLAG_LAST_ROW, .uid = uid, .cid = cid}; + int8_t lflag = updCtx->lflag; + SRowKey *pRowKey = &updCtx->tsdbRowKey.key; + SColVal *pColVal = &updCtx->colVal; + + if (lflag == LFLAG_LAST && !COL_VAL_IS_VALUE(pColVal)) { + continue; + } + + SLastKey *key = &(SLastKey){.lflag = lflag, .uid = uid, .cid = pColVal->cid}; size_t klen = ROCKS_KEY_LEN; LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); if (h) { @@ -1053,23 +1037,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow } taosArrayPush(remainCols, &(SIdxKey){i, *key}); } - - if (COL_VAL_IS_VALUE(pColVal)) { - key->lflag = LFLAG_LAST; - LRUHandle *h = taosLRUCacheLookup(pCache, key, klen); - if (h) { - SLastCol *pLastCol = (SLastCol *)taosLRUCacheValue(pCache, h); - if (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1) { - tsdbCacheUpdateLastCol(pLastCol, pRowKey, pColVal); - } - taosLRUCacheRelease(pCache, h, false); - } else { - if (!remainCols) { - remainCols = taosArrayInit(num_keys * 2, sizeof(SIdxKey)); - } - taosArrayPush(remainCols, &(SIdxKey){i, *key}); - } - } } if (remainCols) { @@ -1097,12 +1064,17 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow rocksdb_writebatch_t *wb = pTsdb->rCache.writebatch; for (int i = 0; i < num_keys; ++i) { SIdxKey *idxKey = &((SIdxKey *)TARRAY_DATA(remainCols))[i]; - SColVal *pColVal = (SColVal *)TARRAY_DATA(aColVal) + idxKey->idx; - // SColVal *pColVal = (SColVal *)taosArrayGet(aColVal, idxKey->idx); + SLastUpdateCtx *updCtx = (SLastUpdateCtx *)taosArrayGet(updCtxArray, i); + SRowKey *pRowKey = &updCtx->tsdbRowKey.key; + SColVal *pColVal = &updCtx->colVal; SLastCol *pLastCol = tsdbCacheDeserialize(values_list[i], values_list_sizes[i]); SLastCol *PToFree = pLastCol; + if (IS_LAST_KEY(idxKey->key) && !COL_VAL_IS_VALUE(pColVal)) { + continue; + } + if (IS_LAST_ROW_KEY(idxKey->key)) { int32_t cmp_res = 1; if (pLastCol) { @@ -1148,48 +1120,6 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosMemoryFree(value); } - } else { - if (COL_VAL_IS_VALUE(pColVal)) { - if (NULL == pLastCol || (tRowKeyCompare(&pLastCol->rowKey, pRowKey) != 1)) { - char *value = NULL; - size_t vlen = 0; - SLastCol lastColTmp = {.rowKey = *pRowKey, .colVal = *pColVal}; - tsdbCacheSerialize(&lastColTmp, &value, &vlen); - - taosThreadMutexLock(&pTsdb->rCache.rMutex); - - rocksdb_writebatch_put(wb, (char *)&idxKey->key, ROCKS_KEY_LEN, value, vlen); - - taosThreadMutexUnlock(&pTsdb->rCache.rMutex); - - pLastCol = &lastColTmp; - SLastCol *pTmpLastCol = taosMemoryCalloc(1, sizeof(SLastCol)); - *pTmpLastCol = *pLastCol; - pLastCol = pTmpLastCol; - - size_t charge = sizeof(*pLastCol); - for (int8_t i = 0; i < pLastCol->rowKey.numOfPKs; i++) { - SValue *pValue = &pLastCol->rowKey.pks[i]; - if (IS_VAR_DATA_TYPE(pValue->type)) { - reallocVarDataVal(pValue); - charge += pValue->nData; - } - } - - if (IS_VAR_DATA_TYPE(pLastCol->colVal.value.type)) { - reallocVarData(&pLastCol->colVal); - charge += pLastCol->colVal.value.nData; - } - - LRUStatus status = taosLRUCacheInsert(pTsdb->lruCache, &idxKey->key, ROCKS_KEY_LEN, pLastCol, charge, - tsdbCacheDeleter, NULL, TAOS_LRU_PRIORITY_LOW, &pTsdb->flushState); - if (status != TAOS_LRU_STATUS_OK) { - code = -1; - } - - taosMemoryFree(value); - } - } } taosMemoryFreeClear(PToFree); @@ -1209,11 +1139,152 @@ int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, TSDBROW *pRow taosThreadMutexUnlock(&pTsdb->lruMutex); _exit: - taosArrayDestroy(aColVal); - taosMemoryFree(pTSchema); return code; } +int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int64_t version, int32_t nRow, + SRow **aRow) { + int32_t code = 0; + + // 1. prepare last + TSDBROW lRow = {.type = TSDBROW_ROW_FMT, .pTSRow = aRow[nRow - 1], .version = version}; + + STSchema *pTSchema = NULL; + int32_t sver = TSDBROW_SVERSION(&lRow); + SArray *ctxArray = NULL; + SSHashObj *iColHash = NULL; + + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + goto _exit; + } + + TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; + int32_t nCol = pTSchema->numOfCols; + + ctxArray = taosArrayInit(nCol, sizeof(SLastUpdateCtx)); + iColHash = tSimpleHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + + // 1. prepare by lrow + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&lRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, &lRow, pTSchema); + int32_t iCol = 0; + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal && iCol < nCol; pColVal = tsdbRowIterNext(&iter), iCol++) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + taosArrayPush(ctxArray, &updateCtx); + + if (!COL_VAL_IS_VALUE(pColVal)) { + tSimpleHashPut(iColHash, &iCol, sizeof(iCol), NULL, 0); + continue; + } + updateCtx.lflag = LFLAG_LAST; + taosArrayPush(ctxArray, &updateCtx); + } + tsdbRowClose(&iter); + + // 2. prepare by the other rows + for (int32_t iRow = nRow - 2; iRow >= 0; --iRow) { + if (tSimpleHashGetSize(iColHash) == 0) { + break; + } + + tRow.pTSRow = aRow[iRow]; + + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&tRow, &tsdbRowKey); + + void *pIte = NULL; + int32_t iter = 0; + while ((pIte = tSimpleHashIterate(iColHash, pIte, &iter)) != NULL) { + int32_t iCol = ((int32_t *)pIte)[0]; + SColVal colVal = COL_VAL_NONE(0, 0); + tsdbRowGetColVal(&tRow, pTSchema, iCol, &colVal); + + if (COL_VAL_IS_VALUE(&colVal)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; + taosArrayPush(ctxArray, &updateCtx); + tSimpleHashIterateRemove(iColHash, &iCol, sizeof(iCol), &pIte, &iter); + } + } + } + + // 3. do update + tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); + +_exit: + taosMemoryFreeClear(pTSchema); + taosArrayDestroy(ctxArray); + tSimpleHashCleanup(iColHash); + return code; +} + +int32_t tsdbCacheColFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SBlockData *pBlockData) { + int32_t code = 0; + + TSDBROW lRow = tsdbRowFromBlockData(pBlockData, pBlockData->nRow - 1); + + STSchema *pTSchema = NULL; + int32_t sver = TSDBROW_SVERSION(&lRow); + SArray *ctxArray = NULL; + + code = metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, sver, &pTSchema); + if (code != TSDB_CODE_SUCCESS) { + terrno = code; + goto _exit; + } + + ctxArray = taosArrayInit(pBlockData->nColData, sizeof(SLastUpdateCtx)); + + // 1. prepare last + TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); + + for (int32_t iColData = 0; iColData < pBlockData->nColData; ++iColData) { + SColData *pColData = &pBlockData->aColData[iColData]; + if ((pColData->flag & HAS_VALUE) != HAS_VALUE) { + continue; + } + + for (tRow.iRow = pBlockData->nRow - 1; tRow.iRow >= 0; --tRow.iRow) { + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&tRow, &tsdbRowKey); + + uint8_t colType = tColDataGetBitValue(pColData, tRow.iRow); + if (colType == 2) { + SColVal colVal = COL_VAL_NONE(pColData->cid, pColData->type); + tColDataGetValue(pColData, tRow.iRow, &colVal); + + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST, .tsdbRowKey = tsdbRowKey, .colVal = colVal}; + taosArrayPush(ctxArray, &updateCtx); + break; + } + } + } + + // 2. prepare last row + STsdbRowKey tsdbRowKey = {0}; + tsdbRowGetKey(&lRow, &tsdbRowKey); + + STSDBRowIter iter = {0}; + tsdbRowIterOpen(&iter, &lRow, pTSchema); + for (SColVal *pColVal = tsdbRowIterNext(&iter); pColVal; pColVal = tsdbRowIterNext(&iter)) { + SLastUpdateCtx updateCtx = {.lflag = LFLAG_LAST_ROW, .tsdbRowKey = tsdbRowKey, .colVal = *pColVal}; + taosArrayPush(ctxArray, &updateCtx); + } + tsdbRowClose(&iter); + + // 3. do update + tsdbCacheUpdate(pTsdb, suid, uid, ctxArray); + +_exit: + taosMemoryFreeClear(pTSchema); + taosArrayDestroy(ctxArray); + return 0; +} + static int32_t mergeLastCid(tb_uid_t uid, STsdb *pTsdb, SArray **ppLastArray, SCacheRowsReader *pr, int16_t *aCols, int nCols, int16_t *slotIds); diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index 40cf9adde1..211855b245 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -629,14 +629,12 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode *pos[SL_MAX_LEVEL]; TSDBROW tRow = tsdbRowFromBlockData(pBlockData, 0); STsdbRowKey key; - TSDBROW lRow; // last row // first row tsdbRowGetKey(&tRow, &key); tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0))) goto _exit; pTbData->minKey = TMIN(pTbData->minKey, key.key.ts); - lRow = tRow; // remain row ++tRow.iRow; @@ -653,7 +651,6 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, } if ((code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1))) goto _exit; - lRow = tRow; ++tRow.iRow; } @@ -664,7 +661,7 @@ static int32_t tsdbInsertColDataToTable(SMemTable *pMemTable, STbData *pTbData, } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + tsdbCacheColFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, pBlockData); } // SMemTable @@ -688,7 +685,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, SMemSkipListNode *pos[SL_MAX_LEVEL]; TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version}; int32_t iRow = 0; - TSDBROW lRow; // backward put first data tRow.pTSRow = aRow[iRow++]; @@ -696,7 +692,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, tbDataMovePosTo(pTbData, pos, &key, SL_MOVE_BACKWARD); code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 0); if (code) goto _exit; - lRow = tRow; pTbData->minKey = TMIN(pTbData->minKey, key.key.ts); @@ -717,8 +712,6 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, code = tbDataDoPut(pMemTable, pTbData, pos, &tRow, 1); if (code) goto _exit; - lRow = tRow; - iRow++; } } @@ -727,7 +720,7 @@ static int32_t tsdbInsertRowDataToTable(SMemTable *pMemTable, STbData *pTbData, pTbData->maxKey = key.key.ts; } if (!TSDB_CACHE_NO(pMemTable->pTsdb->pVnode->config)) { - tsdbCacheUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, &lRow); + tsdbCacheRowFormatUpdate(pMemTable->pTsdb, pTbData->suid, pTbData->uid, version, nRow, aRow); } // SMemTable diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index fcd9669f9a..58075cf0ac 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -601,17 +601,21 @@ void tsdbRowGetColVal(TSDBROW *pRow, STSchema *pTSchema, int32_t iCol, SColVal * STColumn *pTColumn = &pTSchema->columns[iCol]; SValue value; - ASSERT(iCol > 0); - if (pRow->type == TSDBROW_ROW_FMT) { tRowGet(pRow->pTSRow, pTSchema, iCol, pColVal); } else if (pRow->type == TSDBROW_COL_FMT) { - SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId); - - if (pColData) { - tColDataGetValue(pColData, pRow->iRow, pColVal); + if (iCol == 0) { + *pColVal = + COL_VAL_VALUE(PRIMARYKEY_TIMESTAMP_COL_ID, + ((SValue){.type = TSDB_DATA_TYPE_TIMESTAMP, .val = pRow->pBlockData->aTSKEY[pRow->iRow]})); } else { - *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); + SColData *pColData = tBlockDataGetColData(pRow->pBlockData, pTColumn->colId); + + if (pColData) { + tColDataGetValue(pColData, pRow->iRow, pColVal); + } else { + *pColVal = COL_VAL_NONE(pTColumn->colId, pTColumn->type); + } } } else { ASSERT(0); @@ -1817,4 +1821,4 @@ uint32_t tsdbCvtTimestampAlg(uint32_t alg) { DEFINE_VAR(alg) return 0; -} \ No newline at end of file +}