tsdb/cache: Not lazy init ctxArray
This commit is contained in:
parent
4113921448
commit
270e9f0bcf
|
@ -234,10 +234,15 @@ static int32_t tsdbOpenRocksCache(STsdb *pTsdb) {
|
||||||
pTsdb->rCache.suid = -1;
|
pTsdb->rCache.suid = -1;
|
||||||
pTsdb->rCache.uid = -1;
|
pTsdb->rCache.uid = -1;
|
||||||
pTsdb->rCache.pTSchema = NULL;
|
pTsdb->rCache.pTSchema = NULL;
|
||||||
pTsdb->rCache.ctxArray = NULL;
|
pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
|
||||||
|
if (!pTsdb->rCache.ctxArray) {
|
||||||
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _err7);
|
||||||
|
}
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
|
|
||||||
|
_err7:
|
||||||
|
(void)taosThreadMutexDestroy(&pTsdb->rCache.writeBatchMutex);
|
||||||
_err6:
|
_err6:
|
||||||
rocksdb_writebatch_destroy(writebatch);
|
rocksdb_writebatch_destroy(writebatch);
|
||||||
_err5:
|
_err5:
|
||||||
|
@ -594,7 +599,9 @@ static int32_t tsdbUpdateSkm(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int32_t
|
||||||
return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema);
|
return metaGetTbTSchemaEx(pTsdb->pVnode->pMeta, suid, uid, -1, &pRCache->pTSchema);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray) {
|
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
|
||||||
|
|
||||||
|
static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
int32_t lino = 0;
|
int32_t lino = 0;
|
||||||
STsdb *pTsdb = imem->pTsdb;
|
STsdb *pTsdb = imem->pTsdb;
|
||||||
|
@ -608,6 +615,7 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray)
|
||||||
SSHashObj *iColHash = NULL;
|
SSHashObj *iColHash = NULL;
|
||||||
int32_t sver;
|
int32_t sver;
|
||||||
int32_t nCol;
|
int32_t nCol;
|
||||||
|
SArray *ctxArray = pTsdb->rCache.ctxArray;
|
||||||
|
|
||||||
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
|
STbData *pIMem = tsdbGetTbDataFromMemTable(imem, tid.suid, tid.uid);
|
||||||
|
|
||||||
|
@ -702,11 +710,14 @@ static int32_t tsdbLoadFromImem(SMemTable *imem, TABLEID tid, SArray *ctxArray)
|
||||||
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
|
pMemRow = tsdbImemGetNextRow(&tbIter, pSkyline, &iSkyline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit);
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
if (code) {
|
if (code) {
|
||||||
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
tsdbError("vgId:%d %s failed at %s:%d since %s", TD_VID(pTsdb->pVnode), __func__, __FILE__, lino, tstrerror(code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosArrayClear(ctxArray);
|
||||||
// destroy any allocated resource
|
// destroy any allocated resource
|
||||||
tSimpleHashCleanup(iColHash);
|
tSimpleHashCleanup(iColHash);
|
||||||
if (pMemDelData) {
|
if (pMemDelData) {
|
||||||
|
@ -719,8 +730,6 @@ _exit:
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tsdbCacheUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, SArray *updCtxArray);
|
|
||||||
|
|
||||||
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
|
static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
|
||||||
if (!pTsdb) return 0;
|
if (!pTsdb) return 0;
|
||||||
if (!pTsdb->imem) return 0;
|
if (!pTsdb->imem) return 0;
|
||||||
|
@ -732,31 +741,21 @@ static int32_t tsdbCacheUpdateFromIMem(STsdb *pTsdb) {
|
||||||
int64_t nRow = imem->nRow;
|
int64_t nRow = imem->nRow;
|
||||||
int64_t nDel = imem->nDel;
|
int64_t nDel = imem->nDel;
|
||||||
SArray *aUid = NULL;
|
SArray *aUid = NULL;
|
||||||
SArray *ctxArray = NULL;
|
|
||||||
|
|
||||||
if (nRow == 0 || nTbData == 0) return 0;
|
if (nRow == 0 || nTbData == 0) return 0;
|
||||||
|
|
||||||
// get all last/last_row cols of all uids from imem
|
// get all last/last_row cols of all uids from imem
|
||||||
// 1, get all uids from imem
|
// 1, get all uids from imem
|
||||||
aUid = taosArrayInit(nTbData, sizeof(TABLEID));
|
aUid = taosArrayInit(nTbData, sizeof(TABLEID));
|
||||||
TAOS_CHECK_GOTO(tsdbMemTableUids(imem, aUid), &lino, _exit);
|
if (!aUid) {
|
||||||
|
|
||||||
// 2, read all uid's last/last with tomb filtered
|
|
||||||
if (!pTsdb->rCache.ctxArray) {
|
|
||||||
pTsdb->rCache.ctxArray = taosArrayInit(16, sizeof(SLastUpdateCtx));
|
|
||||||
if (!pTsdb->rCache.ctxArray) {
|
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
||||||
}
|
}
|
||||||
}
|
TAOS_CHECK_GOTO(tsdbMemTableUids(imem, aUid), &lino, _exit);
|
||||||
ctxArray = pTsdb->rCache.ctxArray;
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) {
|
for (int32_t i = 0; i < TARRAY_SIZE(aUid); ++i) {
|
||||||
TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i];
|
TABLEID tid = ((TABLEID *)TARRAY_DATA(aUid))[i];
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid, ctxArray), &lino, _exit);
|
TAOS_CHECK_GOTO(tsdbLoadFromImem(imem, tid), &lino, _exit);
|
||||||
|
|
||||||
TAOS_CHECK_GOTO(tsdbCacheUpdate(pTsdb, tid.suid, tid.uid, ctxArray), &lino, _exit);
|
|
||||||
taosArrayClear(ctxArray);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
@ -767,7 +766,6 @@ _exit:
|
||||||
}
|
}
|
||||||
|
|
||||||
taosArrayDestroy(aUid);
|
taosArrayDestroy(aUid);
|
||||||
taosArrayClear(ctxArray);
|
|
||||||
|
|
||||||
TAOS_RETURN(code);
|
TAOS_RETURN(code);
|
||||||
}
|
}
|
||||||
|
@ -778,6 +776,7 @@ int32_t tsdbCacheCommit(STsdb *pTsdb) {
|
||||||
// 0, tsdbCacheUpdateFromIMem if updateCacheBatch
|
// 0, tsdbCacheUpdateFromIMem if updateCacheBatch
|
||||||
// flush dirty data of lru into rocks with
|
// flush dirty data of lru into rocks with
|
||||||
// 4, and update when writing if !updateCacheBatch
|
// 4, and update when writing if !updateCacheBatch
|
||||||
|
// 5, merge cache & mem if updateCacheBatch
|
||||||
|
|
||||||
if (tsUpdateCacheBatch) {
|
if (tsUpdateCacheBatch) {
|
||||||
code = tsdbCacheUpdateFromIMem(pTsdb);
|
code = tsdbCacheUpdateFromIMem(pTsdb);
|
||||||
|
@ -1609,13 +1608,6 @@ int32_t tsdbCacheRowFormatUpdate(STsdb *pTsdb, tb_uid_t suid, tb_uid_t uid, int6
|
||||||
|
|
||||||
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
TSDBROW tRow = {.type = TSDBROW_ROW_FMT, .version = version};
|
||||||
int32_t nCol = pTSchema->numOfCols;
|
int32_t nCol = pTSchema->numOfCols;
|
||||||
|
|
||||||
if (!pTsdb->rCache.ctxArray) {
|
|
||||||
pTsdb->rCache.ctxArray = taosArrayInit(nCol * 2, sizeof(SLastUpdateCtx));
|
|
||||||
if (!pTsdb->rCache.ctxArray) {
|
|
||||||
TAOS_CHECK_GOTO(TSDB_CODE_OUT_OF_MEMORY, &lino, _exit);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
SArray *ctxArray = pTsdb->rCache.ctxArray;
|
SArray *ctxArray = pTsdb->rCache.ctxArray;
|
||||||
|
|
||||||
// 1. prepare by lrow
|
// 1. prepare by lrow
|
||||||
|
|
Loading…
Reference in New Issue