[TD-2378]<enhance>: reduce table meta memory consumption.
This commit is contained in:
parent
b64c30cfdb
commit
47fa1deaab
|
@ -494,7 +494,8 @@ static FORCE_INLINE void tscGetResultColumnChr(SSqlRes* pRes, SFieldInfo* pField
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
extern SHashObj *tscHashMap;
|
extern int32_t sentinel;
|
||||||
|
extern SHashObj *tscVgroupMap;
|
||||||
extern SHashObj *tscTableMetaInfo;
|
extern SHashObj *tscTableMetaInfo;
|
||||||
|
|
||||||
extern int tscObjRef;
|
extern int tscObjRef;
|
||||||
|
|
|
@ -121,7 +121,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
||||||
int32_t vgId = pTableMetaInfo->pTableMeta->vgId;
|
int32_t vgId = pTableMetaInfo->pTableMeta->vgId;
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
||||||
taosHashGetClone(tscHashMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
|
assert(vgroupInfo.numOfEps > 0 && vgroupInfo.vgId > 0);
|
||||||
|
|
||||||
tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
tscDebug("before: Endpoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
||||||
|
@ -133,7 +133,7 @@ static void tscUpdateVgroupInfo(SSqlObj *pObj, SRpcEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
tscDebug("after: EndPoint in use:%d, numOfEps:%d", vgroupInfo.inUse, vgroupInfo.numOfEps);
|
||||||
taosHashPut(tscHashMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
void tscProcessHeartBeatRsp(void *param, TAOS_RES *tres, int code) {
|
||||||
|
@ -553,7 +553,7 @@ int tscBuildSubmitMsg(SSqlObj *pSql, SSqlInfo *pInfo) {
|
||||||
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
pSql->cmd.msgType = TSDB_MSG_TYPE_SUBMIT;
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {0};
|
SNewVgroupInfo vgroupInfo = {0};
|
||||||
taosHashGetClone(tscHashMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||||
|
|
||||||
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, pTableMeta->vgId, pSql->cmd.numOfTablesInSubmit,
|
tscDebug("%p build submit msg, vgId:%d numOfTables:%d numberOfEP:%d", pSql, pTableMeta->vgId, pSql->cmd.numOfTablesInSubmit,
|
||||||
|
@ -618,7 +618,7 @@ static char *doSerializeTableInfo(SQueryTableMsg* pQueryMsg, SSqlObj *pSql, char
|
||||||
vgId = pTableMeta->vgId;
|
vgId = pTableMeta->vgId;
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {0};
|
SNewVgroupInfo vgroupInfo = {0};
|
||||||
taosHashGetClone(tscHashMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1459,7 +1459,7 @@ int tscBuildUpdateTagMsg(SSqlObj* pSql, SSqlInfo *pInfo) {
|
||||||
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
STableMeta *pTableMeta = tscGetMetaInfo(pQueryInfo, 0)->pTableMeta;
|
||||||
|
|
||||||
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
SNewVgroupInfo vgroupInfo = {.vgId = -1};
|
||||||
taosHashGetClone(tscHashMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &pTableMeta->vgId, sizeof(pTableMeta->vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
assert(vgroupInfo.vgId > 0);
|
assert(vgroupInfo.vgId > 0);
|
||||||
|
|
||||||
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
tscDumpEpSetFromVgroupInfo(&pSql->epSet, &vgroupInfo);
|
||||||
|
@ -1849,13 +1849,13 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
|
||||||
// update the vgroupInfo if needed
|
// update the vgroupInfo if needed
|
||||||
int32_t vgId = pTableMeta->vgId;
|
int32_t vgId = pTableMeta->vgId;
|
||||||
SNewVgroupInfo vgroupInfo = {.inUse = -1};
|
SNewVgroupInfo vgroupInfo = {.inUse = -1};
|
||||||
taosHashGetClone(tscHashMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
taosHashGetClone(tscVgroupMap, &vgId, sizeof(vgId), NULL, &vgroupInfo, sizeof(SNewVgroupInfo));
|
||||||
|
|
||||||
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, &pMetaMsg->vgroup)) ||
|
if (((vgroupInfo.inUse >= 0) && !vgroupInfoIdentical(&vgroupInfo, &pMetaMsg->vgroup)) ||
|
||||||
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
(vgroupInfo.inUse < 0)) { // vgroup info exists, compare with it
|
||||||
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
|
vgroupInfo = createNewVgroupInfo(&pMetaMsg->vgroup);
|
||||||
taosHashPut(tscHashMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
taosHashPut(tscVgroupMap, &vgId, sizeof(vgId), &vgroupInfo, sizeof(vgroupInfo));
|
||||||
tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscHashMap));
|
tscDebug("add new VgroupInfo, vgId:%d, total:%d", vgId, (int32_t) taosHashGetSize(tscVgroupMap));
|
||||||
}
|
}
|
||||||
|
|
||||||
tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
|
tscDebug("%p recv table meta, uid:%"PRId64 ", tid:%d, name:%s", pSql, pTableMeta->id.uid, pTableMeta->id.tid, pTableMetaInfo->name);
|
||||||
|
|
|
@ -31,15 +31,20 @@
|
||||||
#include "tlocale.h"
|
#include "tlocale.h"
|
||||||
|
|
||||||
// global, not configurable
|
// global, not configurable
|
||||||
SHashObj *tscHashMap; // hash map to keep the global vgroup info
|
#define TSC_VAR_NOT_RELEASE 1
|
||||||
SHashObj *tscTableMetaInfo; // table meta info
|
#define TSC_VAR_RELEASED 0
|
||||||
int tscObjRef = -1;
|
|
||||||
void *tscTmr;
|
|
||||||
void *tscQhandle;
|
|
||||||
void *tscCheckDiskUsageTmr;
|
|
||||||
int tscRefId = -1;
|
|
||||||
int tscNumOfObj = 0; // number of sqlObj in current process.
|
|
||||||
|
|
||||||
|
int32_t sentinel = TSC_VAR_NOT_RELEASE;
|
||||||
|
|
||||||
|
SHashObj *tscVgroupMap; // hash map to keep the global vgroup info
|
||||||
|
SHashObj *tscTableMetaInfo; // table meta info
|
||||||
|
int32_t tscObjRef = -1;
|
||||||
|
void *tscTmr;
|
||||||
|
void *tscQhandle;
|
||||||
|
int32_t tscRefId = -1;
|
||||||
|
int32_t tscNumOfObj = 0; // number of sqlObj in current process.
|
||||||
|
|
||||||
|
static void *tscCheckDiskUsageTmr;
|
||||||
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
static pthread_once_t tscinit = PTHREAD_ONCE_INIT;
|
||||||
|
|
||||||
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
|
void tscCheckDiskUsage(void *UNUSED_PARAM(para), void* UNUSED_PARAM(param)) {
|
||||||
|
@ -131,7 +136,7 @@ void taos_init_imp(void) {
|
||||||
|
|
||||||
if (tscTableMetaInfo == NULL) {
|
if (tscTableMetaInfo == NULL) {
|
||||||
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
|
tscObjRef = taosOpenRef(40960, tscFreeRegisteredSqlObj);
|
||||||
tscHashMap = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
tscVgroupMap = taosHashInit(256, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
tscTableMetaInfo = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_ENTRY_LOCK);
|
||||||
tscDebug("TableMeta:%p", tscTableMetaInfo);
|
tscDebug("TableMeta:%p", tscTableMetaInfo);
|
||||||
}
|
}
|
||||||
|
@ -151,30 +156,38 @@ void taos_init() { pthread_once(&tscinit, taos_init_imp); }
|
||||||
void taos_cleanup(void) {
|
void taos_cleanup(void) {
|
||||||
tscDebug("start to cleanup client environment");
|
tscDebug("start to cleanup client environment");
|
||||||
|
|
||||||
void* m = tscTableMetaInfo;
|
if (atomic_val_compare_exchange_32(&sentinel, TSC_VAR_NOT_RELEASE, TSC_VAR_RELEASED) != TSC_VAR_NOT_RELEASE) {
|
||||||
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTableMetaInfo, m, 0) == m) {
|
return;
|
||||||
taosCacheCleanup(m);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int refId = atomic_exchange_32(&tscObjRef, -1);
|
taosHashCleanup(tscTableMetaInfo);
|
||||||
if (refId != -1) {
|
tscTableMetaInfo = NULL;
|
||||||
taosCloseRef(refId);
|
|
||||||
}
|
|
||||||
|
|
||||||
m = tscQhandle;
|
taosHashCleanup(tscVgroupMap);
|
||||||
if (m != NULL && atomic_val_compare_exchange_ptr(&tscQhandle, m, 0) == m) {
|
tscVgroupMap = NULL;
|
||||||
taosCleanUpScheduler(m);
|
|
||||||
}
|
int32_t id = tscObjRef;
|
||||||
|
tscObjRef = -1;
|
||||||
|
taosCloseRef(id);
|
||||||
|
|
||||||
|
void* p = tscQhandle;
|
||||||
|
tscQhandle = NULL;
|
||||||
|
taosCleanUpScheduler(p);
|
||||||
|
|
||||||
|
id = tscRefId;
|
||||||
|
tscRefId = -1;
|
||||||
|
taosCloseRef(id);
|
||||||
|
|
||||||
taosCloseRef(tscRefId);
|
|
||||||
taosCleanupKeywordsTable();
|
taosCleanupKeywordsTable();
|
||||||
taosCloseLog();
|
taosCloseLog();
|
||||||
if (tscEmbedded == 0) rpcCleanup();
|
|
||||||
|
|
||||||
m = tscTmr;
|
if (tscEmbedded == 0) {
|
||||||
if (m != NULL && atomic_val_compare_exchange_ptr(&tscTmr, m, 0) == m) {
|
rpcCleanup();
|
||||||
taosTmrCleanUp(m);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
p = tscTmr;
|
||||||
|
tscTmr = NULL;
|
||||||
|
taosTmrCleanUp(p);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
|
static int taos_options_imp(TSDB_OPTION option, const char *pStr) {
|
||||||
|
|
|
@ -883,6 +883,7 @@ void tscCloseTscObj(void *param) {
|
||||||
rpcClose(pObj->pDnodeConn);
|
rpcClose(pObj->pDnodeConn);
|
||||||
pObj->pDnodeConn = NULL;
|
pObj->pDnodeConn = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
tfree(pObj->tscCorMgmtEpSet);
|
tfree(pObj->tscCorMgmtEpSet);
|
||||||
pthread_mutex_destroy(&pObj->mutex);
|
pthread_mutex_destroy(&pObj->mutex);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue