Merge branch '3.0' into feature/stream

This commit is contained in:
Liu Jicong 2022-06-14 15:14:47 +08:00
commit acda336f4e
29 changed files with 1253 additions and 549 deletions

View File

@ -1134,14 +1134,16 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp); int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
void tFreeSTableMetaRsp(STableMetaRsp* pRsp); void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
void tFreeSTableIndexRsp(void *info);
typedef struct { typedef struct {
SArray* pArray; // Array of STableMetaRsp SArray* pMetaRsp; // Array of STableMetaRsp
} STableMetaBatchRsp; SArray* pIndexRsp; // Array of STableIndexRsp;
} SSTbHbRsp;
int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp); int32_t tSerializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp); int32_t tDeserializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp); void tFreeSSTbHbRsp(SSTbHbRsp* pRsp);
typedef struct { typedef struct {
int32_t numOfTables; int32_t numOfTables;
@ -2525,6 +2527,10 @@ typedef struct {
} STableIndexInfo; } STableIndexInfo;
typedef struct { typedef struct {
char tbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
uint64_t suid;
int32_t version;
SArray* pIndex; SArray* pIndex;
} STableIndexRsp; } STableIndexRsp;

View File

@ -98,14 +98,15 @@ typedef struct SCatalogCfg {
uint32_t stbRentSec; uint32_t stbRentSec;
} SCatalogCfg; } SCatalogCfg;
typedef struct SSTableMetaVersion { typedef struct SSTableVersion {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
char stbName[TSDB_TABLE_NAME_LEN]; char stbName[TSDB_TABLE_NAME_LEN];
uint64_t dbId; uint64_t dbId;
uint64_t suid; uint64_t suid;
int16_t sversion; int16_t sversion;
int16_t tversion; int16_t tversion;
} SSTableMetaVersion; int32_t smaVer;
} SSTableVersion;
typedef struct SDbVgVersion { typedef struct SDbVgVersion {
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
@ -267,7 +268,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList); int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion** stables, uint32_t* num); int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion **stables, uint32_t *num);
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num); int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num);
@ -279,6 +280,8 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes); int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp *pRsp);
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo); int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo);
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass); int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);

View File

@ -99,15 +99,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) { static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
int32_t code = 0; int32_t code = 0;
STableMetaBatchRsp batchMetaRsp = {0}; SSTbHbRsp hbRsp = {0};
if (tDeserializeSTableMetaBatchRsp(value, valueLen, &batchMetaRsp) != 0) { if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
int32_t numOfBatchs = taosArrayGetSize(batchMetaRsp.pArray); int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
for (int32_t i = 0; i < numOfBatchs; ++i) { for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *rsp = taosArrayGet(batchMetaRsp.pArray, i); STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
if (rsp->numOfColumns < 0) { if (rsp->numOfColumns < 0) {
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
@ -116,7 +116,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName); tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) { if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId); tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
tFreeSTableMetaBatchRsp(&batchMetaRsp); tFreeSSTbHbRsp(&hbRsp);
return TSDB_CODE_TSC_INVALID_VALUE; return TSDB_CODE_TSC_INVALID_VALUE;
} }
@ -124,7 +124,17 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
} }
} }
tFreeSTableMetaBatchRsp(&batchMetaRsp); int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
catalogUpdateTableIndex(pCatalog, rsp);
}
taosArrayDestroy(hbRsp.pIndexRsp);
hbRsp.pIndexRsp = NULL;
tFreeSSTbHbRsp(&hbRsp);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -455,7 +465,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
} }
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) { int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
SSTableMetaVersion *stbs = NULL; SSTableVersion *stbs = NULL;
uint32_t stbNum = 0; uint32_t stbNum = 0;
int32_t code = 0; int32_t code = 0;
@ -469,15 +479,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
} }
for (int32_t i = 0; i < stbNum; ++i) { for (int32_t i = 0; i < stbNum; ++i) {
SSTableMetaVersion *stb = &stbs[i]; SSTableVersion *stb = &stbs[i];
stb->suid = htobe64(stb->suid); stb->suid = htobe64(stb->suid);
stb->sversion = htons(stb->sversion); stb->sversion = htons(stb->sversion);
stb->tversion = htons(stb->tversion); stb->tversion = htons(stb->tversion);
stb->smaVer = htonl(stb->smaVer);
} }
SKv kv = { SKv kv = {
.key = HEARTBEAT_KEY_STBINFO, .key = HEARTBEAT_KEY_STBINFO,
.valueLen = sizeof(SSTableMetaVersion) * stbNum, .valueLen = sizeof(SSTableVersion) * stbNum,
.value = stbs, .value = stbs,
}; };

View File

