Merge remote-tracking branch 'origin/3.0' into feature/config
This commit is contained in:
commit
e14ef16667
|
@ -167,6 +167,7 @@ typedef struct {
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
int64_t dbId;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
} SBuildUseDBInput;
|
} SBuildUseDBInput;
|
||||||
|
|
||||||
|
@ -563,6 +564,7 @@ int32_t tDeserializeSDropDbRsp(void* buf, int32_t bufLen, SDropDbRsp* pRsp);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
|
int64_t dbId;
|
||||||
int32_t vgVersion;
|
int32_t vgVersion;
|
||||||
} SUseDbReq;
|
} SUseDbReq;
|
||||||
|
|
||||||
|
|
|
@ -95,7 +95,7 @@ int32_t catalogGetHandle(uint64_t clusterId, SCatalog** catalogHandle);
|
||||||
*/
|
*/
|
||||||
void catalogFreeHandle(SCatalog* pCatalog);
|
void catalogFreeHandle(SCatalog* pCatalog);
|
||||||
|
|
||||||
int32_t catalogGetDBVgVersion(SCatalog* pCatalog, const char* dbName, int32_t* version);
|
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get a DB's all vgroup info.
|
* Get a DB's all vgroup info.
|
||||||
|
|
|
@ -156,6 +156,8 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code);
|
||||||
*/
|
*/
|
||||||
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
|
int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo);
|
||||||
|
|
||||||
|
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp);
|
||||||
|
|
||||||
void initQueryModuleMsgHandle();
|
void initQueryModuleMsgHandle();
|
||||||
|
|
||||||
const SSchema* tGetTbnameColumnSchema();
|
const SSchema* tGetTbnameColumnSchema();
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
#include "clientInt.h"
|
#include "clientInt.h"
|
||||||
#include "clientLog.h"
|
#include "clientLog.h"
|
||||||
#include "catalog.h"
|
#include "catalog.h"
|
||||||
|
#include "query.h"
|
||||||
|
|
||||||
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
|
int32_t (*handleRequestRspFp[TDMT_MAX])(void*, const SDataBuf* pMsg, int32_t code);
|
||||||
|
|
||||||
|
@ -243,6 +244,23 @@ int32_t processCreateDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SRequestObj* pRequest = param;
|
SRequestObj* pRequest = param;
|
||||||
|
|
||||||
|
if (TSDB_CODE_MND_DB_NOT_EXIST == code) {
|
||||||
|
SUseDbRsp usedbRsp = {0};
|
||||||
|
tDeserializeSUseDbRsp(pMsg->pData, pMsg->len, &usedbRsp);
|
||||||
|
struct SCatalog *pCatalog = NULL;
|
||||||
|
|
||||||
|
if (usedbRsp.vgVersion >= 0) {
|
||||||
|
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
catalogRemoveDB(pCatalog, usedbRsp.db, usedbRsp.uid);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tFreeSUsedbRsp(&usedbRsp);
|
||||||
|
}
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
free(pMsg->pData);
|
free(pMsg->pData);
|
||||||
setErrno(pRequest, code);
|
setErrno(pRequest, code);
|
||||||
|
@ -256,6 +274,26 @@ int32_t processUseDbRsp(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB);
|
tNameFromString(&name, usedbRsp.db, T_NAME_ACCT|T_NAME_DB);
|
||||||
|
|
||||||
|
SUseDbOutput output = {0};
|
||||||
|
code = queryBuildUseDbOutput(&output, &usedbRsp);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = code;
|
||||||
|
if (output.dbVgroup) taosHashCleanup(output.dbVgroup->vgHash);
|
||||||
|
tfree(output.dbVgroup);
|
||||||
|
|
||||||
|
tscError("failed to build use db output since %s", terrstr());
|
||||||
|
} else {
|
||||||
|
struct SCatalog *pCatalog = NULL;
|
||||||
|
|
||||||
|
int32_t code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog);
|
||||||
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
tscWarn("catalogGetHandle failed, clusterId:%"PRIx64", error:%s", pRequest->pTscObj->pAppInfo->clusterId, tstrerror(code));
|
||||||
|
} else {
|
||||||
|
catalogUpdateDBVgInfo(pCatalog, output.db, output.dbId, output.dbVgroup);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
tFreeSUsedbRsp(&usedbRsp);
|
tFreeSUsedbRsp(&usedbRsp);
|
||||||
|
|
||||||
char db[TSDB_DB_NAME_LEN] = {0};
|
char db[TSDB_DB_NAME_LEN] = {0};
|
||||||
|
|
|
@ -1421,6 +1421,7 @@ int32_t tSerializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
|
if (tEncodeI64(&encoder, pReq->dbId) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -1435,6 +1436,7 @@ int32_t tDeserializeSUseDbReq(void *buf, int32_t bufLen, SUseDbReq *pReq) {
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
|
||||||
|
if (tDecodeI64(&decoder, &pReq->dbId) < 0) return -1;
|
||||||
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
|
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
|
||||||
tEndDecode(&decoder);
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
|
|
@ -940,43 +940,51 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
||||||
char *p = strchr(usedbReq.db, '.');
|
char *p = strchr(usedbReq.db, '.');
|
||||||
if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) {
|
if (p && 0 == strcmp(p + 1, TSDB_INFORMATION_SCHEMA_DB)) {
|
||||||
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
|
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
|
||||||
|
code = 0;
|
||||||
} else {
|
} else {
|
||||||
pDb = mndAcquireDb(pMnode, usedbReq.db);
|
pDb = mndAcquireDb(pMnode, usedbReq.db);
|
||||||
if (pDb == NULL) {
|
if (pDb == NULL) {
|
||||||
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
terrno = TSDB_CODE_MND_DB_NOT_EXIST;
|
||||||
goto USE_DB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
pUser = mndAcquireUser(pMnode, pReq->user);
|
memcpy(usedbRsp.db, usedbReq.db, TSDB_DB_FNAME_LEN);
|
||||||
if (pUser == NULL) {
|
usedbRsp.uid = usedbReq.dbId;
|
||||||
goto USE_DB_OVER;
|
usedbRsp.vgVersion = usedbReq.vgVersion;
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCheckUseDbAuth(pUser, pDb) != 0) {
|
mError("db:%s, failed to process use db req since %s", usedbReq.db, terrstr());
|
||||||
goto USE_DB_OVER;
|
} else {
|
||||||
}
|
pUser = mndAcquireUser(pMnode, pReq->user);
|
||||||
|
if (pUser == NULL) {
|
||||||
|
goto USE_DB_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
if (mndCheckUseDbAuth(pUser, pDb) != 0) {
|
||||||
if (usedbRsp.pVgroupInfos == NULL) {
|
goto USE_DB_OVER;
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
}
|
||||||
goto USE_DB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (usedbReq.vgVersion < pDb->vgVersion) {
|
usedbRsp.pVgroupInfos = taosArrayInit(pDb->cfg.numOfVgroups, sizeof(SVgroupInfo));
|
||||||
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
|
if (usedbRsp.pVgroupInfos == NULL) {
|
||||||
}
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
goto USE_DB_OVER;
|
||||||
|
}
|
||||||
|
|
||||||
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
|
if (usedbReq.vgVersion < pDb->vgVersion || usedbReq.dbId != pDb->uid) {
|
||||||
usedbRsp.uid = pDb->uid;
|
mndBuildDBVgroupInfo(pDb, pMnode, usedbRsp.pVgroupInfos);
|
||||||
usedbRsp.vgVersion = pDb->vgVersion;
|
}
|
||||||
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
|
|
||||||
usedbRsp.hashMethod = pDb->hashMethod;
|
memcpy(usedbRsp.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
usedbRsp.uid = pDb->uid;
|
||||||
|
usedbRsp.vgVersion = pDb->vgVersion;
|
||||||
|
usedbRsp.vgNum = taosArrayGetSize(usedbRsp.pVgroupInfos);
|
||||||
|
usedbRsp.hashMethod = pDb->hashMethod;
|
||||||
|
code = 0;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp);
|
int32_t contLen = tSerializeSUseDbRsp(NULL, 0, &usedbRsp);
|
||||||
void *pRsp = rpcMallocCont(contLen);
|
void *pRsp = rpcMallocCont(contLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
code = -1;
|
||||||
goto USE_DB_OVER;
|
goto USE_DB_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -984,7 +992,6 @@ static int32_t mndProcessUseDbReq(SMnodeMsg *pReq) {
|
||||||
|
|
||||||
pReq->pCont = pRsp;
|
pReq->pCont = pRsp;
|
||||||
pReq->contLen = contLen;
|
pReq->contLen = contLen;
|
||||||
code = 0;
|
|
||||||
|
|
||||||
USE_DB_OVER:
|
USE_DB_OVER:
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
|
|
@ -494,7 +494,7 @@ PROCESS_RPC_END:
|
||||||
if (code == TSDB_CODE_APP_NOT_READY) {
|
if (code == TSDB_CODE_APP_NOT_READY) {
|
||||||
mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
|
mndSendRedirectRsp(pMnode, &pMsg->rpcMsg);
|
||||||
} else if (code != 0) {
|
} else if (code != 0) {
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .code = code};
|
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont, .code = code};
|
||||||
rpcSendResponse(&rpcRsp);
|
rpcSendResponse(&rpcRsp);
|
||||||
} else {
|
} else {
|
||||||
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
|
SRpcMsg rpcRsp = {.handle = pMsg->rpcMsg.handle, .contLen = pMsg->contLen, .pCont = pMsg->pCont};
|
||||||
|
|
|
@ -166,6 +166,28 @@ void ctgDbgShowDBCache(SHashObj *dbHash) {
|
||||||
taosHashGetKey(dbCache, (void **)&dbFName, &len);
|
taosHashGetKey(dbCache, (void **)&dbFName, &len);
|
||||||
|
|
||||||
CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId);
|
CTG_CACHE_DEBUG("** %dth db [%.*s][%"PRIx64"] **", i, (int32_t)len, dbFName, dbCache->dbId);
|
||||||
|
|
||||||
|
CTG_CACHE_DEBUG("deleted: %d", dbCache->deleted);
|
||||||
|
if (dbCache->vgInfo) {
|
||||||
|
CTG_CACHE_DEBUG("vgVersion: %d", dbCache->vgInfo->vgVersion);
|
||||||
|
CTG_CACHE_DEBUG("hashMethod: %d", dbCache->vgInfo->hashMethod);
|
||||||
|
if (dbCache->vgInfo->vgHash) {
|
||||||
|
CTG_CACHE_DEBUG("vgNum: %d", taosHashGetSize(dbCache->vgInfo->vgHash));
|
||||||
|
//TODO
|
||||||
|
} else {
|
||||||
|
CTG_CACHE_DEBUG("vgHash: %p", dbCache->vgInfo->vgHash);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
CTG_CACHE_DEBUG("vgInfo: %p", dbCache->vgInfo);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dbCache->tbCache.metaCache) {
|
||||||
|
CTG_CACHE_DEBUG("metaNum: %d", taosHashGetSize(dbCache->tbCache.metaCache));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (dbCache->tbCache.stbCache) {
|
||||||
|
CTG_CACHE_DEBUG("stbNum: %d", taosHashGetSize(dbCache->tbCache.stbCache));
|
||||||
|
}
|
||||||
|
|
||||||
pIter = taosHashIterate(dbHash, pIter);
|
pIter = taosHashIterate(dbHash, pIter);
|
||||||
}
|
}
|
||||||
|
@ -242,6 +264,34 @@ void ctgFreeMetaRent(SCtgRentMgmt *mgmt) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgPushRmDBMsgInQueue(SCatalog* pCtg, const char *dbFName, int64_t dbId) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
|
||||||
|
SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg));
|
||||||
|
if (NULL == msg) {
|
||||||
|
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
|
||||||
|
CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
msg->pCtg = pCtg;
|
||||||
|
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
||||||
|
msg->dbId = dbId;
|
||||||
|
|
||||||
|
action.data = msg;
|
||||||
|
|
||||||
|
CTG_ERR_JRET(ctgPushAction(&action));
|
||||||
|
|
||||||
|
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
tfree(action.data);
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
|
void ctgFreeTableMetaCache(SCtgTbMetaCache *cache) {
|
||||||
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
CTG_LOCK(CTG_WRITE, &cache->stbLock);
|
||||||
if (cache->stbCache) {
|
if (cache->stbCache) {
|
||||||
|
@ -452,12 +502,7 @@ int32_t ctgGetDBVgInfoFromMnode(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtE
|
||||||
|
|
||||||
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
rpcSendRecv(pRpc, (SEpSet*)pMgmtEps, &rpcMsg, &rpcRsp);
|
||||||
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
if (TSDB_CODE_SUCCESS != rpcRsp.code) {
|
||||||
if (CTG_DB_NOT_EXIST(rpcRsp.code)) {
|
ctgError("error rsp for use db, error:%s, db:%s", tstrerror(rpcRsp.code), input->db);
|
||||||
ctgDebug("db not exist in mnode, dbFName:%s", input->db);
|
|
||||||
return rpcRsp.code;
|
|
||||||
}
|
|
||||||
|
|
||||||
ctgError("error rsp for use db, code:%s, db:%s", tstrerror(rpcRsp.code), input->db);
|
|
||||||
CTG_ERR_RET(rpcRsp.code);
|
CTG_ERR_RET(rpcRsp.code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1365,20 +1410,33 @@ int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst) {
|
||||||
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
|
int32_t ctgGetDBVgInfo(SCatalog* pCtg, void *pRpc, const SEpSet* pMgmtEps, const char* dbFName, bool forceUpdate, SCtgDBCache** dbCache, SDBVgInfo **pInfo) {
|
||||||
bool inCache = false;
|
bool inCache = false;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
if (!forceUpdate) {
|
|
||||||
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
|
CTG_ERR_RET(ctgAcquireVgInfoFromCache(pCtg, dbFName, dbCache, &inCache));
|
||||||
if (inCache) {
|
|
||||||
return TSDB_CODE_SUCCESS;
|
if (inCache && !forceUpdate) {
|
||||||
}
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
SUseDbOutput DbOut = {0};
|
SUseDbOutput DbOut = {0};
|
||||||
SBuildUseDBInput input = {0};
|
SBuildUseDBInput input = {0};
|
||||||
|
|
||||||
tstrncpy(input.db, dbFName, tListLen(input.db));
|
tstrncpy(input.db, dbFName, tListLen(input.db));
|
||||||
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
if (inCache) {
|
||||||
|
input.dbId = (*dbCache)->dbId;
|
||||||
|
input.vgVersion = (*dbCache)->vgInfo->vgVersion;
|
||||||
|
} else {
|
||||||
|
input.vgVersion = CTG_DEFAULT_INVALID_VERSION;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut));
|
code = ctgGetDBVgInfoFromMnode(pCtg, pRpc, pMgmtEps, &input, &DbOut);
|
||||||
|
if (code) {
|
||||||
|
if (CTG_DB_NOT_EXIST(code) && input.vgVersion > CTG_DEFAULT_INVALID_VERSION) {
|
||||||
|
ctgDebug("db no longer exist, dbFName:%s, dbId:%" PRIx64, input.db, input.dbId);
|
||||||
|
ctgPushRmDBMsgInQueue(pCtg, input.db, input.dbId);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_ERR_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
|
CTG_ERR_JRET(ctgCloneVgInfo(DbOut.dbVgroup, pInfo));
|
||||||
|
|
||||||
|
@ -1772,7 +1830,6 @@ int32_t ctgActRemoveTbl(SCtgMetaAction *action) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void* ctgUpdateThreadFunc(void* param) {
|
void* ctgUpdateThreadFunc(void* param) {
|
||||||
setThreadName("catalog");
|
setThreadName("catalog");
|
||||||
|
|
||||||
|
@ -1964,10 +2021,10 @@ void catalogFreeHandle(SCatalog* pCtg) {
|
||||||
ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
|
ctgInfo("handle freed, culsterId:%"PRIx64, clusterId);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version) {
|
int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* version, int64_t* dbId) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
if (NULL == pCtg || NULL == dbFName || NULL == version) {
|
if (NULL == pCtg || NULL == dbFName || NULL == version || NULL == dbId) {
|
||||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1994,6 +2051,7 @@ int32_t catalogGetDBVgVersion(SCatalog* pCtg, const char* dbFName, int32_t* vers
|
||||||
}
|
}
|
||||||
|
|
||||||
*version = dbCache->vgInfo->vgVersion;
|
*version = dbCache->vgInfo->vgVersion;
|
||||||
|
*dbId = dbCache->dbId;
|
||||||
|
|
||||||
ctgReleaseVgInfo(dbCache);
|
ctgReleaseVgInfo(dbCache);
|
||||||
ctgReleaseDBCache(pCtg, dbCache);
|
ctgReleaseDBCache(pCtg, dbCache);
|
||||||
|
@ -2099,29 +2157,12 @@ int32_t catalogRemoveDB(SCatalog* pCtg, const char* dbFName, uint64_t dbId) {
|
||||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
SCtgMetaAction action= {.act = CTG_ACT_REMOVE_DB};
|
CTG_ERR_JRET(ctgPushRmDBMsgInQueue(pCtg, dbFName, dbId));
|
||||||
SCtgRemoveDBMsg *msg = malloc(sizeof(SCtgRemoveDBMsg));
|
|
||||||
if (NULL == msg) {
|
|
||||||
ctgError("malloc %d failed", (int32_t)sizeof(SCtgRemoveDBMsg));
|
|
||||||
CTG_API_LEAVE(TSDB_CODE_CTG_MEM_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
msg->pCtg = pCtg;
|
|
||||||
strncpy(msg->dbFName, dbFName, sizeof(msg->dbFName));
|
|
||||||
msg->dbId = dbId;
|
|
||||||
|
|
||||||
action.data = msg;
|
|
||||||
|
|
||||||
CTG_ERR_JRET(ctgPushAction(&action));
|
|
||||||
|
|
||||||
ctgDebug("action [%s] added into queue", gCtgAction[action.act].name);
|
|
||||||
|
|
||||||
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
CTG_API_LEAVE(TSDB_CODE_SUCCESS);
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
tfree(action.data);
|
|
||||||
|
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -128,6 +128,7 @@ void ctgTestInitLogFile() {
|
||||||
|
|
||||||
tsAsyncLog = 0;
|
tsAsyncLog = 0;
|
||||||
qDebugFlag = 159;
|
qDebugFlag = 159;
|
||||||
|
strcpy(tsLogDir, "/var/log/taos");
|
||||||
|
|
||||||
ctgDbgEnableDebug("api");
|
ctgDbgEnableDebug("api");
|
||||||
|
|
||||||
|
@ -1498,7 +1499,7 @@ TEST(dbVgroup, getSetDbVgroupCase) {
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
|
ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), ctgTestVgNum);
|
||||||
|
|
||||||
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) {
|
while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_RENT_NUM)) {
|
||||||
usleep(10000);
|
usleep(10000);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -762,6 +762,7 @@ SDclStmtInfo* qParserValidateDclSqlNode(SSqlInfo* pInfo, SParseContext* pCtx, ch
|
||||||
|
|
||||||
SUseDbReq usedbReq = {0};
|
SUseDbReq usedbReq = {0};
|
||||||
tNameExtractFullName(&n, usedbReq.db);
|
tNameExtractFullName(&n, usedbReq.db);
|
||||||
|
catalogGetDBVgVersion(pCtx->pCatalog, usedbReq.db, &usedbReq.vgVersion, &usedbReq.dbId);
|
||||||
|
|
||||||
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
|
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
|
||||||
void* pBuf = malloc(bufLen);
|
void* pBuf = malloc(bufLen);
|
||||||
|
|
|
@ -24,6 +24,32 @@
|
||||||
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
|
int32_t (*queryBuildMsg[TDMT_MAX])(void *input, char **msg, int32_t msgSize, int32_t *msgLen) = {0};
|
||||||
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0};
|
int32_t (*queryProcessMsgRsp[TDMT_MAX])(void *output, char *msg, int32_t msgSize) = {0};
|
||||||
|
|
||||||
|
int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
|
||||||
|
memcpy(pOut->db, usedbRsp->db, TSDB_DB_FNAME_LEN);
|
||||||
|
pOut->dbId = usedbRsp->uid;
|
||||||
|
pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
|
||||||
|
if (NULL == pOut->dbVgroup) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
pOut->dbVgroup->vgVersion = usedbRsp->vgVersion;
|
||||||
|
pOut->dbVgroup->hashMethod = usedbRsp->hashMethod;
|
||||||
|
pOut->dbVgroup->vgHash =
|
||||||
|
taosHashInit(usedbRsp->vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
|
if (NULL == pOut->dbVgroup->vgHash) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < usedbRsp->vgNum; ++i) {
|
||||||
|
SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp->pVgroupInfos, i);
|
||||||
|
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
|
||||||
|
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||||
SBuildTableMetaInput *pInput = input;
|
SBuildTableMetaInput *pInput = input;
|
||||||
if (NULL == input || NULL == msg || NULL == msgLen) {
|
if (NULL == input || NULL == msg || NULL == msgLen) {
|
||||||
|
@ -57,6 +83,7 @@ int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *ms
|
||||||
strncpy(usedbReq.db, pInput->db, sizeof(usedbReq.db));
|
strncpy(usedbReq.db, pInput->db, sizeof(usedbReq.db));
|
||||||
usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
|
usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
|
||||||
usedbReq.vgVersion = pInput->vgVersion;
|
usedbReq.vgVersion = pInput->vgVersion;
|
||||||
|
usedbReq.dbId = pInput->dbId;
|
||||||
|
|
||||||
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
|
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
|
||||||
void *pBuf = rpcMallocCont(bufLen);
|
void *pBuf = rpcMallocCont(bufLen);
|
||||||
|
@ -90,35 +117,10 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
goto PROCESS_USEDB_OVER;
|
goto PROCESS_USEDB_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN);
|
code = queryBuildUseDbOutput(pOut, &usedbRsp);
|
||||||
pOut->dbId = usedbRsp.uid;
|
|
||||||
pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
|
|
||||||
if (NULL == pOut->dbVgroup) {
|
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
goto PROCESS_USEDB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
pOut->dbVgroup->vgVersion = usedbRsp.vgVersion;
|
|
||||||
pOut->dbVgroup->hashMethod = usedbRsp.hashMethod;
|
|
||||||
pOut->dbVgroup->vgHash =
|
|
||||||
taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
|
||||||
if (NULL == pOut->dbVgroup->vgHash) {
|
|
||||||
tfree(pOut->dbVgroup);
|
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
goto PROCESS_USEDB_OVER;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < usedbRsp.vgNum; ++i) {
|
|
||||||
SVgroupInfo *pVgInfo = taosArrayGet(usedbRsp.pVgroupInfos, i);
|
|
||||||
if (0 != taosHashPut(pOut->dbVgroup->vgHash, &pVgInfo->vgId, sizeof(int32_t), pVgInfo, sizeof(SVgroupInfo))) {
|
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
goto PROCESS_USEDB_OVER;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
code = 0;
|
|
||||||
|
|
||||||
PROCESS_USEDB_OVER:
|
PROCESS_USEDB_OVER:
|
||||||
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
if (pOut) {
|
if (pOut) {
|
||||||
if (pOut->dbVgroup) taosHashCleanup(pOut->dbVgroup->vgHash);
|
if (pOut->dbVgroup) taosHashCleanup(pOut->dbVgroup->vgHash);
|
||||||
|
|
|
@ -326,7 +326,7 @@ int64_t taosWriteFile(TdFilePtr pFile, const void *buf, int64_t count) {
|
||||||
nleft -= nwritten;
|
nleft -= nwritten;
|
||||||
tbuf += nwritten;
|
tbuf += nwritten;
|
||||||
}
|
}
|
||||||
fsync(pFile->fd);
|
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
2
tests
2
tests
|
@ -1 +1 @@
|
||||||
Subproject commit 904e6f0e152e8fe61edfe0a0a9ae497cfde2a72c
|
Subproject commit 08ed39f0a5fcbbfb5a630b945ab3d1998d4b4136
|
Loading…
Reference in New Issue