Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact
This commit is contained in:
commit
72e525082f
|
@ -1134,14 +1134,16 @@ void tFreeSMAlterStbRsp(SMAlterStbRsp* pRsp);
|
|||
int32_t tSerializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
||||
int32_t tDeserializeSTableMetaRsp(void* buf, int32_t bufLen, STableMetaRsp* pRsp);
|
||||
void tFreeSTableMetaRsp(STableMetaRsp* pRsp);
|
||||
void tFreeSTableIndexRsp(void *info);
|
||||
|
||||
typedef struct {
|
||||
SArray* pArray; // Array of STableMetaRsp
|
||||
} STableMetaBatchRsp;
|
||||
SArray* pMetaRsp; // Array of STableMetaRsp
|
||||
SArray* pIndexRsp; // Array of STableIndexRsp;
|
||||
} SSTbHbRsp;
|
||||
|
||||
int32_t tSerializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
|
||||
int32_t tDeserializeSTableMetaBatchRsp(void* buf, int32_t bufLen, STableMetaBatchRsp* pRsp);
|
||||
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp* pRsp);
|
||||
int32_t tSerializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
|
||||
int32_t tDeserializeSSTbHbRsp(void* buf, int32_t bufLen, SSTbHbRsp* pRsp);
|
||||
void tFreeSSTbHbRsp(SSTbHbRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
int32_t numOfTables;
|
||||
|
@ -2502,7 +2504,11 @@ typedef struct {
|
|||
} STableIndexInfo;
|
||||
|
||||
typedef struct {
|
||||
SArray* pIndex;
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
uint64_t suid;
|
||||
int32_t version;
|
||||
SArray* pIndex;
|
||||
} STableIndexRsp;
|
||||
|
||||
int32_t tSerializeSTableIndexRsp(void* buf, int32_t bufLen, const STableIndexRsp* pRsp);
|
||||
|
|
|
@ -98,14 +98,15 @@ typedef struct SCatalogCfg {
|
|||
uint32_t stbRentSec;
|
||||
} SCatalogCfg;
|
||||
|
||||
typedef struct SSTableMetaVersion {
|
||||
typedef struct SSTableVersion {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
uint64_t dbId;
|
||||
uint64_t suid;
|
||||
int16_t sversion;
|
||||
int16_t tversion;
|
||||
} SSTableMetaVersion;
|
||||
int32_t smaVer;
|
||||
} SSTableVersion;
|
||||
|
||||
typedef struct SDbVgVersion {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
|
@ -267,7 +268,7 @@ int32_t catalogAsyncGetAllMeta(SCatalog* pCtg, SRequestConnInfo* pConn, uint64_t
|
|||
|
||||
int32_t catalogGetQnodeList(SCatalog* pCatalog, SRequestConnInfo* pConn, SArray* pQnodeList);
|
||||
|
||||
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableMetaVersion** stables, uint32_t* num);
|
||||
int32_t catalogGetExpiredSTables(SCatalog* pCatalog, SSTableVersion **stables, uint32_t *num);
|
||||
|
||||
int32_t catalogGetExpiredDBs(SCatalog* pCatalog, SDbVgVersion** dbs, uint32_t* num);
|
||||
|
||||
|
@ -279,6 +280,8 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
|
|||
|
||||
int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
|
||||
|
||||
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp *pRsp);
|
||||
|
||||
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo* pConn, const char* funcName, SFuncInfo* pInfo);
|
||||
|
||||
int32_t catalogChkAuth(SCatalog* pCtg, SRequestConnInfo* pConn, const char* user, const char* dbFName, AUTH_TYPE type, bool *pass);
|
||||
|
|
|
@ -99,15 +99,15 @@ static int32_t hbProcessDBInfoRsp(void *value, int32_t valueLen, struct SCatalog
|
|||
static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalog *pCatalog) {
|
||||
int32_t code = 0;
|
||||
|
||||
STableMetaBatchRsp batchMetaRsp = {0};
|
||||
if (tDeserializeSTableMetaBatchRsp(value, valueLen, &batchMetaRsp) != 0) {
|
||||
SSTbHbRsp hbRsp = {0};
|
||||
if (tDeserializeSSTbHbRsp(value, valueLen, &hbRsp) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t numOfBatchs = taosArrayGetSize(batchMetaRsp.pArray);
|
||||
for (int32_t i = 0; i < numOfBatchs; ++i) {
|
||||
STableMetaRsp *rsp = taosArrayGet(batchMetaRsp.pArray, i);
|
||||
int32_t numOfMeta = taosArrayGetSize(hbRsp.pMetaRsp);
|
||||
for (int32_t i = 0; i < numOfMeta; ++i) {
|
||||
STableMetaRsp *rsp = taosArrayGet(hbRsp.pMetaRsp, i);
|
||||
|
||||
if (rsp->numOfColumns < 0) {
|
||||
tscDebug("hb remove stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
||||
|
@ -116,7 +116,7 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
|||
tscDebug("hb update stb, db:%s, stb:%s", rsp->dbFName, rsp->stbName);
|
||||
if (rsp->pSchemas[0].colId != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
tscError("invalid colId[%" PRIi16 "] for the first column in table meta rsp msg", rsp->pSchemas[0].colId);
|
||||
tFreeSTableMetaBatchRsp(&batchMetaRsp);
|
||||
tFreeSSTbHbRsp(&hbRsp);
|
||||
return TSDB_CODE_TSC_INVALID_VALUE;
|
||||
}
|
||||
|
||||
|
@ -124,7 +124,17 @@ static int32_t hbProcessStbInfoRsp(void *value, int32_t valueLen, struct SCatalo
|
|||
}
|
||||
}
|
||||
|
||||
tFreeSTableMetaBatchRsp(&batchMetaRsp);
|
||||
int32_t numOfIndex = taosArrayGetSize(hbRsp.pIndexRsp);
|
||||
for (int32_t i = 0; i < numOfIndex; ++i) {
|
||||
STableIndexRsp *rsp = taosArrayGet(hbRsp.pIndexRsp, i);
|
||||
|
||||
catalogUpdateTableIndex(pCatalog, rsp);
|
||||
}
|
||||
|
||||
taosArrayDestroy(hbRsp.pIndexRsp);
|
||||
hbRsp.pIndexRsp = NULL;
|
||||
|
||||
tFreeSSTbHbRsp(&hbRsp);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -455,7 +465,7 @@ int32_t hbGetExpiredDBInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SCl
|
|||
}
|
||||
|
||||
int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SClientHbReq *req) {
|
||||
SSTableMetaVersion *stbs = NULL;
|
||||
SSTableVersion *stbs = NULL;
|
||||
uint32_t stbNum = 0;
|
||||
int32_t code = 0;
|
||||
|
||||
|
@ -469,15 +479,16 @@ int32_t hbGetExpiredStbInfo(SClientHbKey *connKey, struct SCatalog *pCatalog, SC
|
|||
}
|
||||
|
||||
for (int32_t i = 0; i < stbNum; ++i) {
|
||||
SSTableMetaVersion *stb = &stbs[i];
|
||||
SSTableVersion *stb = &stbs[i];
|
||||
stb->suid = htobe64(stb->suid);
|
||||
stb->sversion = htons(stb->sversion);
|
||||
stb->tversion = htons(stb->tversion);
|
||||
stb->smaVer = htonl(stb->smaVer);
|
||||
}
|
||||
|
||||
SKv kv = {
|
||||
.key = HEARTBEAT_KEY_STBINFO,
|
||||
.valueLen = sizeof(SSTableMetaVersion) * stbNum,
|
||||
.valueLen = sizeof(SSTableVersion) * stbNum,
|
||||
.value = stbs,
|
||||
};
|
||||
|
||||
|
|
|
@ -2438,6 +2438,10 @@ int32_t tSerializeSTableIndexRsp(void *buf, int32_t bufLen, const STableIndexRsp
|
|||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pRsp->tbName) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pRsp->dbFName) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pRsp->suid) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pRsp->version) < 0) return -1;
|
||||
int32_t num = taosArrayGetSize(pRsp->pIndex);
|
||||
if (tEncodeI32(&encoder, num) < 0) return -1;
|
||||
if (num > 0) {
|
||||
|
@ -2472,6 +2476,10 @@ int32_t tDeserializeSTableIndexRsp(void *buf, int32_t bufLen, STableIndexRsp *pR
|
|||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pRsp->tbName) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pRsp->dbFName) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &pRsp->suid) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &pRsp->version) < 0) return -1;
|
||||
int32_t num = 0;
|
||||
if (tDecodeI32(&decoder, &num) < 0) return -1;
|
||||
if (num > 0) {
|
||||
|
@ -2631,18 +2639,35 @@ int32_t tSerializeSTableMetaRsp(void *buf, int32_t bufLen, STableMetaRsp *pRsp)
|
|||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tSerializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatchRsp *pRsp) {
|
||||
int32_t tSerializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
|
||||
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
|
||||
if (tEncodeI32(&encoder, numOfBatch) < 0) return -1;
|
||||
for (int32_t i = 0; i < numOfBatch; ++i) {
|
||||
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pArray, i);
|
||||
int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
|
||||
if (tEncodeI32(&encoder, numOfMeta) < 0) return -1;
|
||||
for (int32_t i = 0; i < numOfMeta; ++i) {
|
||||
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pMetaRsp, i);
|
||||
if (tEncodeSTableMetaRsp(&encoder, pMetaRsp) < 0) return -1;
|
||||
}
|
||||
|
||||
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
|
||||
if (tEncodeI32(&encoder, numOfIndex) < 0) return -1;
|
||||
for (int32_t i = 0; i < numOfIndex; ++i) {
|
||||
STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i);
|
||||
if (tEncodeCStr(&encoder, pIndexRsp->tbName) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pIndexRsp->dbFName) < 0) return -1;
|
||||
if (tEncodeU64(&encoder, pIndexRsp->suid) < 0) return -1;
|
||||
if (tEncodeI32(&encoder, pIndexRsp->version) < 0) return -1;
|
||||
int32_t num = taosArrayGetSize(pIndexRsp->pIndex);
|
||||
if (tEncodeI32(&encoder, num) < 0) return -1;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STableIndexInfo *pInfo = (STableIndexInfo *)taosArrayGet(pIndexRsp->pIndex, i);
|
||||
if (tSerializeSTableIndexInfo(&encoder, pInfo) < 0) return -1;
|
||||
}
|
||||
}
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
|
@ -2662,26 +2687,58 @@ int32_t tDeserializeSTableMetaRsp(void *buf, int32_t bufLen, STableMetaRsp *pRsp
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatchRsp *pRsp) {
|
||||
int32_t tDeserializeSSTbHbRsp(void *buf, int32_t bufLen, SSTbHbRsp *pRsp) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
|
||||
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
|
||||
if (tDecodeI32(&decoder, &numOfBatch) < 0) return -1;
|
||||
|
||||
pRsp->pArray = taosArrayInit(numOfBatch, sizeof(STableMetaRsp));
|
||||
if (pRsp->pArray == NULL) {
|
||||
int32_t numOfMeta = 0;
|
||||
if (tDecodeI32(&decoder, &numOfMeta) < 0) return -1;
|
||||
pRsp->pMetaRsp = taosArrayInit(numOfMeta, sizeof(STableMetaRsp));
|
||||
if (pRsp->pMetaRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfBatch; ++i) {
|
||||
for (int32_t i = 0; i < numOfMeta; ++i) {
|
||||
STableMetaRsp tableMetaRsp = {0};
|
||||
if (tDecodeSTableMetaRsp(&decoder, &tableMetaRsp) < 0) return -1;
|
||||
taosArrayPush(pRsp->pArray, &tableMetaRsp);
|
||||
taosArrayPush(pRsp->pMetaRsp, &tableMetaRsp);
|
||||
}
|
||||
|
||||
int32_t numOfIndex = 0;
|
||||
if (tDecodeI32(&decoder, &numOfIndex) < 0) return -1;
|
||||
|
||||
pRsp->pIndexRsp = taosArrayInit(numOfIndex, sizeof(STableIndexRsp));
|
||||
if (pRsp->pIndexRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfIndex; ++i) {
|
||||
STableIndexRsp tableIndexRsp = {0};
|
||||
if (tDecodeCStrTo(&decoder, tableIndexRsp.tbName) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, tableIndexRsp.dbFName) < 0) return -1;
|
||||
if (tDecodeU64(&decoder, &tableIndexRsp.suid) < 0) return -1;
|
||||
if (tDecodeI32(&decoder, &tableIndexRsp.version) < 0) return -1;
|
||||
int32_t num = 0;
|
||||
if (tDecodeI32(&decoder, &num) < 0) return -1;
|
||||
if (num > 0) {
|
||||
tableIndexRsp.pIndex = taosArrayInit(num, sizeof(STableIndexInfo));
|
||||
if (NULL == tableIndexRsp.pIndex) return -1;
|
||||
STableIndexInfo info;
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
if (tDeserializeSTableIndexInfo(&decoder, &info) < 0) return -1;
|
||||
if (NULL == taosArrayPush(tableIndexRsp.pIndex, &info)) {
|
||||
taosMemoryFree(info.expr);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
taosArrayPush(pRsp->pIndexRsp, &tableIndexRsp);
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
@ -2690,14 +2747,32 @@ int32_t tDeserializeSTableMetaBatchRsp(void *buf, int32_t bufLen, STableMetaBatc
|
|||
|
||||
void tFreeSTableMetaRsp(STableMetaRsp *pRsp) { taosMemoryFreeClear(pRsp->pSchemas); }
|
||||
|
||||
void tFreeSTableMetaBatchRsp(STableMetaBatchRsp *pRsp) {
|
||||
int32_t numOfBatch = taosArrayGetSize(pRsp->pArray);
|
||||
for (int32_t i = 0; i < numOfBatch; ++i) {
|
||||
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pArray, i);
|
||||
void tFreeSTableIndexRsp(void *info) {
|
||||
if (NULL == info) {
|
||||
return;
|
||||
}
|
||||
|
||||
STableIndexRsp *pInfo = (STableIndexRsp *)info;
|
||||
|
||||
taosArrayDestroyEx(pInfo->pIndex, tFreeSTableIndexInfo);
|
||||
}
|
||||
|
||||
void tFreeSSTbHbRsp(SSTbHbRsp *pRsp) {
|
||||
int32_t numOfMeta = taosArrayGetSize(pRsp->pMetaRsp);
|
||||
for (int32_t i = 0; i < numOfMeta; ++i) {
|
||||
STableMetaRsp *pMetaRsp = taosArrayGet(pRsp->pMetaRsp, i);
|
||||
tFreeSTableMetaRsp(pMetaRsp);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pRsp->pArray);
|
||||
taosArrayDestroy(pRsp->pMetaRsp);
|
||||
|
||||
int32_t numOfIndex = taosArrayGetSize(pRsp->pIndexRsp);
|
||||
for (int32_t i = 0; i < numOfIndex; ++i) {
|
||||
STableIndexRsp *pIndexRsp = taosArrayGet(pRsp->pIndexRsp, i);
|
||||
tFreeSTableIndexRsp(pIndexRsp);
|
||||
}
|
||||
|
||||
taosArrayDestroy(pRsp->pIndexRsp);
|
||||
}
|
||||
|
||||
int32_t tSerializeSShowRsp(void *buf, int32_t bufLen, SShowRsp *pRsp) {
|
||||
|
|
|
@ -124,7 +124,7 @@ static int32_t dmProcessCreateNodeReq(EDndNodeType ntype, SRpcMsg *pMsg) {
|
|||
} else {
|
||||
dInfo("node:%s, has been created", pWrapper->name);
|
||||
code = dmOpenNode(pWrapper);
|
||||
if (code != 0) {
|
||||
if (code == 0) {
|
||||
code = dmStartNode(pWrapper);
|
||||
}
|
||||
pWrapper->deployed = true;
|
||||
|
|
|
@ -26,6 +26,7 @@ int32_t mndInitSma(SMnode *pMnode);
|
|||
void mndCleanupSma(SMnode *pMnode);
|
||||
SSmaObj *mndAcquireSma(SMnode *pMnode, char *smaName);
|
||||
void mndReleaseSma(SMnode *pMnode, SSmaObj *pSma);
|
||||
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ void mndCleanupStb(SMnode *pMnode);
|
|||
SStbObj *mndAcquireStb(SMnode *pMnode, char *stbName);
|
||||
void mndReleaseStb(SMnode *pMnode, SStbObj *pStb);
|
||||
SSdbRaw *mndStbActionEncode(SStbObj *pStb);
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbs, int32_t numOfStbs, void **ppRsp,
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbs, int32_t numOfStbs, void **ppRsp,
|
||||
int32_t *pRspLen);
|
||||
int32_t mndGetNumOfStbs(SMnode *pMnode, char *dbName, int32_t *pNumOfStbs);
|
||||
|
||||
|
|
|
@ -1366,7 +1366,7 @@ char *buildRetension(SArray *pRetension) {
|
|||
}
|
||||
|
||||
static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, int32_t rows, int64_t numOfTables,
|
||||
bool sysDb) {
|
||||
bool sysDb, ESdbStatus objStatus) {
|
||||
int32_t cols = 0;
|
||||
|
||||
int32_t bytes = pShow->pMeta->pSchemas[cols].bytes;
|
||||
|
@ -1379,6 +1379,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
|
|||
}
|
||||
|
||||
char *status = "ready";
|
||||
if (objStatus == SDB_STATUS_CREATING) status = "creating";
|
||||
if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
|
||||
char statusB[24] = {0};
|
||||
STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status));
|
||||
|
||||
|
@ -1503,8 +1505,8 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
|
|||
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
colDataAppend(pColInfo, rows, (const char *)statusB, false);
|
||||
|
||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
// colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
|
||||
// pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
|
||||
// colDataAppend(pColInfo, rows, (const char *)&pDb->cfg.schemaless, false);
|
||||
|
||||
char *p = buildRetension(pDb->cfg.pRetensions);
|
||||
|
||||
|
@ -1548,29 +1550,30 @@ static bool mndGetTablesOfDbFp(SMnode *pMnode, void *pObj, void *p1, void *p2, v
|
|||
}
|
||||
|
||||
static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rowsCapacity) {
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SDbObj *pDb = NULL;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfRows = 0;
|
||||
SDbObj *pDb = NULL;
|
||||
ESdbStatus objStatus = 0;
|
||||
|
||||
// Append the information_schema database into the result.
|
||||
if (!pShow->sysDbRsp) {
|
||||
SDbObj infoschemaDb = {0};
|
||||
setInformationSchemaDbCfg(&infoschemaDb);
|
||||
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, 14, true);
|
||||
dumpDbInfoData(pBlock, &infoschemaDb, pShow, numOfRows, 14, true, 0);
|
||||
|
||||
numOfRows += 1;
|
||||
|
||||
SDbObj perfschemaDb = {0};
|
||||
setPerfSchemaDbCfg(&perfschemaDb);
|
||||
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, 3, true);
|
||||
dumpDbInfoData(pBlock, &perfschemaDb, pShow, numOfRows, 3, true, 0);
|
||||
|
||||
numOfRows += 1;
|
||||
pShow->sysDbRsp = true;
|
||||
}
|
||||
|
||||
while (numOfRows < rowsCapacity) {
|
||||
pShow->pIter = sdbFetch(pSdb, SDB_DB, pShow->pIter, (void **)&pDb);
|
||||
pShow->pIter = sdbFetchAll(pSdb, SDB_DB, pShow->pIter, (void **)&pDb, &objStatus);
|
||||
if (pShow->pIter == NULL) {
|
||||
break;
|
||||
}
|
||||
|
@ -1578,7 +1581,7 @@ static int32_t mndRetrieveDbs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBloc
|
|||
int32_t numOfTables = 0;
|
||||
sdbTraverse(pSdb, SDB_VGROUP, mndGetTablesOfDbFp, &numOfTables, NULL, NULL);
|
||||
|
||||
dumpDbInfoData(pBlock, pDb, pShow, numOfRows, numOfTables, false);
|
||||
dumpDbInfoData(pBlock, pDb, pShow, numOfRows, numOfTables, false, objStatus);
|
||||
numOfRows++;
|
||||
sdbRelease(pSdb, pDb);
|
||||
}
|
||||
|
|
|
@ -659,7 +659,7 @@ static int32_t mndRetrieveMnodes(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
|
|||
int32_t numOfRows = 0;
|
||||
int32_t cols = 0;
|
||||
SMnodeObj *pObj = NULL;
|
||||
ESdbStatus objStatus;
|
||||
ESdbStatus objStatus = 0;
|
||||
char *pWrite;
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
|
||||
|
|
|
@ -433,7 +433,7 @@ static int32_t mndProcessQueryHeartBeat(SMnode *pMnode, SRpcMsg *pMsg, SClientHb
|
|||
case HEARTBEAT_KEY_STBINFO: {
|
||||
void *rspMsg = NULL;
|
||||
int32_t rspLen = 0;
|
||||
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableMetaVersion), &rspMsg, &rspLen);
|
||||
mndValidateStbInfo(pMnode, kv->value, kv->valueLen / sizeof(SSTableVersion), &rspMsg, &rspLen);
|
||||
if (rspMsg && rspLen > 0) {
|
||||
SKv kv1 = {.key = HEARTBEAT_KEY_STBINFO, .valueLen = rspLen, .value = rspMsg};
|
||||
taosArrayPush(hbRsp.info, &kv1);
|
||||
|
|
|
@ -532,6 +532,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
|
|||
SStreamObj streamObj = {0};
|
||||
tstrncpy(streamObj.name, pCreate->name, TSDB_STREAM_FNAME_LEN);
|
||||
tstrncpy(streamObj.sourceDb, pDb->name, TSDB_DB_FNAME_LEN);
|
||||
tstrncpy(streamObj.targetDb, streamObj.sourceDb, TSDB_DB_FNAME_LEN);
|
||||
streamObj.createTime = taosGetTimestampMs();
|
||||
streamObj.updateTime = streamObj.createTime;
|
||||
streamObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
|
||||
|
@ -899,18 +900,31 @@ static int32_t mndGetSma(SMnode *pMnode, SUserIndexReq *indexReq, SUserIndexRsp
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndGetTableSma(SMnode *pMnode, STableIndexReq *indexReq, STableIndexRsp *rsp, bool *exist) {
|
||||
int32_t mndGetTableSma(SMnode *pMnode, char *tbFName, STableIndexRsp *rsp, bool *exist) {
|
||||
int32_t code = 0;
|
||||
SSmaObj *pSma = NULL;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
STableIndexInfo info;
|
||||
|
||||
SStbObj* pStb = mndAcquireStb(pMnode, tbFName);
|
||||
if (NULL == pStb) {
|
||||
*exist = false;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
strcpy(rsp->dbFName, pStb->db);
|
||||
strcpy(rsp->tbName, pStb->name + strlen(pStb->db) + 1);
|
||||
rsp->suid = pStb->uid;
|
||||
rsp->version = pStb->smaVer;
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_SMA, pIter, (void **)&pSma);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pSma->stb[0] != indexReq->tbFName[0] || strcmp(pSma->stb, indexReq->tbFName)) {
|
||||
if (pSma->stb[0] != tbFName[0] || strcmp(pSma->stb, tbFName)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1022,7 +1036,7 @@ static int32_t mndProcessGetTbSmaReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndGetTableSma(pMnode, &indexReq, &rsp, &exist);
|
||||
code = mndGetTableSma(pMnode, indexReq.tbFName, &rsp, &exist);
|
||||
if (code) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
|
|
@ -27,6 +27,7 @@
|
|||
#include "mndTrans.h"
|
||||
#include "mndUser.h"
|
||||
#include "mndVgroup.h"
|
||||
#include "mndSma.h"
|
||||
#include "tname.h"
|
||||
|
||||
#define STB_VER_NUMBER 1
|
||||
|
@ -1271,7 +1272,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
|
||||
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp, int32_t *smaVer) {
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN] = {0};
|
||||
snprintf(tbFName, sizeof(tbFName), "%s.%s", dbFName, tbName);
|
||||
|
||||
|
@ -1288,6 +1289,10 @@ static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (smaVer) {
|
||||
*smaVer = pStb->smaVer;
|
||||
}
|
||||
|
||||
int32_t code = mndBuildStbSchemaImp(pDb, pStb, tbName, pRsp);
|
||||
mndReleaseDb(pMnode, pDb);
|
||||
mndReleaseStb(pMnode, pStb);
|
||||
|
@ -1634,7 +1639,7 @@ static int32_t mndProcessTableMetaReq(SRpcMsg *pReq) {
|
|||
}
|
||||
} else {
|
||||
mDebug("stb:%s.%s, start to retrieve meta", infoReq.dbFName, infoReq.tbName);
|
||||
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp) != 0) {
|
||||
if (mndBuildStbSchema(pMnode, infoReq.dbFName, infoReq.tbName, &metaRsp, NULL) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
@ -1667,51 +1672,86 @@ _OVER:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableMetaVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
|
||||
int32_t mndValidateStbInfo(SMnode *pMnode, SSTableVersion *pStbVersions, int32_t numOfStbs, void **ppRsp,
|
||||
int32_t *pRspLen) {
|
||||
STableMetaBatchRsp batchMetaRsp = {0};
|
||||
batchMetaRsp.pArray = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
|
||||
if (batchMetaRsp.pArray == NULL) {
|
||||
SSTbHbRsp hbRsp = {0};
|
||||
hbRsp.pMetaRsp = taosArrayInit(numOfStbs, sizeof(STableMetaRsp));
|
||||
if (hbRsp.pMetaRsp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
hbRsp.pIndexRsp = taosArrayInit(numOfStbs, sizeof(STableIndexRsp));
|
||||
if (NULL == hbRsp.pIndexRsp) {
|
||||
taosArrayDestroy(hbRsp.pMetaRsp);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < numOfStbs; ++i) {
|
||||
SSTableMetaVersion *pStbVersion = &pStbVersions[i];
|
||||
SSTableVersion *pStbVersion = &pStbVersions[i];
|
||||
pStbVersion->suid = be64toh(pStbVersion->suid);
|
||||
pStbVersion->sversion = ntohs(pStbVersion->sversion);
|
||||
pStbVersion->tversion = ntohs(pStbVersion->tversion);
|
||||
pStbVersion->smaVer = ntohl(pStbVersion->smaVer);
|
||||
|
||||
STableMetaRsp metaRsp = {0};
|
||||
int32_t smaVer = 0;
|
||||
mDebug("stb:%s.%s, start to retrieve meta", pStbVersion->dbFName, pStbVersion->stbName);
|
||||
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp) != 0) {
|
||||
if (mndBuildStbSchema(pMnode, pStbVersion->dbFName, pStbVersion->stbName, &metaRsp, &smaVer) != 0) {
|
||||
metaRsp.numOfColumns = -1;
|
||||
metaRsp.suid = pStbVersion->suid;
|
||||
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (pStbVersion->sversion != metaRsp.sversion || pStbVersion->tversion != metaRsp.tversion) {
|
||||
taosArrayPush(batchMetaRsp.pArray, &metaRsp);
|
||||
taosArrayPush(hbRsp.pMetaRsp, &metaRsp);
|
||||
} else {
|
||||
tFreeSTableMetaRsp(&metaRsp);
|
||||
}
|
||||
|
||||
if (pStbVersion->smaVer && pStbVersion->smaVer != smaVer) {
|
||||
bool exist = false;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
STableIndexRsp indexRsp = {0};
|
||||
indexRsp.pIndex = taosArrayInit(10, sizeof(STableIndexInfo));
|
||||
if (NULL == indexRsp.pIndex) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
sprintf(tbFName, "%s.%s", pStbVersion->dbFName, pStbVersion->stbName);
|
||||
int32_t code = mndGetTableSma(pMnode, tbFName, &indexRsp, &exist);
|
||||
if (code || !exist) {
|
||||
indexRsp.suid = pStbVersion->suid;
|
||||
indexRsp.version = -1;
|
||||
indexRsp.pIndex = NULL;
|
||||
}
|
||||
|
||||
strcpy(indexRsp.dbFName, pStbVersion->dbFName);
|
||||
strcpy(indexRsp.tbName, pStbVersion->stbName);
|
||||
|
||||
taosArrayPush(hbRsp.pIndexRsp, &indexRsp);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t rspLen = tSerializeSTableMetaBatchRsp(NULL, 0, &batchMetaRsp);
|
||||
int32_t rspLen = tSerializeSSTbHbRsp(NULL, 0, &hbRsp);
|
||||
if (rspLen < 0) {
|
||||
tFreeSTableMetaBatchRsp(&batchMetaRsp);
|
||||
tFreeSSTbHbRsp(&hbRsp);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
}
|
||||
|
||||
void *pRsp = taosMemoryMalloc(rspLen);
|
||||
if (pRsp == NULL) {
|
||||
tFreeSTableMetaBatchRsp(&batchMetaRsp);
|
||||
tFreeSSTbHbRsp(&hbRsp);
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
tSerializeSTableMetaBatchRsp(pRsp, rspLen, &batchMetaRsp);
|
||||
tFreeSTableMetaBatchRsp(&batchMetaRsp);
|
||||
tSerializeSSTbHbRsp(pRsp, rspLen, &hbRsp);
|
||||
tFreeSSTbHbRsp(&hbRsp);
|
||||
*ppRsp = pRsp;
|
||||
*pRspLen = rspLen;
|
||||
return 0;
|
||||
|
|
|
@ -663,12 +663,23 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
|
|||
|
||||
void * entryKey = NULL, *entryVal = NULL;
|
||||
int32_t nEntryKey, nEntryVal;
|
||||
bool first = true;
|
||||
while (1) {
|
||||
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, (const void **)&entryVal, &nEntryVal);
|
||||
if (valid < 0) {
|
||||
break;
|
||||
}
|
||||
STagIdxKey *p = entryKey;
|
||||
if (p->type != pCursor->type) {
|
||||
if (first) {
|
||||
valid = param->reverse ? tdbTbcMoveToPrev(pCursor->pCur) : tdbTbcMoveToNext(pCursor->pCur);
|
||||
if (valid < 0) break;
|
||||
continue;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
first = false;
|
||||
if (p != NULL) {
|
||||
int32_t cmp = (*param->filterFunc)(p->data, pKey->data, pKey->type);
|
||||
if (cmp == 0) {
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "sma.h"
|
||||
#include "tsdb.h"
|
||||
|
||||
#define SMA_STORAGE_MINUTES_MAX 86400
|
||||
#define SMA_STORAGE_MINUTES_DAY 1440
|
||||
#define SMA_STORAGE_SPLIT_FACTOR 14400 // least records in tsma file
|
||||
|
||||
/**
|
||||
* @brief Judge the tsma file split days
|
||||
*
|
||||
* @param pCfg
|
||||
* @param pCont
|
||||
* @param contLen
|
||||
* @param days unit is minute
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessTSmaGetDaysImpl(SVnodeCfg *pCfg, void *pCont, uint32_t contLen, int32_t *days) {
|
||||
SDecoder coder = {0};
|
||||
tDecoderInit(&coder, pCont, contLen);
|
||||
|
||||
STSma tsma = {0};
|
||||
if (tDecodeSVCreateTSmaReq(&coder, &tsma) < 0) {
|
||||
terrno = TSDB_CODE_MSG_DECODE_ERROR;
|
||||
goto _err;
|
||||
}
|
||||
STsdbCfg *pTsdbCfg = &pCfg->tsdbCfg;
|
||||
int64_t sInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_SECOND);
|
||||
if (sInterval <= 0) {
|
||||
*days = pTsdbCfg->days;
|
||||
return 0;
|
||||
}
|
||||
int64_t records = pTsdbCfg->days * 60 / sInterval;
|
||||
if (records >= SMA_STORAGE_SPLIT_FACTOR) {
|
||||
*days = pTsdbCfg->days;
|
||||
} else {
|
||||
int64_t mInterval = convertTimeFromPrecisionToUnit(tsma.interval, pTsdbCfg->precision, TIME_UNIT_MINUTE);
|
||||
int64_t daysPerFile = mInterval * SMA_STORAGE_MINUTES_DAY * 2;
|
||||
|
||||
if (daysPerFile > SMA_STORAGE_MINUTES_MAX) {
|
||||
*days = SMA_STORAGE_MINUTES_MAX;
|
||||
} else {
|
||||
*days = (int32_t)daysPerFile;
|
||||
}
|
||||
|
||||
if (*days < pTsdbCfg->days) {
|
||||
*days = pTsdbCfg->days;
|
||||
}
|
||||
}
|
||||
tDecoderClear(&coder);
|
||||
return 0;
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief create tsma meta and result stable
|
||||
*
|
||||
* @param pSma
|
||||
* @param version
|
||||
* @param pMsg
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessTSmaCreateImpl(SSma *pSma, int64_t version, const char *pMsg) {
|
||||
SSmaCfg *pCfg = (SSmaCfg *)pMsg;
|
||||
|
||||
if (TD_VID(pSma->pVnode) == pCfg->dstVgId) {
|
||||
// create tsma meta in dstVgId
|
||||
if (metaCreateTSma(SMA_META(pSma), version, pCfg) < 0) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
// create stable to save tsma result in dstVgId
|
||||
SVCreateStbReq pReq = {0};
|
||||
pReq.name = pCfg->dstTbName;
|
||||
pReq.suid = pCfg->dstTbUid;
|
||||
pReq.schemaRow = pCfg->schemaRow;
|
||||
pReq.schemaTag = pCfg->schemaTag;
|
||||
|
||||
if (metaCreateSTable(SMA_META(pSma), version, &pReq) < 0) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Insert/Update Time-range-wise SMA data.
|
||||
*
|
||||
* @param pSma
|
||||
* @param msg
|
||||
* @return int32_t
|
||||
*/
|
||||
int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
|
||||
const SArray *pDataBlocks = (const SArray *)msg;
|
||||
// TODO: destroy SSDataBlocks(msg)
|
||||
if (!pDataBlocks) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_PTR;
|
||||
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
|
||||
return terrno;
|
||||
}
|
||||
|
||||
if (taosArrayGetSize(pDataBlocks) <= 0) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_PARA;
|
||||
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is empty", SMA_VID(pSma));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
if (tdCheckAndInitSmaEnv(pSma, TSDB_SMA_TYPE_TIME_RANGE) != 0) {
|
||||
terrno = TSDB_CODE_TSMA_INIT_FAILED;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
SSmaEnv *pEnv = SMA_TSMA_ENV(pSma);
|
||||
SSmaStat *pStat = NULL;
|
||||
SSmaStatItem *pItem = NULL;
|
||||
|
||||
if (!pEnv || !(pStat = SMA_ENV_STAT(pEnv))) {
|
||||
terrno = TSDB_CODE_TSMA_INVALID_STAT;
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
|
||||
tdRefSmaStat(pSma, pStat);
|
||||
pItem = &pStat->tsmaStatItem;
|
||||
|
||||
ASSERT(pItem);
|
||||
|
||||
if (!pItem->pTSma) {
|
||||
// cache smaMeta
|
||||
STSma *pTSma = metaGetSmaInfoByIndex(SMA_META(pSma), indexUid);
|
||||
if (!pTSma) {
|
||||
terrno = TSDB_CODE_TSMA_NO_INDEX_IN_META;
|
||||
smaWarn("vgId:%d, tsma insert for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno));
|
||||
return TSDB_CODE_FAILED;
|
||||
}
|
||||
pItem->pTSma = pTSma;
|
||||
}
|
||||
|
||||
STSma *pTSma = pItem->pTSma;
|
||||
|
||||
ASSERT(pTSma->indexUid == indexUid);
|
||||
|
||||
SMetaReader mr = {0};
|
||||
|
||||
const char *dbName = "testDb";
|
||||
if (metaGetTableEntryByName(&mr, dbName) != 0) {
|
||||
smaDebug("vgId:%d, tsma no table testTb exists for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), indexUid, tstrerror(terrno));
|
||||
SVCreateStbReq pReq = {0};
|
||||
pReq.name = dbName;
|
||||
pReq.suid = pTSma->dstTbUid;
|
||||
pReq.schemaRow = pCfg->schemaRow;
|
||||
pReq.schemaTag = pCfg->schemaTag;
|
||||
}
|
||||
|
||||
SSubmitReq *pSubmitReq = NULL;
|
||||
buildSubmitReqFromDataBlock(&pSubmitReq, (const SArray *)msg, NULL, pItem->pTSma->dstVgId,
|
||||
pItem->pTSma->dstTbUid);
|
||||
|
||||
tdUnRefSmaStat(pSma, pStat);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
|
@ -78,6 +78,8 @@ _err:
|
|||
}
|
||||
|
||||
int32_t tsdbCommit(STsdb *pTsdb) {
|
||||
if (!pTsdb) return 0;
|
||||
|
||||
int32_t code = 0;
|
||||
SCommitter commith;
|
||||
SMemTable *pMemTable = pTsdb->mem;
|
||||
|
|
|
@ -55,6 +55,7 @@ extern const char *TSDB_LEVEL_DNAME[];
|
|||
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
|
||||
static const char *tsdbTxnFname[] = {"current.t", "current"};
|
||||
#define TSDB_MAX_FSETS(keep, days) ((keep) / (days) + 3)
|
||||
#define TSDB_MAX_INIT_FSETS (365000)
|
||||
|
||||
static int tsdbComparFidFSet(const void *arg1, const void *arg2);
|
||||
static void tsdbResetFSStatus(SFSStatus *pStatus);
|
||||
|
@ -245,6 +246,10 @@ STsdbFS *tsdbNewFS(const STsdbKeepCfg *pCfg) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
if (maxFSet > TSDB_MAX_INIT_FSETS) {
|
||||
maxFSet = TSDB_MAX_INIT_FSETS;
|
||||
}
|
||||
|
||||
pfs->cstatus = tsdbNewFSStatus(maxFSet);
|
||||
if (pfs->cstatus == NULL) {
|
||||
tsdbFreeFS(pfs);
|
||||
|
|
|
@ -57,6 +57,8 @@ enum {
|
|||
CTG_OP_DROP_TB_META,
|
||||
CTG_OP_UPDATE_USER,
|
||||
CTG_OP_UPDATE_VG_EPSET,
|
||||
CTG_OP_UPDATE_TB_INDEX,
|
||||
CTG_OP_DROP_TB_INDEX,
|
||||
CTG_OP_MAX
|
||||
};
|
||||
|
||||
|
@ -128,25 +130,33 @@ typedef struct SCtgUserCtx {
|
|||
SUserAuthInfo user;
|
||||
} SCtgUserCtx;
|
||||
|
||||
typedef struct SCtgTbMetaCache {
|
||||
SRWLatch stbLock;
|
||||
SRWLatch metaLock; // RC between cache destroy and all other operations
|
||||
SHashObj *metaCache; //key:tbname, value:STableMeta
|
||||
SHashObj *stbCache; //key:suid, value:STableMeta*
|
||||
} SCtgTbMetaCache;
|
||||
typedef STableIndexRsp STableIndex;
|
||||
|
||||
typedef struct SCtgTbCache {
|
||||
SRWLatch metaLock;
|
||||
STableMeta *pMeta;
|
||||
SRWLatch indexLock;
|
||||
STableIndex *pIndex;
|
||||
} SCtgTbCache;
|
||||
|
||||
typedef struct SCtgVgCache {
|
||||
SRWLatch vgLock;
|
||||
SDBVgInfo *vgInfo;
|
||||
} SCtgVgCache;
|
||||
|
||||
typedef struct SCtgDBCache {
|
||||
SRWLatch vgLock;
|
||||
SRWLatch dbLock; // RC between destroy tbCache/stbCache and all reads
|
||||
uint64_t dbId;
|
||||
int8_t deleted;
|
||||
SDBVgInfo *vgInfo;
|
||||
SCtgTbMetaCache tbCache;
|
||||
SCtgVgCache vgCache;
|
||||
SHashObj *tbCache; // key:tbname, value:SCtgTbCache
|
||||
SHashObj *stbCache; // key:suid, value:STableMeta*
|
||||
} SCtgDBCache;
|
||||
|
||||
typedef struct SCtgRentSlot {
|
||||
SRWLatch lock;
|
||||
bool needSort;
|
||||
SArray *meta; // element is SDbVgVersion or SSTableMetaVersion
|
||||
SArray *meta; // element is SDbVgVersion or SSTableVersion
|
||||
} SCtgRentSlot;
|
||||
|
||||
typedef struct SCtgRentMgmt {
|
||||
|
@ -245,8 +255,10 @@ typedef struct SCtgCacheStat {
|
|||
uint64_t userNum;
|
||||
uint64_t vgHitNum;
|
||||
uint64_t vgMissNum;
|
||||
uint64_t tblHitNum;
|
||||
uint64_t tblMissNum;
|
||||
uint64_t tbMetaHitNum;
|
||||
uint64_t tbMetaMissNum;
|
||||
uint64_t tbIndexHitNum;
|
||||
uint64_t tbIndexMissNum;
|
||||
uint64_t userHitNum;
|
||||
uint64_t userMissNum;
|
||||
} SCtgCacheStat;
|
||||
|
@ -268,10 +280,10 @@ typedef struct SCtgUpdateVgMsg {
|
|||
SDBVgInfo* dbInfo;
|
||||
} SCtgUpdateVgMsg;
|
||||
|
||||
typedef struct SCtgUpdateTblMsg {
|
||||
SCatalog* pCtg;
|
||||
STableMetaOutput* output;
|
||||
} SCtgUpdateTblMsg;
|
||||
typedef struct SCtgUpdateTbMetaMsg {
|
||||
SCatalog* pCtg;
|
||||
STableMetaOutput* pMeta;
|
||||
} SCtgUpdateTbMetaMsg;
|
||||
|
||||
typedef struct SCtgDropDBMsg {
|
||||
SCatalog* pCtg;
|
||||
|
@ -305,6 +317,17 @@ typedef struct SCtgUpdateUserMsg {
|
|||
SGetUserAuthRsp userAuth;
|
||||
} SCtgUpdateUserMsg;
|
||||
|
||||
typedef struct SCtgUpdateTbIndexMsg {
|
||||
SCatalog* pCtg;
|
||||
STableIndex* pIndex;
|
||||
} SCtgUpdateTbIndexMsg;
|
||||
|
||||
typedef struct SCtgDropTbIndexMsg {
|
||||
SCatalog* pCtg;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
} SCtgDropTbIndexMsg;
|
||||
|
||||
typedef struct SCtgUpdateEpsetMsg {
|
||||
SCatalog* pCtg;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
|
@ -465,12 +488,11 @@ int32_t ctgOpUpdateUser(SCtgCacheOperation *action);
|
|||
int32_t ctgOpUpdateEpset(SCtgCacheOperation *operation);
|
||||
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
|
||||
void ctgReleaseDBCache(SCatalog *pCtg, SCtgDBCache *dbCache);
|
||||
void ctgReleaseVgInfo(SCtgDBCache *dbCache);
|
||||
int32_t ctgAcquireVgInfoFromCache(SCatalog* pCtg, const char *dbFName, SCtgDBCache **pCache);
|
||||
void ctgRUnlockVgInfo(SCtgDBCache *dbCache);
|
||||
int32_t ctgTbMetaExistInCache(SCatalog* pCtg, char *dbFName, char* tbName, int32_t *exist);
|
||||
int32_t ctgReadTbMetaFromCache(SCatalog* pCtg, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
|
||||
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, const SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
|
||||
int32_t ctgChkAuthFromCache(SCatalog* pCtg, const char* user, const char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
|
||||
int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver, int32_t *tver, int32_t *tbType, uint64_t *suid, char *stbName);
|
||||
int32_t ctgChkAuthFromCache(SCatalog* pCtg, char* user, char* dbFName, AUTH_TYPE type, bool *inCache, bool *pass);
|
||||
int32_t ctgDropDbCacheEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId);
|
||||
int32_t ctgDropDbVgroupEnqueue(SCatalog* pCtg, const char *dbFName, bool syncReq);
|
||||
int32_t ctgDropStbMetaEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId, const char *stbName, uint64_t suid, bool syncReq);
|
||||
|
@ -479,12 +501,18 @@ int32_t ctgUpdateVgroupEnqueue(SCatalog* pCtg, const char *dbFName, int64_t dbId
|
|||
int32_t ctgUpdateTbMetaEnqueue(SCatalog* pCtg, STableMetaOutput *output, bool syncReq);
|
||||
int32_t ctgUpdateUserEnqueue(SCatalog* pCtg, SGetUserAuthRsp *pAuth, bool syncReq);
|
||||
int32_t ctgUpdateVgEpsetEnqueue(SCatalog* pCtg, char *dbFName, int32_t vgId, SEpSet* pEpSet);
|
||||
int32_t ctgUpdateTbIndexEnqueue(SCatalog* pCtg, STableIndex **pIndex, bool syncOp);
|
||||
int32_t ctgMetaRentInit(SCtgRentMgmt *mgmt, uint32_t rentSec, int8_t type);
|
||||
int32_t ctgMetaRentAdd(SCtgRentMgmt *mgmt, void *meta, int64_t id, int32_t size);
|
||||
int32_t ctgMetaRentGet(SCtgRentMgmt *mgmt, void **res, uint32_t *num, int32_t size);
|
||||
int32_t ctgUpdateTbMetaToCache(SCatalog* pCtg, STableMetaOutput* pOut, bool syncReq);
|
||||
int32_t ctgStartUpdateThread();
|
||||
int32_t ctgRelaunchGetTbMetaTask(SCtgTask *pTask);
|
||||
void ctgReleaseVgInfoToCache(SCatalog* pCtg, SCtgDBCache *dbCache);
|
||||
int32_t ctgReadTbIndexFromCache(SCatalog* pCtg, SName* pTableName, SArray** pRes);
|
||||
int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
|
||||
int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation);
|
||||
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation);
|
||||
|
||||
|
||||
|
||||
|
@ -493,7 +521,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SBuildU
|
|||
int32_t ctgGetQnodeListFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SArray *out, SCtgTask* pTask);
|
||||
int32_t ctgGetDBCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *dbFName, SDbCfgInfo *out, SCtgTask* pTask);
|
||||
int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *indexName, SIndexInfo *out, SCtgTask* pTask);
|
||||
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName* name, SArray** out, SCtgTask* pTask);
|
||||
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask);
|
||||
int32_t ctgGetUdfInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *funcName, SFuncInfo *out, SCtgTask* pTask);
|
||||
int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const char *user, SGetUserAuthRsp *out, SCtgTask* pTask);
|
||||
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask);
|
||||
|
@ -521,6 +549,8 @@ void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
|
|||
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
|
||||
char *ctgTaskTypeStr(CTG_TASK_TYPE type);
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
|
||||
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
|
||||
void ctgFreeSTableIndex(void *info);
|
||||
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
|
|
|
@ -96,8 +96,7 @@ int32_t ctgRefreshDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
|
|||
if (NULL != dbCache) {
|
||||
input.dbId = dbCache->dbId;
|
||||
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
|
@ -350,7 +349,7 @@ int32_t ctgChkAuth(SCatalog* pCtg, SRequestConnInfo *pConn, const char* user, co
|
|||
|
||||
*pass = false;
|
||||
|
||||
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, user, dbFName, type, &inCache, pass));
|
||||
CTG_ERR_RET(ctgChkAuthFromCache(pCtg, (char*)user, (char*)dbFName, type, &inCache, pass));
|
||||
|
||||
if (inCache) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -382,6 +381,45 @@ _return:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbIndex(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pRes) {
|
||||
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pTableName, pRes));
|
||||
if (*pRes) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableIndex *pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
|
||||
if (NULL == pIndex) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
int32_t code = ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pIndex, NULL);
|
||||
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
|
||||
code = 0;
|
||||
goto _return;
|
||||
}
|
||||
CTG_ERR_JRET(code);
|
||||
|
||||
SArray* pInfo = NULL;
|
||||
CTG_ERR_JRET(ctgCloneTableIndex(pIndex->pIndex, &pInfo));
|
||||
|
||||
*pRes = pInfo;
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, &pIndex, false));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
||||
_return:
|
||||
|
||||
tFreeSTableIndexRsp(pIndex);
|
||||
taosMemoryFree(pIndex);
|
||||
|
||||
taosArrayDestroyEx(*pRes, tFreeSTableIndexInfo);
|
||||
*pRes = NULL;
|
||||
|
||||
CTG_RET(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pVgList) {
|
||||
STableMeta *tbMeta = NULL;
|
||||
int32_t code = 0;
|
||||
|
@ -404,7 +442,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
|
|||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
|
||||
|
||||
if (dbCache) {
|
||||
vgHash = dbCache->vgInfo->vgHash;
|
||||
vgHash = dbCache->vgCache.vgInfo->vgHash;
|
||||
} else {
|
||||
vgHash = vgInfo->vgHash;
|
||||
}
|
||||
|
@ -442,7 +480,7 @@ int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTabl
|
|||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgRUnlockVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
|
@ -631,12 +669,11 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
|
|||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
*version = dbCache->vgInfo->vgVersion;
|
||||
*version = dbCache->vgCache.vgInfo->vgVersion;
|
||||
*dbId = dbCache->dbId;
|
||||
*tableNum = dbCache->vgInfo->numOfTable;
|
||||
*tableNum = dbCache->vgCache.vgInfo->numOfTable;
|
||||
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
|
||||
ctgDebug("Got db vgVersion from cache, dbFName:%s, vgVersion:%d", dbFName, *version);
|
||||
|
||||
|
@ -661,7 +698,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
|
|||
SDBVgInfo *vgInfo = NULL;
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, dbFName, &dbCache, &vgInfo));
|
||||
if (dbCache) {
|
||||
vgHash = dbCache->vgInfo->vgHash;
|
||||
vgHash = dbCache->vgCache.vgInfo->vgHash;
|
||||
} else {
|
||||
vgHash = vgInfo->vgHash;
|
||||
}
|
||||
|
@ -674,7 +711,7 @@ int32_t catalogGetDBVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char*
|
|||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgRUnlockVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
|
@ -741,6 +778,30 @@ _return:
|
|||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp *pRsp) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
int32_t code = 0;
|
||||
|
||||
if (NULL == pCtg || NULL == pRsp) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
STableIndex* pIndex = taosMemoryCalloc(1, sizeof(STableIndex));
|
||||
if (NULL == pIndex) {
|
||||
CTG_API_LEAVE(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
memcpy(pIndex, pRsp, sizeof(STableIndex));
|
||||
|
||||
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pCtg, &pIndex, false));
|
||||
|
||||
_return:
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
|
||||
int32_t catalogRemoveTableMeta(SCatalog* pCtg, SName* pTableName) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
|
@ -932,12 +993,12 @@ int32_t catalogGetTableHashVgroup(SCatalog *pCtg, SRequestConnInfo *pConn, const
|
|||
SDBVgInfo *vgInfo = NULL;
|
||||
CTG_ERR_JRET(ctgGetDBVgInfo(pCtg, pConn, db, &dbCache, &vgInfo));
|
||||
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgInfo, pTableName, pVgroup));
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, vgInfo ? vgInfo : dbCache->vgCache.vgInfo, pTableName, pVgroup));
|
||||
|
||||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgRUnlockVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
|
@ -1060,14 +1121,14 @@ _return:
|
|||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableMetaVersion** stables, uint32_t* num) {
|
||||
int32_t catalogGetExpiredSTables(SCatalog* pCtg, SSTableVersion **stables, uint32_t *num) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
if (NULL == pCtg || NULL == stables || NULL == num) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void**)stables, num, sizeof(SSTableMetaVersion)));
|
||||
CTG_API_LEAVE(ctgMetaRentGet(&pCtg->stbRent, (void **)stables, num, sizeof(SSTableVersion)));
|
||||
}
|
||||
|
||||
int32_t catalogGetExpiredDBs(SCatalog* pCtg, SDbVgVersion** dbs, uint32_t* num) {
|
||||
|
@ -1138,7 +1199,12 @@ int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo *pConn, const SNam
|
|||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(ctgGetTbIndexFromMnode(pCtg, pConn, (SName*)pTableName, pRes, NULL));
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgGetTbIndex(pCtg, pConn, (SName*)pTableName, pRes));
|
||||
|
||||
_return:
|
||||
|
||||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* funcName, SFuncInfo* pInfo) {
|
||||
|
|
|
@ -344,6 +344,11 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, SCtgJob *pJob, const SCatalogReq* p
|
|||
}
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < pJob->tbIndexNum; ++i) {
|
||||
SName* name = taosArrayGet(pReq->pTableIndex, i);
|
||||
ctgDropTbIndexEnqueue(pCtg, name, true);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -680,15 +685,14 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
|
|||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
|
||||
if (NULL != dbCache) {
|
||||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo));
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
|
||||
|
||||
ctx->vgId = vgInfo.vgId;
|
||||
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
|
||||
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
} else {
|
||||
SBuildUseDBInput input = {0};
|
||||
|
||||
|
@ -786,7 +790,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
|
|||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgRUnlockVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
|
@ -866,9 +870,15 @@ _return:
|
|||
|
||||
int32_t ctgHandleGetTbIndexRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *pMsg, int32_t rspCode) {
|
||||
int32_t code = 0;
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||
|
||||
TSWAP(pTask->res, pTask->msgCtx.out);
|
||||
STableIndex* pOut = (STableIndex*)pTask->msgCtx.out;
|
||||
SArray* pInfo = NULL;
|
||||
CTG_ERR_JRET(ctgCloneTableIndex(pOut->pIndex, &pInfo));
|
||||
pTask->res = pInfo;
|
||||
|
||||
SCtgTbIndexCtx* ctx = pTask->taskCtx;
|
||||
CTG_ERR_JRET(ctgUpdateTbIndexEnqueue(pTask->pJob->pCtg, (STableIndex**)&pTask->msgCtx.out, false));
|
||||
|
||||
_return:
|
||||
if (TSDB_CODE_MND_DB_INDEX_NOT_EXIST == code) {
|
||||
|
@ -1008,7 +1018,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
|
|||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, &dbCache));
|
||||
if (dbCache) {
|
||||
SVgroupInfo vgInfo = {0};
|
||||
CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, ctx->pName, &vgInfo));
|
||||
CTG_ERR_RET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
|
||||
|
||||
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
|
||||
|
||||
|
@ -1026,8 +1036,7 @@ int32_t ctgAsyncRefreshTbMeta(SCtgTask *pTask) {
|
|||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
|
@ -1057,7 +1066,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
|
|||
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
|
||||
if (NULL != dbCache) {
|
||||
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgInfo->vgHash, (SArray**)&pTask->res));
|
||||
CTG_ERR_JRET(ctgGenerateVgList(pCtg, dbCache->vgCache.vgInfo->vgHash, (SArray**)&pTask->res));
|
||||
|
||||
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
|
||||
} else {
|
||||
|
@ -1072,8 +1081,7 @@ int32_t ctgLaunchGetDbVgTask(SCtgTask *pTask) {
|
|||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
|
@ -1092,7 +1100,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
|
|||
if (NULL == pTask->res) {
|
||||
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
|
||||
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, pCtx->pName, (SVgroupInfo*)pTask->res));
|
||||
|
||||
CTG_ERR_JRET(ctgHandleTaskEnd(pTask, 0));
|
||||
} else {
|
||||
|
@ -1107,8 +1115,7 @@ int32_t ctgLaunchGetTbHashTask(SCtgTask *pTask) {
|
|||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
|
@ -1119,6 +1126,15 @@ int32_t ctgLaunchGetTbIndexTask(SCtgTask *pTask) {
|
|||
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||
SCtgTbIndexCtx* pCtx = (SCtgTbIndexCtx*)pTask->taskCtx;
|
||||
SArray* pRes = NULL;
|
||||
|
||||
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pCtx->pName, &pRes));
|
||||
if (pRes) {
|
||||
pTask->res = pRes;
|
||||
|
||||
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgGetTbIndexFromMnode(pCtg, pConn, pCtx->pName, NULL, pTask));
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1157,9 +1173,9 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
|
|||
SDbInfo* pInfo = (SDbInfo*)pTask->res;
|
||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, pCtx->dbFName, &dbCache));
|
||||
if (NULL != dbCache) {
|
||||
pInfo->vgVer = dbCache->vgInfo->vgVersion;
|
||||
pInfo->vgVer = dbCache->vgCache.vgInfo->vgVersion;
|
||||
pInfo->dbId = dbCache->dbId;
|
||||
pInfo->tbNum = dbCache->vgInfo->numOfTable;
|
||||
pInfo->tbNum = dbCache->vgCache.vgInfo->numOfTable;
|
||||
} else {
|
||||
pInfo->vgVer = CTG_DEFAULT_INVALID_VERSION;
|
||||
}
|
||||
|
@ -1169,8 +1185,7 @@ int32_t ctgLaunchGetDbInfoTask(SCtgTask *pTask) {
|
|||
_return:
|
||||
|
||||
if (dbCache) {
|
||||
ctgReleaseVgInfo(dbCache);
|
||||
ctgReleaseDBCache(pCtg, dbCache);
|
||||
ctgReleaseVgInfoToCache(pCtg, dbCache);
|
||||
}
|
||||
|
||||
CTG_RET(code);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -19,7 +19,7 @@
|
|||
#include "catalogInt.h"
|
||||
|
||||
extern SCatalogMgmt gCtgMgmt;
|
||||
SCtgDebug gCTGDebug = {.apiEnable = true};
|
||||
SCtgDebug gCTGDebug = {0};
|
||||
|
||||
void ctgdUserCallback(SMetaData* pResult, void* param, int32_t code) {
|
||||
ASSERT(*(int32_t*)param == 1);
|
||||
|
@ -266,11 +266,11 @@ int32_t ctgdGetStatNum(char *option, void *res) {
|
|||
}
|
||||
|
||||
int32_t ctgdGetTbMetaNum(SCtgDBCache *dbCache) {
|
||||
return dbCache->tbCache.metaCache ? (int32_t)taosHashGetSize(dbCache->tbCache.metaCache) : 0;
|
||||
return dbCache->tbCache ? (int32_t)taosHashGetSize(dbCache->tbCache) : 0;
|
||||
}
|
||||
|
||||
int32_t ctgdGetStbNum(SCtgDBCache *dbCache) {
|
||||
return dbCache->tbCache.stbCache ? (int32_t)taosHashGetSize(dbCache->tbCache.stbCache) : 0;
|
||||
return dbCache->stbCache ? (int32_t)taosHashGetSize(dbCache->stbCache) : 0;
|
||||
}
|
||||
|
||||
int32_t ctgdGetRentNum(SCtgRentMgmt *rent) {
|
||||
|
@ -363,17 +363,17 @@ void ctgdShowDBCache(SCatalog* pCtg, SHashObj *dbHash) {
|
|||
|
||||
dbFName = taosHashGetKey(pIter, &len);
|
||||
|
||||
int32_t metaNum = dbCache->tbCache.metaCache ? taosHashGetSize(dbCache->tbCache.metaCache) : 0;
|
||||
int32_t stbNum = dbCache->tbCache.stbCache ? taosHashGetSize(dbCache->tbCache.stbCache) : 0;
|
||||
int32_t metaNum = dbCache->tbCache ? taosHashGetSize(dbCache->tbCache) : 0;
|
||||
int32_t stbNum = dbCache->stbCache ? taosHashGetSize(dbCache->stbCache) : 0;
|
||||
int32_t vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||
int32_t hashMethod = -1;
|
||||
int32_t vgNum = 0;
|
||||
|
||||
if (dbCache->vgInfo) {
|
||||
vgVersion = dbCache->vgInfo->vgVersion;
|
||||
hashMethod = dbCache->vgInfo->hashMethod;
|
||||
if (dbCache->vgInfo->vgHash) {
|
||||
vgNum = taosHashGetSize(dbCache->vgInfo->vgHash);
|
||||
if (dbCache->vgCache.vgInfo) {
|
||||
vgVersion = dbCache->vgCache.vgInfo->vgVersion;
|
||||
hashMethod = dbCache->vgCache.vgInfo->hashMethod;
|
||||
if (dbCache->vgCache.vgInfo->vgHash) {
|
||||
vgNum = taosHashGetSize(dbCache->vgCache.vgInfo->vgHash);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -431,7 +431,7 @@ int32_t ctgGetIndexInfoFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, SArray** out, SCtgTask* pTask) {
|
||||
int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *name, STableIndex* out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_GET_TABLE_INDEX;
|
||||
|
@ -448,10 +448,11 @@ int32_t ctgGetTbIndexFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, SName *n
|
|||
}
|
||||
|
||||
if (pTask) {
|
||||
void* pOut = taosMemoryCalloc(1, POINTER_BYTES);
|
||||
void* pOut = taosMemoryCalloc(1, sizeof(STableIndex));
|
||||
if (NULL == pOut) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, pOut, (char*)tbFName));
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
|
|
|
@ -44,6 +44,16 @@ char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
|
|||
}
|
||||
}
|
||||
|
||||
void ctgFreeSTableIndex(void *info) {
|
||||
if (NULL == info) {
|
||||
return;
|
||||
}
|
||||
|
||||
STableIndex *pInfo = (STableIndex *)info;
|
||||
|
||||
taosArrayDestroyEx(pInfo->pIndex, tFreeSTableIndexInfo);
|
||||
}
|
||||
|
||||
void ctgFreeSMetaData(SMetaData* pData) {
|
||||
taosArrayDestroy(pData->pTableMeta);
|
||||
pData->pTableMeta = NULL;
|
||||
|
@ -110,25 +120,39 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
|
|||
taosMemoryFreeClear(mgmt->slots);
|
||||
}
|
||||
|
||||
|
||||
void ctgFreeTbMetaCache(SCtgTbMetaCache *cache) {
|
||||
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
||||
if (cache->stbCache) {
|
||||
int32_t stblNum = taosHashGetSize(cache->stbCache);
|
||||
taosHashCleanup(cache->stbCache);
|
||||
cache->stbCache = NULL;
|
||||
CTG_CACHE_STAT_DEC(stblNum, stblNum);
|
||||
void ctgFreeStbMetaCache(SCtgDBCache *dbCache) {
|
||||
if (NULL == dbCache->stbCache) {
|
||||
return;
|
||||
}
|
||||
CTG_UNLOCK(CTG_WRITE, &cache->stbLock);
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &cache->metaLock);
|
||||
if (cache->metaCache) {
|
||||
int32_t tblNum = taosHashGetSize(cache->metaCache);
|
||||
taosHashCleanup(cache->metaCache);
|
||||
cache->metaCache = NULL;
|
||||
CTG_CACHE_STAT_DEC(tblNum, tblNum);
|
||||
int32_t stblNum = taosHashGetSize(dbCache->stbCache);
|
||||
taosHashCleanup(dbCache->stbCache);
|
||||
dbCache->stbCache = NULL;
|
||||
CTG_CACHE_STAT_DEC(stblNum, stblNum);
|
||||
}
|
||||
|
||||
void ctgFreeTbCacheImpl(SCtgTbCache *pCache) {
|
||||
taosMemoryFreeClear(pCache->pMeta);
|
||||
if (pCache->pIndex) {
|
||||
taosArrayDestroyEx(pCache->pIndex->pIndex, tFreeSTableIndexInfo);
|
||||
taosMemoryFreeClear(pCache->pIndex);
|
||||
}
|
||||
CTG_UNLOCK(CTG_WRITE, &cache->metaLock);
|
||||
}
|
||||
|
||||
void ctgFreeTbCache(SCtgDBCache *dbCache) {
|
||||
if (NULL == dbCache->tbCache) {
|
||||
return;
|
||||
}
|
||||
|
||||
int32_t tblNum = taosHashGetSize(dbCache->tbCache);
|
||||
SCtgTbCache *pCache = taosHashIterate(dbCache->tbCache, NULL);
|
||||
while (NULL != pCache) {
|
||||
ctgFreeTbCacheImpl(pCache);
|
||||
pCache = taosHashIterate(dbCache->tbCache, pCache);
|
||||
}
|
||||
taosHashCleanup(dbCache->tbCache);
|
||||
dbCache->tbCache = NULL;
|
||||
CTG_CACHE_STAT_DEC(tblNum, tblNum);
|
||||
}
|
||||
|
||||
void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
|
||||
|
@ -144,16 +168,18 @@ void ctgFreeVgInfo(SDBVgInfo *vgInfo) {
|
|||
taosMemoryFreeClear(vgInfo);
|
||||
}
|
||||
|
||||
void ctgFreeVgInfoCache(SCtgDBCache *dbCache) {
|
||||
ctgFreeVgInfo(dbCache->vgCache.vgInfo);
|
||||
}
|
||||
|
||||
void ctgFreeDbCache(SCtgDBCache *dbCache) {
|
||||
if (NULL == dbCache) {
|
||||
return;
|
||||
}
|
||||
|
||||
CTG_LOCK(CTG_WRITE, &dbCache->vgLock);
|
||||
ctgFreeVgInfo (dbCache->vgInfo);
|
||||
CTG_UNLOCK(CTG_WRITE, &dbCache->vgLock);
|
||||
|
||||
ctgFreeTbMetaCache(&dbCache->tbCache);
|
||||
ctgFreeVgInfoCache(dbCache);
|
||||
ctgFreeStbMetaCache(dbCache);
|
||||
ctgFreeTbCache(dbCache);
|
||||
}
|
||||
|
||||
|
||||
|
@ -167,16 +193,13 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
|||
void *pIter = taosHashIterate(pCtg->dbCache, NULL);
|
||||
while (pIter) {
|
||||
SCtgDBCache *dbCache = pIter;
|
||||
|
||||
atomic_store_8(&dbCache->deleted, 1);
|
||||
|
||||
ctgFreeDbCache(dbCache);
|
||||
|
||||
pIter = taosHashIterate(pCtg->dbCache, pIter);
|
||||
}
|
||||
|
||||
taosHashCleanup(pCtg->dbCache);
|
||||
|
||||
CTG_CACHE_STAT_DEC(dbNum, dbNum);
|
||||
}
|
||||
|
||||
|
@ -186,14 +209,12 @@ void ctgFreeHandle(SCatalog* pCtg) {
|
|||
void *pIter = taosHashIterate(pCtg->userCache, NULL);
|
||||
while (pIter) {
|
||||
SCtgUserAuth *userCache = pIter;
|
||||
|
||||
ctgFreeSCtgUserAuth(userCache);
|
||||
|
||||
pIter = taosHashIterate(pCtg->userCache, pIter);
|
||||
}
|
||||
|
||||
taosHashCleanup(pCtg->userCache);
|
||||
|
||||
CTG_CACHE_STAT_DEC(userNum, userNum);
|
||||
}
|
||||
|
||||
|
@ -252,9 +273,9 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
|
|||
break;
|
||||
}
|
||||
case TDMT_MND_GET_TABLE_INDEX: {
|
||||
SArray** pOut = (SArray**)pCtx->out;
|
||||
STableIndex* pOut = (STableIndex*)pCtx->out;
|
||||
if (pOut) {
|
||||
taosArrayDestroyEx(*pOut, tFreeSTableIndexInfo);
|
||||
taosArrayDestroyEx(pOut->pIndex, tFreeSTableIndexInfo);
|
||||
taosMemoryFreeClear(pCtx->out);
|
||||
}
|
||||
break;
|
||||
|
@ -535,9 +556,9 @@ int32_t ctgGetVgInfoFromHashValue(SCatalog *pCtg, SDBVgInfo *dbInfo, const SName
|
|||
}
|
||||
|
||||
int32_t ctgStbVersionSearchCompare(const void* key1, const void* key2) {
|
||||
if (*(uint64_t *)key1 < ((SSTableMetaVersion*)key2)->suid) {
|
||||
if (*(uint64_t *)key1 < ((SSTableVersion*)key2)->suid) {
|
||||
return -1;
|
||||
} else if (*(uint64_t *)key1 > ((SSTableMetaVersion*)key2)->suid) {
|
||||
} else if (*(uint64_t *)key1 > ((SSTableVersion*)key2)->suid) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -555,9 +576,9 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2) {
|
|||
}
|
||||
|
||||
int32_t ctgStbVersionSortCompare(const void* key1, const void* key2) {
|
||||
if (((SSTableMetaVersion*)key1)->suid < ((SSTableMetaVersion*)key2)->suid) {
|
||||
if (((SSTableVersion*)key1)->suid < ((SSTableVersion*)key2)->suid) {
|
||||
return -1;
|
||||
} else if (((SSTableMetaVersion*)key1)->suid > ((SSTableMetaVersion*)key2)->suid) {
|
||||
} else if (((SSTableVersion*)key1)->suid > ((SSTableVersion*)key2)->suid) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
|
@ -640,6 +661,27 @@ int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
|
||||
if (NULL == pIndex) {
|
||||
*pRes = NULL;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t num = taosArrayGetSize(pIndex);
|
||||
*pRes = taosArrayInit(num, sizeof(STableIndexInfo));
|
||||
if (NULL == *pRes) {
|
||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < num; ++i) {
|
||||
STableIndexInfo *pInfo = taosArrayGet(pIndex, i);
|
||||
taosArrayPush(*pRes, pInfo);
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) {
|
||||
if (msgType == TDMT_VND_TABLE_META) {
|
||||
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
|
||||
|
|
|
@ -895,9 +895,9 @@ void *ctgTestSetCtableMetaThread(void *param) {
|
|||
output = (STableMetaOutput *)taosMemoryMalloc(sizeof(STableMetaOutput));
|
||||
ctgTestBuildCTableMetaOutput(output);
|
||||
|
||||
SCtgUpdateTblMsg *msg = (SCtgUpdateTblMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTblMsg));
|
||||
SCtgUpdateTbMetaMsg *msg = (SCtgUpdateTbMetaMsg *)taosMemoryMalloc(sizeof(SCtgUpdateTbMetaMsg));
|
||||
msg->pCtg = pCtg;
|
||||
msg->output = output;
|
||||
msg->pMeta = output;
|
||||
operation.data = msg;
|
||||
|
||||
code = ctgOpUpdateTbMeta(&operation);
|
||||
|
@ -989,7 +989,7 @@ TEST(tableMeta, normalTable) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
SDbVgVersion *dbs = NULL;
|
||||
SSTableMetaVersion *stb = NULL;
|
||||
SSTableVersion *stb = NULL;
|
||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||
int32_t i = 0;
|
||||
while (i < 5) {
|
||||
|
@ -1098,7 +1098,7 @@ TEST(tableMeta, childTableCase) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
SDbVgVersion *dbs = NULL;
|
||||
SSTableMetaVersion *stb = NULL;
|
||||
SSTableVersion *stb = NULL;
|
||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||
int32_t i = 0;
|
||||
while (i < 5) {
|
||||
|
@ -1220,7 +1220,7 @@ TEST(tableMeta, superTableCase) {
|
|||
ASSERT_EQ(tableMeta->tableInfo.rowSize, 12);
|
||||
|
||||
SDbVgVersion *dbs = NULL;
|
||||
SSTableMetaVersion *stb = NULL;
|
||||
SSTableVersion *stb = NULL;
|
||||
uint32_t dbNum = 0, stbNum = 0, allDbNum = 0, allStbNum = 0;
|
||||
int32_t i = 0;
|
||||
while (i < 5) {
|
||||
|
@ -2299,7 +2299,7 @@ TEST(rentTest, allRent) {
|
|||
SArray *vgList = NULL;
|
||||
ctgTestStop = false;
|
||||
SDbVgVersion *dbs = NULL;
|
||||
SSTableMetaVersion *stable = NULL;
|
||||
SSTableVersion *stable = NULL;
|
||||
uint32_t num = 0;
|
||||
|
||||
ctgTestInitLogFile();
|
||||
|
|
|
@ -97,7 +97,14 @@ static int32_t sifGetOperParamNum(EOperatorType ty) {
|
|||
}
|
||||
return 2;
|
||||
}
|
||||
static int32_t sifValidateColumn(SColumnNode *cn) {
|
||||
static int32_t sifValidOp(EOperatorType ty) {
|
||||
if ((ty >= OP_TYPE_ADD && ty <= OP_TYPE_BIT_OR) || (ty == OP_TYPE_IN || ty == OP_TYPE_NOT_IN) ||
|
||||
(ty == OP_TYPE_LIKE || ty == OP_TYPE_NOT_LIKE || ty == OP_TYPE_MATCH || ty == OP_TYPE_NMATCH)) {
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
static int32_t sifValidColumn(SColumnNode *cn) {
|
||||
// add more check
|
||||
if (cn == NULL) {
|
||||
return TSDB_CODE_QRY_INVALID_INPUT;
|
||||
|
@ -176,6 +183,7 @@ static int32_t sifInitJsonParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
|||
memcpy(param->colName, r->literal, strlen(r->literal));
|
||||
// sprintf(param->colName, "%s_%s", l->colName, r->literal);
|
||||
param->colValType = r->typeData;
|
||||
param->status = SFLT_COARSE_INDEX;
|
||||
return 0;
|
||||
// memcpy(param->colName, l->colName, sizeof(l->colName));
|
||||
}
|
||||
|
@ -197,7 +205,7 @@ static int32_t sifInitParam(SNode *node, SIFParam *param, SIFCtx *ctx) {
|
|||
case QUERY_NODE_COLUMN: {
|
||||
SColumnNode *cn = (SColumnNode *)node;
|
||||
/*only support tag column*/
|
||||
SIF_ERR_RET(sifValidateColumn(cn));
|
||||
SIF_ERR_RET(sifValidColumn(cn));
|
||||
|
||||
param->colId = cn->colId;
|
||||
param->colValType = cn->node.resType.type;
|
||||
|
@ -247,11 +255,13 @@ static int32_t sifInitOperParams(SIFParam **params, SOperatorNode *node, SIFCtx
|
|||
return code;
|
||||
}
|
||||
SIFParam *paramList = taosMemoryCalloc(nParam, sizeof(SIFParam));
|
||||
|
||||
if (NULL == paramList) {
|
||||
SIF_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||
}
|
||||
|
||||
if (nodeType(node->pLeft) == QUERY_NODE_OPERATOR) {
|
||||
if (nodeType(node->pLeft) == QUERY_NODE_OPERATOR &&
|
||||
(((SOperatorNode *)(node->pLeft))->opType == OP_TYPE_JSON_GET_VALUE)) {
|
||||
SNode *interNode = (node->pLeft);
|
||||
SIF_ERR_JRET(sifInitJsonParam(interNode, ¶mList[0], ctx));
|
||||
if (nParam > 1) {
|
||||
|
@ -505,6 +515,11 @@ static int32_t sifGetOperFn(int32_t funcId, sif_func_t *func, SIdxFltStatus *sta
|
|||
|
||||
static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
||||
int32_t code = 0;
|
||||
if (sifValidOp(node->opType) < 0) {
|
||||
output->status = SFLT_NOT_INDEX;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t nParam = sifGetOperParamNum(node->opType);
|
||||
if (nParam <= 1) {
|
||||
output->status = SFLT_NOT_INDEX;
|
||||
|
@ -513,9 +528,15 @@ static int32_t sifExecOper(SOperatorNode *node, SIFCtx *ctx, SIFParam *output) {
|
|||
if (node->opType == OP_TYPE_JSON_GET_VALUE) {
|
||||
return code;
|
||||
}
|
||||
SIFParam *params = NULL;
|
||||
|
||||
SIFParam *params = NULL;
|
||||
SIF_ERR_RET(sifInitOperParams(¶ms, node, ctx));
|
||||
|
||||
if (params[0].status == SFLT_NOT_INDEX || (nParam > 1 && params[1].status == SFLT_NOT_INDEX)) {
|
||||
output->status = SFLT_NOT_INDEX;
|
||||
return code;
|
||||
}
|
||||
|
||||
// ugly code, refactor later
|
||||
output->arg = ctx->arg;
|
||||
sif_func_t operFn = sifNullFunc;
|
||||
|
|
|
@ -484,14 +484,12 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
|
|||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
STableIndexRsp out = {0};
|
||||
if (tDeserializeSTableIndexRsp(msg, msgSize, &out) != 0) {
|
||||
STableIndexRsp *out = (STableIndexRsp*)output;
|
||||
if (tDeserializeSTableIndexRsp(msg, msgSize, out) != 0) {
|
||||
qError("tDeserializeSTableIndexRsp failed, msgSize:%d", msgSize);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
*(void **)output = out.pIndex;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -837,7 +837,6 @@ void qwProcessHbTimerEvent(void *param, void *tmrId) {
|
|||
SQWorker *mgmt = qwAcquire(refId);
|
||||
if (NULL == mgmt) {
|
||||
QW_DLOG("qwAcquire %" PRIx64 "failed", refId);
|
||||
taosMemoryFree(param);
|
||||
return;
|
||||
}
|
||||
|
||||
|
|
|
@ -1476,6 +1476,11 @@ void filterDumpInfoToString(SFilterInfo *info, const char *msg, int32_t options)
|
|||
for (uint32_t i = 0; i < info->fields[FLD_TYPE_VALUE].num; ++i) {
|
||||
SFilterField *field = &info->fields[FLD_TYPE_VALUE].fields[i];
|
||||
if (field->desc) {
|
||||
if (QUERY_NODE_VALUE != nodeType(field->desc)) {
|
||||
qDebug("VAL%d => [type:not value node][val:NIL]", i); //TODO
|
||||
continue;
|
||||
}
|
||||
|
||||
SValueNode *var = (SValueNode *)field->desc;
|
||||
SDataType *dType = &var->node.resType;
|
||||
if (dType->type == TSDB_DATA_TYPE_VALUE_ARRAY) {
|
||||
|
|
|
@ -1658,14 +1658,24 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {
|
|||
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
syncRpcMsgLog2((char*)"==syncNodeEqHeartbeatTimer==", &rpcMsg);
|
||||
if (pSyncNode->FpEqMsg != NULL) {
|
||||
pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
int32_t code = pSyncNode->FpEqMsg(pSyncNode->msgcb, &rpcMsg);
|
||||
if (code != 0) {
|
||||
sError("vgId:%d sync enqueue timer msg error, code:%d", pSyncNode->vgId, code);
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
return;
|
||||
}
|
||||
|
||||
} else {
|
||||
sTrace("syncNodeEqHeartbeatTimer pSyncNode->FpEqMsg is NULL");
|
||||
}
|
||||
syncTimeoutDestroy(pSyncMsg);
|
||||
|
||||
if (gSyncEnv != NULL) {
|
||||
taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
|
||||
&pSyncNode->pHeartbeatTimer);
|
||||
} else {
|
||||
sError("sync env is already stop");
|
||||
}
|
||||
} else {
|
||||
sTrace("==syncNodeEqHeartbeatTimer== heartbeatTimerLogicClock:%" PRIu64 ", heartbeatTimerLogicClockUser:%" PRIu64
|
||||
"",
|
||||
|
|
|
@ -458,6 +458,9 @@ static void taosArrayInsertSort(SArray* pArray, __ext_compar_fn_t fn, const void
|
|||
}
|
||||
|
||||
SArray* taosArrayDeepCopy(const SArray* pSrc, FCopy deepCopy) {
|
||||
if (NULL == pSrc) {
|
||||
return NULL;
|
||||
}
|
||||
ASSERT(pSrc->elemSize == sizeof(void*));
|
||||
SArray* pArray = taosArrayInit(pSrc->size, sizeof(void*));
|
||||
for (int32_t i = 0; i < pSrc->size; i++) {
|
||||
|
|
|
@ -58,7 +58,7 @@
|
|||
# ---- mnode
|
||||
./test.sh -f tsim/mnode/basic1.sim
|
||||
./test.sh -f tsim/mnode/basic2.sim
|
||||
#./test.sh -f tsim/mnode/basic3.sim
|
||||
./test.sh -f tsim/mnode/basic3.sim
|
||||
./test.sh -f tsim/mnode/basic4.sim
|
||||
./test.sh -f tsim/mnode/basic5.sim
|
||||
|
||||
|
|
|
@ -73,5 +73,80 @@ if $rows != 6 then
|
|||
endi
|
||||
|
||||
|
||||
print ========== prepare stbBin and ctbBin
|
||||
sql create table db.stbBin (ts timestamp, c1 int, c2 binary(4)) tags(t1 binary(16))
|
||||
|
||||
|
||||
sql create table db.ctbBin using db.stbBin tags("a")
|
||||
sql insert into db.ctbBin values(now, 1, "2")
|
||||
|
||||
sql create table db.ctbBin1 using db.stbBin tags("b")
|
||||
sql insert into db.ctbBin1 values(now, 2, "2")
|
||||
|
||||
sql create table db.ctbBin2 using db.stbBin tags("c")
|
||||
sql insert into db.ctbBin2 values(now, 3, "2")
|
||||
|
||||
sql create table db.ctbBin3 using db.stbBin tags("d")
|
||||
sql insert into db.ctbBin3 values(now, 4, "2")
|
||||
|
||||
sql select * from db.stbBin where t1 = "a"
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select * from db.stbBin where t1 < "a"
|
||||
if $rows != 0 then
|
||||
return -=1
|
||||
endi
|
||||
|
||||
sql select * from db.stbBin where t1 < "b"
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select * from db.stbBin where t1 between "a" and "e"
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
print ========== prepare stbNc and ctbNc
|
||||
sql create table db.stbNc (ts timestamp, c1 int, c2 binary(4)) tags(t1 nchar(16))
|
||||
|
||||
|
||||
sql create table db.ctbNc using db.stbNc tags("a")
|
||||
sql insert into db.ctbNc values(now, 1, "2")
|
||||
|
||||
sql create table db.ctbNc1 using db.stbNc tags("b")
|
||||
sql insert into db.ctbNc1 values(now, 2, "2")
|
||||
|
||||
sql create table db.ctbNc2 using db.stbNc tags("c")
|
||||
sql insert into db.ctbNc2 values(now, 3, "2")
|
||||
|
||||
sql create table db.ctbNc3 using db.stbNc tags("d")
|
||||
sql insert into db.ctbNc3 values(now, 4, "2")
|
||||
|
||||
sql select * from db.stbNc where t1 = "a"
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select * from db.stbNc where t1 < "a"
|
||||
if $rows != 0 then
|
||||
return -=1
|
||||
endi
|
||||
|
||||
sql select * from db.stbNc where t1 < "b"
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
||||
sql select * from db.stbNc where t1 between "a" and "e"
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
|
|
Loading…
Reference in New Issue