@ -2438,6 +2438,10 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->tbName) < 0) return -1;
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
int32_t num = taosArrayGetSize(pRsp->pIndex); int32_t num = taosArrayGetSize(pRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1; if (tEncodeI32(&encoder, num) < 0) return -1;
if (num > 0) { if (num > 0) {
@ -2472,6 +2476,10 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->tbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1;
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
int32_t num = 0; int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1; if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) { if (num > 0) {
@ -2631,18 +2639,35 @@ int32_t tSerializeSTableMetaRsp(void *buf, int32_t bufLen, STableMetaRsp *pRsp)
return tlen; return tlen;
} }
int32_t tSerializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatchRsp *pRsp) { int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
SEncoder encoder = {0}; SEncoder encoder = {0};
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1; if (tEncodeI32(&encoder, numOfMeta) < 0) return -1;
for (int32_t i = 0; i < numOfBatch; ++i) { for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pArray, i); STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pMetaRsp, i);
if (tEncodeSTableMetaRsp(&encoder, pMetaRsp) < 0) return -1; if (tEncodeSTableMetaRsp(&encoder, pMetaRsp) < 0) return -1;
} }
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
if (tEncodeI32(&encoder, numOfIndex) < 0) return -1;
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i);
if (tEncodeCStr(&encoder, pIndexRsp->tbName) < 0) return -1;
if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1;
if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1;
if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1;
int32_t num = taosArrayGetSize(pIndexRsp->pIndex);
if (tEncodeI32(&encoder, num) < 0) return -1;
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pIndexRsp->pIndex, i);
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
}
}
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
@ -2662,26 +2687,58 @@ int32_t tDeserializeSTableMetaRsp(void *buf, int32_t bufLen, STableMetaRsp *pRsp
return 0; return 0;
} }
int32_t tDeserializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatchRsp *pRsp) { int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
SDecoder decoder = {0}; SDecoder decoder = {0};
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); int32_t numOfMeta = 0;
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1; if (tDecodeI32(&decoder, &numOfMeta) < 0) return -1;
pRsp->pMetaRsp = taosArrayInit(numOfMeta, sizeof(STableMetaRsp));
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(STableMetaRsp)); if (pRsp->pMetaRsp == NULL) {
if (pRsp->pArray == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
for (int32_t i = 0; i < numOfBatch; ++i) { for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp tableMetaRsp = {0}; STableMetaRsp tableMetaRsp = {0};
if (tDecodeSTableMetaRsp(&decoder, &tableMetaRsp) < 0) return -1; if (tDecodeSTableMetaRsp(&decoder, &tableMetaRsp) < 0) return -1;
taosArrayPush(pRsp->pArray, &tableMetaRsp); taosArrayPush(pRsp->pMetaRsp, &tableMetaRsp);
} }
int32_t numOfIndex = 0;
if (tDecodeI32(&decoder, &numOfIndex) < 0) return -1;
pRsp->pIndexRsp = taosArrayInit(numOfIndex, sizeof(STableIndexRsp));
if (pRsp->pIndexRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp tableIndexRsp = {0};
if (tDecodeCStrTo(&decoder, tableIndexRsp.tbName) < 0) return -1;
if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1;
if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1;
if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1;
int32_t num = 0;
if (tDecodeI32(&decoder, &num) < 0) return -1;
if (num > 0) {
tableIndexRsp.pIndex = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == tableIndexRsp.pIndex) return -1;
STableIndexInfo info;
for (int32_t i = 0; i < num; ++i) {
if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1;
if (NULL == taosArrayPush(tableIndexRsp.pIndex, &info)) {
taosMemoryFree(info.expr);
return -1;
}
}
}
taosArrayPush(pRsp->pIndexRsp, &tableIndexRsp);
}
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -2690,14 +2747,32 @@ int32_t tDeserializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatc
void tFreeSTableMetaRsp(STableMetaRsp *pRsp) { taosMemoryFreeClear(pRsp->pSchemas); } void tFreeSTableMetaRsp(STableMetaRsp *pRsp) { taosMemoryFreeClear(pRsp->pSchemas); }
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp *pRsp) { void tFreeSTableIndexRsp(void *info) {
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray); if (NULL == info) {
for (int32_t i = 0; i < numOfBatch; ++i) { return;
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pArray, i); }
STableIndexRsp *pInfo = (STableIndexRsp *)info;
taosArrayDestroyEx(pInfo->pIndex, tFreeSTableIndexInfo);
}
void tFreeSSTbHbRsp(SSTbHbRsp *pRsp) {
int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
for (int32_t i = 0; i < numOfMeta; ++i) {
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pMetaRsp, i);
tFreeSTableMetaRsp(pMetaRsp); tFreeSTableMetaRsp(pMetaRsp);
} }
taosArrayDestroy(pRsp->pArray); taosArrayDestroy(pRsp->pMetaRsp);
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
for (int32_t i = 0; i < numOfIndex; ++i) {
STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i);
tFreeSTableIndexRsp(pIndexRsp);
}
taosArrayDestroy(pRsp->pIndexRsp);
} }
int32_t tSerializeSShowRsp(void *buf, int32_t bufLen, SShowRsp *pRsp) { int32_t tSerializeSShowRsp(void *buf, int32_t bufLen, SShowRsp *pRsp) {

View File

@ -124,7 +124,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
} else { } else {
dInfo("node:%s, has been created", pWrapper->name); dInfo("node:%s, has been created", pWrapper->name);
code = dmOpenNode(pWrapper); code = dmOpenNode(pWrapper);
if (code != 0) { if (code == 0) {
code = dmStartNode(pWrapper); code = dmStartNode(pWrapper);
} }
pWrapper->deployed = true; pWrapper->deployed = true;

View File

@ -26,6 +26,7 @@ int32_t mndInitSma(SMnode *pMnode);
void mndCleanupSma(SMnode *pMnode); void mndCleanupSma(SMnode *pMnode);
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName); SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName);
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma); void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma);
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -27,7 +27,7 @@ void mndCleanupStb(SMnode *pMnode);
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName); SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb); void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
SSdbRaw *mndStbActionEncode(SStbObj *pStb); SSdbRaw *mndStbActionEncode(SStbObj *pStb);
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t numOfStbs, void **ppRsp, int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen); int32_t *pRspLen);
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs); int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);

View File

@ -1366,7 +1366,7 @@ char *buildRetension(SArray *pRetension) {
} }
static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables, static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables,
bool sysDb) { bool sysDb, ESdbStatus objStatus) {
int32_t cols = 0; int32_t cols = 0;
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes; int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
@ -1379,6 +1379,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
} }
char *status = "ready"; char *status = "ready";
if (objStatus == SDB_STATUS_CREATING) status = "creating";
if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
char statusB[24] = {0}; char statusB[24] = {0};
STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status)); STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status));
@ -1503,8 +1505,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
colDataAppend(pColInfo, rows, (const char *)statusB, false); colDataAppend(pColInfo, rows, (const char *)statusB, false);
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++); // pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
// colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false); // colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
char *p = buildRetension(pDb->cfg.pRetensions); char *p = buildRetension(pDb->cfg.pRetensions);
@ -1552,25 +1554,26 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0; int32_t numOfRows = 0;
SDbObj *pDb = NULL; SDbObj *pDb = NULL;
ESdbStatus objStatus = 0;
// Append the information_schema database into the result. // Append the information_schema database into the result.
if (!pShow->sysDbRsp) { if (!pShow->sysDbRsp) {
SDbObj infoschemaDb = {0}; SDbObj infoschemaDb = {0};
setInformationSchemaDbCfg(&infoschemaDb); setInformationSchemaDbCfg(&infoschemaDb);
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, 14, true); dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, 14, true, 0);
numOfRows += 1; numOfRows += 1;
SDbObj perfschemaDb = {0}; SDbObj perfschemaDb = {0};
setPerfSchemaDbCfg(&perfschemaDb); setPerfSchemaDbCfg(&perfschemaDb);
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, 3, true); dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, 3, true, 0);
numOfRows += 1; numOfRows += 1;
pShow->sysDbRsp = true; pShow->sysDbRsp = true;
} }
while (numOfRows < rowsCapacity) { while (numOfRows < rowsCapacity) {
pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb); pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus);
if (pShow->pIter == NULL) { if (pShow->pIter == NULL) {
break; break;
} }
@ -1578,7 +1581,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
int32_t numOfTables = 0; int32_t numOfTables = 0;
sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, NULL, NULL); sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, NULL, NULL);
dumpDbInfoData(pBlock, pDb, pShow, numOfRows, numOfTables, false); dumpDbInfoData(pBlock, pDb, pShow, numOfRows, numOfTables, false, objStatus);
numOfRows++; numOfRows++;
sdbRelease(pSdb, pDb); sdbRelease(pSdb, pDb);
} }

View File

@ -659,7 +659,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
int32_t numOfRows = 0; int32_t numOfRows = 0;
int32_t cols = 0; int32_t cols = 0;
SMnodeObj *pObj = NULL; SMnodeObj *pObj = NULL;
ESdbStatus objStatus; ESdbStatus objStatus = 0;
char *pWrite; char *pWrite;
int64_t curMs = taosGetTimestampMs(); int64_t curMs = taosGetTimestampMs();

View File

