diff --git a/include/os/osDef.h b/include/os/osDef.h index 335b151cac..1052722492 100644 --- a/include/os/osDef.h +++ b/include/os/osDef.h @@ -220,7 +220,7 @@ void syslog(int unused, const char *format, ...); // Linux, length of name must <= 16 (the last '\0' included) #define setThreadName(name) \ do { \ - prctl(PR_SET_NAME, (name)); \ + (void)prctl(PR_SET_NAME, (name)); \ } while (0) #define getThreadName(name) \ do { \ diff --git a/source/libs/catalog/src/ctgCache.c b/source/libs/catalog/src/ctgCache.c index da7411921a..565dcd1739 100644 --- a/source/libs/catalog/src/ctgCache.c +++ b/source/libs/catalog/src/ctgCache.c @@ -1469,6 +1469,165 @@ _return: CTG_RET(code); } +int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, int32_t tsmaVersion, bool syncOp) { + int32_t code = 0; + SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + if (NULL == op) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgCacheOperation)); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + op->opId = CTG_OP_UPDATE_TB_TSMA; + op->syncOp = syncOp; + + SCtgUpdateTbTSMAMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbTSMAMsg)); + if (NULL == msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbTSMAMsg)); + taosMemoryFree(op); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + msg->pCtg = pCtg; + msg->pTsma = *pTsma; + msg->dbTsmaVersion = tsmaVersion; + msg->dbId = (*pTsma)->dbId; + + op->data = msg; + + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); + + *pTsma = NULL; + return TSDB_CODE_SUCCESS; + +_return: + + CTG_RET(code); +} + +int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp) { + int32_t code = 0; + SCtgCacheOperation* op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + if (NULL == op) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgCacheOperation)); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + op->opId = CTG_OP_DROP_TB_TSMA; + op->syncOp = syncOp; + + SCtgDropTbTSMAMsg* msg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg)); + if (!msg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbTSMAMsg)); + taosMemoryFree(op); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + msg->pCtg = pCtg; + msg->dbId = pTsma->dbId; + msg->tbId = pTsma->suid; + msg->tsmaId = pTsma->tsmaId; + tstrncpy(msg->dbFName, pTsma->dbFName, TSDB_DB_FNAME_LEN); + tstrncpy(msg->tbName, pTsma->tb, TSDB_TABLE_NAME_LEN); + tstrncpy(msg->tsmaName, pTsma->name, TSDB_TABLE_NAME_LEN); + + op->data = msg; + CTG_ERR_JRET(ctgEnqueue(pCtg, op)); + + return TSDB_CODE_SUCCESS; + +_return: + + CTG_RET(code); +} + + +static int32_t createDropAllTbTsmaCtgCacheOp(SCatalog* pCtg, const STSMACache* pCache, bool syncOp, SCtgCacheOperation** ppOp) { + SCtgCacheOperation* pOp = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); + if (NULL == pOp) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgCacheOperation)); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + SCtgDropTbTSMAMsg* pMsg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg)); + if (NULL == pMsg) { + ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbTSMAMsg)); + taosMemoryFree(pOp); + CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); + } + + pOp->opId = CTG_OP_DROP_TB_TSMA; + pOp->syncOp = syncOp; + pMsg->pCtg = pCtg; + pMsg->dbId = pCache->dbId; + pMsg->tbId = pCache->suid; + pMsg->tsmaId = pCache->tsmaId; + pMsg->dropAllForTb = true; + tstrncpy(pMsg->tsmaName, pCache->name, TSDB_TABLE_NAME_LEN); + tstrncpy(pMsg->dbFName, pCache->dbFName, TSDB_DB_FNAME_LEN); + tstrncpy(pMsg->tbName, pCache->tb, TSDB_TABLE_NAME_LEN); + pOp->data = pMsg; + + *ppOp = pOp; + + return TSDB_CODE_SUCCESS; +} + +int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { + ctgDebug("drop tsma meta for tb: %s.%s", pName->dbname, pName->tname); + + int32_t code = 0; + SCtgDBCache *pDbCache = NULL; + SCtgCacheOperation *pOp = NULL; + char dbFName[TSDB_DB_FNAME_LEN]; + SCtgTSMACache *pCtgCache = NULL; + (void)tNameGetFullDbName(pName, dbFName); + + CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache)); + if (NULL == pDbCache || !pDbCache->tsmaCache) { + goto _return; + } + + pCtgCache = taosHashAcquire(pDbCache->tsmaCache, pName->tname, strlen(pName->tname)); + if (!pCtgCache) { + goto _return; + } + + CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock); + if (!pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) { + CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock); + goto _return; + } + + STSMACache *pCache = taosArrayGetP(pCtgCache->pTsmas, 0); + if (NULL == pCache) { + ctgError("fail to get the 0th STSMACache, total:%d", (int32_t)pCtgCache->pTsmas->size); + code = TSDB_CODE_CTG_INTERNAL_ERROR; + } + if (TSDB_CODE_SUCCESS == code) { + code = createDropAllTbTsmaCtgCacheOp(pCtg, pCache, syncOp, &pOp); + } + CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock); + + CTG_ERR_JRET(code); + + CTG_ERR_JRET(ctgEnqueue(pCtg, pOp)); + taosHashRelease(pDbCache->tsmaCache, pCtgCache); + + return TSDB_CODE_SUCCESS; + +_return: + + if (pCtgCache) { + taosHashRelease(pDbCache->tsmaCache, pCtgCache); + } + if (pOp) { + taosMemoryFree(pOp->data); + taosMemoryFree(pOp); + } + + CTG_RET(code); +} + int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) { int32_t code = 0; @@ -1888,6 +2047,103 @@ int32_t ctgVgInfoIdComp(void const *lp, void const *rp) { return 0; } + + + +int32_t ctgWriteTbTSMAToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, + STSMACache **ppTsmaCache) { + if (NULL == dbCache->tsmaCache) { + ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId); + CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); + } + + STSMACache *pTsmaCache = *ppTsmaCache; + int32_t code = TSDB_CODE_SUCCESS; + + SCtgTSMACache* pCache = taosHashGet(dbCache->tsmaCache, tbName, strlen(tbName)); + if (!pCache) { + SCtgTSMACache cache = {0}; + cache.pTsmas = taosArrayInit(4, sizeof(POINTER_BYTES)); + if (NULL == cache.pTsmas) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + if (NULL == taosArrayPush(cache.pTsmas, &pTsmaCache)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + if (taosHashPut(dbCache->tsmaCache, tbName, strlen(tbName), &cache, sizeof(cache))) { + ctgError("taosHashPut new tsmacache for tb: %s.%s failed", dbFName, tbName); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + (void)atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(STSMACache) + ctgGetTbTSMACacheSize(pTsmaCache)); + + CTG_DB_NUM_INC(CTG_CI_TBL_TSMA); + ctgDebug("tb %s tsma updated to cache, name: %s", tbName, pTsmaCache->name); + + CTG_ERR_JRET(ctgUpdateRentTSMAVersion(pCtg, dbFName, pTsmaCache)); + *ppTsmaCache = NULL; + + goto _return; + } + + CTG_LOCK(CTG_WRITE, &pCache->tsmaLock); + + if (pCache->pTsmas) { + uint64_t cacheSize = 0; + for (int32_t i = 0; i < pCache->pTsmas->size; ++i) { + STableTSMAInfo* pInfo = taosArrayGetP(pCache->pTsmas, i); + if (NULL == pInfo) { + ctgError("fail to get the %dth STableTSMAInfo, total:%d", i, (int32_t)pCache->pTsmas->size); + CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + } + + if (pInfo->tsmaId == pTsmaCache->tsmaId) { + ctgDebug("tsma: %s removed from cache, history from %d to %d, reqTs from %" PRId64 " to %" PRId64 + "rspTs from %" PRId64 " to %" PRId64 " delay from %" PRId64 " to %" PRId64, + pInfo->name, pInfo->fillHistoryFinished, pTsmaCache->fillHistoryFinished, pInfo->reqTs, + pTsmaCache->reqTs, pInfo->rspTs, pTsmaCache->rspTs, pInfo->delayDuration, pTsmaCache->delayDuration); + + cacheSize = ctgGetTbTSMACacheSize(pInfo); + taosArrayRemove(pCache->pTsmas, i); + (void)atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); + + tFreeTableTSMAInfo(pInfo); + taosMemoryFreeClear(pInfo); + + break; + } + } + } else { + pCache->pTsmas = taosArrayInit(4, sizeof(POINTER_BYTES)); + if (!pCache->pTsmas) { + CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + } + + // push the new cache + if (NULL == taosArrayPush(pCache->pTsmas, &pTsmaCache)) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + *ppTsmaCache = NULL; + + (void)atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbTSMACacheSize(pTsmaCache)); + + CTG_ERR_RET(ctgUpdateRentTSMAVersion(pCtg, dbFName, pTsmaCache)); + + ctgDebug("table %s tsma updated to cache, tsma: %s", tbName, pTsmaCache->name); + } + + CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock); + +_return: + + CTG_RET(code); +} + + int32_t ctgOpUpdateVgroup(SCtgCacheOperation *operation) { int32_t code = 0; SCtgUpdateVgMsg *msg = operation->data; @@ -2563,7 +2819,7 @@ int32_t ctgOpDropViewMeta(SCtgCacheOperation *operation) { ctgError("view %s not exist in cache, dbFName:%s", msg->viewName, msg->dbFName); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } else { - atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(SCtgViewCache) + strlen(msg->viewName)); + (void)atomic_sub_fetch_64(&dbCache->dbCacheSize, sizeof(SCtgViewCache) + strlen(msg->viewName)); CTG_DB_NUM_DEC(CTG_CI_VIEW); } @@ -2658,6 +2914,150 @@ _return: CTG_RET(code); } + +int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgDropTbTSMAMsg * msg = operation->data; + SCatalog *pCtg = msg->pCtg; + SCtgDBCache *dbCache = NULL; + + if (pCtg->stopUpdate) { + goto _return; + } + + CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache)); + if (NULL == dbCache || !dbCache->tsmaCache || (msg->dbId != dbCache->dbId && msg->dbId != 0)) { + goto _return; + } + + SCtgTSMACache* pCtgCache = taosHashGet(dbCache->tsmaCache, msg->tbName, strlen(msg->tbName)); + if (!pCtgCache || !pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) { + goto _return; + } + + uint64_t cacheSize = 0; + STSMACache *pCache = NULL; + if (msg->dropAllForTb) { + CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock); + + for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) { + pCache = taosArrayGetP(pCtgCache->pTsmas, i); + if (NULL == pCache) { + ctgError("fail to the %dth tsma in pTsmas, total:%d", i, (int32_t)pCtgCache->pTsmas->size); + continue; + } + + cacheSize += ctgGetTbTSMACacheSize(pCache); + (void)ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare); + + CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA); + } + + taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo); + pCtgCache->pTsmas = NULL; + + ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName); + (void)taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN); + + CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock); + } else { + CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock); + + pCache = taosArrayGetP(pCtgCache->pTsmas, 0); + if (NULL == pCache) { + ctgError("fail to the 0th tsma in pTsmas, total:%d", (int32_t)pCtgCache->pTsmas->size); + code = TSDB_CODE_CTG_INTERNAL_ERROR; + } else { + if (msg->tbId != 0 && pCache->suid != msg->tbId) { + // table id mismatch, skip drops + CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock); + goto _return; + } + + for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) { + pCache = taosArrayGetP(pCtgCache->pTsmas, i); + if (NULL == pCache) { + ctgError("fail to the %dth tsma in pTsmas, total:%d", i, (int32_t)pCtgCache->pTsmas->size); + code = TSDB_CODE_CTG_INTERNAL_ERROR; + continue; + } + + if (pCache->tsmaId != msg->tsmaId) { + continue; + } + + cacheSize = ctgGetTbTSMACacheSize(pCache); + (void)ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare); + + taosArrayRemove(pCtgCache->pTsmas, i); + tFreeAndClearTableTSMAInfo(pCache); + + CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA); + + break; + } + } + + CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock); + } + + (void)atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); + +_return: + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + +int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) { + int32_t code = 0; + SCtgUpdateTbTSMAMsg *msg = operation->data; + SCatalog * pCtg = msg->pCtg; + STableTSMAInfo * pTsmaInfo = msg->pTsma; + SCtgDBCache * dbCache = NULL; + + if (pCtg->stopUpdate) { + goto _return; + } + + CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pTsmaInfo->dbFName, pTsmaInfo->dbId, &dbCache)); + CTG_ERR_JRET(ctgWriteTbTSMAToCache(pCtg, dbCache, pTsmaInfo->dbFName, pTsmaInfo->tb, &pTsmaInfo)); + + if (dbCache && msg->dbTsmaVersion > 0) { + dbCache->tsmaVersion = msg->dbTsmaVersion; + SDbCacheInfo cacheInfo = {0}; + cacheInfo.dbId = dbCache->dbId; + + if (dbCache->cfgCache.cfgInfo) { + cacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion; + tstrncpy(cacheInfo.dbFName, dbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN); + } + + if (dbCache->vgCache.vgInfo) { + cacheInfo.vgVersion = dbCache->vgCache.vgInfo->vgVersion; + cacheInfo.numOfTable = dbCache->vgCache.vgInfo->numOfTable; + cacheInfo.stateTs = dbCache->vgCache.vgInfo->stateTs; + } + + cacheInfo.tsmaVersion = dbCache->tsmaVersion; + CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo), + ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare)); + } + +_return: + + if (pTsmaInfo) { + tFreeTableTSMAInfo(pTsmaInfo); + taosMemoryFreeClear(pTsmaInfo); + } + + taosMemoryFreeClear(msg); + + CTG_RET(code); +} + + void ctgFreeCacheOperationData(SCtgCacheOperation *op) { if (NULL == op || NULL == op->data) { return; @@ -2825,7 +3225,7 @@ int32_t ctgStartUpdateThread() { CTG_ERR_RET(terrno); } - taosThreadAttrDestroy(&thAttr); + (void)taosThreadAttrDestroy(&thAttr); _return: @@ -3538,7 +3938,7 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam bool found = false; STSMACache * pTsmaOut = NULL; - tNameGetFullDbName(pTsmaName, dbFName); + (void)tNameGetFullDbName(pTsmaName, dbFName); CTG_ERR_RET(ctgAcquireDBCache(pCtg, dbFName, &pDbCache)); if (!pDbCache) { @@ -3550,10 +3950,16 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam while (pIter && !found) { SCtgTSMACache* pCtgCache = pIter; + CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock); int32_t size = pCtgCache ? (pCtgCache->pTsmas ? pCtgCache->pTsmas->size : 0) : 0; for (int32_t i = 0; i < size; ++i) { STSMACache* pCache = taosArrayGetP(pCtgCache->pTsmas, i); + if (NULL == pCache) { + ctgError("fail to the %dth tsma in pTsmas, total:%d", i, size); + code = TSDB_CODE_CTG_INTERNAL_ERROR; + break; + } if (memcmp(pCache->name, pTsmaName->tname, TSDB_TABLE_NAME_LEN) == 0) { found = true; CTG_CACHE_NHIT_INC(CTG_CI_TBL_TSMA, 1); @@ -3562,323 +3968,44 @@ int32_t ctgGetTSMAFromCache(SCatalog* pCtg, SCtgTbTSMACtx* pCtx, SName* pTsmaNam } } CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock); + + if (TSDB_CODE_SUCCESS != code) { + break; + } + pIter = taosHashIterate(pDbCache->tsmaCache, pIter); } + taosHashCancelIterate(pDbCache->tsmaCache, pIter); + if (found && code == TSDB_CODE_SUCCESS) { res.pRes = taosMemoryCalloc(1, sizeof(STableTSMAInfoRsp)); if (!res.pRes) { tFreeAndClearTableTSMAInfo(pTsmaOut); - CTG_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } + STableTSMAInfoRsp* pRsp = res.pRes; pRsp->pTsmas = taosArrayInit(1, POINTER_BYTES); if (!pRsp->pTsmas) { tFreeAndClearTableTSMAInfo(pTsmaOut); - CTG_RET(TSDB_CODE_OUT_OF_MEMORY); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); } - taosArrayPush(pRsp->pTsmas, &pTsmaOut); - taosArrayPush(pCtx->pResList, &res); + if (NULL == taosArrayPush(pRsp->pTsmas, &pTsmaOut)) { + tFreeAndClearTableTSMAInfo(pTsmaOut); + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } + + if (NULL == taosArrayPush(pCtx->pResList, &res)) { + CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); + } } +_return: + ctgReleaseDBCache(pCtg, pDbCache); + CTG_RET(code); } -int32_t ctgUpdateTbTSMAEnqueue(SCatalog *pCtg, STSMACache **pTsma, int32_t tsmaVersion, bool syncOp) { - int32_t code = 0; - SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); - op->opId = CTG_OP_UPDATE_TB_TSMA; - op->syncOp = syncOp; - - SCtgUpdateTbTSMAMsg *msg = taosMemoryMalloc(sizeof(SCtgUpdateTbTSMAMsg)); - if (NULL == msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgUpdateTbTSMAMsg)); - taosMemoryFree(op); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - msg->pCtg = pCtg; - msg->pTsma = *pTsma; - msg->dbTsmaVersion = tsmaVersion; - msg->dbId = (*pTsma)->dbId; - - op->data = msg; - - CTG_ERR_JRET(ctgEnqueue(pCtg, op)); - - *pTsma = NULL; - return TSDB_CODE_SUCCESS; - -_return: - CTG_RET(code); -} - -int32_t ctgDropTbTSMAEnqueue(SCatalog* pCtg, const STSMACache* pTsma, bool syncOp) { - int32_t code = 0; - SCtgCacheOperation* op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); - if (!op) CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - - op->opId = CTG_OP_DROP_TB_TSMA; - op->syncOp = syncOp; - - SCtgDropTbTSMAMsg* msg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg)); - if (!msg) { - ctgError("malloc %d failed", (int32_t)sizeof(SCtgDropTbTSMAMsg)); - taosMemoryFree(op); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - - msg->pCtg = pCtg; - msg->dbId = pTsma->dbId; - msg->tbId = pTsma->suid; - msg->tsmaId = pTsma->tsmaId; - tstrncpy(msg->dbFName, pTsma->dbFName, TSDB_DB_FNAME_LEN); - tstrncpy(msg->tbName, pTsma->tb, TSDB_TABLE_NAME_LEN); - tstrncpy(msg->tsmaName, pTsma->name, TSDB_TABLE_NAME_LEN); - - op->data = msg; - CTG_ERR_JRET(ctgEnqueue(pCtg, op)); - return TSDB_CODE_SUCCESS; -_return: - CTG_RET(code); -} - -static SCtgCacheOperation* createDropAllTbTsmaCtgCacheOp(SCatalog* pCtg, const STSMACache* pCache, bool syncOp) { - SCtgCacheOperation* pOp = taosMemoryCalloc(1, sizeof(SCtgCacheOperation)); - if (!pOp) return NULL; - - SCtgDropTbTSMAMsg* pMsg = taosMemoryCalloc(1, sizeof(SCtgDropTbTSMAMsg)); - if (!pMsg) { - taosMemoryFree(pOp); - return NULL; - } - pOp->opId = CTG_OP_DROP_TB_TSMA; - pOp->syncOp = syncOp; - pMsg->pCtg = pCtg; - pMsg->dbId = pCache->dbId; - pMsg->tbId = pCache->suid; - pMsg->tsmaId = pCache->tsmaId; - pMsg->dropAllForTb = true; - tstrncpy(pMsg->tsmaName, pCache->name, TSDB_TABLE_NAME_LEN); - tstrncpy(pMsg->dbFName, pCache->dbFName, TSDB_DB_FNAME_LEN); - tstrncpy(pMsg->tbName, pCache->tb, TSDB_TABLE_NAME_LEN); - pOp->data = pMsg; - return pOp; -} - -int32_t ctgDropTSMAForTbEnqueue(SCatalog *pCtg, SName *pName, bool syncOp) { - ctgDebug("drop tsma meta for tb: %s.%s", pName->dbname, pName->tname); - int32_t code = 0; - SCtgDBCache *pDbCache = NULL; - SCtgCacheOperation *pOp = NULL; - char dbFName[TSDB_DB_FNAME_LEN]; - SCtgTSMACache *pCtgCache = NULL; - tNameGetFullDbName(pName, dbFName); - CTG_ERR_JRET(ctgGetDBCache(pCtg, dbFName, &pDbCache)); - if (NULL == pDbCache || !pDbCache->tsmaCache) { - goto _return; - } - - pCtgCache = taosHashAcquire(pDbCache->tsmaCache, pName->tname, strlen(pName->tname)); - if (!pCtgCache) goto _return; - - CTG_LOCK(CTG_READ, &pCtgCache->tsmaLock); - if (!pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) { - CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock); - goto _return; - } - STSMACache *pCache = taosArrayGetP(pCtgCache->pTsmas, 0); - pOp = createDropAllTbTsmaCtgCacheOp(pCtg, pCache, syncOp); - if (!pOp) { - code = TSDB_CODE_OUT_OF_MEMORY; - CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock); - goto _return; - } - CTG_UNLOCK(CTG_READ, &pCtgCache->tsmaLock); - CTG_ERR_JRET(ctgEnqueue(pCtg, pOp)); - taosHashRelease(pDbCache->tsmaCache, pCtgCache); - return TSDB_CODE_SUCCESS; - -_return: - if (pCtgCache) taosHashRelease(pDbCache->tsmaCache, pCtgCache); - if (pOp) { - taosMemoryFree(pOp->data); - taosMemoryFree(pOp); - } - CTG_RET(code); -} - -int32_t ctgWriteTbTSMAToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFName, char *tbName, - STSMACache **ppTsmaCache) { - if (NULL == dbCache->tsmaCache) { - ctgError("db is dropping, dbId:0x%" PRIx64, dbCache->dbId); - CTG_ERR_RET(TSDB_CODE_CTG_DB_DROPPED); - } - - STSMACache *pTsmaCache = *ppTsmaCache; - int32_t code = TSDB_CODE_SUCCESS; - - SCtgTSMACache* pCache = taosHashGet(dbCache->tsmaCache, tbName, strlen(tbName)); - if (!pCache) { - SCtgTSMACache cache = {0}; - cache.pTsmas = taosArrayInit(4, sizeof(POINTER_BYTES)); - if (!cache.pTsmas) CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - taosArrayPush(cache.pTsmas, &pTsmaCache); - if (taosHashPut(dbCache->tsmaCache, tbName, strlen(tbName), &cache, sizeof(cache))) { - ctgError("taosHashPut new tsmacache for tb: %s.%s failed", dbFName, tbName); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - atomic_add_fetch_64(&dbCache->dbCacheSize, strlen(tbName) + sizeof(STSMACache) + ctgGetTbTSMACacheSize(pTsmaCache)); - CTG_DB_NUM_INC(CTG_CI_TBL_TSMA); - ctgDebug("tb %s tsma updated to cache, name: %s", tbName, pTsmaCache->name); - CTG_ERR_JRET(ctgUpdateRentTSMAVersion(pCtg, dbFName, pTsmaCache)); - *ppTsmaCache = NULL; - goto _return; - } - - CTG_LOCK(CTG_WRITE, &pCache->tsmaLock); - - if (pCache->pTsmas) { - uint64_t cacheSize = 0; - for (int32_t i = 0; i < pCache->pTsmas->size; ++i) { - STableTSMAInfo* pInfo = taosArrayGetP(pCache->pTsmas, i); - if (pInfo->tsmaId == pTsmaCache->tsmaId) { - ctgDebug("tsma: %s removed from cache, history from %d to %d, reqTs from %" PRId64 " to %" PRId64 - "rspTs from %" PRId64 " to %" PRId64 " delay from %" PRId64 " to %" PRId64, - pInfo->name, pInfo->fillHistoryFinished, pTsmaCache->fillHistoryFinished, pInfo->reqTs, - pTsmaCache->reqTs, pInfo->rspTs, pTsmaCache->rspTs, pInfo->delayDuration, pTsmaCache->delayDuration); - cacheSize = ctgGetTbTSMACacheSize(pInfo); - taosArrayRemove(pCache->pTsmas, i); - atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); - tFreeTableTSMAInfo(pInfo); - taosMemoryFreeClear(pInfo); - break; - } - } - } else { - pCache->pTsmas = taosArrayInit(4, sizeof(POINTER_BYTES)); - if (!pCache->pTsmas) { - CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock); - CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); - } - } - // push the new cache - taosArrayPush(pCache->pTsmas, &pTsmaCache); - *ppTsmaCache = NULL; - - atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetTbTSMACacheSize(pTsmaCache)); - CTG_ERR_RET(ctgUpdateRentTSMAVersion(pCtg, dbFName, pTsmaCache)); - CTG_UNLOCK(CTG_WRITE, &pCache->tsmaLock); - ctgDebug("table %s tsma updated to cache, tsma: %s", tbName, pTsmaCache->name); -_return: - CTG_RET(code); -} - -int32_t ctgOpDropTbTSMA(SCtgCacheOperation *operation) { - int32_t code = 0; - SCtgDropTbTSMAMsg * msg = operation->data; - SCatalog *pCtg = msg->pCtg; - SCtgDBCache *dbCache = NULL; - - if (pCtg->stopUpdate) { - goto _return; - } - - CTG_ERR_JRET(ctgGetDBCache(pCtg, msg->dbFName, &dbCache)); - if (NULL == dbCache || !dbCache->tsmaCache || (msg->dbId != dbCache->dbId && msg->dbId != 0)) { - goto _return; - } - - SCtgTSMACache* pCtgCache = taosHashGet(dbCache->tsmaCache, msg->tbName, strlen(msg->tbName)); - if (!pCtgCache || !pCtgCache->pTsmas || pCtgCache->pTsmas->size == 0) { - goto _return; - } - - uint64_t cacheSize = 0; - STSMACache *pCache = NULL; - if (msg->dropAllForTb) { - CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock); - for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) { - pCache = taosArrayGetP(pCtgCache->pTsmas, i); - cacheSize += ctgGetTbTSMACacheSize(pCache); - ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare); - CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA); - } - taosArrayDestroyP(pCtgCache->pTsmas, tFreeAndClearTableTSMAInfo); - pCtgCache->pTsmas = NULL; - ctgDebug("all tsmas for table dropped: %s.%s", msg->dbFName, msg->tbName); - taosHashRemove(dbCache->tsmaCache, msg->tbName, TSDB_TABLE_NAME_LEN); - CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock); - } else { - CTG_LOCK(CTG_WRITE, &pCtgCache->tsmaLock); - pCache = taosArrayGetP(pCtgCache->pTsmas, 0); - if (msg->tbId != 0 && pCache->suid != msg->tbId) { - // table id mismatch, skip drops - CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock); - goto _return; - } - for (int32_t i = 0; i < pCtgCache->pTsmas->size; ++i) { - pCache = taosArrayGetP(pCtgCache->pTsmas, i); - if (pCache->tsmaId != msg->tsmaId) { - continue; - } - cacheSize = ctgGetTbTSMACacheSize(pCache); - ctgMetaRentRemove(&msg->pCtg->tsmaRent, pCache->tsmaId, ctgTSMAVersionSearchCompare, ctgTSMAVersionSearchCompare); - taosArrayRemove(pCtgCache->pTsmas, i); - tFreeAndClearTableTSMAInfo(pCache); - CTG_DB_NUM_DEC(CTG_CI_TBL_TSMA); - break; - } - CTG_UNLOCK(CTG_WRITE, &pCtgCache->tsmaLock); - } - atomic_sub_fetch_64(&dbCache->dbCacheSize, cacheSize); - -_return: - - taosMemoryFreeClear(msg); - CTG_RET(code); -} - -int32_t ctgOpUpdateTbTSMA(SCtgCacheOperation *operation) { - int32_t code = 0; - SCtgUpdateTbTSMAMsg *msg = operation->data; - SCatalog * pCtg = msg->pCtg; - STableTSMAInfo * pTsmaInfo = msg->pTsma; - SCtgDBCache * dbCache = NULL; - - if (pCtg->stopUpdate) { - goto _return; - } - - CTG_ERR_JRET(ctgGetAddDBCache(pCtg, pTsmaInfo->dbFName, pTsmaInfo->dbId, &dbCache)); - CTG_ERR_JRET(ctgWriteTbTSMAToCache(pCtg, dbCache, pTsmaInfo->dbFName, pTsmaInfo->tb, &pTsmaInfo)); - if (dbCache && msg->dbTsmaVersion > 0) { - dbCache->tsmaVersion = msg->dbTsmaVersion; - SDbCacheInfo cacheInfo = {0}; - cacheInfo.dbId = dbCache->dbId; - if (dbCache->cfgCache.cfgInfo) { - cacheInfo.cfgVersion = dbCache->cfgCache.cfgInfo->cfgVersion; - tstrncpy(cacheInfo.dbFName, dbCache->cfgCache.cfgInfo->db, TSDB_DB_FNAME_LEN); - } - if (dbCache->vgCache.vgInfo) { - cacheInfo.vgVersion = dbCache->vgCache.vgInfo->vgVersion; - cacheInfo.numOfTable = dbCache->vgCache.vgInfo->numOfTable; - cacheInfo.stateTs = dbCache->vgCache.vgInfo->stateTs; - } - cacheInfo.tsmaVersion = dbCache->tsmaVersion; - CTG_ERR_JRET(ctgMetaRentUpdate(&msg->pCtg->dbRent, &cacheInfo, cacheInfo.dbId, sizeof(SDbCacheInfo), - ctgDbCacheInfoSortCompare, ctgDbCacheInfoSearchCompare)); - } - -_return: - - if (pTsmaInfo) { - tFreeTableTSMAInfo(pTsmaInfo); - taosMemoryFreeClear(pTsmaInfo); - } - - taosMemoryFreeClear(msg); - CTG_RET(code); -}