feat: support view meta updating

This commit is contained in:
dapan1121 2023-10-13 16:29:34 +08:00
parent 7c67f23469
commit c0a4328803
12 changed files with 404 additions and 131 deletions

View File

@ -1791,6 +1791,15 @@ int32_t tSerializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
int32_t tDeserializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
void tFreeSSTbHbRsp(SSTbHbRsp* pRsp);
typedef struct {
SArray* pViewRsp; // Array of SViewMetaRsp;
} SViewHbRsp;
int32_t tSerializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp);
int32_t tDeserializeSViewHbRsp(void* buf, int32_t bufLen, SViewHbRsp* pRsp);
void tFreeSViewHbRsp(SViewHbRsp* pRsp);
typedef struct {
int32_t numOfTables;
int32_t numOfVgroup;

View File

@ -119,6 +119,7 @@ typedef struct SMetaData {
typedef struct SCatalogCfg {
uint32_t maxTblCacheNum;
uint32_t maxViewCacheNum;
uint32_t maxDBCacheNum;
uint32_t maxUserCacheNum;
uint32_t dbRentSec;
@ -365,7 +366,13 @@ SMetaData* catalogCloneMetaData(SMetaData* pData);
void catalogFreeMetaData(SMetaData* pData);
int32_t catalogRemoveViewMeta(SCatalog* pCtg, SName* pViewName);
int32_t catalogRemoveViewMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* viewName, uint64_t viewId);
int32_t catalogUpdateDynViewVer(SCatalog* pCtg, SDynViewVersion* pVer);
int32_t catalogUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg);
int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg);
int32_t ctgdEnableDebug(char* option, bool enable);

View File