@ -433,7 +433,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
case HEARTBEAT_KEY_STBINFO: { case HEARTBEAT_KEY_STBINFO: {
void *rspMsg = NULL; void *rspMsg = NULL;
int32_t rspLen = 0; int32_t rspLen = 0;
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen); mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
if (rspMsg && rspLen > 0) { if (rspMsg && rspLen > 0) {
SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg}; SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
taosArrayPush(hbRsp.info, &kv1); taosArrayPush(hbRsp.info, &kv1);

View File

@ -532,6 +532,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
SStreamObj streamObj = {0}; SStreamObj streamObj = {0};
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN); tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
streamObj.createTime = taosGetTimestampMs(); streamObj.createTime = taosGetTimestampMs();
streamObj.updateTime = streamObj.createTime; streamObj.updateTime = streamObj.createTime;
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name)); streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
@ -899,18 +900,31 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
return code; return code;
} }
static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) { int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
int32_t code = 0; int32_t code = 0;
SSmaObj *pSma = NULL; SSmaObj *pSma = NULL;
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
STableIndexInfo info; STableIndexInfo info;
SStbObj* pStb = mndAcquireStb(pMnode, tbFName);
if (NULL == pStb) {
*exist = false;
return TSDB_CODE_SUCCESS;
}
strcpy(rsp->dbFName, pStb->db);
strcpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1);
rsp->suid = pStb->uid;
rsp->version = pStb->smaVer;
mndReleaseStb(pMnode, pStb);
while (1) { while (1) {
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma); pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pSma->stb[0] != indexReq->tbFName[0] || strcmp(pSma->stb, indexReq->tbFName)) { if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
continue; continue;
} }
@ -1022,7 +1036,7 @@ static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
goto _OVER; goto _OVER;
} }
code = mndGetTableSma(pMnode, &indexReq, &rsp, &exist); code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
if (code) { if (code) {
goto _OVER; goto _OVER;
} }

View File

@ -27,6 +27,7 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#include "mndVgroup.h" #include "mndVgroup.h"
#include "mndSma.h"
#include "tname.h" #include "tname.h"
#define STB_VER_NUMBER 1 #define STB_VER_NUMBER 1
@ -1271,7 +1272,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
return 0; return 0;
} }
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) { static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, int32_t *smaVer) {
char tbFName[TSDB_TABLE_FNAME_LEN] = {0}; char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName); snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
@ -1288,6 +1289,10 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
return -1; return -1;
} }
if (smaVer) {
*smaVer = pStb->smaVer;
}
int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp); int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
mndReleaseDb(pMnode, pDb); mndReleaseDb(pMnode, pDb);
mndReleaseStb(pMnode, pStb); mndReleaseStb(pMnode, pStb);
@ -1634,7 +1639,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
} }
} else { } else {
mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName); mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) { if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp, NULL) != 0) {
goto _OVER; goto _OVER;
} }
} }
@ -1667,51 +1672,86 @@ _OVER:
return code; return code;
} }
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int32_t numOfStbs, void **ppRsp, int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
int32_t *pRspLen) { int32_t *pRspLen) {
STableMetaBatchRsp batchMetaRsp = {0}; SSTbHbRsp hbRsp = {0};
batchMetaRsp.pArray = taosArrayInit(numOfStbs, sizeof(STableMetaRsp)); hbRsp.pMetaRsp = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
if (batchMetaRsp.pArray == NULL) { if (hbRsp.pMetaRsp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
hbRsp.pIndexRsp = taosArrayInit(numOfStbs, sizeof(STableIndexRsp));
if (NULL == hbRsp.pIndexRsp) {
taosArrayDestroy(hbRsp.pMetaRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
for (int32_t i = 0; i < numOfStbs; ++i) { for (int32_t i = 0; i < numOfStbs; ++i) {
SSTableMetaVersion *pStbVersion = &pStbVersions[i]; SSTableVersion *pStbVersion = &pStbVersions[i];
pStbVersion->suid = be64toh(pStbVersion->suid); pStbVersion->suid = be64toh(pStbVersion->suid);
pStbVersion->sversion = ntohs(pStbVersion->sversion); pStbVersion->sversion = ntohs(pStbVersion->sversion);
pStbVersion->tversion = ntohs(pStbVersion->tversion); pStbVersion->tversion = ntohs(pStbVersion->tversion);
pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
STableMetaRsp metaRsp = {0}; STableMetaRsp metaRsp = {0};
int32_t smaVer = 0;
mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName); mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) { if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
metaRsp.numOfColumns = -1; metaRsp.numOfColumns = -1;
metaRsp.suid = pStbVersion->suid; metaRsp.suid = pStbVersion->suid;
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
continue;
} }
if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) { if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) {
taosArrayPush(batchMetaRsp.pArray, &metaRsp); taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
} else { } else {
tFreeSTableMetaRsp(&metaRsp); tFreeSTableMetaRsp(&metaRsp);
} }
if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) {
bool exist = false;
char tbFName[TSDB_TABLE_FNAME_LEN];
STableIndexRsp indexRsp = {0};
indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
if (NULL == indexRsp.pIndex) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
} }
int32_t rspLen = tSerializeSTableMetaBatchRsp(NULL, 0, &batchMetaRsp); sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
if (code || !exist) {
indexRsp.suid = pStbVersion->suid;
indexRsp.version = -1;
indexRsp.pIndex = NULL;
}
strcpy(indexRsp.dbFName, pStbVersion->dbFName);
strcpy(indexRsp.tbName, pStbVersion->stbName);
taosArrayPush(hbRsp.pIndexRsp, &indexRsp);
}
}
int32_t rspLen = tSerializeSSTbHbRsp(NULL, 0, &hbRsp);
if (rspLen < 0) { if (rspLen < 0) {
tFreeSTableMetaBatchRsp(&batchMetaRsp); tFreeSSTbHbRsp(&hbRsp);
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
return -1; return -1;
} }
void *pRsp = taosMemoryMalloc(rspLen); void *pRsp = taosMemoryMalloc(rspLen);
if (pRsp == NULL) { if (pRsp == NULL) {
tFreeSTableMetaBatchRsp(&batchMetaRsp); tFreeSSTbHbRsp(&hbRsp);
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
} }
tSerializeSTableMetaBatchRsp(pRsp, rspLen, &batchMetaRsp); tSerializeSSTbHbRsp(pRsp, rspLen, &hbRsp);
tFreeSTableMetaBatchRsp(&batchMetaRsp); tFreeSSTbHbRsp(&hbRsp);
*ppRsp = pRsp; *ppRsp = pRsp;
*pRspLen = rspLen; *pRspLen = rspLen;
return 0; return 0;

View File

