TD-2354<feature>: implement last_row cache
This commit is contained in:
parent
0f1d6ee6c1
commit
5808280ace
|
@ -66,6 +66,7 @@ typedef struct {
|
|||
int8_t precision;
|
||||
int8_t compression;
|
||||
int8_t update;
|
||||
int8_t cacheLastRow;
|
||||
} STsdbCfg;
|
||||
|
||||
// --------- TSDB REPOSITORY USAGE STATISTICS
|
||||
|
@ -119,7 +120,7 @@ STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
|
|||
int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
|
||||
int tsdbDropTable(TSDB_REPO_T *pRepo, STableId tableId);
|
||||
int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg);
|
||||
TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
|
||||
// TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid);
|
||||
|
||||
uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
|
||||
|
||||
|
|
|
@ -66,7 +66,10 @@ typedef struct STable {
|
|||
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
|
||||
void* eventHandler; // TODO
|
||||
void* streamHandler; // TODO
|
||||
TSKEY lastKey; // lastkey inserted in this table, initialized as 0, TODO: make a structure
|
||||
union {
|
||||
TSKEY lastKey;
|
||||
SDataRow lastRow;
|
||||
};
|
||||
char* sql;
|
||||
void* cqhandle;
|
||||
SRWLatch latch; // TODO: implementa latch functions
|
||||
|
@ -360,8 +363,11 @@ typedef struct {
|
|||
#define TABLE_UID(t) (t)->tableId.uid
|
||||
#define TABLE_TID(t) (t)->tableId.tid
|
||||
#define TABLE_SUID(t) (t)->suid
|
||||
#define TABLE_LASTKEY(t) (t)->lastKey
|
||||
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
|
||||
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
|
||||
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
|
||||
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
|
||||
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
|
||||
|
||||
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
|
||||
void tsdbFreeMeta(STsdbMeta* pMeta);
|
||||
|
@ -391,7 +397,7 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
|
|||
STSchema* pSchema = NULL;
|
||||
STSchema* pTSchema = NULL;
|
||||
|
||||
if (lock) taosRLockLatch(&(pDTable->latch));
|
||||
if (lock) TSDB_RLOCK_TABLE(pDTable);
|
||||
if (version < 0) { // get the latest version of schema
|
||||
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
|
||||
} else { // get the schema with version
|
||||
|
@ -413,7 +419,7 @@ static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock,
|
|||
}
|
||||
|
||||
_exit:
|
||||
if (lock) taosRUnLockLatch(&(pDTable->latch));
|
||||
if (lock) TSDB_RUNLOCK_TABLE(pDTable);
|
||||
return pSchema;
|
||||
}
|
||||
|
||||
|
@ -433,6 +439,18 @@ static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable, bool cacheLastRow) {
|
||||
if (cacheLastRow) {
|
||||
if (pTable->lastRow == NULL) {
|
||||
return TSKEY_INITIAL_VAL;
|
||||
} else {
|
||||
return dataRowKey(pTable->lastRow);
|
||||
}
|
||||
} else {
|
||||
return pTable->lastKey;
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------ tsdbBuffer.c
|
||||
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
||||
|
||||
|
|
|
@ -220,7 +220,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
SCommitIter *pIter = iters + tid;
|
||||
if (pIter->pTable == NULL) continue;
|
||||
|
||||
taosRLockLatch(&(pIter->pTable->latch));
|
||||
TSDB_RLOCK_TABLE(pIter->pTable);
|
||||
|
||||
if (tsdbSetHelperTable(pHelper, pIter->pTable, pRepo) < 0) goto _err;
|
||||
|
||||
|
@ -231,7 +231,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
}
|
||||
|
||||
if (tsdbCommitTableData(pHelper, pIter, pDataCols, maxKey) < 0) {
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
tsdbError("vgId:%d failed to write data of table %s tid %d uid %" PRIu64 " since %s", REPO_ID(pRepo),
|
||||
TABLE_CHAR_NAME(pIter->pTable), TABLE_TID(pIter->pTable), TABLE_UID(pIter->pTable),
|
||||
tstrerror(terrno));
|
||||
|
@ -239,7 +239,7 @@ static int tsdbCommitToFile(STsdbRepo *pRepo, int fid, SCommitIter *iters, SRWHe
|
|||
}
|
||||
}
|
||||
|
||||
taosRUnLockLatch(&(pIter->pTable->latch));
|
||||
TSDB_RUNLOCK_TABLE(pIter->pTable);
|
||||
|
||||
// Move the last block to the new .l file if neccessary
|
||||
if (tsdbMoveLastBlockIfNeccessary(pHelper) < 0) {
|
||||
|
|
|
@ -77,9 +77,9 @@ int32_t tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg) {
|
|||
|
||||
tsdbDebug(
|
||||
"vgId:%d tsdb env create succeed! cacheBlockSize %d totalBlocks %d daysPerFile %d keep "
|
||||
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d",
|
||||
"%d minRowsPerFileBlock %d maxRowsPerFileBlock %d precision %d compression %d update %d cacheLastRow %d",
|
||||
pCfg->tsdbId, pCfg->cacheBlockSize, pCfg->totalBlocks, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock,
|
||||
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression);
|
||||
pCfg->maxRowsPerFileBlock, pCfg->precision, pCfg->compression, pCfg->update, pCfg->cacheLastRow);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -475,6 +475,9 @@ static int32_t tsdbCheckAndSetDefaultCfg(STsdbCfg *pCfg) {
|
|||
// update check
|
||||
if (pCfg->update != 0) pCfg->update = 1;
|
||||
|
||||
// update cacheLastRow
|
||||
if (pCfg->cacheLastRow != 0) pCfg->cacheLastRow = 1;
|
||||
|
||||
return 0;
|
||||
|
||||
_err:
|
||||
|
@ -692,10 +695,12 @@ static void tsdbFreeRepo(STsdbRepo *pRepo) {
|
|||
}
|
||||
}
|
||||
|
||||
static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
||||
static int tsdbRestoreInfo(STsdbRepo *pRepo) { // TODO
|
||||
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||
STsdbFileH *pFileH = pRepo->tsdbFileH;
|
||||
SFileGroup *pFGroup = NULL;
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
SCompBlock *pBlock = NULL;
|
||||
|
||||
SFileGroupIter iter;
|
||||
SRWHelper rhelper = {0};
|
||||
|
@ -713,7 +718,33 @@ static int tsdbRestoreInfo(STsdbRepo *pRepo) {
|
|||
if (tsdbSetHelperTable(&rhelper, pTable, pRepo) < 0) goto _err;
|
||||
SCompIdx *pIdx = &(rhelper.curCompIdx);
|
||||
|
||||
if (pIdx->offset > 0 && pTable->lastKey < pIdx->maxKey) pTable->lastKey = pIdx->maxKey;
|
||||
TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable, pCfg->cacheLastRow);
|
||||
if (pIdx->offset > 0 && lastKey < pIdx->maxKey) {
|
||||
if (pCfg->cacheLastRow) { // load the block of data
|
||||
if (tsdbLoadCompInfo(&rhelper, NULL) < 0) goto _err;
|
||||
|
||||
pBlock = rhelper.pCompInfo->blocks + pIdx->numOfBlocks - 1;
|
||||
if (tsdbLoadBlockData(&rhelper, pBlock, NULL) < 0) goto _err;
|
||||
|
||||
// construct the data row
|
||||
ASSERT(pTable->lastRow == NULL);
|
||||
STSchema *pSchema = tsdbGetTableSchema(pTable);
|
||||
pTable->lastRow = taosTMalloc(schemaTLen(pSchema));
|
||||
if (pTable->lastRow == NULL) {
|
||||
goto _err;
|
||||
}
|
||||
|
||||
tdInitDataRow(pTable->lastRow, pSchema);
|
||||
for (int icol = 0; icol < schemaNCols(pSchema); icol++) {
|
||||
STColumn *pCol = schemaColAt(pSchema, icol);
|
||||
SDataCol *pDataCol = rhelper.pDataCols[0]->cols + icol;
|
||||
tdAppendColVal(pTable->lastRow, tdGetColDataOfRow(pDataCol, pBlock->numOfRows - 1), pCol->type, pCol->bytes,
|
||||
pCol->offset);
|
||||
}
|
||||
} else {
|
||||
pTable->lastKey = pIdx->maxKey;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -800,6 +831,7 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg) {
|
|||
tlen += taosEncodeFixedI8(buf, pCfg->precision);
|
||||
tlen += taosEncodeFixedI8(buf, pCfg->compression);
|
||||
tlen += taosEncodeFixedI8(buf, pCfg->update);
|
||||
tlen += taosEncodeFixedI8(buf, pCfg->cacheLastRow);
|
||||
|
||||
return tlen;
|
||||
}
|
||||
|
@ -817,6 +849,7 @@ static void *tsdbDecodeCfg(void *buf, STsdbCfg *pCfg) {
|
|||
buf = taosDecodeFixedI8(buf, &(pCfg->precision));
|
||||
buf = taosDecodeFixedI8(buf, &(pCfg->compression));
|
||||
buf = taosDecodeFixedI8(buf, &(pCfg->update));
|
||||
buf = taosDecodeFixedI8(buf, &(pCfg->cacheLastRow));
|
||||
|
||||
return buf;
|
||||
}
|
||||
|
|
|
@ -35,6 +35,7 @@ static int tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter, SSubmitBlk **pPB
|
|||
static int tsdbCheckTableSchema(STsdbRepo *pRepo, SSubmitBlk *pBlock, STable *pTable);
|
||||
static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **rows, int rowCounter);
|
||||
static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter);
|
||||
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row);
|
||||
|
||||
static FORCE_INLINE int tsdbCheckRowRange(STsdbRepo *pRepo, STable *pTable, SDataRow row, TSKEY minKey, TSKEY maxKey,
|
||||
TSKEY now);
|
||||
|
@ -663,9 +664,10 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SDataRow row, STable *pTable, void
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (key > TABLE_LASTKEY(pTable)) {
|
||||
TSKEY lastKey = tsdbGetTableLastKeyImpl(pTable, pCfg->cacheLastRow);
|
||||
if (key > lastKey) {
|
||||
tsdbTrace("vgId:%d skip to delete row key %" PRId64 " which is larger than table lastKey %" PRId64,
|
||||
REPO_ID(pRepo), key, TABLE_LASTKEY(pTable));
|
||||
REPO_ID(pRepo), key, lastKey);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
@ -846,8 +848,10 @@ static int tsdbInsertDataToTableImpl(STsdbRepo *pRepo, STable *pTable, void **ro
|
|||
if (pTableData->keyLast < dataRowKey(rows[rowCounter - 1])) pTableData->keyLast = dataRowKey(rows[rowCounter - 1]);
|
||||
pTableData->numOfRows += dsize;
|
||||
|
||||
// TODO: impl delete row thing
|
||||
if (TABLE_LASTKEY(pTable) < dataRowKey(rows[rowCounter-1])) TABLE_LASTKEY(pTable) = dataRowKey(rows[rowCounter-1]);
|
||||
// update table latest info
|
||||
if (tsdbUpdateTableLatestInfo(pRepo, pTable, rows[rowCounter - 1]) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -890,3 +894,35 @@ static void tsdbFreeRows(STsdbRepo *pRepo, void **rows, int rowCounter) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
static int tsdbUpdateTableLatestInfo(STsdbRepo *pRepo, STable *pTable, SDataRow row) {
|
||||
STsdbCfg *pCfg = &pRepo->config;
|
||||
|
||||
if (tsdbGetTableLastKeyImpl(pTable, pCfg->cacheLastRow) < dataRowKey(row)) {
|
||||
if (pCfg->cacheLastRow) {
|
||||
SDataRow nrow = pTable->lastRow;
|
||||
if (taosTSizeof(nrow) < dataRowLen(row)) {
|
||||
SDataRow orow = nrow;
|
||||
nrow = taosTMalloc(dataRowLen(row));
|
||||
if (nrow == NULL) {
|
||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
dataRowCpy(nrow, row);
|
||||
TSDB_WLOCK_TABLE(pTable);
|
||||
pTable->lastRow = nrow;
|
||||
TSDB_WUNLOCK_TABLE(pTable);
|
||||
taosTZfree(orow);
|
||||
} else {
|
||||
TSDB_WLOCK_TABLE(pTable);
|
||||
dataRowCpy(nrow, row);
|
||||
TSDB_WUNLOCK_TABLE(pTable);
|
||||
}
|
||||
} else {
|
||||
pTable->lastKey = dataRowKey(row);
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -377,11 +377,11 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
|||
|
||||
// Chage in memory
|
||||
if (pNewSchema != NULL) { // change super table tag schema
|
||||
taosWLockLatch(&(pTable->pSuper->latch));
|
||||
TSDB_WLOCK_TABLE(pTable->pSuper);
|
||||
STSchema *pOldSchema = pTable->pSuper->tagSchema;
|
||||
pTable->pSuper->tagSchema = pNewSchema;
|
||||
tdFreeSchema(pOldSchema);
|
||||
taosWUnLockLatch(&(pTable->pSuper->latch));
|
||||
TSDB_WUNLOCK_TABLE(pTable->pSuper);
|
||||
}
|
||||
|
||||
bool isChangeIndexCol = (pMsg->colId == colColId(schemaColAt(pTable->pSuper->tagSchema, 0)));
|
||||
|
@ -392,9 +392,9 @@ int tsdbUpdateTableTagValue(TSDB_REPO_T *repo, SUpdateTableTagValMsg *pMsg) {
|
|||
tsdbWLockRepoMeta(pRepo);
|
||||
tsdbRemoveTableFromIndex(pMeta, pTable);
|
||||
}
|
||||
taosWLockLatch(&(pTable->latch));
|
||||
TSDB_WLOCK_TABLE(pTable);
|
||||
tdSetKVRowDataOfCol(&(pTable->tagVal), pMsg->colId, pMsg->type, POINTER_SHIFT(pMsg->data, pMsg->schemaLen));
|
||||
taosWUnLockLatch(&(pTable->latch));
|
||||
TSDB_WUNLOCK_TABLE(pTable);
|
||||
if (isChangeIndexCol) {
|
||||
tsdbAddTableIntoIndex(pMeta, pTable, false);
|
||||
tsdbUnlockRepoMeta(pRepo);
|
||||
|
@ -587,7 +587,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
|
|||
STable *pCTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
|
||||
ASSERT(schemaVersion(pSchema) > schemaVersion(pCTable->schema[pCTable->numOfSchemas - 1]));
|
||||
|
||||
taosWLockLatch(&(pCTable->latch));
|
||||
TSDB_WLOCK_TABLE(pCTable);
|
||||
if (pCTable->numOfSchemas < TSDB_MAX_TABLE_SCHEMAS) {
|
||||
pCTable->schema[pCTable->numOfSchemas++] = pSchema;
|
||||
} else {
|
||||
|
@ -599,7 +599,7 @@ void tsdbUpdateTableSchema(STsdbRepo *pRepo, STable *pTable, STSchema *pSchema,
|
|||
|
||||
if (schemaNCols(pSchema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pSchema);
|
||||
if (schemaTLen(pSchema) > pMeta->maxRowBytes) pMeta->maxRowBytes = schemaTLen(pSchema);
|
||||
taosWUnLockLatch(&(pCTable->latch));
|
||||
TSDB_WUNLOCK_TABLE(pCTable);
|
||||
|
||||
if (insertAct) {
|
||||
int tlen = tsdbGetTableEncodeSize(TSDB_UPDATE_META, pCTable);
|
||||
|
@ -663,7 +663,7 @@ static STable *tsdbNewTable() {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||
// pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||
|
||||
return pTable;
|
||||
}
|
||||
|
@ -782,6 +782,13 @@ static void tsdbFreeTable(STable *pTable) {
|
|||
|
||||
static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, bool lock) {
|
||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
||||
STsdbCfg * pCfg = &(pRepo->config);
|
||||
|
||||
if (pCfg->cacheLastRow) {
|
||||
pTable->lastRow = NULL;
|
||||
} else {
|
||||
pTable->lastKey = TSKEY_INITIAL_VAL;
|
||||
}
|
||||
|
||||
if (lock && tsdbWLockRepoMeta(pRepo) < 0) {
|
||||
tsdbError("vgId:%d failed to add table %s to meta since %s", REPO_ID(pRepo), TABLE_CHAR_NAME(pTable),
|
||||
|
|
Loading…
Reference in New Issue