@ -53,7 +53,7 @@ static int32_t hbProcessUserAuthInfoRsp(void *value, int32_t valueLen, struct SC
int32_t numOfBatchs = taosArrayGetSize(batchRsp.pArray);
for (int32_t i = 0; i < numOfBatchs; ++i) {
SGetUserAuthRsp *rsp = taosArrayGet(batchRsp.pArray, i);
tscDebug("hb user auth rsp, user:%s, version:%d", rsp->user, rsp->version);
tscDebug("hb to update user auth, user:%s, version:%d", rsp->user, rsp->version);
catalogUpdateUserAuthInfo(pCatalog, rsp);
}
@ -205,6 +205,7 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
rsp->useDbRsp->db, rsp->useDbRsp->vgVersion, rsp->useDbRsp->stateTs, rsp->useDbRsp->uid);
if (rsp->useDbRsp->vgVersion < 0) {
tscDebug("hb to remove db, db:%s", rsp->useDbRsp->db);
code = catalogRemoveDB(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid);
} else {
SDBVgInfo *vgInfo = NULL;
@ -213,6 +214,8 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
goto _return;
}
tscDebug("hb to update db vgInfo, db:%s", rsp->useDbRsp->db);
catalogUpdateDBVgInfo(pCatalog, rsp->useDbRsp->db, rsp->useDbRsp->uid, vgInfo);
if (IS_SYS_DBNAME(rsp->useDbRsp->db)) {
@ -253,10 +256,10 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
if (rsp->numOfColumns < 0) {
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
tscDebug("hb to remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
catalogRemoveStbMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->stbName, rsp->suid);
} else {
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
tscDebug("hb to update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
tFreeSSTbHbRsp(&hbRsp);
@ -281,6 +284,96 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
return TSDB_CODE_SUCCESS;
}
static int32_t hbProcessDynViewRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
return catalogUpdateDynViewVer(pCatalog, (SDynViewVersion*)value);
}
static int32_t hbProcessViewInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0;
SViewHbRsp hbRsp = {0};
if (tDeserializeSViewHbRsp(value, valueLen, &hbRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG;
return -1;
}
int32_t numOfMeta = taosArrayGetSize(hbRsp.pViewRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
SViewMetaRsp *rsp = taosArrayGet(hbRsp.pViewRsp, i);
if (rsp->numOfCols < 0) {
tscDebug("hb to remove view, db:%s, view:%s", rsp->dbFName, rsp->name);
catalogRemoveViewMeta(pCatalog, rsp->dbFName, rsp->dbId, rsp->name, rsp->viewId);
} else {
tscDebug("hb to update view, db:%s, view:%s", rsp->dbFName, rsp->name);
catalogUpdateViewMeta(pCatalog, rsp);
}
}
tFreeSViewHbRsp(&hbRsp);
return TSDB_CODE_SUCCESS;
}
static void hbProcessQueryRspKvs(int32_t kvNum, SArray* pKvs, struct SCatalog *pCatalog, SAppHbMgr *pAppHbMgr) {
for (int32_t i = 0; i < kvNum; ++i) {
SKv *kv = taosArrayGet(pKvs, i);
switch (kv->key) {
case HEARTBEAT_KEY_USER_AUTHINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr);
break;
}
case HEARTBEAT_KEY_DBINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
case HEARTBEAT_KEY_STBINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
#ifdef TD_ENTERPRISE
case HEARTBEAT_KEY_DYN_VIEW: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid dyn view info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
hbProcessDynViewRsp(kv->value, kv->valueLen, pCatalog);
break;
}
case HEARTBEAT_KEY_VIEWINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid view info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
hbProcessViewInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
#endif
default:
tscError("invalid hb key type:%d", kv->key);
break;
}
}
}
static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
SClientHbReq *pReq = taosHashAcquire(pAppHbMgr->activeInfo, &pRsp->connKey, sizeof(SClientHbKey));
if (NULL == pReq) {
@ -338,66 +431,16 @@ static int32_t hbQueryHbRspHandle(SAppHbMgr *pAppHbMgr, SClientHbRsp *pRsp) {
tscDebug("hb got %d rsp kv", kvNum);
for (int32_t i = 0; i < kvNum; ++i) {
SKv *kv = taosArrayGet(pRsp->info, i);
switch (kv->key) {
case HEARTBEAT_KEY_USER_AUTHINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb user auth info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
hbProcessUserAuthInfoRsp(kv->value, kv->valueLen, pCatalog, pAppHbMgr);
break;
}
case HEARTBEAT_KEY_DBINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb db info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
hbProcessDBInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
case HEARTBEAT_KEY_STBINFO: {
if (kv->valueLen <= 0 || NULL == kv->value) {
tscError("invalid hb stb info, len:%d, value:%p", kv->valueLen, kv->value);
break;
}
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
break;
}
hbProcessStbInfoRsp(kv->value, kv->valueLen, pCatalog);
break;
}
default:
tscError("invalid hb key type:%d", kv->key);
break;
if (kvNum > 0) {
struct SCatalog *pCatalog = NULL;
int32_t code = catalogGetHandle(pReq->clusterId, &pCatalog);
if (code != TSDB_CODE_SUCCESS) {
tscWarn("catalogGetHandle failed, clusterId:%" PRIx64 ", error:%s", pReq->clusterId, tstrerror(code));
} else {
hbProcessQueryRspKvs(kvNum, pRsp->info, pCatalog, pAppHbMgr);
}
}
taosHashRelease(pAppHbMgr->activeInfo, pReq);
return TSDB_CODE_SUCCESS;
@ -783,6 +826,7 @@ int32_t hbGetExpiredViewInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, S
for (int32_t i = 0; i < viewNum; ++i) {
SViewVersion *view = &views[i];
view->dbId = htobe64(view->dbId);
view->viewId = htobe64(view->viewId);
view->version = htonl(view->version);
}

View File

@ -8621,24 +8621,31 @@ int32_t tDeserializeSViewMetaReq(void* buf, int32_t bufLen, SViewMetaReq* pReq)
return 0;
}
static int32_t tEncodeSViewMetaRsp(SEncoder *pEncoder, const SViewMetaRsp *pRsp) {
if (tEncodeCStr(pEncoder, pRsp->name) < 0) return -1;
if (tEncodeCStr(pEncoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(pEncoder, pRsp->dbId) < 0) return -1;
if (tEncodeU64(pEncoder, pRsp->viewId) < 0) return -1;
if (tEncodeCStr(pEncoder, pRsp->querySql) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->precision) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->type) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->version) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->numOfCols) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfCols; ++i) {
SSchema *pSchema = &pRsp->pSchema[i];
if (tEncodeSSchema(pEncoder, pSchema) < 0) return -1;
}
return 0;
}
int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->name) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->dbId) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->viewId) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->querySql) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->precision) < 0) return -1;
if (tEncodeI8(&encoder, pRsp->type) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->numOfCols) < 0) return -1;
for (int32_t i = 0; i < pRsp->numOfCols; ++i) {
SSchema *pSchema = &pRsp->pSchema[i];
if (tEncodeSSchema(&encoder, pSchema) < 0) return -1;
}
if (tEncodeSViewMetaRsp(&encoder, pRsp) < 0) return -1;
tEndEncode(&encoder);
@ -8647,20 +8654,16 @@ int32_t tSerializeSViewMetaRsp(void* buf, int32_t bufLen, const SViewMetaRsp* pR
return tlen;
}
int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->name) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->dbId) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->viewId) < 0) return -1;
if (tDecodeCStrAlloc(&decoder, &pRsp->querySql) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->precision) < 0) return -1;
if (tDecodeI8(&decoder, &pRsp->type) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->numOfCols) < 0) return -1;
static int32_t tDecodeSViewMetaRsp(SDecoder *pDecoder, SViewMetaRsp *pRsp) {
if (tDecodeCStrTo(pDecoder, pRsp->name) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(pDecoder, &pRsp->dbId) < 0) return -1;
if (tDecodeU64(pDecoder, &pRsp->viewId) < 0) return -1;
if (tDecodeCStrAlloc(pDecoder, &pRsp->querySql) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->precision) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->type) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->version) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->numOfCols) < 0) return -1;
if (pRsp->numOfCols > 0) {
pRsp->pSchema = taosMemoryCalloc(pRsp->numOfCols, sizeof(SSchema));
if (pRsp->pSchema == NULL) {
@ -8670,10 +8673,20 @@ int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp)
for (int32_t i = 0; i < pRsp->numOfCols; ++i) {
SSchema* pSchema = pRsp->pSchema + i;
if (tDecodeSSchema(&decoder, pSchema) < 0) return -1;
if (tDecodeSSchema(pDecoder, pSchema) < 0) return -1;
}
}
return 0;
}
int32_t tDeserializeSViewMetaRsp(void* buf, int32_t bufLen, SViewMetaRsp* pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeSViewMetaRsp(&decoder, pRsp) < 0) return -1;
tEndDecode(&decoder);
tDecoderClear(&decoder);
@ -8689,4 +8702,62 @@ void tFreeSViewMetaRsp(SViewMetaRsp* pRsp) {
taosMemoryFree(pRsp->pSchema);
}
int32_t tSerializeSViewHbRsp(void *buf, int32_t bufLen, SViewHbRsp *pRsp) {
SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1;
int32_t numOfMeta = taosArrayGetSize(pRsp->pViewRsp);
if (tEncodeI32(&encoder, numOfMeta) < 0) return -1;
for (int32_t i = 0; i < numOfMeta; ++i) {
SViewMetaRsp *pMetaRsp = taosArrayGet(pRsp->pViewRsp, i);
if (tEncodeSViewMetaRsp(&encoder, pMetaRsp) < 0) return -1;
}
tEndEncode(&encoder);
int32_t tlen = encoder.pos;
tEncoderClear(&encoder);
return tlen;
}
int32_t tDeserializeSViewHbRsp(void *buf, int32_t bufLen, SViewHbRsp *pRsp) {
SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1;
int32_t numOfMeta = 0;
if (tDecodeI32(&decoder, &numOfMeta) < 0) return -1;
pRsp->pViewRsp = taosArrayInit(numOfMeta, sizeof(SViewMetaRsp));
if (pRsp->pViewRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfMeta; ++i) {
SViewMetaRsp metaRsp = {0};
if (tDecodeSViewMetaRsp(&decoder, &metaRsp) < 0) return -1;
taosArrayPush(pRsp->pViewRsp, &metaRsp);
}
tEndDecode(&decoder);
tDecoderClear(&decoder);
return 0;
}
void tFreeSViewHbRsp(SViewHbRsp *pRsp) {
int32_t numOfMeta = taosArrayGetSize(pRsp->pViewRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
SViewMetaRsp *pMetaRsp = taosArrayGet(pRsp->pViewRsp, i);
tFreeSViewMetaRsp(pMetaRsp);
}
taosArrayDestroy(pRsp->pViewRsp);
}