@ -663,12 +663,23 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
void * entryKey = NULL, *entryVal = NULL; void * entryKey = NULL, *entryVal = NULL;
int32_t nEntryKey, nEntryVal; int32_t nEntryKey, nEntryVal;
bool first = true;
while (1) { while (1) {
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal); valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
if (valid < 0) { if (valid < 0) {
break; break;
} }
STagIdxKey *p = entryKey; STagIdxKey *p = entryKey;
if (p->type != pCursor->type) {
if (first) {
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
if (valid < 0) break;
continue;
} else {
break;
}
}
first = false;
if (p != NULL) { if (p != NULL) {
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type); int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
if (cmp == 0) { if (cmp == 0) {

View File

@ -57,6 +57,8 @@ enum {
CTG_OP_DROP_TB_META, CTG_OP_DROP_TB_META,
CTG_OP_UPDATE_USER, CTG_OP_UPDATE_USER,
CTG_OP_UPDATE_VG_EPSET, CTG_OP_UPDATE_VG_EPSET,
CTG_OP_UPDATE_TB_INDEX,
CTG_OP_DROP_TB_INDEX,
CTG_OP_MAX CTG_OP_MAX
}; };
@ -128,25 +130,33 @@ typedef struct SCtgUserCtx {
SUserAuthInfo user; SUserAuthInfo user;
} SCtgUserCtx; } SCtgUserCtx;
typedef struct SCtgTbMetaCache { typedef STableIndexRsp STableIndex;
SRWLatch stbLock;
SRWLatch metaLock; // RC between cache destroy and all other operations typedef struct SCtgTbCache {
SHashObj *metaCache; //key:tbname, value:STableMeta SRWLatch metaLock;
SHashObj *stbCache; //key:suid, value:STableMeta* STableMeta *pMeta;
} SCtgTbMetaCache; SRWLatch indexLock;
STableIndex *pIndex;
} SCtgTbCache;
typedef struct SCtgVgCache {
SRWLatch vgLock;
SDBVgInfo *vgInfo;
} SCtgVgCache;
typedef struct SCtgDBCache { typedef struct SCtgDBCache {
SRWLatch vgLock; SRWLatch dbLock; // RC between destroy tbCache/stbCache and all reads
uint64_t dbId; uint64_t dbId;
int8_t deleted; int8_t deleted;
SDBVgInfo *vgInfo; SCtgVgCache vgCache;
SCtgTbMetaCache tbCache; SHashObj *tbCache; // key:tbname, value:SCtgTbCache
SHashObj *stbCache; // key:suid, value:STableMeta*
} SCtgDBCache; } SCtgDBCache;
typedef struct SCtgRentSlot { typedef struct SCtgRentSlot {
SRWLatch lock; SRWLatch lock;
bool needSort; bool needSort;
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion SArray *meta; // element is SDbVgVersion or SSTableVersion
} SCtgRentSlot; } SCtgRentSlot;
typedef struct SCtgRentMgmt { typedef struct SCtgRentMgmt {
@ -245,8 +255,10 @@ typedef struct SCtgCacheStat {
uint64_t userNum; uint64_t userNum;
uint64_t vgHitNum; uint64_t vgHitNum;
uint64_t vgMissNum; uint64_t vgMissNum;
uint64_t tblHitNum; uint64_t tbMetaHitNum;
uint64_t tblMissNum; uint64_t tbMetaMissNum;
uint64_t tbIndexHitNum;
uint64_t tbIndexMissNum;
uint64_t userHitNum; uint64_t userHitNum;
uint64_t userMissNum; uint64_t userMissNum;
} SCtgCacheStat; } SCtgCacheStat;
@ -268,10 +280,10 @@ typedef struct SCtgUpdateVgMsg {
SDBVgInfo* dbInfo; SDBVgInfo* dbInfo;
} SCtgUpdateVgMsg; } SCtgUpdateVgMsg;
typedef struct SCtgUpdateTblMsg { typedef struct SCtgUpdateTbMetaMsg {
SCatalog* pCtg; SCatalog* pCtg;
STableMetaOutput* output; STableMetaOutput* pMeta;
} SCtgUpdateTblMsg; } SCtgUpdateTbMetaMsg;
typedef struct SCtgDropDBMsg { typedef struct SCtgDropDBMsg {
SCatalog* pCtg; SCatalog* pCtg;
@ -305,6 +317,17 @@ typedef struct SCtgUpdateUserMsg {
SGetUserAuthRsp userAuth; SGetUserAuthRsp userAuth;
} SCtgUpdateUserMsg; } SCtgUpdateUserMsg;
typedef struct SCtgUpdateTbIndexMsg {
SCatalog* pCtg;
STableIndex* pIndex;
} SCtgUpdateTbIndexMsg;
typedef struct SCtgDropTbIndexMsg {
SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
} SCtgDropTbIndexMsg;
typedef struct SCtgUpdateEpsetMsg { typedef struct SCtgUpdateEpsetMsg {
SCatalog* pCtg; SCatalog* pCtg;
char dbFName[TSDB_DB_FNAME_LEN]; char dbFName[TSDB_DB_FNAME_LEN];
@ -465,12 +488,11 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation); int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache); int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache); void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
void ctgReleaseVgInfo(SCtgDBCache *dbCache); void ctgRUnlockVgInfo(SCtgDBCache *dbCache);
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist); int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta); int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName); int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass); int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId); int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq); int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq);
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq); int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
@ -479,12 +501,18 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq); int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq); int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet); int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp);
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type); int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size); int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size); int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq); int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
int32_t ctgStartUpdateThread(); int32_t ctgStartUpdateThread();
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask); int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask);
void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache *dbCache);
int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes);
int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation);
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation);
@ -493,7 +521,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask); int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask); int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask); int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName* name, SArray** out, SCtgTask* pTask); int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask); int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask); int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask); int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
@ -521,6 +549,8 @@ void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target); int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
char *ctgTaskTypeStr(CTG_TASK_TYPE type); char *ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask); int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
void ctgFreeSTableIndex(void *info);
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;

View File

@ -96,8 +96,7 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
if (NULL != dbCache) { if (NULL != dbCache) {
input.dbId = dbCache->dbId; input.dbId = dbCache->dbId;
ctgReleaseVgInfo(dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
} }
input.vgVersion = CTG_DEFAULT_INVALID_VERSION; input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
@ -350,7 +349,7 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo *pConn, const char* user, co
*pass = false; *pass = false;
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, user, dbFName, type, &inCache, pass)); CTG_ERR_RET(ctgChkAuthFromCache(pCtg, (char*)user, (char*)dbFName, type, &inCache, pass));
if (inCache) { if (inCache) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -382,6 +381,45 @@ _return:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbIndex(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pRes) {
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pTableName, pRes));
if (*pRes) {
return TSDB_CODE_SUCCESS;
}
STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pIndex) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
int32_t code = ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pIndex, NULL);
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
code = 0;
goto _return;
}
CTG_ERR_JRET(code);
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(pIndex->pIndex, &pInfo));
*pRes = pInfo;
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, &pIndex, false));
return TSDB_CODE_SUCCESS;
_return:
tFreeSTableIndexRsp(pIndex);
taosMemoryFree(pIndex);
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
*pRes = NULL;
CTG_RET(code);
}
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pVgList) { int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pVgList) {
STableMeta *tbMeta = NULL; STableMeta *tbMeta = NULL;
int32_t code = 0; int32_t code = 0;
@ -404,7 +442,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo)); CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
if (dbCache) { if (dbCache) {
vgHash = dbCache->vgInfo->vgHash; vgHash = dbCache->vgCache.vgInfo->vgHash;
} else { } else {
vgHash = vgInfo->vgHash; vgHash = vgInfo->vgHash;
} }
@ -442,7 +480,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
_return: _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfo(dbCache); ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
} }
@ -631,12 +669,11 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
CTG_API_LEAVE(TSDB_CODE_SUCCESS); CTG_API_LEAVE(TSDB_CODE_SUCCESS);
} }
*version = dbCache->vgInfo->vgVersion; *version = dbCache->vgCache.vgInfo->vgVersion;
*dbId = dbCache->dbId; *dbId = dbCache->dbId;
*tableNum = dbCache->vgInfo->numOfTable; *tableNum = dbCache->vgCache.vgInfo->numOfTable;
ctgReleaseVgInfo(dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version); ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
@ -661,7 +698,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
SDBVgInfo *vgInfo = NULL; SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo)); CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo));
if (dbCache) { if (dbCache) {
vgHash = dbCache->vgInfo->vgHash; vgHash = dbCache->vgCache.vgInfo->vgHash;
} else { } else {
vgHash = vgInfo->vgHash; vgHash = vgInfo->vgHash;
} }
@ -674,7 +711,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
_return: _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfo(dbCache); ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
} }
@ -741,6 +778,30 @@ _return:
CTG_API_LEAVE(code); CTG_API_LEAVE(code);
} }
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp *pRsp) {
CTG_API_ENTER();
int32_t code = 0;
if (NULL == pCtg || NULL == pRsp) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
}
STableIndex* pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pIndex) {
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
}
memcpy(pIndex, pRsp, sizeof(STableIndex));
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, &pIndex, false));
_return:
CTG_API_LEAVE(code);
}
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) { int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
CTG_API_ENTER(); CTG_API_ENTER();
@ -932,12 +993,12 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const
SDBVgInfo *vgInfo = NULL; SDBVgInfo *vgInfo = NULL;
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo)); CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
_return: _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfo(dbCache); ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
} }
@ -1060,14 +1121,14 @@ _return:
CTG_API_LEAVE(TSDB_CODE_SUCCESS); CTG_API_LEAVE(TSDB_CODE_SUCCESS);
} }
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion** stables, uint32_t* num) { int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion **stables, uint32_t *num) {
CTG_API_ENTER(); CTG_API_ENTER();
if (NULL == pCtg || NULL == stables || NULL == num) { if (NULL == pCtg || NULL == stables || NULL == num) {
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableMetaVersion))); CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableVersion)));
} }
int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion** dbs, uint32_t* num) { int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion** dbs, uint32_t* num) {
@ -1138,7 +1199,12 @@ int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo *pConn, const SNam
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT); CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
} }
CTG_API_LEAVE(ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pRes, NULL)); int32_t code = 0;
CTG_ERR_JRET(ctgGetTbIndex(pCtg, pConn, (SName*)pTableName, pRes));
_return:
CTG_API_LEAVE(code);
} }
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* funcName, SFuncInfo* pInfo) { int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* funcName, SFuncInfo* pInfo) {

View File

@ -344,6 +344,11 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* p
} }
} }
for (int32_t i = 0; i < pJob->tbIndexNum; ++i) {
SName* name = taosArrayGet(pReq->pTableIndex, i);
ctgDropTbIndexEnqueue(pCtg, name, true);
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -680,15 +685,14 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
ctx->vgId = vgInfo.vgId; ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask)); CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
ctgReleaseVgInfo(dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
} else { } else {
SBuildUseDBInput input = {0}; SBuildUseDBInput input = {0};
@ -786,7 +790,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
_return: _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfo(dbCache); ctgRUnlockVgInfo(dbCache);
ctgReleaseDBCache(pCtg, dbCache); ctgReleaseDBCache(pCtg, dbCache);
} }
@ -866,9 +870,15 @@ _return:
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) { int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
int32_t code = 0; int32_t code = 0;
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target)); CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
TSWAP(pTask->res, pTask->msgCtx.out); STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
SArray* pInfo = NULL;
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
pTask->res = pInfo;
SCtgTbIndexCtx* ctx = pTask->taskCtx;
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
_return: _return:
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) { if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
@ -1008,7 +1018,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
if (dbCache) { if (dbCache) {
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo)); CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag); ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
@ -1026,8 +1036,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
_return: _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfo(dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
} }
CTG_RET(code); CTG_RET(code);
@ -1057,7 +1066,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgInfo->vgHash, (SArray**)&pTask->res)); CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0)); CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
} else { } else {
@ -1072,8 +1081,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
_return: _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfo(dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
} }
CTG_RET(code); CTG_RET(code);
@ -1092,7 +1100,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
if (NULL == pTask->res) { if (NULL == pTask->res) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res)); CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0)); CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
} else { } else {
@ -1107,8 +1115,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
_return: _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfo(dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
} }
CTG_RET(code); CTG_RET(code);
@ -1119,6 +1126,15 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
SCatalog* pCtg = pTask->pJob->pCtg; SCatalog* pCtg = pTask->pJob->pCtg;
SRequestConnInfo* pConn = &pTask->pJob->conn; SRequestConnInfo* pConn = &pTask->pJob->conn;
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx; SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
SArray* pRes = NULL;
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
if (pRes) {
pTask->res = pRes;
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
return TSDB_CODE_SUCCESS;
}
CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask)); CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -1157,9 +1173,9 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
SDbInfo* pInfo = (SDbInfo*)pTask->res; SDbInfo* pInfo = (SDbInfo*)pTask->res;
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache)); CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
if (NULL != dbCache) { if (NULL != dbCache) {
pInfo->vgVer = dbCache->vgInfo->vgVersion; pInfo->vgVer = dbCache->vgCache.vgInfo->vgVersion;
pInfo->dbId = dbCache->dbId; pInfo->dbId = dbCache->dbId;
pInfo->tbNum = dbCache->vgInfo->numOfTable; pInfo->tbNum = dbCache->vgCache.vgInfo->numOfTable;
} else { } else {
pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION; pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION;
} }
@ -1169,8 +1185,7 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
_return: _return:
if (dbCache) { if (dbCache) {
ctgReleaseVgInfo(dbCache); ctgReleaseVgInfoToCache(pCtg, dbCache);
ctgReleaseDBCache(pCtg, dbCache);
} }
CTG_RET(code); CTG_RET(code);

File diff suppressed because it is too large Load Diff

View File

@ -19,7 +19,7 @@
#include "catalogInt.h" #include "catalogInt.h"
extern SCatalogMgmt gCtgMgmt; extern SCatalogMgmt gCtgMgmt;
SCtgDebug gCTGDebug = {.apiEnable = true}; SCtgDebug gCTGDebug = {0};
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) { void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
ASSERT(*(int32_t*)param == 1); ASSERT(*(int32_t*)param == 1);
@ -266,11 +266,11 @@ int32_t ctgdGetStatNum(char *option, void *res) {
} }
int32_t ctgdGetTbMetaNum(SCtgDBCache *dbCache) { int32_t ctgdGetTbMetaNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0; return dbCache->tbCache ? (int32_t)taosHashGetSize(dbCache->tbCache) : 0;
} }
int32_t ctgdGetStbNum(SCtgDBCache *dbCache) { int32_t ctgdGetStbNum(SCtgDBCache *dbCache) {
return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0; return dbCache->stbCache ? (int32_t)taosHashGetSize(dbCache->stbCache) : 0;
} }
int32_t ctgdGetRentNum(SCtgRentMgmt *rent) { int32_t ctgdGetRentNum(SCtgRentMgmt *rent) {
@ -363,17 +363,17 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
dbFName = taosHashGetKey(pIter, &len); dbFName = taosHashGetKey(pIter, &len);
int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0; int32_t metaNum = dbCache->tbCache ? taosHashGetSize(dbCache->tbCache) : 0;
int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0; int32_t stbNum = dbCache->stbCache ? taosHashGetSize(dbCache->stbCache) : 0;
int32_t vgVersion = CTG_DEFAULT_INVALID_VERSION; int32_t vgVersion = CTG_DEFAULT_INVALID_VERSION;
int32_t hashMethod = -1; int32_t hashMethod = -1;
int32_t vgNum = 0; int32_t vgNum = 0;
if (dbCache->vgInfo) { if (dbCache->vgCache.vgInfo) {
vgVersion = dbCache->vgInfo->vgVersion; vgVersion = dbCache->vgCache.vgInfo->vgVersion;
hashMethod = dbCache->vgInfo->hashMethod; hashMethod = dbCache->vgCache.vgInfo->hashMethod;
if (dbCache->vgInfo->vgHash) { if (dbCache->vgCache.vgInfo->vgHash) {
vgNum = taosHashGetSize(dbCache->vgInfo->vgHash); vgNum = taosHashGetSize(dbCache->vgCache.vgInfo->vgHash);
} }
} }

View File

@ -431,7 +431,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, SArray** out, SCtgTask* pTask) { int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) {
char *msg = NULL; char *msg = NULL;
int32_t msgLen = 0; int32_t msgLen = 0;
int32_t reqType = TDMT_MND_GET_TABLE_INDEX; int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
@ -448,10 +448,11 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
} }
if (pTask) { if (pTask) {
void* pOut = taosMemoryCalloc(1, POINTER_BYTES); void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
if (NULL == pOut) { if (NULL == pOut) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
} }
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName)); CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen)); CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));

View File

