diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index aee46d421e..66f8e304ea 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -137,6 +137,7 @@ void tBlockDataReset(SBlockData *pBlockData); void tBlockDataClear(SBlockData *pBlockData); int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema); // SDelIdx +int32_t tCmprDelIdx(void const *lhs, void const *rhs); int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); // SDelData @@ -225,7 +226,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB int32_t tsdbOpenCache(STsdb *pTsdb); void tsdbCloseCache(SLRUCache *pCache); int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row); -int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow); +int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow); int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid); // structs ======================= diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 05df2672e6..02d659cc16 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -81,7 +81,247 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) { return code; } -int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow) { +static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) { + tb_uid_t suid = 0; + + SMetaReader mr = {0}; + metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0); + if (metaGetTableEntryByUid(&mr, uid) < 0) { + metaReaderClear(&mr); // table not esist + return 0; + } + + if (mr.me.type == TSDB_CHILD_TABLE) { + suid = mr.me.ctbEntry.suid; + } else if (mr.me.type == TSDB_NORMAL_TABLE) { + suid = 0; + } else { + suid = 0; + } + + metaReaderClear(&mr); + + return suid; +} +/* +static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow **ppRow) { + int32_t code = 0; + + if (mem) { + STbData *pMem = NULL; + STbDataIter* iter; // mem buffer skip list iterator + + tsdbGetTbDataFromMemTable(mem, suid, uid, &pMem); + if (pMem != NULL) { + tsdbTbDataIterCreate(pMem, NULL, 1, &iter); + + if (iter != NULL) { + TSDBROW *row = tsdbTbDataIterGet(iter); + + tsdbTbDataIterDestroy(iter); + } + } + } else { + *ppRow = NULL; + } + + return code; +} +*/ +static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) { + int32_t code = 0; + + SMapData delDataMap; + SDelData delData; + + if (pDelIdx) { + tMapDataReset(&delDataMap); + + code = tsdbReadDelData(pDelReader, pDelIdx, &delDataMap, NULL); + if (code) goto _err; + + for (int32_t iDelData = 0; iDelData < delDataMap.nItem; ++iDelData) { + code = tMapDataGetItemByIdx(&delDataMap, iDelData, &delData, tGetDelData); + if (code) goto _err; + + taosArrayPush(aDelData, &delData); + } + } + +_err: + return code; +} + +static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) { + int32_t code = 0; + SDelData *pDelData = pTbData ? pTbData->pHead : NULL; + + for (; pDelData; pDelData = pDelData->pNext) { + taosArrayPush(aDelData, pDelData); + } + + return code; +} + +static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) { + int32_t code = 0; + + if (pMem) { + code = getTableDelDataFromTbData(pMem, aDelData); + if (code) goto _err; + } + + if (pIMem) { + code = getTableDelDataFromTbData(pIMem, aDelData); + if (code) goto _err; + } + + if (pDelIdx) { + code = getTableDelDataFromDelIdx(pDelReader, pDelIdx, aDelData); + if (code) goto _err; + } + +_err: + return code; +} + +static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aSkyline) { + int32_t code = 0; + + SArray *aDelData = taosArrayInit(32, sizeof(SDelData)); + code = getTableDelData(pMem, pIMem, pDelReader, pDelIdx, aDelData); + if (code) goto _err; + + size_t nDelData = taosArrayGetSize(aDelData); + code = tsdbBuildDeleteSkyline(aDelData, 0, (int32_t)(nDelData - 1), aSkyline); + if (code) goto _err; + + taosArrayDestroy(aDelData); + +_err: + return code; +} + +static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t uid, SDelIdx *pDelIdx) { + int32_t code = 0; + + SMapData delIdxMap; + SDelIdx idx = {.suid = suid, .uid = uid}; + + tMapDataReset(&delIdxMap); + code = tsdbReadDelIdx(pDelFReader, &delIdxMap, NULL); + if (code) goto _err; + + code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx); + if (code) goto _err; + +_err: + return code; +} + +static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet, + SArray *pSkyline, + STsdb *pTsdb, + STSRow **pLastRow) { + int32_t code = 0; + + TSDBROW *pMemRow = NULL; + TSDBROW *pIMemRow = NULL; + + if (iter != NULL) { + pMemRow = tsdbTbDataIterGet(iter); + } + + if (iter != NULL) { + pIMemRow = tsdbTbDataIterGet(iiter); + } + + SDataFReader *pDataFReader; + code = tsdbDataFReaderOpen(&pDataFReader, pTsdb, pFileSet); + if (code) goto _err; + + SMapData blockIdxMap; + tMapDataReset(&blockIdxMap); + code = tsdbReadBlockIdx(pDataFReader, &blockIdxMap, NULL); + if (code) goto _err; + + SBlockData *pBlockData; + + tsdbDataFReaderClose(pDataFReader); + +_err: + return code; +} + +static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { + int32_t code = 0; + + tb_uid_t suid = getTableSuidByUid(uid, pTsdb); + + STbData *pMem = NULL; + STbData *pIMem = NULL; + STbDataIter iter; // mem buffer skip list iterator + STbDataIter iiter; // imem buffer skip list iterator + + if (pTsdb->mem) { + tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem); + if (pMem != NULL) { + tsdbTbDataIterOpen(pMem, NULL, 1, &iter); + } + } + + if (pTsdb->imem) { + tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem); + if (pIMem != NULL) { + tsdbTbDataIterOpen(pIMem, NULL, 1, &iiter); + } + } + + *ppRow = NULL; + + SDelFReader *pDelFReader; + //code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL); + if (code) goto _err; + + SDelIdx delIdx; + code = getTableDelIdx(pDelFReader, suid, uid, &delIdx); + if (code) goto _err; + + SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY)); + code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline); + if (code) goto _err; + /* + SFSIter fsiter; + bool fsHasNext = false; + + tsdbFSIterOpen(pTsdb->fs, TSDB_FS_ITER_BACKWARD, &fsiter); + do { + */ + SDFileSet *pFileSet = NULL; + //pFileSet = tsdbFSIterGet(fsiter); + + code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow); + if (code < 0) { + goto _err; + } + + if (*ppRow != NULL) { + //break; + } + /* + } while (fsHasNext = tsdbFSIterNext(fsiter)) + */ + + tsdbDelFReaderClose(pDelFReader); + + return code; + +_err: + tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code)); + return code; +} + +int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) { int32_t code = 0; char key[32] = {0}; int keyLen = 0; @@ -91,9 +331,12 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow) { if (h) { *ppRow = (STSRow *) taosLRUCacheValue(pCache, h); } else { - // TODO: load lastrow from mem, imem, and files - // if table's empty, return code of -1 - code = -1; + STSRow *pRow = NULL; + code = mergeLastRow(uid, pTsdb, &pRow); + // if table's empty or error, return code of -1 + if (code < 0 || pRow == NULL) { + return -1; + } } return code; diff --git a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c index 74ecf806fd..42d6969518 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCacheRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbCacheRead.c @@ -52,7 +52,7 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty for (int32_t i = 0; i < numOfTables; ++i) { tb_uid_t* uid = taosArrayGet(pTableIdList, i); - int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow); + int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, pv->pTsdb, &pRow); if (code != TSDB_CODE_SUCCESS) { return code; } @@ -77,7 +77,7 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty for (int32_t i = 0; i < numOfTables; ++i) { tb_uid_t* uid = taosArrayGet(pTableIdList, i); - int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow); + int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, pv->pTsdb, &pRow); if (code != TSDB_CODE_SUCCESS) { return code; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index be03ff89ca..158aab3f12 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -440,6 +440,25 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { } // SDelIdx ====================================================== +int32_t tCmprDelIdx(void const *lhs, void const *rhs) { + SDelIdx *lDelIdx = *(SDelIdx **)lhs; + SDelIdx *rDelIdx = *(SDelIdx **)rhs; + + if (lDelIdx->suid < lDelIdx->suid) { + return -1; + } else if (lDelIdx->suid > lDelIdx->suid) { + return 1; + } + + if (lDelIdx->uid < lDelIdx->uid) { + return -1; + } else if (lDelIdx->uid > lDelIdx->uid) { + return 1; + } + + return 0; +} + int32_t tPutDelIdx(uint8_t *p, void *ph) { SDelIdx *pDelIdx = (SDelIdx *)ph; int32_t n = 0; @@ -1125,4 +1144,4 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS _err: return code; -} \ No newline at end of file +}