View File

@ -48,7 +48,7 @@ int32_t mndProcessDropViewReqImpl(SCMDropViewReq* pDropView, SRpcMsg *pReq);
int32_t mndProcessViewMetaReqImpl(SViewMetaReq* pMetaReq, SRpcMsg *pReq);
int32_t mndRetrieveViewImpl(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
int32_t mndDropViewByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
void mndValidateDynViewVersion(SMnode *pMnode, SDynViewVersion* pDynViewVer, bool *needCheck);
int32_t mndValidateDynViewVersion(SMnode *pMnode, SDynViewVersion* pReqVer, bool *needCheck, SDynViewVersion** ppRspVer);
int32_t mndValidateViewInfo(SMnode *pMnode, SViewVersion *pViewVersions, int32_t numOfViews, void **ppRsp,
int32_t *pRspLen);

View File

@ -539,10 +539,17 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
SKv* pKv = taosHashGet(pHbReq->info, &key, sizeof(key));
if (NULL != pKv) {
pDynViewVer = pKv->value;
mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck);
mTrace("recv view dyn ver, bootTs:%" PRId64 ", ver:%" PRIu64, pDynViewVer->svrBootTs, pDynViewVer->dynViewVer);
SDynViewVersion* pRspVer = NULL;
if (0 != mndValidateDynViewVersion(pMnode, pDynViewVer, &needCheck, &pRspVer)) {
return -1;
}
if (needCheck) {
SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pDynViewVer};
SKv kv1 = {.key = HEARTBEAT_KEY_DYN_VIEW, .valueLen = sizeof(*pDynViewVer), .value = pRspVer};
taosArrayPush(hbRsp.info, &kv1);
mTrace("need to check view ver, lastest bootTs:%" PRId64 ", ver:%" PRIu64, pRspVer->svrBootTs, pRspVer->dynViewVer);
}
}
#endif

View File

@ -2000,8 +2000,51 @@ static int32_t mndBuildStbCfgImp(SDbObj *pDb, SStbObj *pStb, const char *tbName,
return 0;
}
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp,
int32_t *smaVer) {
static int32_t mndValidateStbVersion(SMnode *pMnode, SSTableVersion* pStbVer, bool* schema, bool* sma) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", pStbVer->dbFName, pStbVer->stbName);
SDbObj *pDb = mndAcquireDb(pMnode, pStbVer->dbFName);
if (pDb == NULL) {
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
if (pDb->uid != pStbVer->dbId) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_MND_DB_NOT_SELECTED;
return -1;
}
SStbObj *pStb = mndAcquireStb(pMnode, tbFName);
if (pStb == NULL) {
mndReleaseDb(pMnode, pDb);
terrno = TSDB_CODE_PAR_TABLE_NOT_EXIST;
return -1;
}
taosRLockLatch(&pStb->lock);
if (pStbVer->sversion != pStb->colVer || pStbVer->tversion != pStb->tagVer) {
*schema = true;
} else {
*schema = false;
}
if (pStbVer->smaVer && pStbVer->smaVer != pStb->smaVer) {
*sma = true;
} else {
*sma = false;
}
taosRUnLockLatch(&pStb->lock);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
return TSDB_CODE_SUCCESS;
}
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
@ -2018,10 +2061,6 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
return -1;
}
if (smaVer) {
*smaVer = pStb->smaVer;
}
int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb);
@ -2664,7 +2703,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
}
} else {
mInfo("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp, NULL) != 0) {
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) {
goto _OVER;
}
}
@ -2775,14 +2814,15 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
for (int32_t i = 0; i < numOfStbs; ++i) {
SSTableVersion *pStbVersion = &pStbVersions[i];
pStbVersion->suid = be64toh(pStbVersion->suid);
pStbVersion->sversion = ntohs(pStbVersion->sversion);
pStbVersion->tversion = ntohs(pStbVersion->tversion);
pStbVersion->sversion = ntohl(pStbVersion->sversion);
pStbVersion->tversion = ntohl(pStbVersion->tversion);
pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
STableMetaRsp metaRsp = {0};
int32_t smaVer = 0;
mInfo("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
bool schema = false;
bool sma = false;
int32_t code = mndValidateStbVersion(pMnode, pStbVersion, &schema, &sma);
if (TSDB_CODE_SUCCESS != code) {
STableMetaRsp metaRsp = {0};
metaRsp.numOfColumns = -1;
metaRsp.suid = pStbVersion->suid;
tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
@ -2792,13 +2832,23 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t
continue;
}
if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) {
if (schema) {
STableMetaRsp metaRsp = {0};
mInfo("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) {
metaRsp.numOfColumns = -1;
metaRsp.suid = pStbVersion->suid;
tstrncpy(metaRsp.dbFName, pStbVersion->dbFName, sizeof(metaRsp.dbFName));
tstrncpy(metaRsp.tbName, pStbVersion->stbName, sizeof(metaRsp.tbName));
tstrncpy(metaRsp.stbName, pStbVersion->stbName, sizeof(metaRsp.stbName));
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
continue;
}
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
} else {
tFreeSTableMetaRsp(&metaRsp);
}
if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) {
if (sma) {
bool exist = false;
char tbFName[TSDB_TABLE_FNAME_LEN];
STableIndexRsp indexRsp = {0};

View File

@ -30,6 +30,7 @@ extern "C" {
#define CTG_DEFAULT_CACHE_VGROUP_NUMBER 100
#define CTG_DEFAULT_CACHE_DB_NUMBER 20
#define CTG_DEFAULT_CACHE_TBLMETA_NUMBER 1000
#define CTG_DEFAULT_CACHE_VIEW_NUMBER 256
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
@ -542,6 +543,7 @@ typedef struct SCtgDropViewMetaMsg {
char dbFName[TSDB_DB_FNAME_LEN];
char viewName[TSDB_VIEW_NAME_LEN];
uint64_t dbId;
uint64_t viewId;
} SCtgDropViewMetaMsg;
@ -928,7 +930,7 @@ int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput* output, bool sy
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp* pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char* dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex** pIndex, bool syncOp);
int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *viewName, bool syncOp);
int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, uint64_t dbId, const char *viewName, uint64_t viewId, bool syncOp);
int32_t ctgClearCacheEnqueue(SCatalog* pCtg, bool clearMeta, bool freeCtg, bool stopQueue, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt* mgmt, uint32_t rentSec, int8_t type, int32_t size);
int32_t ctgMetaRentAdd(SCtgRentMgmt* mgmt, void* meta, int64_t id, int32_t size);

View File

@ -666,10 +666,10 @@ _return:
}
int32_t ctgRemoveViewMeta(SCatalog* pCtg, SName* pViewName) {
int32_t ctgRemoveViewMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* viewName, uint64_t viewId) {
int32_t code = 0;
if (NULL == pCtg || NULL == pViewName) {
if (NULL == pCtg || NULL == dbFName || NULL == viewName) {
CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT);
}
@ -677,9 +677,7 @@ int32_t ctgRemoveViewMeta(SCatalog* pCtg, SName* pViewName) {
return TSDB_CODE_SUCCESS;
}
char dbFName[TSDB_DB_FNAME_LEN] = {0};
tNameGetFullDbName(pViewName, dbFName);
CTG_ERR_JRET(ctgDropViewMetaEnqueue(pCtg, dbFName, 0, pViewName->tname, true));
CTG_ERR_JRET(ctgDropViewMetaEnqueue(pCtg, dbFName, 0, viewName, viewId, true));
_return:
@ -757,6 +755,10 @@ int32_t catalogInit(SCatalogCfg* cfg) {
gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
}
if (gCtgMgmt.cfg.maxViewCacheNum == 0) {
gCtgMgmt.cfg.maxViewCacheNum = CTG_DEFAULT_CACHE_VIEW_NUMBER;
}
if (gCtgMgmt.cfg.dbRentSec == 0) {
gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
}
@ -771,6 +773,7 @@ int32_t catalogInit(SCatalogCfg* cfg) {
} else {
gCtgMgmt.cfg.maxDBCacheNum = CTG_DEFAULT_CACHE_DB_NUMBER;
gCtgMgmt.cfg.maxTblCacheNum = CTG_DEFAULT_CACHE_TBLMETA_NUMBER;
gCtgMgmt.cfg.maxViewCacheNum = CTG_DEFAULT_CACHE_VIEW_NUMBER;
gCtgMgmt.cfg.dbRentSec = CTG_DEFAULT_RENT_SECOND;
gCtgMgmt.cfg.stbRentSec = CTG_DEFAULT_RENT_SECOND;
gCtgMgmt.cfg.viewRentSec = CTG_DEFAULT_RENT_SECOND;
@ -1503,7 +1506,9 @@ int32_t catalogGetExpiredViews(SCatalog* pCtg, SViewVersion** views, uint32_t* n
if (NULL == *dynViewVersion) {
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
**dynViewVersion = pCtg->dynViewVer;
(*dynViewVersion)->svrBootTs = atomic_load_64(&pCtg->dynViewVer.svrBootTs);
(*dynViewVersion)->dynViewVer = atomic_load_64(&pCtg->dynViewVer.dynViewVer);
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->viewRent, (void**)views, num, sizeof(SViewVersion)));
}
@ -1703,10 +1708,58 @@ void catalogFreeMetaData(SMetaData * pData) {
}
int32_t catalogRemoveViewMeta(SCatalog* pCtg, SName* pViewName) {
int32_t catalogRemoveViewMeta(SCatalog* pCtg, const char* dbFName, uint64_t dbId, const char* viewName, uint64_t viewId) {
CTG_API_ENTER();
CTG_API_LEAVE(ctgRemoveViewMeta(pCtg, pViewName));
CTG_API_LEAVE(ctgRemoveViewMeta(pCtg, dbFName, dbId, viewName, viewId));
}
int32_t catalogUpdateDynViewVer(SCatalog* pCtg, SDynViewVersion* pVer) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pVer) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
atomic_store_64(&pCtg->dynViewVer.svrBootTs, pVer->svrBootTs);
atomic_store_64(&pCtg->dynViewVer.dynViewVer, pVer->dynViewVer);
ctgDebug("cluster %" PRIx64 " svrBootTs updated to %" PRId64, pCtg->clusterId, pVer->svrBootTs);
ctgDebug("cluster %" PRIx64 " dynViewVer updated to %" PRId64, pCtg->clusterId, pVer->dynViewVer);
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
}
int32_t catalogUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pMsg) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgUpdateViewMetaToCache(pCtg, pMsg, true));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogAsyncUpdateViewMeta(SCatalog* pCtg, SViewMetaRsp* pMsg) {
CTG_API_ENTER();
if (NULL == pCtg || NULL == pMsg) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
int32_t code = 0;
CTG_ERR_JRET(ctgUpdateViewMetaToCache(pCtg, pMsg, false));
_return:
CTG_API_LEAVE(code);
}