@ -44,6 +44,16 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
} }
} }
void ctgFreeSTableIndex(void *info) {
if (NULL == info) {
return;
}
STableIndex *pInfo = (STableIndex *)info;
taosArrayDestroyEx(pInfo->pIndex, tFreeSTableIndexInfo);
}
void ctgFreeSMetaData(SMetaData* pData) { void ctgFreeSMetaData(SMetaData* pData) {
taosArrayDestroy(pData->pTableMeta); taosArrayDestroy(pData->pTableMeta);
pData->pTableMeta = NULL; pData->pTableMeta = NULL;
@ -110,25 +120,39 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
taosMemoryFreeClear(mgmt->slots); taosMemoryFreeClear(mgmt->slots);
} }
void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
if (NULL == dbCache->stbCache) {
return;
}
void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) { int32_t stblNum = taosHashGetSize(dbCache->stbCache);
CTG_LOCK(CTG_WRITE, &cache->stbLock); taosHashCleanup(dbCache->stbCache);
if (cache->stbCache) { dbCache->stbCache = NULL;
int32_t stblNum = taosHashGetSize(cache->stbCache);
taosHashCleanup(cache->stbCache);
cache->stbCache = NULL;
CTG_CACHE_STAT_DEC(stblNum, stblNum); CTG_CACHE_STAT_DEC(stblNum, stblNum);
} }
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
CTG_LOCK(CTG_WRITE, &cache->metaLock); void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
if (cache->metaCache) { taosMemoryFreeClear(pCache->pMeta);
int32_t tblNum = taosHashGetSize(cache->metaCache); if (pCache->pIndex) {
taosHashCleanup(cache->metaCache); taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
cache->metaCache = NULL; taosMemoryFreeClear(pCache->pIndex);
CTG_CACHE_STAT_DEC(tblNum, tblNum);
} }
CTG_UNLOCK(CTG_WRITE, &cache->metaLock); }
void ctgFreeTbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache->tbCache) {
return;
}
int32_t tblNum = taosHashGetSize(dbCache->tbCache);
SCtgTbCache *pCache = taosHashIterate(dbCache->tbCache, NULL);
while (NULL != pCache) {
ctgFreeTbCacheImpl(pCache);
pCache = taosHashIterate(dbCache->tbCache, pCache);
}
taosHashCleanup(dbCache->tbCache);
dbCache->tbCache = NULL;
CTG_CACHE_STAT_DEC(tblNum, tblNum);
} }
void ctgFreeVgInfo(SDBVgInfo *vgInfo) { void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
@ -144,16 +168,18 @@ void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
taosMemoryFreeClear(vgInfo); taosMemoryFreeClear(vgInfo);
} }
void ctgFreeVgInfoCache(SCtgDBCache *dbCache) {
ctgFreeVgInfo(dbCache->vgCache.vgInfo);
}
void ctgFreeDbCache(SCtgDBCache *dbCache) { void ctgFreeDbCache(SCtgDBCache *dbCache) {
if (NULL == dbCache) { if (NULL == dbCache) {
return; return;
} }
CTG_LOCK(CTG_WRITE, &dbCache->vgLock); ctgFreeVgInfoCache(dbCache);
ctgFreeVgInfo (dbCache->vgInfo); ctgFreeStbMetaCache(dbCache);
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock); ctgFreeTbCache(dbCache);
ctgFreeTbMetaCache(&dbCache->tbCache);
} }
@ -167,16 +193,13 @@ void ctgFreeHandle(SCatalog* pCtg) {
void *pIter = taosHashIterate(pCtg->dbCache, NULL); void *pIter = taosHashIterate(pCtg->dbCache, NULL);
while (pIter) { while (pIter) {
SCtgDBCache *dbCache = pIter; SCtgDBCache *dbCache = pIter;
atomic_store_8(&dbCache->deleted, 1); atomic_store_8(&dbCache->deleted, 1);
ctgFreeDbCache(dbCache); ctgFreeDbCache(dbCache);
pIter = taosHashIterate(pCtg->dbCache, pIter); pIter = taosHashIterate(pCtg->dbCache, pIter);
} }
taosHashCleanup(pCtg->dbCache); taosHashCleanup(pCtg->dbCache);
CTG_CACHE_STAT_DEC(dbNum, dbNum); CTG_CACHE_STAT_DEC(dbNum, dbNum);
} }
@ -186,14 +209,12 @@ void ctgFreeHandle(SCatalog* pCtg) {
void *pIter = taosHashIterate(pCtg->userCache, NULL); void *pIter = taosHashIterate(pCtg->userCache, NULL);
while (pIter) { while (pIter) {
SCtgUserAuth *userCache = pIter; SCtgUserAuth *userCache = pIter;
ctgFreeSCtgUserAuth(userCache); ctgFreeSCtgUserAuth(userCache);
pIter = taosHashIterate(pCtg->userCache, pIter); pIter = taosHashIterate(pCtg->userCache, pIter);
} }
taosHashCleanup(pCtg->userCache); taosHashCleanup(pCtg->userCache);
CTG_CACHE_STAT_DEC(userNum, userNum); CTG_CACHE_STAT_DEC(userNum, userNum);
} }
@ -252,9 +273,9 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
break; break;
} }
case TDMT_MND_GET_TABLE_INDEX: { case TDMT_MND_GET_TABLE_INDEX: {
SArray** pOut = (SArray**)pCtx->out; STableIndex* pOut = (STableIndex*)pCtx->out;
if (pOut) { if (pOut) {
taosArrayDestroyEx(*pOut, tFreeSTableIndexInfo); taosArrayDestroyEx(pOut->pIndex, tFreeSTableIndexInfo);
taosMemoryFreeClear(pCtx->out); taosMemoryFreeClear(pCtx->out);
} }
break; break;
@ -535,9 +556,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
} }
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) { int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) { if (*(uint64_t *)key1 < ((SSTableVersion*)key2)->suid) {
return -1; return -1;
} else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) { } else if (*(uint64_t *)key1 > ((SSTableVersion*)key2)->suid) {
return 1; return 1;
} else { } else {
return 0; return 0;
@ -555,9 +576,9 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
} }
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) { int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) { if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) {
return -1; return -1;
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) { } else if (((SSTableVersion*)key1)->suid > ((SSTableVersion*)key2)->suid) {
return 1; return 1;
} else { } else {
return 0; return 0;
@ -640,6 +661,27 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
if (NULL == pIndex) {
*pRes = NULL;
return TSDB_CODE_SUCCESS;
}
int32_t num = taosArrayGetSize(pIndex);
*pRes = taosArrayInit(num, sizeof(STableIndexInfo));
if (NULL == *pRes) {
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
for (int32_t i = 0; i < num; ++i) {
STableIndexInfo *pInfo = taosArrayGet(pIndex, i);
taosArrayPush(*pRes, pInfo);
}
return TSDB_CODE_SUCCESS;
}
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) { int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) {
if (msgType == TDMT_VND_TABLE_META) { if (msgType == TDMT_VND_TABLE_META) {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx; SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;

View File

@ -895,9 +895,9 @@ void *ctgTestSetCtableMetaThread(void *param) {
output = (STableMetaOutput *)taosMemoryMalloc(sizeof(STableMetaOutput)); output = (STableMetaOutput *)taosMemoryMalloc(sizeof(STableMetaOutput));
ctgTestBuildCTableMetaOutput(output); ctgTestBuildCTableMetaOutput(output);
SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTblMsg)); SCtgUpdateTbMetaMsg *msg = (SCtgUpdateTbMetaMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
msg->pCtg = pCtg; msg->pCtg = pCtg;
msg->output = output; msg->pMeta = output;
operation.data = msg; operation.data = msg;
code = ctgOpUpdateTbMeta(&operation); code = ctgOpUpdateTbMeta(&operation);
@ -989,7 +989,7 @@ TEST(tableMeta, normalTable) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL; SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL; SSTableVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0; uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0; int32_t i = 0;
while (i < 5) { while (i < 5) {
@ -1098,7 +1098,7 @@ TEST(tableMeta, childTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL; SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL; SSTableVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0; uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0; int32_t i = 0;
while (i < 5) { while (i < 5) {
@ -1220,7 +1220,7 @@ TEST(tableMeta, superTableCase) {
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
SDbVgVersion *dbs = NULL; SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stb = NULL; SSTableVersion *stb = NULL;
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0; uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
int32_t i = 0; int32_t i = 0;
while (i < 5) { while (i < 5) {
@ -2299,7 +2299,7 @@ TEST(rentTest, allRent) {
SArray *vgList = NULL; SArray *vgList = NULL;
ctgTestStop = false; ctgTestStop = false;
SDbVgVersion *dbs = NULL; SDbVgVersion *dbs = NULL;
SSTableMetaVersion *stable = NULL; SSTableVersion *stable = NULL;
uint32_t num = 0; uint32_t num = 0;
ctgTestInitLogFile(); ctgTestInitLogFile();

View File

@ -97,7 +97,14 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
} }
return 2; return 2;
} }
static int32_t sifValidateColumn(SColumnNode *cn) { static int32_t sifValidOp(EOperatorType ty) {
if ((ty >= OP_TYPE_ADD && ty <= OP_TYPE_BIT_OR) || (ty == OP_TYPE_IN || ty == OP_TYPE_NOT_IN) ||
(ty == OP_TYPE_LIKE || ty == OP_TYPE_NOT_LIKE || ty == OP_TYPE_MATCH || ty == OP_TYPE_NMATCH)) {
return -1;
}
return 0;
}
static int32_t sifValidColumn(SColumnNode *cn) {
// add more check // add more check
if (cn == NULL) { if (cn == NULL) {
return TSDB_CODE_QRY_INVALID_INPUT; return TSDB_CODE_QRY_INVALID_INPUT;
@ -176,6 +183,7 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
memcpy(param->colName, r->literal, strlen(r->literal)); memcpy(param->colName, r->literal, strlen(r->literal));
// sprintf(param->colName, "%s_%s", l->colName, r->literal); // sprintf(param->colName, "%s_%s", l->colName, r->literal);
param->colValType = r->typeData; param->colValType = r->typeData;
param->status = SFLT_COARSE_INDEX;
return 0; return 0;
// memcpy(param->colName, l->colName, sizeof(l->colName)); // memcpy(param->colName, l->colName, sizeof(l->colName));
} }
@ -197,7 +205,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
case QUERY_NODE_COLUMN: { case QUERY_NODE_COLUMN: {
SColumnNode *cn = (SColumnNode *)node; SColumnNode *cn = (SColumnNode *)node;
/*only support tag column*/ /*only support tag column*/
SIF_ERR_RET(sifValidateColumn(cn)); SIF_ERR_RET(sifValidColumn(cn));
param->colId = cn->colId; param->colId = cn->colId;
param->colValType = cn->node.resType.type; param->colValType = cn->node.resType.type;
@ -247,11 +255,13 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx
return code; return code;
} }
SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam)); SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam));
if (NULL == paramList) { if (NULL == paramList) {
SIF_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SIF_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
if (nodeType(node->pLeft) == QUERY_NODE_OPERATOR) { if (nodeType(node->pLeft) == QUERY_NODE_OPERATOR &&
(((SOperatorNode *)(node->pLeft))->opType == OP_TYPE_JSON_GET_VALUE)) {
SNode *interNode = (node->pLeft); SNode *interNode = (node->pLeft);
SIF_ERR_JRET(sifInitJsonParam(interNode, &paramList[0], ctx)); SIF_ERR_JRET(sifInitJsonParam(interNode, &paramList[0], ctx));
if (nParam > 1) { if (nParam > 1) {
@ -505,6 +515,11 @@ static int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *sta
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) { static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
int32_t code = 0; int32_t code = 0;
if (sifValidOp(node->opType) < 0) {
output->status = SFLT_NOT_INDEX;
return code;
}
int32_t nParam = sifGetOperParamNum(node->opType); int32_t nParam = sifGetOperParamNum(node->opType);
if (nParam <= 1) { if (nParam <= 1) {
output->status = SFLT_NOT_INDEX; output->status = SFLT_NOT_INDEX;
@ -513,9 +528,15 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
if (node->opType == OP_TYPE_JSON_GET_VALUE) { if (node->opType == OP_TYPE_JSON_GET_VALUE) {
return code; return code;
} }
SIFParam *params = NULL;
SIFParam *params = NULL;
SIF_ERR_RET(sifInitOperParams(&params, node, ctx)); SIF_ERR_RET(sifInitOperParams(&params, node, ctx));
if (params[0].status == SFLT_NOT_INDEX || (nParam > 1 && params[1].status == SFLT_NOT_INDEX)) {
output->status = SFLT_NOT_INDEX;
return code;
}
// ugly code, refactor later // ugly code, refactor later
output->arg = ctx->arg; output->arg = ctx->arg;
sif_func_t operFn = sifNullFunc; sif_func_t operFn = sifNullFunc;

View File

@ -484,14 +484,12 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
return TSDB_CODE_TSC_INVALID_INPUT; return TSDB_CODE_TSC_INVALID_INPUT;
} }
STableIndexRsp out = {0}; STableIndexRsp *out = (STableIndexRsp*)output;
if (tDeserializeSTableIndexRsp(msg, msgSize, &out) != 0) { if (tDeserializeSTableIndexRsp(msg, msgSize, out) != 0) {
qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize); qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
return TSDB_CODE_INVALID_MSG; return TSDB_CODE_INVALID_MSG;
} }
*(void **)output = out.pIndex;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }

View File

@ -837,7 +837,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
SQWorker *mgmt = qwAcquire(refId); SQWorker *mgmt = qwAcquire(refId);
if (NULL == mgmt) { if (NULL == mgmt) {
QW_DLOG("qwAcquire %" PRIx64 "failed", refId); QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
taosMemoryFree(param);
return; return;
} }

View File

@ -1476,6 +1476,11 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
for (uint32_t i = 0; i < info->fields[FLD_TYPE_VALUE].num; ++i) { for (uint32_t i = 0; i < info->fields[FLD_TYPE_VALUE].num; ++i) {
SFilterField *field = &info->fields[FLD_TYPE_VALUE].fields[i]; SFilterField *field = &info->fields[FLD_TYPE_VALUE].fields[i];
if (field->desc) { if (field->desc) {
if (QUERY_NODE_VALUE != nodeType(field->desc)) {
qDebug("VAL%d => [type:not value node][val:NIL]", i); //TODO
continue;
}
SValueNode *var = (SValueNode *)field->desc; SValueNode *var = (SValueNode *)field->desc;
SDataType *dType = &var->node.resType; SDataType *dType = &var->node.resType;
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) { if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) {

View File

@ -45,13 +45,13 @@ int32_t BUILDIN_CLZL(uint64_t val) {
#else #else
_MyBitScanReverse64(&r, val); _MyBitScanReverse64(&r, val);
#endif #endif
return (int)(r >> 3); return (int)(63 - r);
} }
int32_t BUILDIN_CLZ(uint32_t val) { int32_t BUILDIN_CLZ(uint32_t val) {
unsigned long r = 0; unsigned long r = 0;
_BitScanReverse(&r, val); _BitScanReverse(&r, val);
return (int)(r >> 3); return (int)(31 - r);
} }
int32_t BUILDIN_CTZL(uint64_t val) { int32_t BUILDIN_CTZL(uint64_t val) {
@ -61,13 +61,13 @@ int32_t BUILDIN_CTZL(uint64_t val) {
#else #else
_MyBitScanForward64(&r, val); _MyBitScanForward64(&r, val);
#endif #endif
return (int)(r >> 3); return (int)(r);
} }
int32_t BUILDIN_CTZ(uint32_t val) { int32_t BUILDIN_CTZ(uint32_t val) {
unsigned long r = 0; unsigned long r = 0;
_BitScanForward(&r, val); _BitScanForward(&r, val);
return (int)(r >> 3); return (int)(r);
} }
#endif #endif

View File

@ -458,6 +458,9 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
} }
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) { SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) {
if (NULL == pSrc) {
return NULL;
}
ASSERT(pSrc->elemSize == sizeof(void*)); ASSERT(pSrc->elemSize == sizeof(void*));
SArray* pArray = taosArrayInit(pSrc->size, sizeof(void*)); SArray* pArray = taosArrayInit(pSrc->size, sizeof(void*));
for (int32_t i = 0; i < pSrc->size; i++) { for (int32_t i = 0; i < pSrc->size; i++) {

View File

@ -58,7 +58,7 @@
# ---- mnode # ---- mnode
./test.sh -f tsim/mnode/basic1.sim ./test.sh -f tsim/mnode/basic1.sim
./test.sh -f tsim/mnode/basic2.sim ./test.sh -f tsim/mnode/basic2.sim
#./test.sh -f tsim/mnode/basic3.sim ./test.sh -f tsim/mnode/basic3.sim
./test.sh -f tsim/mnode/basic4.sim ./test.sh -f tsim/mnode/basic4.sim
./test.sh -f tsim/mnode/basic5.sim ./test.sh -f tsim/mnode/basic5.sim

View File

@ -73,5 +73,80 @@ if $rows != 6 then
endi endi
print ========== prepare stbBin and ctbBin
sql create table db.stbBin (ts timestamp, c1 int, c2 binary(4)) tags(t1 binary(16))
sql create table db.ctbBin using db.stbBin tags("a")
sql insert into db.ctbBin values(now, 1, "2")
sql create table db.ctbBin1 using db.stbBin tags("b")
sql insert into db.ctbBin1 values(now, 2, "2")
sql create table db.ctbBin2 using db.stbBin tags("c")
sql insert into db.ctbBin2 values(now, 3, "2")
sql create table db.ctbBin3 using db.stbBin tags("d")
sql insert into db.ctbBin3 values(now, 4, "2")
sql select * from db.stbBin where t1 = "a"
if $rows != 1 then
return -1
endi
sql select * from db.stbBin where t1 < "a"
if $rows != 0 then
return -=1
endi
sql select * from db.stbBin where t1 < "b"
if $rows != 1 then
return -1
endi
sql select * from db.stbBin where t1 between "a" and "e"
if $rows != 4 then
return -1
endi
print ========== prepare stbNc and ctbNc
sql create table db.stbNc (ts timestamp, c1 int, c2 binary(4)) tags(t1 nchar(16))
sql create table db.ctbNc using db.stbNc tags("a")
sql insert into db.ctbNc values(now, 1, "2")
sql create table db.ctbNc1 using db.stbNc tags("b")
sql insert into db.ctbNc1 values(now, 2, "2")
sql create table db.ctbNc2 using db.stbNc tags("c")
sql insert into db.ctbNc2 values(now, 3, "2")
sql create table db.ctbNc3 using db.stbNc tags("d")
sql insert into db.ctbNc3 values(now, 4, "2")
sql select * from db.stbNc where t1 = "a"
if $rows != 1 then
return -1
endi
sql select * from db.stbNc where t1 < "a"
if $rows != 0 then
return -=1
endi
sql select * from db.stbNc where t1 < "b"
if $rows != 1 then
return -1
endi
sql select * from db.stbNc where t1 between "a" and "e"
if $rows != 4 then
return -1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode1 -s stop -x SIGINT