last_row/del-skyline: first round skyline building
This commit is contained in:
parent
81897d32af
commit
c999aead58
|
@ -133,6 +133,7 @@ void tBlockDataReset(SBlockData *pBlockData);
|
|||
void tBlockDataClear(SBlockData *pBlockData);
|
||||
int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema);
|
||||
// SDelIdx
|
||||
int32_t tCmprDelIdx(void const *lhs, void const *rhs);
|
||||
int32_t tPutDelIdx(uint8_t *p, void *ph);
|
||||
int32_t tGetDelIdx(uint8_t *p, void *ph);
|
||||
// SDelData
|
||||
|
@ -221,7 +222,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SMapData *pDelIdxMap, uint8_t **ppB
|
|||
int32_t tsdbOpenCache(STsdb *pTsdb);
|
||||
void tsdbCloseCache(SLRUCache *pCache);
|
||||
int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row);
|
||||
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow);
|
||||
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow);
|
||||
int32_t tsdbCacheDeleteLastrow(SLRUCache *pCache, tb_uid_t uid);
|
||||
|
||||
// structs =======================
|
||||
|
|
|
@ -81,7 +81,247 @@ int32_t tsdbCacheInsertLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow *row) {
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow) {
|
||||
static tb_uid_t getTableSuidByUid(tb_uid_t uid, STsdb *pTsdb) {
|
||||
tb_uid_t suid = 0;
|
||||
|
||||
SMetaReader mr = {0};
|
||||
metaReaderInit(&mr, pTsdb->pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
||||
metaReaderClear(&mr); // table not esist
|
||||
return 0;
|
||||
}
|
||||
|
||||
if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||
suid = mr.me.ctbEntry.suid;
|
||||
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
|
||||
suid = 0;
|
||||
} else {
|
||||
suid = 0;
|
||||
}
|
||||
|
||||
metaReaderClear(&mr);
|
||||
|
||||
return suid;
|
||||
}
|
||||
/*
|
||||
static int32_t getMemLastRow(SMemTable *mem, tb_uid_t suid, tb_uid_t uid, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (mem) {
|
||||
STbData *pMem = NULL;
|
||||
STbDataIter* iter; // mem buffer skip list iterator
|
||||
|
||||
tsdbGetTbDataFromMemTable(mem, suid, uid, &pMem);
|
||||
if (pMem != NULL) {
|
||||
tsdbTbDataIterCreate(pMem, NULL, 1, &iter);
|
||||
|
||||
if (iter != NULL) {
|
||||
TSDBROW *row = tsdbTbDataIterGet(iter);
|
||||
|
||||
tsdbTbDataIterDestroy(iter);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
*ppRow = NULL;
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
*/
|
||||
static int32_t getTableDelDataFromDelIdx(SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
|
||||
int32_t code = 0;
|
||||
|
||||
SMapData delDataMap;
|
||||
SDelData delData;
|
||||
|
||||
if (pDelIdx) {
|
||||
tMapDataReset(&delDataMap);
|
||||
|
||||
code = tsdbReadDelData(pDelReader, pDelIdx, &delDataMap, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
for (int32_t iDelData = 0; iDelData < delDataMap.nItem; ++iDelData) {
|
||||
code = tMapDataGetItemByIdx(&delDataMap, iDelData, &delData, tGetDelData);
|
||||
if (code) goto _err;
|
||||
|
||||
taosArrayPush(aDelData, &delData);
|
||||
}
|
||||
}
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableDelDataFromTbData(STbData *pTbData, SArray *aDelData) {
|
||||
int32_t code = 0;
|
||||
SDelData *pDelData = pTbData ? pTbData->pHead : NULL;
|
||||
|
||||
for (; pDelData; pDelData = pDelData->pNext) {
|
||||
taosArrayPush(aDelData, pDelData);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableDelData(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aDelData) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (pMem) {
|
||||
code = getTableDelDataFromTbData(pMem, aDelData);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (pIMem) {
|
||||
code = getTableDelDataFromTbData(pIMem, aDelData);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
if (pDelIdx) {
|
||||
code = getTableDelDataFromDelIdx(pDelReader, pDelIdx, aDelData);
|
||||
if (code) goto _err;
|
||||
}
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableDelSkyline(STbData *pMem, STbData *pIMem, SDelFReader *pDelReader, SDelIdx *pDelIdx, SArray *aSkyline) {
|
||||
int32_t code = 0;
|
||||
|
||||
SArray *aDelData = taosArrayInit(32, sizeof(SDelData));
|
||||
code = getTableDelData(pMem, pIMem, pDelReader, pDelIdx, aDelData);
|
||||
if (code) goto _err;
|
||||
|
||||
size_t nDelData = taosArrayGetSize(aDelData);
|
||||
code = tsdbBuildDeleteSkyline(aDelData, 0, (int32_t)(nDelData - 1), aSkyline);
|
||||
if (code) goto _err;
|
||||
|
||||
taosArrayDestroy(aDelData);
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t getTableDelIdx(SDelFReader *pDelFReader, tb_uid_t suid, tb_uid_t uid, SDelIdx *pDelIdx) {
|
||||
int32_t code = 0;
|
||||
|
||||
SMapData delIdxMap;
|
||||
SDelIdx idx = {.suid = suid, .uid = uid};
|
||||
|
||||
tMapDataReset(&delIdxMap);
|
||||
code = tsdbReadDelIdx(pDelFReader, &delIdxMap, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
code = tMapDataSearch(&delIdxMap, &idx, tGetDelIdx, tCmprDelIdx, pDelIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRowFileSet(STbDataIter *iter, STbDataIter *iiter, SDFileSet *pFileSet,
|
||||
SArray *pSkyline,
|
||||
STsdb *pTsdb,
|
||||
STSRow **pLastRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
TSDBROW *pMemRow = NULL;
|
||||
TSDBROW *pIMemRow = NULL;
|
||||
|
||||
if (iter != NULL) {
|
||||
pMemRow = tsdbTbDataIterGet(iter);
|
||||
}
|
||||
|
||||
if (iter != NULL) {
|
||||
pIMemRow = tsdbTbDataIterGet(iiter);
|
||||
}
|
||||
|
||||
SDataFReader *pDataFReader;
|
||||
code = tsdbDataFReaderOpen(&pDataFReader, pTsdb, pFileSet);
|
||||
if (code) goto _err;
|
||||
|
||||
SMapData blockIdxMap;
|
||||
tMapDataReset(&blockIdxMap);
|
||||
code = tsdbReadBlockIdx(pDataFReader, &blockIdxMap, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
SBlockData *pBlockData;
|
||||
|
||||
tsdbDataFReaderClose(pDataFReader);
|
||||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mergeLastRow(tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
|
||||
tb_uid_t suid = getTableSuidByUid(uid, pTsdb);
|
||||
|
||||
STbData *pMem = NULL;
|
||||
STbData *pIMem = NULL;
|
||||
STbDataIter iter; // mem buffer skip list iterator
|
||||
STbDataIter iiter; // imem buffer skip list iterator
|
||||
|
||||
if (pTsdb->mem) {
|
||||
tsdbGetTbDataFromMemTable(pTsdb->mem, suid, uid, &pMem);
|
||||
if (pMem != NULL) {
|
||||
tsdbTbDataIterOpen(pMem, NULL, 1, &iter);
|
||||
}
|
||||
}
|
||||
|
||||
if (pTsdb->imem) {
|
||||
tsdbGetTbDataFromMemTable(pTsdb->imem, suid, uid, &pIMem);
|
||||
if (pIMem != NULL) {
|
||||
tsdbTbDataIterOpen(pIMem, NULL, 1, &iiter);
|
||||
}
|
||||
}
|
||||
|
||||
*ppRow = NULL;
|
||||
|
||||
SDelFReader *pDelFReader;
|
||||
//code = tsdbDelFReaderOpen(&pDelFReader, pTsdb->fs->cState->pDelFile, pTsdb, NULL);
|
||||
if (code) goto _err;
|
||||
|
||||
SDelIdx delIdx;
|
||||
code = getTableDelIdx(pDelFReader, suid, uid, &delIdx);
|
||||
if (code) goto _err;
|
||||
|
||||
SArray *pSkyline = taosArrayInit(32, sizeof(TSDBKEY));
|
||||
code = getTableDelSkyline(pMem, pIMem, pDelFReader, &delIdx, pSkyline);
|
||||
if (code) goto _err;
|
||||
/*
|
||||
SFSIter fsiter;
|
||||
bool fsHasNext = false;
|
||||
|
||||
tsdbFSIterOpen(pTsdb->fs, TSDB_FS_ITER_BACKWARD, &fsiter);
|
||||
do {
|
||||
*/
|
||||
SDFileSet *pFileSet = NULL;
|
||||
//pFileSet = tsdbFSIterGet(fsiter);
|
||||
|
||||
code = mergeLastRowFileSet(&iter, &iiter, pFileSet, pSkyline, pTsdb, ppRow);
|
||||
if (code < 0) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
if (*ppRow != NULL) {
|
||||
//break;
|
||||
}
|
||||
/*
|
||||
} while (fsHasNext = tsdbFSIterNext(fsiter))
|
||||
*/
|
||||
|
||||
tsdbDelFReaderClose(pDelFReader);
|
||||
|
||||
return code;
|
||||
|
||||
_err:
|
||||
tsdbError("vgId:%d merge last_row failed since %s", TD_VID(pTsdb->pVnode), tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STsdb *pTsdb, STSRow **ppRow) {
|
||||
int32_t code = 0;
|
||||
char key[32] = {0};
|
||||
int keyLen = 0;
|
||||
|
@ -91,9 +331,12 @@ int32_t tsdbCacheGetLastrow(SLRUCache *pCache, tb_uid_t uid, STSRow **ppRow) {
|
|||
if (h) {
|
||||
*ppRow = (STSRow *) taosLRUCacheValue(pCache, h);
|
||||
} else {
|
||||
// TODO: load lastrow from mem, imem, and files
|
||||
// if table's empty, return code of -1
|
||||
code = -1;
|
||||
STSRow *pRow = NULL;
|
||||
code = mergeLastRow(uid, pTsdb, &pRow);
|
||||
// if table's empty or error, return code of -1
|
||||
if (code < 0 || pRow == NULL) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return code;
|
||||
|
|
|
@ -52,7 +52,7 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty
|
|||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
tb_uid_t* uid = taosArrayGet(pTableIdList, i);
|
||||
|
||||
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow);
|
||||
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, pv->pTsdb, &pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
@ -77,7 +77,7 @@ int32_t tsdbRetrieveLastRow(void* pVnode, const SArray* pTableIdList, int32_t ty
|
|||
for (int32_t i = 0; i < numOfTables; ++i) {
|
||||
tb_uid_t* uid = taosArrayGet(pTableIdList, i);
|
||||
|
||||
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, &pRow);
|
||||
int32_t code = tsdbCacheGetLastrow(pv->pTsdb->lruCache, *uid, pv->pTsdb, &pRow);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
|
|
@ -416,6 +416,25 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
|
|||
}
|
||||
|
||||
// SDelIdx ======================================================
|
||||
int32_t tCmprDelIdx(void const *lhs, void const *rhs) {
|
||||
SDelIdx *lDelIdx = *(SDelIdx **)lhs;
|
||||
SDelIdx *rDelIdx = *(SDelIdx **)rhs;
|
||||
|
||||
if (lDelIdx->suid < lDelIdx->suid) {
|
||||
return -1;
|
||||
} else if (lDelIdx->suid > lDelIdx->suid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (lDelIdx->uid < lDelIdx->uid) {
|
||||
return -1;
|
||||
} else if (lDelIdx->uid > lDelIdx->uid) {
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tPutDelIdx(uint8_t *p, void *ph) {
|
||||
SDelIdx *pDelIdx = (SDelIdx *)ph;
|
||||
int32_t n = 0;
|
||||
|
@ -1101,4 +1120,4 @@ int32_t tBlockDataAppendRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
|
|||
|
||||
_err:
|
||||
return code;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue