Merge pull request #13041 from taosdata/feature/schema.version

feat: add tag version
This commit is contained in:
dapan1121 2022-05-27 09:51:22 +08:00 committed by GitHub
commit f6e6554b2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 30 additions and 18 deletions

View File

@ -101,6 +101,7 @@ typedef struct SDbVgVersion {
typedef struct STbSVersion { typedef struct STbSVersion {
char* tbFName; char* tbFName;
int32_t sver; int32_t sver;
int32_t tver;
} STbSVersion; } STbSVersion;
typedef struct SUserAuthVersion { typedef struct SUserAuthVersion {

View File

@ -413,7 +413,7 @@ int32_t validateSversion(SRequestObj* pRequest, void* res) {
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
STbVerInfo* tbInfo = taosArrayGet(pTbArray, i); STbVerInfo* tbInfo = taosArrayGet(pTbArray, i);
STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion}; STbSVersion tbSver = {.tbFName = tbInfo->tbFName, .sver = tbInfo->sversion, .tver = tbInfo->tversion};
taosArrayPush(pArray, &tbSver); taosArrayPush(pArray, &tbSver);
} }
} }
@ -745,12 +745,12 @@ void processMsgFromServer(void* parent, SRpcMsg* pMsg, SEpSet* pEpSet) {
pRequest->metric.rsp = taosGetTimestampUs(); pRequest->metric.rsp = taosGetTimestampUs();
STscObj* pTscObj = pRequest->pTscObj; //STscObj* pTscObj = pRequest->pTscObj;
if (pEpSet) { //if (pEpSet) {
if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) { // if (!isEpsetEqual(&pTscObj->pAppInfo->mgmtEp.epSet, pEpSet)) {
updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet); // updateEpSet_s(&pTscObj->pAppInfo->mgmtEp, pEpSet);
} // }
} //}
/* /*
* There is not response callback function for submit response. * There is not response callback function for submit response.

View File

@ -1503,6 +1503,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
pRsp->precision = pDb->cfg.precision; pRsp->precision = pDb->cfg.precision;
pRsp->tableType = TSDB_SUPER_TABLE; pRsp->tableType = TSDB_SUPER_TABLE;
pRsp->sversion = pStb->colVer; pRsp->sversion = pStb->colVer;
pRsp->tversion = pStb->tagVer;
pRsp->suid = pStb->uid; pRsp->suid = pStb->uid;
pRsp->tuid = pStb->uid; pRsp->tuid = pStb->uid;
@ -1629,7 +1630,7 @@ int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int
metaRsp.suid = pStbVersion->suid; metaRsp.suid = pStbVersion->suid;
} }
if (pStbVersion->sversion != metaRsp.sversion) { if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) {
taosArrayPush(batchMetaRsp.pArray, &metaRsp); taosArrayPush(batchMetaRsp.pArray, &metaRsp);
} else { } else {
tFreeSTableMetaRsp(&metaRsp); tFreeSTableMetaRsp(&metaRsp);

View File

@ -344,7 +344,7 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
EXPECT_EQ(metaRsp.precision, TSDB_TIME_PRECISION_MILLI); EXPECT_EQ(metaRsp.precision, TSDB_TIME_PRECISION_MILLI);
EXPECT_EQ(metaRsp.tableType, TSDB_SUPER_TABLE); EXPECT_EQ(metaRsp.tableType, TSDB_SUPER_TABLE);
EXPECT_EQ(metaRsp.sversion, 1); EXPECT_EQ(metaRsp.sversion, 1);
EXPECT_EQ(metaRsp.tversion, 0); EXPECT_EQ(metaRsp.tversion, 1);
EXPECT_GT(metaRsp.suid, 0); EXPECT_GT(metaRsp.suid, 0);
EXPECT_GT(metaRsp.tuid, 0); EXPECT_GT(metaRsp.tuid, 0);
EXPECT_EQ(metaRsp.vgId, 0); EXPECT_EQ(metaRsp.vgId, 0);

View File

@ -447,7 +447,7 @@ void ctgReleaseVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache); int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist); int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid, char *stbName); int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass); int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId); int32_t ctgPutRmDBToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq); int32_t ctgPutRmStbToQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);

View File

@ -812,6 +812,7 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
SName name; SName name;
int32_t sver = 0; int32_t sver = 0;
int32_t tver = 0;
int32_t tbNum = taosArrayGetSize(pTables); int32_t tbNum = taosArrayGetSize(pTables);
for (int32_t i = 0; i < tbNum; ++i) { for (int32_t i = 0; i < tbNum; ++i) {
STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i); STbSVersion* pTb = (STbSVersion*)taosArrayGet(pTables, i);
@ -828,8 +829,8 @@ int32_t catalogChkTbMetaVersion(SCatalog* pCtg, void *pTrans, const SEpSet* pMgm
int32_t tbType = 0; int32_t tbType = 0;
uint64_t suid = 0; uint64_t suid = 0;
char stbName[TSDB_TABLE_FNAME_LEN]; char stbName[TSDB_TABLE_FNAME_LEN];
ctgReadTbSverFromCache(pCtg, &name, &sver, &tbType, &suid, stbName); ctgReadTbVerFromCache(pCtg, &name, &sver, &tver, &tbType, &suid, stbName);
if (sver >= 0 && sver < pTb->sver) { if ((sver >= 0 && sver < pTb->sver) || (tver >= 0 && tver < pTb->tver)) {
switch (tbType) { switch (tbType) {
case TSDB_CHILD_TABLE: { case TSDB_CHILD_TABLE: {
SName stb = name; SName stb = name;

View File

@ -322,9 +322,10 @@ _return:
CTG_RET(code); CTG_RET(code);
} }
int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tbType, uint64_t *suid, int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid,
char *stbName) { char *stbName) {
*sver = -1; *sver = -1;
*tver = -1;
if (NULL == pCtg->dbCache) { if (NULL == pCtg->dbCache) {
ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname); ctgDebug("empty tbmeta cache, tbName:%s", pTableName->tname);
@ -348,6 +349,7 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
*suid = tbMeta->suid; *suid = tbMeta->suid;
if (*tbType != TSDB_CHILD_TABLE) { if (*tbType != TSDB_CHILD_TABLE) {
*sver = tbMeta->sversion; *sver = tbMeta->sversion;
*tver = tbMeta->tversion;
} }
} }
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.metaLock);
@ -359,7 +361,7 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
if (*tbType != TSDB_CHILD_TABLE) { if (*tbType != TSDB_CHILD_TABLE) {
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname); ctgDebug("Got sver %d tver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tver, *tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -391,12 +393,13 @@ int32_t ctgReadTbSverFromCache(SCatalog *pCtg, const SName *pTableName, int32_t
stbName[nameLen] = 0; stbName[nameLen] = 0;
*sver = (*stbMeta)->sversion; *sver = (*stbMeta)->sversion;
*tver = (*stbMeta)->tversion;
CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock); CTG_UNLOCK(CTG_READ, &dbCache->tbCache.stbLock);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got sver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tbType, dbFName, pTableName->tname); ctgDebug("Got sver %d tver %d from cache, type:%d, dbFName:%s, tbName:%s", *sver, *tver, *tbType, dbFName, pTableName->tname);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -227,6 +227,7 @@ typedef struct SQWorkerMgmt {
#define QW_ELOG(_param, ...) qError("QW:%p " _param, mgmt, __VA_ARGS__) #define QW_ELOG(_param, ...) qError("QW:%p " _param, mgmt, __VA_ARGS__)
#define QW_DLOG(_param, ...) qDebug("QW:%p " _param, mgmt, __VA_ARGS__) #define QW_DLOG(_param, ...) qDebug("QW:%p " _param, mgmt, __VA_ARGS__)
#define QW_TLOG(_param, ...) qTrace("QW:%p " _param, mgmt, __VA_ARGS__)
#define QW_DUMP(_param, ...) \ #define QW_DUMP(_param, ...) \
do { \ do { \

View File

@ -1409,7 +1409,7 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWSchStatus *sch = (SQWSchStatus *)pIter; SQWSchStatus *sch = (SQWSchStatus *)pIter;
if (NULL == sch->hbConnInfo.handle) { if (NULL == sch->hbConnInfo.handle) {
uint64_t *sId = taosHashGetKey(pIter, NULL); uint64_t *sId = taosHashGetKey(pIter, NULL);
QW_DLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId); QW_TLOG("cancel send hb to sch %" PRIx64 " cause of no connection handle", *sId);
pIter = taosHashIterate(mgmt->schHash, pIter); pIter = taosHashIterate(mgmt->schHash, pIter);
continue; continue;
} }

View File

@ -557,7 +557,9 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
param->nodeEpId.nodeId = addr->nodeId; param->nodeEpId.nodeId = addr->nodeId;
memcpy(&param->nodeEpId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp)); SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(param->nodeEpId.ep.fqdn, pEp->fqdn);
param->nodeEpId.ep.port = pEp->port;
param->pTrans = pJob->pTrans; param->pTrans = pJob->pTrans;
*pParam = param; *pParam = param;
@ -788,7 +790,10 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
SQueryNodeEpId epId = {0}; SQueryNodeEpId epId = {0};
epId.nodeId = addr->nodeId; epId.nodeId = addr->nodeId;
memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
SEp* pEp = SCH_GET_CUR_EP(addr);
strcpy(epId.ep.fqdn, pEp->fqdn);
epId.ep.port = pEp->port;
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId)); SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, &epId, sizeof(SQueryNodeEpId));
if (NULL == hb) { if (NULL == hb) {