commit
2d9f2b8915
|
@ -53,13 +53,15 @@ typedef struct SCatalogCfg {
|
|||
} SCatalogCfg;
|
||||
|
||||
typedef struct SSTableMetaVersion {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
uint64_t suid;
|
||||
int16_t sversion;
|
||||
int16_t tversion;
|
||||
} SSTableMetaVersion;
|
||||
|
||||
typedef struct SDbVgVersion {
|
||||
char dbName[TSDB_DB_FNAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
int64_t dbId;
|
||||
int32_t vgVersion;
|
||||
} SDbVgVersion;
|
||||
|
@ -101,6 +103,8 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
|
||||
int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t dbId);
|
||||
|
||||
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid);
|
||||
|
||||
/**
|
||||
* Get a table's meta data.
|
||||
* @param pCatalog (input, got with catalogGetHandle)
|
||||
|
|
|
@ -85,6 +85,75 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
int32_t msgLen = 0;
|
||||
int32_t code = 0;
|
||||
int32_t schemaNum = 0;
|
||||
|
||||
while (msgLen < valueLen) {
|
||||
STableMetaRsp *rsp = (STableMetaRsp *)((char *)value + msgLen);
|
||||
|
||||
rsp->numOfColumns = ntohl(rsp->numOfColumns);
|
||||
rsp->suid = be64toh(rsp->suid);
|
||||
|
||||
if (rsp->numOfColumns < 0) {
|
||||
schemaNum = 0;
|
||||
|
||||
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
||||
|
||||
code = catalogRemoveSTableMeta(pCatalog, rsp->dbFName, rsp->stbName, rsp->suid);
|
||||
} else {
|
||||
rsp->numOfTags = ntohl(rsp->numOfTags);
|
||||
|
||||
schemaNum = rsp->numOfColumns + rsp->numOfTags;
|
||||
/*
|
||||
rsp->vgNum = ntohl(rsp->vgNum);
|
||||
rsp->uid = be64toh(rsp->uid);
|
||||
|
||||
SDBVgroupInfo vgInfo = {0};
|
||||
vgInfo.dbId = rsp->uid;
|
||||
vgInfo.vgVersion = rsp->vgVersion;
|
||||
vgInfo.hashMethod = rsp->hashMethod;
|
||||
vgInfo.vgHash = taosHashInit(rsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||
if (NULL == vgInfo.vgHash) {
|
||||
tscError("hash init[%d] failed", rsp->vgNum);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < rsp->vgNum; ++i) {
|
||||
rsp->vgroupInfo[i].vgId = ntohl(rsp->vgroupInfo[i].vgId);
|
||||
rsp->vgroupInfo[i].hashBegin = ntohl(rsp->vgroupInfo[i].hashBegin);
|
||||
rsp->vgroupInfo[i].hashEnd = ntohl(rsp->vgroupInfo[i].hashEnd);
|
||||
|
||||
for (int32_t n = 0; n < rsp->vgroupInfo[i].epset.numOfEps; ++n) {
|
||||
rsp->vgroupInfo[i].epset.eps[n].port = ntohs(rsp->vgroupInfo[i].epset.eps[n].port);
|
||||
}
|
||||
|
||||
if (0 != taosHashPut(vgInfo.vgHash, &rsp->vgroupInfo[i].vgId, sizeof(rsp->vgroupInfo[i].vgId), &rsp->vgroupInfo[i], sizeof(rsp->vgroupInfo[i]))) {
|
||||
tscError("hash push failed, errno:%d", errno);
|
||||
taosHashCleanup(vgInfo.vgHash);
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
code = catalogUpdateDBVgroup(pCatalog, rsp->db, &vgInfo);
|
||||
if (code) {
|
||||
taosHashCleanup(vgInfo.vgHash);
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
if (code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
msgLen += sizeof(STableMetaRsp) + schemaNum * sizeof(SSchema);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRsp) {
|
||||
SHbConnInfo * info = taosHashGet(pAppHbMgr->connInfo, &pRsp->connKey, sizeof(SClientHbKey));
|
||||
if (NULL == info) {
|
||||
|
@ -117,9 +186,24 @@ static int32_t hbQueryHbRspHandle(struct SAppHbMgr *pAppHbMgr, SClientHbRsp* pRs
|
|||
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_STBINFO:
|
||||
case HEARTBEAT_KEY_STBINFO:{
|
||||
if (kv->valueLen <= 0 || NULL == kv->value) {
|
||||
tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
|
||||
break;
|
||||
}
|
||||
|
||||
int64_t *clusterId = (int64_t *)info->param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
||||
int32_t code = catalogGetHandle(*clusterId, &pCatalog);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", *clusterId, tstrerror(code));
|
||||
break;
|
||||
}
|
||||
|
||||
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
tscError("invalid hb key type:%d", kv->key);
|
||||
break;
|
||||
|
@ -152,7 +236,7 @@ static int32_t hbMqAsyncCallBack(void* param, const SDataBuf* pMsg, int32_t code
|
|||
tfree(param);
|
||||
|
||||
if (rspNum) {
|
||||
tscDebug("hb got %d rsp, %d empty rsp prior", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
|
||||
tscDebug("hb got %d rsp, %d empty rsp received before", rspNum, atomic_val_compare_exchange_32(&emptyRspNum, emptyRspNum, 0));
|
||||
} else {
|
||||
atomic_add_fetch_32(&emptyRspNum, 1);
|
||||
}
|
||||
|
@ -199,6 +283,37 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
|
||||
SSTableMetaVersion *stbs = NULL;
|
||||
uint32_t stbNum = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
code = catalogGetExpiredSTables(pCatalog, &stbs, &stbNum);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
if (stbNum <= 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < stbNum; ++i) {
|
||||
SSTableMetaVersion *stb = &stbs[i];
|
||||
stb->suid = htobe64(stb->suid);
|
||||
stb->sversion = htons(stb->sversion);
|
||||
stb->tversion = htons(stb->tversion);
|
||||
}
|
||||
|
||||
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = sizeof(SSTableMetaVersion) * stbNum, .value = stbs};
|
||||
|
||||
tscDebug("hb got %d expired stb, valueLen:%d", stbNum, kv.valueLen);
|
||||
|
||||
taosHashPut(req->info, &kv.key, sizeof(kv.key), &kv, sizeof(kv));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req) {
|
||||
int64_t *clusterId = (int64_t *)param;
|
||||
struct SCatalog *pCatalog = NULL;
|
||||
|
@ -214,6 +329,11 @@ int32_t hbQueryHbReqHandle(SClientHbKey *connKey, void* param, SClientHbReq *req
|
|||
return code;
|
||||
}
|
||||
|
||||
code = hbGetExpiredStbInfo(connKey, pCatalog, req);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -384,7 +504,6 @@ static void hbStopThread() {
|
|||
}
|
||||
|
||||
SAppHbMgr* appHbMgrInit(SAppInstInfo* pAppInstInfo, char *key) {
|
||||
return NULL;
|
||||
hbMgrInit();
|
||||
SAppHbMgr* pAppHbMgr = malloc(sizeof(SAppHbMgr));
|
||||
if (pAppHbMgr == NULL) {
|
||||
|
@ -442,7 +561,6 @@ void appHbMgrCleanup(void) {
|
|||
}
|
||||
|
||||
int hbMgrInit() {
|
||||
return 0;
|
||||
// init once
|
||||
int8_t old = atomic_val_compare_exchange_8(&clientHbMgr.inited, 0, 1);
|
||||
if (old == 1) return 0;
|
||||
|
@ -499,7 +617,6 @@ int hbRegisterConnImpl(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, SHbConnInfo *
|
|||
}
|
||||
|
||||
int hbRegisterConn(SAppHbMgr* pAppHbMgr, int32_t connId, int64_t clusterId, int32_t hbType) {
|
||||
return 0;
|
||||
SClientHbKey connKey = {.connId = connId, .hbType = HEARTBEAT_TYPE_QUERY};
|
||||
SHbConnInfo info = {0};
|
||||
|
||||
|
|
|
@ -109,7 +109,7 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass,
|
|||
SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo));
|
||||
p->mgmtEp = epSet;
|
||||
p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores);
|
||||
/*p->pAppHbMgr = appHbMgrInit(p, key);*/
|
||||
p->pAppHbMgr = appHbMgrInit(p, key);
|
||||
taosHashPut(appInfo.pInstMap, key, strlen(key), &p, POINTER_BYTES);
|
||||
|
||||
pInst = &p;
|
||||
|
|
|
@ -76,7 +76,7 @@ int processConnectRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
|||
|
||||
pTscObj->connType = HEARTBEAT_TYPE_QUERY;
|
||||
|
||||
/*hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);*/
|
||||
hbRegisterConn(pTscObj->pAppInfo->pAppHbMgr, pConnect->connId, pConnect->clusterId, HEARTBEAT_TYPE_QUERY);
|
||||
|
||||
// pRequest->body.resInfo.pRspMsg = pMsg->pData;
|
||||
tscDebug("0x%" PRIx64 " clusterId:%" PRId64 ", totalConn:%" PRId64, pRequest->requestId, pConnect->clusterId,
|
||||
|
|
|
@ -28,6 +28,9 @@ void mndCleanupStb(SMnode *pMnode);
|
|||
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
|
||||
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
|
||||
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen);
|
||||
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -907,9 +907,9 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
|
|||
|
||||
len = 0;
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, db->dbName);
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, db->dbFName);
|
||||
if (pDb == NULL) {
|
||||
mInfo("db %s not exist", db->dbName);
|
||||
mInfo("db %s not exist", db->dbFName);
|
||||
|
||||
len = sizeof(SUseDbRsp);
|
||||
} else if (pDb->uid != db->dbId || db->vgVersion < pDb->vgVersion) {
|
||||
|
@ -929,7 +929,7 @@ int32_t mndValidateDBInfo(SMnode *pMnode, SDbVgVersion *dbs, int32_t num, void *
|
|||
}
|
||||
|
||||
pRsp = (SUseDbRsp *)((char *)buf + bufOffset);
|
||||
memcpy(pRsp->db, db->dbName, TSDB_DB_FNAME_LEN);
|
||||
memcpy(pRsp->db, db->dbFName, TSDB_DB_FNAME_LEN);
|
||||
if (pDb) {
|
||||
int32_t vgNum = 0;
|
||||
mndBuildDBVgroupInfo(pDb, pMnode, pRsp->vgroupInfo, &vgNum);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
#include "mndProfile.h"
|
||||
//#include "mndConsumer.h"
|
||||
#include "mndDb.h"
|
||||
#include "mndStb.h"
|
||||
#include "mndMnode.h"
|
||||
#include "mndShow.h"
|
||||
//#include "mndTopic.h"
|
||||
|
@ -376,9 +377,16 @@ static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) {
|
|||
}
|
||||
break;
|
||||
}
|
||||
case HEARTBEAT_KEY_STBINFO:
|
||||
|
||||
case HEARTBEAT_KEY_STBINFO: {
|
||||
void *rspMsg = NULL;
|
||||
int32_t rspLen = 0;
|
||||
mndValidateStbInfo(pMnode, (SSTableMetaVersion *)kv->value, kv->valueLen/sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
|
||||
if (rspMsg && rspLen > 0) {
|
||||
SKv kv = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
|
||||
taosArrayPush(hbRsp.info, &kv);
|
||||
}
|
||||
break;
|
||||
}
|
||||
default:
|
||||
mError("invalid kv key:%d", kv->key);
|
||||
hbRsp.status = TSDB_CODE_MND_APP_ERROR;
|
||||
|
|
|
@ -728,7 +728,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
|||
|
||||
mDebug("stb:%s, start to retrieve meta", tbFName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDbByStb(pMnode, tbFName);
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, pInfo->dbFName);
|
||||
if (pDb == NULL) {
|
||||
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
|
||||
mError("stb:%s, failed to retrieve meta since %s", tbFName, terrstr());
|
||||
|
@ -788,6 +788,109 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *stbs, int32_t num, void **rsp, int32_t *rspLen) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t bufSize = num * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
|
||||
void *buf = malloc(bufSize);
|
||||
int32_t len = 0;
|
||||
int32_t contLen = 0;
|
||||
STableMetaRsp *pRsp = NULL;
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
SSTableMetaVersion *stb = &stbs[i];
|
||||
stb->suid = be64toh(stb->suid);
|
||||
stb->sversion = ntohs(stb->sversion);
|
||||
stb->tversion = ntohs(stb->tversion);
|
||||
|
||||
if ((contLen + sizeof(STableMetaRsp)) > bufSize) {
|
||||
bufSize = contLen + (num -i) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
|
||||
buf = realloc(buf, bufSize);
|
||||
}
|
||||
|
||||
pRsp = (STableMetaRsp *)((char *)buf + contLen);
|
||||
|
||||
strcpy(pRsp->dbFName, stb->dbFName);
|
||||
strcpy(pRsp->tbName, stb->stbName);
|
||||
strcpy(pRsp->stbName, stb->stbName);
|
||||
|
||||
mDebug("start to retrieve meta, db:%s, stb:%s", stb->dbFName, stb->stbName);
|
||||
|
||||
SDbObj *pDb = mndAcquireDb(pMnode, stb->dbFName);
|
||||
if (pDb == NULL) {
|
||||
pRsp->numOfColumns = -1;
|
||||
pRsp->suid = htobe64(stb->suid);
|
||||
contLen += sizeof(STableMetaRsp);
|
||||
mWarn("db:%s, failed to require db since %s", stb->dbFName, terrstr());
|
||||
continue;
|
||||
}
|
||||
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", stb->dbFName, stb->stbName);
|
||||
|
||||
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
|
||||
if (pStb == NULL) {
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
pRsp->numOfColumns = -1;
|
||||
pRsp->suid = htobe64(stb->suid);
|
||||
contLen += sizeof(STableMetaRsp);
|
||||
mWarn("stb:%s, failed to get meta since %s", tbFName, terrstr());
|
||||
continue;
|
||||
}
|
||||
|
||||
taosRLockLatch(&pStb->lock);
|
||||
|
||||
if (stb->suid == pStb->uid && stb->sversion == pStb->version) {
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
continue;
|
||||
}
|
||||
|
||||
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
|
||||
int32_t len = totalCols * sizeof(SSchema);
|
||||
|
||||
contLen += sizeof(STableMetaRsp) + len;
|
||||
|
||||
if (contLen > bufSize) {
|
||||
bufSize = contLen + (num -i - 1) * (sizeof(STableMetaRsp) + 4 * sizeof(SSchema));
|
||||
buf = realloc(buf, bufSize);
|
||||
}
|
||||
|
||||
pRsp->numOfTags = htonl(pStb->numOfTags);
|
||||
pRsp->numOfColumns = htonl(pStb->numOfColumns);
|
||||
pRsp->precision = pDb->cfg.precision;
|
||||
pRsp->tableType = TSDB_SUPER_TABLE;
|
||||
pRsp->update = pDb->cfg.update;
|
||||
pRsp->sversion = htonl(pStb->version);
|
||||
pRsp->suid = htobe64(pStb->uid);
|
||||
pRsp->tuid = htobe64(pStb->uid);
|
||||
|
||||
for (int32_t i = 0; i < totalCols; ++i) {
|
||||
SSchema *pSchema = &pRsp->pSchema[i];
|
||||
SSchema *pSrcSchema = &pStb->pSchema[i];
|
||||
memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN);
|
||||
pSchema->type = pSrcSchema->type;
|
||||
pSchema->colId = htonl(pSrcSchema->colId);
|
||||
pSchema->bytes = htonl(pSrcSchema->bytes);
|
||||
}
|
||||
taosRUnLockLatch(&pStb->lock);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
}
|
||||
|
||||
if (contLen > 0) {
|
||||
*rsp = buf;
|
||||
*rspLen = contLen;
|
||||
} else {
|
||||
*rsp = NULL;
|
||||
tfree(buf);
|
||||
*rspLen = 0;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
|
|
|
@ -27,7 +27,7 @@ extern "C" {
|
|||
#define CTG_DEFAULT_CACHE_CLUSTER_NUMBER 6
|
||||
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
|
||||
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
|
||||
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 100000
|
||||
#define CTG_DEFAULT_CACHE_TABLEMETA_NUMBER 10000
|
||||
#define CTG_DEFAULT_RENT_SECOND 10
|
||||
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
|
||||
|
||||
|
|
|
@ -527,9 +527,9 @@ int32_t ctgGetVgInfoFromHashValue(struct SCatalog *pCatalog, SDBVgroupInfo *dbIn
|
|||
}
|
||||
|
||||
int32_t ctgSTableVersionCompare(const void* key1, const void* key2) {
|
||||
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
|
||||
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
|
||||
return -1;
|
||||
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
|
||||
} else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -557,7 +557,7 @@ int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type) {
|
|||
mgmt->slots = calloc(1, msgSize);
|
||||
if (NULL == mgmt->slots) {
|
||||
qError("calloc %d failed", (int32_t)msgSize);
|
||||
return TSDB_CODE_CTG_MEM_ERROR;
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||
}
|
||||
|
||||
qDebug("meta rent initialized, type:%d, slotNum:%d", type, mgmt->slotNum);
|
||||
|
@ -825,6 +825,8 @@ int32_t ctgUpdateTableMetaCache(struct SCatalog *pCatalog, STableMetaOutput *out
|
|||
if (TSDB_SUPER_TABLE == output->tbMeta->tableType) {
|
||||
bool newAdded = false;
|
||||
SSTableMetaVersion metaRent = {.suid = output->tbMeta->suid, .sversion = output->tbMeta->sversion, .tversion = output->tbMeta->tversion};
|
||||
strcpy(metaRent.dbFName, output->dbFName);
|
||||
strcpy(metaRent.stbName, output->tbName);
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
|
||||
if (taosHashPut(dbCache->tbCache.cache, output->tbName, strlen(output->tbName), output->tbMeta, tbSize) != 0) {
|
||||
|
@ -951,6 +953,39 @@ int32_t ctgValidateAndRemoveDb(struct SCatalog* pCatalog, const char* dbName, ui
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgValidateAndRemoveStbMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid, bool *removed) {
|
||||
*removed = false;
|
||||
|
||||
SCtgDBCache *dbCache = (SCtgDBCache *)taosHashAcquire(pCatalog->dbCache, dbName, strlen(dbName));
|
||||
if (NULL == dbCache) {
|
||||
ctgInfo("db not exist in dbCache, may be removed, db:%s", dbName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
|
||||
if (taosHashRemove(dbCache->tbCache.stbCache, &suid, sizeof(suid))) {
|
||||
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
|
||||
taosHashRelease(pCatalog->dbCache, dbCache);
|
||||
ctgInfo("stb not exist in stbCache, may be removed, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
if (taosHashRemove(dbCache->tbCache.cache, stbName, strlen(stbName))) {
|
||||
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
|
||||
taosHashRelease(pCatalog->dbCache, dbCache);
|
||||
ctgError("stb not exist in cache, db:%s, stb:%s, suid:%"PRIx64, dbName, stbName, suid);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR);
|
||||
}
|
||||
CTG_UNLOCK(CTG_WRITE, &dbCache->tbCache.stbLock);
|
||||
|
||||
taosHashRelease(pCatalog->dbCache, dbCache);
|
||||
|
||||
*removed = true;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
int32_t ctgRenewTableMetaImpl(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, int32_t isSTable) {
|
||||
if (NULL == pCatalog || NULL == pTransporter || NULL == pMgmtEps || NULL == pTableName) {
|
||||
|
@ -1065,7 +1100,7 @@ int32_t ctgGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMg
|
|||
|
||||
int32_t catalogInit(SCatalogCfg *cfg) {
|
||||
if (ctgMgmt.pCluster) {
|
||||
qError("catalog already init");
|
||||
qError("catalog already initialized");
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
|
@ -1111,7 +1146,7 @@ int32_t catalogGetHandle(uint64_t clusterId, struct SCatalog** catalogHandle) {
|
|||
}
|
||||
|
||||
if (NULL == ctgMgmt.pCluster) {
|
||||
qError("cluster cache are not ready, clusterId:%"PRIx64, clusterId);
|
||||
qError("catalog cluster cache are not ready, clusterId:%"PRIx64, clusterId);
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_NOT_READY);
|
||||
}
|
||||
|
||||
|
@ -1312,7 +1347,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
ctgMetaRentRemove(&pCatalog->dbRent, dbCache->vgInfo->dbId, ctgDbVgVersionCompare);
|
||||
newAdded = true;
|
||||
} else if (dbInfo->vgVersion <= dbCache->vgInfo->vgVersion) {
|
||||
ctgInfo("db vgVersion is not new, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
|
||||
ctgInfo("db vgVersion is old, db:%s, vgVersion:%d, current:%d", dbName, dbInfo->vgVersion, dbCache->vgInfo->vgVersion);
|
||||
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
|
||||
taosHashRelease(pCatalog->dbCache, dbCache);
|
||||
|
||||
|
@ -1345,7 +1380,7 @@ int32_t catalogUpdateDBVgroup(struct SCatalog* pCatalog, const char* dbName, SDB
|
|||
|
||||
dbInfo = NULL;
|
||||
|
||||
strncpy(vgVersion.dbName, dbName, sizeof(vgVersion.dbName));
|
||||
strncpy(vgVersion.dbFName, dbName, sizeof(vgVersion.dbFName));
|
||||
|
||||
if (newAdded) {
|
||||
CTG_ERR_JRET(ctgMetaRentAdd(&pCatalog->dbRent, &vgVersion, vgVersion.dbId, sizeof(SDbVgVersion)));
|
||||
|
@ -1394,6 +1429,32 @@ int32_t catalogRemoveDB(struct SCatalog* pCatalog, const char* dbName, uint64_t
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t catalogRemoveSTableMeta(struct SCatalog* pCatalog, const char* dbName, const char* stbName, uint64_t suid) {
|
||||
int32_t code = 0;
|
||||
bool removed = false;
|
||||
|
||||
if (NULL == pCatalog || NULL == dbName || NULL == stbName) {
|
||||
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
if (NULL == pCatalog->dbCache) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgValidateAndRemoveStbMeta(pCatalog, dbName, stbName, suid, &removed));
|
||||
if (!removed) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
ctgInfo("stb removed from cache, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid);
|
||||
|
||||
CTG_ERR_RET(ctgMetaRentRemove(&pCatalog->stbRent, suid, ctgSTableVersionCompare));
|
||||
|
||||
ctgDebug("stb removed from rent, db:%s, stbName:%s, suid:%"PRIx64, dbName, stbName, suid);
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogGetTableMeta(struct SCatalog* pCatalog, void *pTransporter, const SEpSet* pMgmtEps, const SName* pTableName, STableMeta** pTableMeta) {
|
||||
return ctgGetTableMeta(pCatalog, pTransporter, pMgmtEps, pTableName, false, pTableMeta, -1);
|
||||
|
|
|
@ -748,7 +748,7 @@ TEST(tableMeta, normalTable) {
|
|||
}
|
||||
|
||||
if (stbNum) {
|
||||
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
|
||||
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
|
||||
free(stb);
|
||||
stb = NULL;
|
||||
} else {
|
||||
|
@ -844,7 +844,7 @@ TEST(tableMeta, childTableCase) {
|
|||
}
|
||||
|
||||
if (stbNum) {
|
||||
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
|
||||
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
|
||||
free(stb);
|
||||
stb = NULL;
|
||||
} else {
|
||||
|
@ -945,7 +945,8 @@ TEST(tableMeta, superTableCase) {
|
|||
}
|
||||
|
||||
if (stbNum) {
|
||||
printf("got expired stb,suid:%" PRId64 "\n", stb->suid);
|
||||
printf("got expired stb,suid:%" PRId64 ",dbFName:%s, stbName:%s\n", stb->suid, stb->dbFName, stb->stbName);
|
||||
|
||||
free(stb);
|
||||
stb = NULL;
|
||||
} else {
|
||||
|
@ -1289,7 +1290,7 @@ TEST(rentTest, allRent) {
|
|||
printf("%d - expired stableNum:%d\n", i, num);
|
||||
if (stable) {
|
||||
for (int32_t n = 0; n < num; ++n) {
|
||||
printf("suid:%" PRId64 ", sversion:%d, tversion:%d\n", stable[n].suid, stable[n].sversion, stable[n].tversion);
|
||||
printf("suid:%" PRId64 ", dbFName:%s, stbName:%s, sversion:%d, tversion:%d\n", stable[n].suid, stable[n].dbFName, stable[n].stbName, stable[n].sversion, stable[n].tversion);
|
||||
}
|
||||
free(stable);
|
||||
stable = NULL;
|
||||
|
|
Loading…
Reference in New Issue