chore: code refactor for time series
This commit is contained in:
parent
6efe3845d4
commit
85226d05a3
|
@ -132,10 +132,10 @@ tb_uid_t metaGetTableEntryUidByName(SMeta *pMeta, const char *name);
|
||||||
int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList);
|
int32_t metaGetCachedTbGroup(void *pVnode, tb_uid_t suid, const uint8_t *pKey, int32_t keyLen, SArray **pList);
|
||||||
int32_t metaPutTbGroupToCache(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
int32_t metaPutTbGroupToCache(void *pVnode, uint64_t suid, const void *pKey, int32_t keyLen, void *pPayload,
|
||||||
int32_t payloadLen);
|
int32_t payloadLen);
|
||||||
bool metaTbInFilterCache(void *pVnode, const void* key, int8_t type);
|
bool metaTbInFilterCache(SMeta *pMeta, const void* key, int8_t type);
|
||||||
int32_t metaPutTbToFilterCache(void *pVnode, const void* key, int8_t type);
|
int32_t metaPutTbToFilterCache(SMeta *pMeta, const void* key, int8_t type);
|
||||||
int32_t metaSizeOfTbFilterCache(void *pVnode, int8_t type);
|
int32_t metaSizeOfTbFilterCache(SMeta *pMeta, int8_t type);
|
||||||
int32_t metaInitTbFilterCache(void *pVnode);
|
int32_t metaInitTbFilterCache(SMeta *pMeta);
|
||||||
|
|
||||||
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t *numOfCols);
|
int32_t metaGetStbStats(void *pVnode, int64_t uid, int64_t *numOfTables, int32_t *numOfCols);
|
||||||
|
|
||||||
|
|
|
@ -907,9 +907,7 @@ int32_t metaTbGroupCacheClear(SMeta* pMeta, uint64_t suid) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool metaTbInFilterCache(void* pVnode, const void* key, int8_t type) {
|
bool metaTbInFilterCache(SMeta *pMeta, const void* key, int8_t type) {
|
||||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
|
||||||
|
|
||||||
if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
|
if (type == 0 && taosHashGet(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t))) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -921,9 +919,7 @@ bool metaTbInFilterCache(void* pVnode, const void* key, int8_t type) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaPutTbToFilterCache(void* pVnode, const void* key, int8_t type) {
|
int32_t metaPutTbToFilterCache(SMeta *pMeta, const void* key, int8_t type) {
|
||||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
|
||||||
|
|
||||||
if (type == 0) {
|
if (type == 0) {
|
||||||
return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
|
return taosHashPut(pMeta->pCache->STbFilterCache.pStb, key, sizeof(tb_uid_t), NULL, 0);
|
||||||
}
|
}
|
||||||
|
@ -935,21 +931,20 @@ int32_t metaPutTbToFilterCache(void* pVnode, const void* key, int8_t type) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaSizeOfTbFilterCache(void* pVnode, int8_t type) {
|
int32_t metaSizeOfTbFilterCache(SMeta *pMeta, int8_t type) {
|
||||||
SMeta* pMeta = ((SVnode*)pVnode)->pMeta;
|
|
||||||
if (type == 0) {
|
if (type == 0) {
|
||||||
return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
|
return taosHashGetSize(pMeta->pCache->STbFilterCache.pStb);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t metaInitTbFilterCache(void* pVnode) {
|
int32_t metaInitTbFilterCache(SMeta* pMeta) {
|
||||||
#ifdef TD_ENTERPRISE
|
#ifdef TD_ENTERPRISE
|
||||||
int32_t tbNum = 0;
|
int32_t tbNum = 0;
|
||||||
const char** pTbArr = NULL;
|
const char** pTbArr = NULL;
|
||||||
const char* dbName = NULL;
|
const char* dbName = NULL;
|
||||||
|
|
||||||
if (!(dbName = strchr(((SVnode*)pVnode)->config.dbname, '.'))) return 0;
|
if (!(dbName = strchr(pMeta->pVnode->config.dbname, '.'))) return 0;
|
||||||
if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
|
if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
|
||||||
tbNum = TK_LOG_STB_NUM;
|
tbNum = TK_LOG_STB_NUM;
|
||||||
pTbArr = (const char**)&tkLogStb;
|
pTbArr = (const char**)&tkLogStb;
|
||||||
|
@ -959,7 +954,7 @@ int32_t metaInitTbFilterCache(void* pVnode) {
|
||||||
}
|
}
|
||||||
if (tbNum && pTbArr) {
|
if (tbNum && pTbArr) {
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
if (metaPutTbToFilterCache(pVnode, pTbArr[i], 1) != 0) {
|
if (metaPutTbToFilterCache(pMeta, pTbArr[i], 1) != 0) {
|
||||||
return terrno ? terrno : -1;
|
return terrno ? terrno : -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,6 +176,10 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (metaInitTbFilterCache(pMeta) != 0) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
|
||||||
metaDebug("vgId:%d, meta is opened", TD_VID(pVnode));
|
metaDebug("vgId:%d, meta is opened", TD_VID(pVnode));
|
||||||
|
|
||||||
*ppMeta = pMeta;
|
*ppMeta = pMeta;
|
||||||
|
|
|
@ -396,7 +396,7 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
nStbEntry.stbEntry.schemaTag = pReq->schemaTag;
|
||||||
|
|
||||||
int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols;
|
int32_t deltaCol = pReq->schemaRow.nCols - oStbEntry.stbEntry.schemaRow.nCols;
|
||||||
bool updStat = deltaCol != 0 && !metaTbInFilterCache(pMeta->pVnode, pReq->name, 1);
|
bool updStat = deltaCol != 0 && !metaTbInFilterCache(pMeta, pReq->name, 1);
|
||||||
|
|
||||||
metaWLock(pMeta);
|
metaWLock(pMeta);
|
||||||
// compare two entry
|
// compare two entry
|
||||||
|
@ -766,7 +766,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
|
||||||
}
|
}
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
||||||
bool sysTbl = (pReq->type == TSDB_CHILD_TABLE) && metaTbInFilterCache(pMeta->pVnode, pReq->ctb.stbName, 1);
|
bool sysTbl = (pReq->type == TSDB_CHILD_TABLE) && metaTbInFilterCache(pMeta, pReq->ctb.stbName, 1);
|
||||||
|
|
||||||
// build SMetaEntry
|
// build SMetaEntry
|
||||||
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
|
SVnodeStats *pStats = &pMeta->pVnode->config.vndStats;
|
||||||
|
@ -1110,7 +1110,7 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type, tb_uid_t *p
|
||||||
tDecoderInit(&tdc, tData, tLen);
|
tDecoderInit(&tdc, tData, tLen);
|
||||||
metaDecodeEntry(&tdc, &stbEntry);
|
metaDecodeEntry(&tdc, &stbEntry);
|
||||||
|
|
||||||
if (pSysTbl) *pSysTbl = metaTbInFilterCache(pMeta->pVnode, stbEntry.name, 1) ? 1 : 0;
|
if (pSysTbl) *pSysTbl = metaTbInFilterCache(pMeta, stbEntry.name, 1) ? 1 : 0;
|
||||||
|
|
||||||
SSchema *pTagColumn = NULL;
|
SSchema *pTagColumn = NULL;
|
||||||
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
|
||||||
|
|
|
@ -399,10 +399,6 @@ SVnode *vnodeOpen(const char *path, int32_t diskPrimary, STfs *pTfs, SMsgCb msgC
|
||||||
goto _err;
|
goto _err;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (metaInitTbFilterCache(pVnode) != 0) {
|
|
||||||
goto _err;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
|
if (metaUpgrade(pVnode, &pVnode->pMeta) < 0) {
|
||||||
vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
vError("vgId:%d, failed to upgrade meta since %s", TD_VID(pVnode), tstrerror(terrno));
|
||||||
}
|
}
|
||||||
|
|
|
@ -594,7 +594,7 @@ static int32_t vnodeGetTimeSeriesBlackList(SVnode *pVnode) {
|
||||||
const char **pTbArr = NULL;
|
const char **pTbArr = NULL;
|
||||||
const char *dbName = NULL;
|
const char *dbName = NULL;
|
||||||
|
|
||||||
if (!(dbName = strchr(((SVnode *)pVnode)->config.dbname, '.'))) return 0;
|
if (!(dbName = strchr(pVnode->config.dbname, '.'))) return 0;
|
||||||
if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
|
if (0 == strncmp(++dbName, "log", TSDB_DB_NAME_LEN)) {
|
||||||
tbNum = TK_LOG_STB_NUM;
|
tbNum = TK_LOG_STB_NUM;
|
||||||
pTbArr = (const char **)&tkLogStb;
|
pTbArr = (const char **)&tkLogStb;
|
||||||
|
@ -603,15 +603,15 @@ static int32_t vnodeGetTimeSeriesBlackList(SVnode *pVnode) {
|
||||||
pTbArr = (const char **)&tkAuditStb;
|
pTbArr = (const char **)&tkAuditStb;
|
||||||
}
|
}
|
||||||
if (tbNum && pTbArr) {
|
if (tbNum && pTbArr) {
|
||||||
tbSize = metaSizeOfTbFilterCache(pVnode, 0);
|
tbSize = metaSizeOfTbFilterCache(pVnode->pMeta, 0);
|
||||||
if (tbSize < tbNum) {
|
if (tbSize < tbNum) {
|
||||||
for (int32_t i = 0; i < tbNum; ++i) {
|
for (int32_t i = 0; i < tbNum; ++i) {
|
||||||
tb_uid_t suid = metaGetTableEntryUidByName(pVnode->pMeta, pTbArr[i]);
|
tb_uid_t suid = metaGetTableEntryUidByName(pVnode->pMeta, pTbArr[i]);
|
||||||
if (suid != 0) {
|
if (suid != 0) {
|
||||||
metaPutTbToFilterCache(pVnode, &suid, 0);
|
metaPutTbToFilterCache(pVnode->pMeta, &suid, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tbSize = metaSizeOfTbFilterCache(pVnode, 0);
|
tbSize = metaSizeOfTbFilterCache(pVnode->pMeta, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -622,7 +622,7 @@ static int32_t vnodeGetTimeSeriesBlackList(SVnode *pVnode) {
|
||||||
static bool vnodeTimeSeriesFilter(void *arg1, void *arg2) {
|
static bool vnodeTimeSeriesFilter(void *arg1, void *arg2) {
|
||||||
SVnode *pVnode = (SVnode *)arg1;
|
SVnode *pVnode = (SVnode *)arg1;
|
||||||
|
|
||||||
if (metaTbInFilterCache(pVnode, arg2, 0)) {
|
if (metaTbInFilterCache(pVnode->pMeta, arg2, 0)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
|
|
Loading…
Reference in New Issue