fix: compile issues

This commit is contained in:
dapan1121 2024-07-18 14:13:29 +08:00
parent 123b0e9378
commit 5cd03bb2ce
2 changed files with 431 additions and 304 deletions

View File

@ -220,7 +220,7 @@ void syslog(int unused, const char *format, ...);
// Linux, length of name must <= 16 (the last '\0' included)
#define setThreadName(name) \
do { \
prctl(PR_SET_NAME, (name)); \
(void)prctl(PR_SET_NAME, (name)); \
} while (0)
#define getThreadName(name) \
do { \

View File

@ -1469,6 +1469,165 @@ _return:
CTG_RET(code);
}
int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, int32_t tsmaVersion, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
if (NULL == op) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgCacheOperation));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
op->opId = CTG_OP_UPDATE_TB_TSMA;
op->syncOp = syncOp;
SCtgUpdateTbTSMAMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbTSMAMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbTSMAMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
msg->pTsma = *pTsma;
msg->dbTsmaVersion = tsmaVersion;
msg->dbId = (*pTsma)->dbId;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
*pTsma = NULL;
return TSDB_CODE_SUCCESS;
_return:
CTG_RET(code);
}
int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation* op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
if (NULL == op) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgCacheOperation));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
op->opId = CTG_OP_DROP_TB_TSMA;
op->syncOp = syncOp;
SCtgDropTbTSMAMsg* msg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg));
if (!msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbTSMAMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
msg->dbId = pTsma->dbId;
msg->tbId = pTsma->suid;
msg->tsmaId = pTsma->tsmaId;
tstrncpy(msg->dbFName, pTsma->dbFName, TSDB_DB_FNAME_LEN);
tstrncpy(msg->tbName, pTsma->tb, TSDB_TABLE_NAME_LEN);
tstrncpy(msg->tsmaName, pTsma->name, TSDB_TABLE_NAME_LEN);
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
CTG_RET(code);
}
static int32_t createDropAllTbTsmaCtgCacheOp(SCatalog* pCtg, const STSMACache* pCache, bool syncOp, SCtgCacheOperation** ppOp) {
SCtgCacheOperation* pOp = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
if (NULL == pOp) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgCacheOperation));
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
SCtgDropTbTSMAMsg* pMsg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg));
if (NULL == pMsg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbTSMAMsg));
taosMemoryFree(pOp);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
pOp->opId = CTG_OP_DROP_TB_TSMA;
pOp->syncOp = syncOp;
pMsg->pCtg = pCtg;
pMsg->dbId = pCache->dbId;
pMsg->tbId = pCache->suid;
pMsg->tsmaId = pCache->tsmaId;
pMsg->dropAllForTb = true;
tstrncpy(pMsg->tsmaName, pCache->name, TSDB_TABLE_NAME_LEN);
tstrncpy(pMsg->dbFName, pCache->dbFName, TSDB_DB_FNAME_LEN);
tstrncpy(pMsg->tbName, pCache->tb, TSDB_TABLE_NAME_LEN);
pOp->data = pMsg;
*ppOp = pOp;
return TSDB_CODE_SUCCESS;
}
int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
ctgDebug("drop tsma meta for tb: %s.%s", pName->dbname, pName->tname);
int32_t code = 0;
SCtgDBCache *pDbCache = NULL;
SCtgCacheOperation *pOp = NULL;
char dbFName[TSDB_DB_FNAME_LEN];
SCtgTSMACache *pCtgCache = NULL;
(void)tNameGetFullDbName(pName, dbFName);
CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache));
if (NULL == pDbCache || !pDbCache->tsmaCache) {
goto _return;
}
pCtgCache = taosHashAcquire(pDbCache->tsmaCache, pName->tname, strlen(pName->tname));
if (!pCtgCache) {
goto _return;
}
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
if (!pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) {
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
goto _return;
}
STSMACache *pCache = taosArrayGetP(pCtgCache->pTsmas, 0);
if (NULL == pCache) {
ctgError("fail to get the 0th STSMACache, total:%d", (int32_t)pCtgCache->pTsmas->size);
code = TSDB_CODE_CTG_INTERNAL_ERROR;
}
if (TSDB_CODE_SUCCESS == code) {
code = createDropAllTbTsmaCtgCacheOp(pCtg, pCache, syncOp, &pOp);
}
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
CTG_ERR_JRET(code);
CTG_ERR_JRET(ctgEnqueue(pCtg, pOp));
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
return TSDB_CODE_SUCCESS;
_return:
if (pCtgCache) {
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
}
if (pOp) {
taosMemoryFree(pOp->data);
taosMemoryFree(pOp);
}
CTG_RET(code);
}
int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
int32_t code = 0;
@ -1888,6 +2047,103 @@ int32_t ctgVgInfoIdComp(void const *lp, void const *rp) {
return 0;
}
int32_t ctgWriteTbTSMAToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName,
STSMACache **ppTsmaCache) {
if (NULL == dbCache->tsmaCache) {
ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
STSMACache *pTsmaCache = *ppTsmaCache;
int32_t code = TSDB_CODE_SUCCESS;
SCtgTSMACache* pCache = taosHashGet(dbCache->tsmaCache, tbName, strlen(tbName));
if (!pCache) {
SCtgTSMACache cache = {0};
cache.pTsmas = taosArrayInit(4, sizeof(POINTER_BYTES));
if (NULL == cache.pTsmas) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (NULL == taosArrayPush(cache.pTsmas, &pTsmaCache)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (taosHashPut(dbCache->tsmaCache, tbName, strlen(tbName), &cache, sizeof(cache))) {
ctgError("taosHashPut new tsmacache for tb: %s.%s failed", dbFName, tbName);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
(void)atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(STSMACache) + ctgGetTbTSMACacheSize(pTsmaCache));
CTG_DB_NUM_INC(CTG_CI_TBL_TSMA);
ctgDebug("tb %s tsma updated to cache, name: %s", tbName, pTsmaCache->name);
CTG_ERR_JRET(ctgUpdateRentTSMAVersion(pCtg, dbFName, pTsmaCache));
*ppTsmaCache = NULL;
goto _return;
}
CTG_LOCK(CTG_WRITE, &pCache->tsmaLock);
if (pCache->pTsmas) {
uint64_t cacheSize = 0;
for (int32_t i = 0; i < pCache->pTsmas->size; ++i) {
STableTSMAInfo* pInfo = taosArrayGetP(pCache->pTsmas, i);
if (NULL == pInfo) {
ctgError("fail to get the %dth STableTSMAInfo, total:%d", i, (int32_t)pCache->pTsmas->size);
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
}
if (pInfo->tsmaId == pTsmaCache->tsmaId) {
ctgDebug("tsma: %s removed from cache, history from %d to %d, reqTs from %" PRId64 " to %" PRId64
"rspTs from %" PRId64 " to %" PRId64 " delay from %" PRId64 " to %" PRId64,
pInfo->name, pInfo->fillHistoryFinished, pTsmaCache->fillHistoryFinished, pInfo->reqTs,
pTsmaCache->reqTs, pInfo->rspTs, pTsmaCache->rspTs, pInfo->delayDuration, pTsmaCache->delayDuration);
cacheSize = ctgGetTbTSMACacheSize(pInfo);
taosArrayRemove(pCache->pTsmas, i);
(void)atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize);
tFreeTableTSMAInfo(pInfo);
taosMemoryFreeClear(pInfo);
break;
}
}
} else {
pCache->pTsmas = taosArrayInit(4, sizeof(POINTER_BYTES));
if (!pCache->pTsmas) {
CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
// push the new cache
if (NULL == taosArrayPush(pCache->pTsmas, &pTsmaCache)) {
code = TSDB_CODE_OUT_OF_MEMORY;
} else {
*ppTsmaCache = NULL;
(void)atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbTSMACacheSize(pTsmaCache));
CTG_ERR_RET(ctgUpdateRentTSMAVersion(pCtg, dbFName, pTsmaCache));
ctgDebug("table %s tsma updated to cache, tsma: %s", tbName, pTsmaCache->name);
}
CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock);
_return:
CTG_RET(code);
}
int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateVgMsg *msg = operation->data;
@ -2563,7 +2819,7 @@ int32_t ctgOpDropViewMeta(SCtgCacheOperation *operation) {
ctgError("view %s not exist in cache, dbFName:%s", msg->viewName, msg->dbFName);
CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR);
} else {
atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(SCtgViewCache) + strlen(msg->viewName));
(void)atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(SCtgViewCache) + strlen(msg->viewName));
CTG_DB_NUM_DEC(CTG_CI_VIEW);
}
@ -2658,6 +2914,150 @@ _return:
CTG_RET(code);
}
int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgDropTbTSMAMsg * msg = operation->data;
SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
if (pCtg->stopUpdate) {
goto _return;
}
CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
if (NULL == dbCache || !dbCache->tsmaCache || (msg->dbId != dbCache->dbId && msg->dbId != 0)) {
goto _return;
}
SCtgTSMACache* pCtgCache = taosHashGet(dbCache->tsmaCache, msg->tbName, strlen(msg->tbName));
if (!pCtgCache || !pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) {
goto _return;
}
uint64_t cacheSize = 0;
STSMACache *pCache = NULL;
if (msg->dropAllForTb) {
CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock);
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
pCache = taosArrayGetP(pCtgCache->pTsmas, i);
if (NULL == pCache) {
ctgError("fail to the %dth tsma in pTsmas, total:%d", i, (int32_t)pCtgCache->pTsmas->size);
continue;
}
cacheSize += ctgGetTbTSMACacheSize(pCache);
(void)ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare);
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);
}
taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo);
pCtgCache->pTsmas = NULL;
ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName);
(void)taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN);
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
} else {
CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock);
pCache = taosArrayGetP(pCtgCache->pTsmas, 0);
if (NULL == pCache) {
ctgError("fail to the 0th tsma in pTsmas, total:%d", (int32_t)pCtgCache->pTsmas->size);
code = TSDB_CODE_CTG_INTERNAL_ERROR;
} else {
if (msg->tbId != 0 && pCache->suid != msg->tbId) {
// table id mismatch, skip drops
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
goto _return;
}
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
pCache = taosArrayGetP(pCtgCache->pTsmas, i);
if (NULL == pCache) {
ctgError("fail to the %dth tsma in pTsmas, total:%d", i, (int32_t)pCtgCache->pTsmas->size);
code = TSDB_CODE_CTG_INTERNAL_ERROR;
continue;
}
if (pCache->tsmaId != msg->tsmaId) {
continue;
}
cacheSize = ctgGetTbTSMACacheSize(pCache);
(void)ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare);
taosArrayRemove(pCtgCache->pTsmas, i);
tFreeAndClearTableTSMAInfo(pCache);
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);
break;
}
}
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
}
(void)atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize);
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateTbTSMAMsg *msg = operation->data;
SCatalog * pCtg = msg->pCtg;
STableTSMAInfo * pTsmaInfo = msg->pTsma;
SCtgDBCache * dbCache = NULL;
if (pCtg->stopUpdate) {
goto _return;
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pTsmaInfo->dbFName, pTsmaInfo->dbId, &dbCache));
CTG_ERR_JRET(ctgWriteTbTSMAToCache(pCtg, dbCache, pTsmaInfo->dbFName, pTsmaInfo->tb, &pTsmaInfo));
if (dbCache && msg->dbTsmaVersion > 0) {
dbCache->tsmaVersion = msg->dbTsmaVersion;
SDbCacheInfo cacheInfo = {0};
cacheInfo.dbId = dbCache->dbId;
if (dbCache->cfgCache.cfgInfo) {
cacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion;
tstrncpy(cacheInfo.dbFName, dbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN);
}
if (dbCache->vgCache.vgInfo) {
cacheInfo.vgVersion = dbCache->vgCache.vgInfo->vgVersion;
cacheInfo.numOfTable = dbCache->vgCache.vgInfo->numOfTable;
cacheInfo.stateTs = dbCache->vgCache.vgInfo->stateTs;
}
cacheInfo.tsmaVersion = dbCache->tsmaVersion;
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo),
ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
}
_return:
if (pTsmaInfo) {
tFreeTableTSMAInfo(pTsmaInfo);
taosMemoryFreeClear(pTsmaInfo);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}
void ctgFreeCacheOperationData(SCtgCacheOperation *op) {
if (NULL == op || NULL == op->data) {
return;
@ -2825,7 +3225,7 @@ int32_t ctgStartUpdateThread() {
CTG_ERR_RET(terrno);
}
taosThreadAttrDestroy(&thAttr);
(void)taosThreadAttrDestroy(&thAttr);
_return:
@ -3538,7 +3938,7 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam
bool found = false;
STSMACache * pTsmaOut = NULL;
tNameGetFullDbName(pTsmaName, dbFName);
(void)tNameGetFullDbName(pTsmaName, dbFName);
CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &pDbCache));
if (!pDbCache) {
@ -3550,10 +3950,16 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam
while (pIter && !found) {
SCtgTSMACache* pCtgCache = pIter;
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
int32_t size = pCtgCache ? (pCtgCache->pTsmas ? pCtgCache->pTsmas->size : 0) : 0;
for (int32_t i = 0; i < size; ++i) {
STSMACache* pCache = taosArrayGetP(pCtgCache->pTsmas, i);
if (NULL == pCache) {
ctgError("fail to the %dth tsma in pTsmas, total:%d", i, size);
code = TSDB_CODE_CTG_INTERNAL_ERROR;
break;
}
if (memcmp(pCache->name, pTsmaName->tname, TSDB_TABLE_NAME_LEN) == 0) {
found = true;
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1);
@ -3562,323 +3968,44 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam
}
}
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
if (TSDB_CODE_SUCCESS != code) {
break;
}
pIter = taosHashIterate(pDbCache->tsmaCache, pIter);
}
taosHashCancelIterate(pDbCache->tsmaCache, pIter);
if (found && code == TSDB_CODE_SUCCESS) {
res.pRes = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp));
if (!res.pRes) {
tFreeAndClearTableTSMAInfo(pTsmaOut);
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
STableTSMAInfoRsp* pRsp = res.pRes;
pRsp->pTsmas = taosArrayInit(1, POINTER_BYTES);
if (!pRsp->pTsmas) {
tFreeAndClearTableTSMAInfo(pTsmaOut);
CTG_RET(TSDB_CODE_OUT_OF_MEMORY);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
taosArrayPush(pRsp->pTsmas, &pTsmaOut);
taosArrayPush(pCtx->pResList, &res);
if (NULL == taosArrayPush(pRsp->pTsmas, &pTsmaOut)) {
tFreeAndClearTableTSMAInfo(pTsmaOut);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
if (NULL == taosArrayPush(pCtx->pResList, &res)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
_return:
ctgReleaseDBCache(pCtg, pDbCache);
CTG_RET(code);
}
int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, int32_t tsmaVersion, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_UPDATE_TB_TSMA;
op->syncOp = syncOp;
SCtgUpdateTbTSMAMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbTSMAMsg));
if (NULL == msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbTSMAMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
msg->pTsma = *pTsma;
msg->dbTsmaVersion = tsmaVersion;
msg->dbId = (*pTsma)->dbId;
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
*pTsma = NULL;
return TSDB_CODE_SUCCESS;
_return:
CTG_RET(code);
}
int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation* op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
if (!op) CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
op->opId = CTG_OP_DROP_TB_TSMA;
op->syncOp = syncOp;
SCtgDropTbTSMAMsg* msg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg));
if (!msg) {
ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbTSMAMsg));
taosMemoryFree(op);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
msg->pCtg = pCtg;
msg->dbId = pTsma->dbId;
msg->tbId = pTsma->suid;
msg->tsmaId = pTsma->tsmaId;
tstrncpy(msg->dbFName, pTsma->dbFName, TSDB_DB_FNAME_LEN);
tstrncpy(msg->tbName, pTsma->tb, TSDB_TABLE_NAME_LEN);
tstrncpy(msg->tsmaName, pTsma->name, TSDB_TABLE_NAME_LEN);
op->data = msg;
CTG_ERR_JRET(ctgEnqueue(pCtg, op));
return TSDB_CODE_SUCCESS;
_return:
CTG_RET(code);
}
static SCtgCacheOperation* createDropAllTbTsmaCtgCacheOp(SCatalog* pCtg, const STSMACache* pCache, bool syncOp) {
SCtgCacheOperation* pOp = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
if (!pOp) return NULL;
SCtgDropTbTSMAMsg* pMsg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg));
if (!pMsg) {
taosMemoryFree(pOp);
return NULL;
}
pOp->opId = CTG_OP_DROP_TB_TSMA;
pOp->syncOp = syncOp;
pMsg->pCtg = pCtg;
pMsg->dbId = pCache->dbId;
pMsg->tbId = pCache->suid;
pMsg->tsmaId = pCache->tsmaId;
pMsg->dropAllForTb = true;
tstrncpy(pMsg->tsmaName, pCache->name, TSDB_TABLE_NAME_LEN);
tstrncpy(pMsg->dbFName, pCache->dbFName, TSDB_DB_FNAME_LEN);
tstrncpy(pMsg->tbName, pCache->tb, TSDB_TABLE_NAME_LEN);
pOp->data = pMsg;
return pOp;
}
int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) {
ctgDebug("drop tsma meta for tb: %s.%s", pName->dbname, pName->tname);
int32_t code = 0;
SCtgDBCache *pDbCache = NULL;
SCtgCacheOperation *pOp = NULL;
char dbFName[TSDB_DB_FNAME_LEN];
SCtgTSMACache *pCtgCache = NULL;
tNameGetFullDbName(pName, dbFName);
CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache));
if (NULL == pDbCache || !pDbCache->tsmaCache) {
goto _return;
}
pCtgCache = taosHashAcquire(pDbCache->tsmaCache, pName->tname, strlen(pName->tname));
if (!pCtgCache) goto _return;
CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock);
if (!pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) {
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
goto _return;
}
STSMACache *pCache = taosArrayGetP(pCtgCache->pTsmas, 0);
pOp = createDropAllTbTsmaCtgCacheOp(pCtg, pCache, syncOp);
if (!pOp) {
code = TSDB_CODE_OUT_OF_MEMORY;
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
goto _return;
}
CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock);
CTG_ERR_JRET(ctgEnqueue(pCtg, pOp));
taosHashRelease(pDbCache->tsmaCache, pCtgCache);
return TSDB_CODE_SUCCESS;
_return:
if (pCtgCache) taosHashRelease(pDbCache->tsmaCache, pCtgCache);
if (pOp) {
taosMemoryFree(pOp->data);
taosMemoryFree(pOp);
}
CTG_RET(code);
}
int32_t ctgWriteTbTSMAToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName,
STSMACache **ppTsmaCache) {
if (NULL == dbCache->tsmaCache) {
ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId);
CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED);
}
STSMACache *pTsmaCache = *ppTsmaCache;
int32_t code = TSDB_CODE_SUCCESS;
SCtgTSMACache* pCache = taosHashGet(dbCache->tsmaCache, tbName, strlen(tbName));
if (!pCache) {
SCtgTSMACache cache = {0};
cache.pTsmas = taosArrayInit(4, sizeof(POINTER_BYTES));
if (!cache.pTsmas) CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
taosArrayPush(cache.pTsmas, &pTsmaCache);
if (taosHashPut(dbCache->tsmaCache, tbName, strlen(tbName), &cache, sizeof(cache))) {
ctgError("taosHashPut new tsmacache for tb: %s.%s failed", dbFName, tbName);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(STSMACache) + ctgGetTbTSMACacheSize(pTsmaCache));
CTG_DB_NUM_INC(CTG_CI_TBL_TSMA);
ctgDebug("tb %s tsma updated to cache, name: %s", tbName, pTsmaCache->name);
CTG_ERR_JRET(ctgUpdateRentTSMAVersion(pCtg, dbFName, pTsmaCache));
*ppTsmaCache = NULL;
goto _return;
}
CTG_LOCK(CTG_WRITE, &pCache->tsmaLock);
if (pCache->pTsmas) {
uint64_t cacheSize = 0;
for (int32_t i = 0; i < pCache->pTsmas->size; ++i) {
STableTSMAInfo* pInfo = taosArrayGetP(pCache->pTsmas, i);
if (pInfo->tsmaId == pTsmaCache->tsmaId) {
ctgDebug("tsma: %s removed from cache, history from %d to %d, reqTs from %" PRId64 " to %" PRId64
"rspTs from %" PRId64 " to %" PRId64 " delay from %" PRId64 " to %" PRId64,
pInfo->name, pInfo->fillHistoryFinished, pTsmaCache->fillHistoryFinished, pInfo->reqTs,
pTsmaCache->reqTs, pInfo->rspTs, pTsmaCache->rspTs, pInfo->delayDuration, pTsmaCache->delayDuration);
cacheSize = ctgGetTbTSMACacheSize(pInfo);
taosArrayRemove(pCache->pTsmas, i);
atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize);
tFreeTableTSMAInfo(pInfo);
taosMemoryFreeClear(pInfo);
break;
}
}
} else {
pCache->pTsmas = taosArrayInit(4, sizeof(POINTER_BYTES));
if (!pCache->pTsmas) {
CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
}
// push the new cache
taosArrayPush(pCache->pTsmas, &pTsmaCache);
*ppTsmaCache = NULL;
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbTSMACacheSize(pTsmaCache));
CTG_ERR_RET(ctgUpdateRentTSMAVersion(pCtg, dbFName, pTsmaCache));
CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock);
ctgDebug("table %s tsma updated to cache, tsma: %s", tbName, pTsmaCache->name);
_return:
CTG_RET(code);
}
int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgDropTbTSMAMsg * msg = operation->data;
SCatalog *pCtg = msg->pCtg;
SCtgDBCache *dbCache = NULL;
if (pCtg->stopUpdate) {
goto _return;
}
CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache));
if (NULL == dbCache || !dbCache->tsmaCache || (msg->dbId != dbCache->dbId && msg->dbId != 0)) {
goto _return;
}
SCtgTSMACache* pCtgCache = taosHashGet(dbCache->tsmaCache, msg->tbName, strlen(msg->tbName));
if (!pCtgCache || !pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) {
goto _return;
}
uint64_t cacheSize = 0;
STSMACache *pCache = NULL;
if (msg->dropAllForTb) {
CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock);
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
pCache = taosArrayGetP(pCtgCache->pTsmas, i);
cacheSize += ctgGetTbTSMACacheSize(pCache);
ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare);
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);
}
taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo);
pCtgCache->pTsmas = NULL;
ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName);
taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN);
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
} else {
CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock);
pCache = taosArrayGetP(pCtgCache->pTsmas, 0);
if (msg->tbId != 0 && pCache->suid != msg->tbId) {
// table id mismatch, skip drops
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
goto _return;
}
for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) {
pCache = taosArrayGetP(pCtgCache->pTsmas, i);
if (pCache->tsmaId != msg->tsmaId) {
continue;
}
cacheSize = ctgGetTbTSMACacheSize(pCache);
ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare);
taosArrayRemove(pCtgCache->pTsmas, i);
tFreeAndClearTableTSMAInfo(pCache);
CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA);
break;
}
CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock);
}
atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize);
_return:
taosMemoryFreeClear(msg);
CTG_RET(code);
}
int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) {
int32_t code = 0;
SCtgUpdateTbTSMAMsg *msg = operation->data;
SCatalog * pCtg = msg->pCtg;
STableTSMAInfo * pTsmaInfo = msg->pTsma;
SCtgDBCache * dbCache = NULL;
if (pCtg->stopUpdate) {
goto _return;
}
CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pTsmaInfo->dbFName, pTsmaInfo->dbId, &dbCache));
CTG_ERR_JRET(ctgWriteTbTSMAToCache(pCtg, dbCache, pTsmaInfo->dbFName, pTsmaInfo->tb, &pTsmaInfo));
if (dbCache && msg->dbTsmaVersion > 0) {
dbCache->tsmaVersion = msg->dbTsmaVersion;
SDbCacheInfo cacheInfo = {0};
cacheInfo.dbId = dbCache->dbId;
if (dbCache->cfgCache.cfgInfo) {
cacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion;
tstrncpy(cacheInfo.dbFName, dbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN);
}
if (dbCache->vgCache.vgInfo) {
cacheInfo.vgVersion = dbCache->vgCache.vgInfo->vgVersion;
cacheInfo.numOfTable = dbCache->vgCache.vgInfo->numOfTable;
cacheInfo.stateTs = dbCache->vgCache.vgInfo->stateTs;
}
cacheInfo.tsmaVersion = dbCache->tsmaVersion;
CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo),
ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare));
}
_return:
if (pTsmaInfo) {
tFreeTableTSMAInfo(pTsmaInfo);
taosMemoryFreeClear(pTsmaInfo);
}
taosMemoryFreeClear(msg);
CTG_RET(code);
}