View File

@ -31,7 +31,7 @@ SCtgOperation gCtgCacheOperation[CTG_OP_MAX] = {{CTG_OP_UPDATE_VGROUP, "update v
{CTG_OP_UPDATE_TB_INDEX, "update tbIndex", ctgOpUpdateTbIndex},
{CTG_OP_DROP_TB_INDEX, "drop tbIndex", ctgOpDropTbIndex},
{CTG_OP_UPDATE_VIEW_META, "update viewMeta", ctgOpUpdateViewMeta},
{CTG_OP_DROP_VIEW_META, "drop viewMeta", ctgOpDropTbMeta},
{CTG_OP_DROP_VIEW_META, "drop viewMeta", ctgOpDropViewMeta},
{CTG_OP_CLEAR_CACHE, "clear cache", ctgOpClearCache}};
SCtgCacheItemInfo gCtgStatItem[CTG_CI_MAX_VALUE] = {
@ -1310,7 +1310,7 @@ int32_t ctgUpdateViewMetaEnqueue(SCatalog *pCtg, SViewMetaRsp *pRsp, bool syncOp
return TSDB_CODE_SUCCESS;
}
int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId, const char *viewName, bool syncOp) {
int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, uint64_t dbId, const char *viewName, uint64_t viewId, bool syncOp) {
int32_t code = 0;
SCtgCacheOperation *op = taosMemoryCalloc(1, sizeof(SCtgCacheOperation));
op->opId = CTG_OP_DROP_VIEW_META;
@ -1327,6 +1327,7 @@ int32_t ctgDropViewMetaEnqueue(SCatalog *pCtg, const char *dbFName, int64_t dbId
tstrncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
tstrncpy(msg->viewName, viewName, sizeof(msg->viewName));
msg->dbId = dbId;
msg->viewId = viewId;
op->data = msg;
@ -1349,7 +1350,7 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
newDBCache.tbCache = taosHashInit(gCtgMgmt.cfg.maxTblCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.tbCache) {
ctgError("taosHashInit %d metaCache failed", gCtgMgmt.cfg.maxTblCacheNum);
ctgError("taosHashInit %d tbCache failed", gCtgMgmt.cfg.maxTblCacheNum);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -1360,6 +1361,13 @@ int32_t ctgAddNewDBCache(SCatalog *pCtg, const char *dbFName, uint64_t dbId) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
newDBCache.viewCache = taosHashInit(gCtgMgmt.cfg.maxViewCacheNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY),
true, HASH_ENTRY_LOCK);
if (NULL == newDBCache.viewCache) {
ctgError("taosHashInit %d viewCache failed", gCtgMgmt.cfg.maxViewCacheNum);
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
code = taosHashPut(pCtg->dbCache, dbFName, strlen(dbFName), &newDBCache, sizeof(SCtgDBCache));
if (code) {
if (HASH_NODE_EXIST(code)) {
@ -1635,7 +1643,6 @@ int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFN
CTG_DB_NUM_INC(CTG_CI_VIEW);
pMeta = NULL;
ctgDebug("view %s meta updated to cache, ver:%d", viewName, pMeta->version);
CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, &cache));
@ -1656,12 +1663,12 @@ int32_t ctgWriteViewMetaToCache(SCatalog *pCtg, SCtgDBCache *dbCache, char *dbFN
atomic_add_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pMeta));
pMeta = NULL;
ctgDebug("view %s meta updated to cache, ver:%d", viewName, pMeta->version);
CTG_ERR_RET(ctgUpdateRentViewVersion(pCtg, dbFName, viewName, dbCache->dbId, pMeta->viewId, pCache));
pMeta = NULL;
_return:
if (pMeta) {
@ -2028,7 +2035,7 @@ int32_t ctgOpDropStbMeta(SCtgCacheOperation *operation) {
goto _return;
}
if (msg->dbId && (dbCache->dbId != msg->dbId)) {
if ((0 != msg->dbId) && (dbCache->dbId != msg->dbId)) {
ctgDebug("dbId already modified, dbFName:%s, current:0x%" PRIx64 ", dbId:0x%" PRIx64 ", stb:%s, suid:0x%" PRIx64,
msg->dbFName, dbCache->dbId, msg->dbId, msg->stbName, msg->suid);
goto _return;
@ -2091,7 +2098,7 @@ int32_t ctgOpDropTbMeta(SCtgCacheOperation *operation) {
goto _return;
}
if (dbCache->dbId != msg->dbId) {
if ((0 != msg->dbId) && (dbCache->dbId != msg->dbId)) {
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, tbName:%s", msg->dbId, dbCache->dbId,
msg->dbFName, msg->tbName);
goto _return;
@ -2370,7 +2377,7 @@ int32_t ctgOpDropViewMeta(SCtgCacheOperation *operation) {
goto _return;
}
if (0 != msg->dbId && dbCache->dbId != msg->dbId) {
if ((0 != msg->dbId) && (dbCache->dbId != msg->dbId)) {
ctgDebug("dbId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", dbFName:%s, viewName:%s", msg->dbId, dbCache->dbId,
msg->dbFName, msg->viewName);
goto _return;
@ -2383,6 +2390,11 @@ int32_t ctgOpDropViewMeta(SCtgCacheOperation *operation) {
}
int64_t viewId = pViewCache->pMeta->viewId;
if (0 != msg->viewId && viewId != msg->viewId) {
ctgDebug("viewId 0x%" PRIx64 " not match with curId 0x%" PRIx64 ", viewName:%s", msg->viewId, viewId, msg->viewName);
goto _return;
}
atomic_sub_fetch_64(&dbCache->dbCacheSize, ctgGetViewMetaCacheSize(pViewCache->pMeta));
ctgFreeViewCacheImpl(pViewCache, true);

View File

@ -261,12 +261,25 @@ void ctgFreeViewCacheImpl(SCtgViewCache* pCache, bool lock) {
}
}
void ctgFreeViewCache(SCtgDBCache* dbCache) {
if (NULL == dbCache->viewCache) {
return;
}
SCtgViewCache* pCache = taosHashIterate(dbCache->viewCache, NULL);
while (NULL != pCache) {
ctgFreeViewCacheImpl(pCache, false);
pCache = taosHashIterate(dbCache->viewCache, pCache);
}
taosHashCleanup(dbCache->viewCache);
dbCache->viewCache = NULL;
}
void ctgFreeTbCache(SCtgDBCache* dbCache) {
if (NULL == dbCache->tbCache) {
return;
}
int32_t tblNum = taosHashGetSize(dbCache->tbCache);
SCtgTbCache* pCache = taosHashIterate(dbCache->tbCache, NULL);
while (NULL != pCache) {
ctgFreeTbCacheImpl(pCache, false);
@ -288,6 +301,7 @@ void ctgFreeDbCache(SCtgDBCache* dbCache) {
ctgFreeCfgInfoCache(dbCache);
ctgFreeStbMetaCache(dbCache);
ctgFreeTbCache(dbCache);
ctgFreeViewCache(dbCache);
}
void ctgFreeInstDbCache(SHashObj* pDbCache) {

View File

@ -383,7 +383,9 @@ static int32_t collectMetaKeyFromDescribe(SCollectMetaKeyCxt* pCxt, SDescribeStm
int32_t code = catalogRemoveTableMeta(pCxt->pParseCxt->pCatalog, &name);
#ifdef TD_ENTERPRISE
if (TSDB_CODE_SUCCESS == code) {
code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, &name);
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&name, dbFName);
code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, dbFName, 0, pStmt->tableName, 0);
}
#endif
if (TSDB_CODE_SUCCESS == code) {
@ -626,7 +628,9 @@ static int32_t collectMetaKeyFromShowCreateView(SCollectMetaKeyCxt* pCxt, SShowC
SName name = {.type = TSDB_TABLE_NAME_T, .acctId = pCxt->pParseCxt->acctId};
strcpy(name.dbname, pStmt->dbName);
strcpy(name.tname, pStmt->viewName);
int32_t code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, &name);
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(&name, dbFName);
int32_t code = catalogRemoveViewMeta(pCxt->pParseCxt->pCatalog, dbFName, 0, pStmt->viewName, 0);
if (TSDB_CODE_SUCCESS == code) {
code = reserveViewUserAuthInCache(pCxt->pParseCxt->acctId, pCxt->pParseCxt->pUser, pStmt->dbName, pStmt->viewName,
AUTH_TYPE_READ, pCxt->pMetaCache);