Merge pull request #21056 from taosdata/feat/TD-23693
feat: subtable level privilege
This commit is contained in:
commit
05dc0fd6b1
|
@ -689,6 +689,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
|
int32_t tSerializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
|
||||||
int32_t tDeserializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
|
int32_t tDeserializeSAlterUserReq(void* buf, int32_t bufLen, SAlterUserReq* pReq);
|
||||||
|
void tFreeSAlterUserReq(SAlterUserReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char user[TSDB_USER_LEN];
|
char user[TSDB_USER_LEN];
|
||||||
|
|
|
@ -82,6 +82,7 @@ typedef struct SCatalogReq {
|
||||||
SArray* pUser; // element is SUserAuthInfo
|
SArray* pUser; // element is SUserAuthInfo
|
||||||
SArray* pTableIndex; // element is SNAME
|
SArray* pTableIndex; // element is SNAME
|
||||||
SArray* pTableCfg; // element is SNAME
|
SArray* pTableCfg; // element is SNAME
|
||||||
|
SArray* pTableTag; // element is SNAME
|
||||||
bool qNodeRequired; // valid qnode
|
bool qNodeRequired; // valid qnode
|
||||||
bool dNodeRequired; // valid dnode
|
bool dNodeRequired; // valid dnode
|
||||||
bool svrVerRequired;
|
bool svrVerRequired;
|
||||||
|
@ -105,6 +106,7 @@ typedef struct SMetaData {
|
||||||
SArray* pUser; // pRes = SUserAuthRes*
|
SArray* pUser; // pRes = SUserAuthRes*
|
||||||
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
|
SArray* pQnodeList; // pRes = SArray<SQueryNodeLoad>*
|
||||||
SArray* pTableCfg; // pRes = STableCfg*
|
SArray* pTableCfg; // pRes = STableCfg*
|
||||||
|
SArray* pTableTag; // pRes = SArray<STagVal>*
|
||||||
SArray* pDnodeList; // pRes = SArray<SEpSet>*
|
SArray* pDnodeList; // pRes = SArray<SEpSet>*
|
||||||
SMetaRes* pSvrVer; // pRes = char*
|
SMetaRes* pSvrVer; // pRes = char*
|
||||||
} SMetaData;
|
} SMetaData;
|
||||||
|
@ -312,6 +314,8 @@ int32_t catalogGetIndexMeta(SCatalog* pCtg, SRequestConnInfo* pConn, const char*
|
||||||
|
|
||||||
int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
|
int32_t catalogGetTableIndex(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
|
||||||
|
|
||||||
|
int32_t catalogGetTableTag(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes);
|
||||||
|
|
||||||
int32_t catalogRefreshGetTableCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** pCfg);
|
int32_t catalogRefreshGetTableCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** pCfg);
|
||||||
|
|
||||||
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp* pRsp);
|
int32_t catalogUpdateTableIndex(SCatalog* pCtg, STableIndexRsp* pRsp);
|
||||||
|
|
|
@ -379,6 +379,8 @@ typedef struct SVnodeModifyOpStmt {
|
||||||
SName usingTableName;
|
SName usingTableName;
|
||||||
const char* pBoundCols;
|
const char* pBoundCols;
|
||||||
struct STableMeta* pTableMeta;
|
struct STableMeta* pTableMeta;
|
||||||
|
SNode* pTagCond;
|
||||||
|
SArray* pTableTag;
|
||||||
SHashObj* pVgroupsHashObj;
|
SHashObj* pVgroupsHashObj;
|
||||||
SHashObj* pTableBlockHashObj; // SHashObj<tuid, STableDataCxt*>
|
SHashObj* pTableBlockHashObj; // SHashObj<tuid, STableDataCxt*>
|
||||||
SHashObj* pSubTableHashObj;
|
SHashObj* pSubTableHashObj;
|
||||||
|
|
|
@ -359,11 +359,11 @@ int taos_print_row(char *str, TAOS_ROW row, TAOS_FIELD *fields, int num_fields)
|
||||||
case TSDB_DATA_TYPE_NCHAR: {
|
case TSDB_DATA_TYPE_NCHAR: {
|
||||||
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
|
int32_t charLen = varDataLen((char *)row[i] - VARSTR_HEADER_SIZE);
|
||||||
if (fields[i].type == TSDB_DATA_TYPE_BINARY) {
|
if (fields[i].type == TSDB_DATA_TYPE_BINARY) {
|
||||||
if(ASSERT(charLen <= fields[i].bytes && charLen >= 0)){
|
if (ASSERT(charLen <= fields[i].bytes && charLen >= 0)) {
|
||||||
tscError("taos_print_row error binary. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes);
|
tscError("taos_print_row error binary. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if(ASSERT(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0)){
|
if (ASSERT(charLen <= fields[i].bytes * TSDB_NCHAR_SIZE && charLen >= 0)) {
|
||||||
tscError("taos_print_row error. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes);
|
tscError("taos_print_row error. charLen:%d, fields[i].bytes:%d", charLen, fields[i].bytes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -705,16 +705,16 @@ int taos_get_current_db(TAOS *taos, char *database, int len, int *required) {
|
||||||
|
|
||||||
int code = TSDB_CODE_SUCCESS;
|
int code = TSDB_CODE_SUCCESS;
|
||||||
taosThreadMutexLock(&pTscObj->mutex);
|
taosThreadMutexLock(&pTscObj->mutex);
|
||||||
if(database == NULL || len <= 0){
|
if (database == NULL || len <= 0) {
|
||||||
if(required != NULL) *required = strlen(pTscObj->db) + 1;
|
if (required != NULL) *required = strlen(pTscObj->db) + 1;
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
code = -1;
|
code = -1;
|
||||||
}else if(len < strlen(pTscObj->db) + 1){
|
} else if (len < strlen(pTscObj->db) + 1) {
|
||||||
tstrncpy(database, pTscObj->db, len);
|
tstrncpy(database, pTscObj->db, len);
|
||||||
if(required) *required = strlen(pTscObj->db) + 1;
|
if (required) *required = strlen(pTscObj->db) + 1;
|
||||||
terrno = TSDB_CODE_INVALID_PARA;
|
terrno = TSDB_CODE_INVALID_PARA;
|
||||||
code = -1;
|
code = -1;
|
||||||
}else{
|
} else {
|
||||||
strcpy(database, pTscObj->db);
|
strcpy(database, pTscObj->db);
|
||||||
code = 0;
|
code = 0;
|
||||||
}
|
}
|
||||||
|
@ -741,6 +741,7 @@ static void destoryCatalogReq(SCatalogReq *pCatalogReq) {
|
||||||
taosArrayDestroy(pCatalogReq->pUser);
|
taosArrayDestroy(pCatalogReq->pUser);
|
||||||
taosArrayDestroy(pCatalogReq->pTableIndex);
|
taosArrayDestroy(pCatalogReq->pTableIndex);
|
||||||
taosArrayDestroy(pCatalogReq->pTableCfg);
|
taosArrayDestroy(pCatalogReq->pTableCfg);
|
||||||
|
taosArrayDestroy(pCatalogReq->pTableTag);
|
||||||
taosMemoryFree(pCatalogReq);
|
taosMemoryFree(pCatalogReq);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -975,8 +976,10 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
pRequest->stmtType = pRequest->pQuery->pRoot->type;
|
pRequest->stmtType = pRequest->pQuery->pRoot->type;
|
||||||
phaseAsyncQuery(pWrapper);
|
code = phaseAsyncQuery(pWrapper);
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
tscError("0x%" PRIx64 " error happens, code:%d - %s, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code),
|
||||||
pRequest->requestId);
|
pRequest->requestId);
|
||||||
destorySqlCallbackWrapper(pWrapper);
|
destorySqlCallbackWrapper(pWrapper);
|
||||||
|
@ -1042,11 +1045,11 @@ static void fetchCallback(void *pResult, void *param, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
if(ASSERT(res != NULL && fp != NULL)){
|
if (ASSERT(res != NULL && fp != NULL)) {
|
||||||
tscError("taos_fetch_rows_a invalid paras");
|
tscError("taos_fetch_rows_a invalid paras");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if(ASSERT(TD_RES_QUERY(res))){
|
if (ASSERT(TD_RES_QUERY(res))) {
|
||||||
tscError("taos_fetch_rows_a res is NULL");
|
tscError("taos_fetch_rows_a res is NULL");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1092,11 +1095,11 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
if(ASSERT(res != NULL && fp != NULL)){
|
if (ASSERT(res != NULL && fp != NULL)) {
|
||||||
tscError("taos_fetch_rows_a invalid paras");
|
tscError("taos_fetch_rows_a invalid paras");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if(ASSERT(TD_RES_QUERY(res))){
|
if (ASSERT(TD_RES_QUERY(res))) {
|
||||||
tscError("taos_fetch_rows_a res is NULL");
|
tscError("taos_fetch_rows_a res is NULL");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1111,11 +1114,11 @@ void taos_fetch_raw_block_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
const void *taos_get_raw_block(TAOS_RES *res) {
|
const void *taos_get_raw_block(TAOS_RES *res) {
|
||||||
if(ASSERT(res != NULL)){
|
if (ASSERT(res != NULL)) {
|
||||||
tscError("taos_fetch_rows_a invalid paras");
|
tscError("taos_fetch_rows_a invalid paras");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
if(ASSERT(TD_RES_QUERY(res))){
|
if (ASSERT(TD_RES_QUERY(res))) {
|
||||||
tscError("taos_fetch_rows_a res is NULL");
|
tscError("taos_fetch_rows_a res is NULL");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1273,7 +1276,6 @@ _return:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
int taos_load_table_info(TAOS *taos, const char *tableNameList) {
|
||||||
if (NULL == taos) {
|
if (NULL == taos) {
|
||||||
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
terrno = TSDB_CODE_TSC_DISCONNECTED;
|
||||||
|
|
|
@ -1409,6 +1409,8 @@ int32_t tDeserializeSAlterUserReq(void *buf, int32_t bufLen, SAlterUserReq *pReq
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void tFreeSAlterUserReq(SAlterUserReq *pReq) { taosMemoryFreeClear(pReq->tagCond); }
|
||||||
|
|
||||||
int32_t tSerializeSGetUserAuthReq(void *buf, int32_t bufLen, SGetUserAuthReq *pReq) {
|
int32_t tSerializeSGetUserAuthReq(void *buf, int32_t bufLen, SGetUserAuthReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
@ -1635,6 +1637,7 @@ int32_t tDeserializeSGetUserAuthRspImpl(SDecoder *pDecoder, SGetUserAuthRsp *pRs
|
||||||
int32_t ref = 0;
|
int32_t ref = 0;
|
||||||
if (tDecodeI32(pDecoder, &ref) < 0) return -1;
|
if (tDecodeI32(pDecoder, &ref) < 0) return -1;
|
||||||
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
|
taosHashPut(pRsp->useDbs, key, strlen(key), &ref, sizeof(ref));
|
||||||
|
taosMemoryFree(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1831,7 +1834,6 @@ int32_t tSerializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pReq
|
||||||
if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->pComment) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1;
|
if (tEncodeI8(&encoder, pReq->orReplace) < 0) return -1;
|
||||||
|
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
@ -1876,7 +1878,6 @@ int32_t tDeserializeSCreateFuncReq(void *buf, int32_t bufLen, SCreateFuncReq *pR
|
||||||
if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->pComment) < 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (!tDecodeIsEnd(&decoder)) {
|
if (!tDecodeIsEnd(&decoder)) {
|
||||||
if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1;
|
if (tDecodeI8(&decoder, &pReq->orReplace) < 0) return -1;
|
||||||
} else {
|
} else {
|
||||||
|
@ -2053,12 +2054,12 @@ int32_t tDeserializeSRetrieveFuncRsp(void *buf, int32_t bufLen, SRetrieveFuncRsp
|
||||||
if (pRsp->pFuncExtraInfos == NULL) return -1;
|
if (pRsp->pFuncExtraInfos == NULL) return -1;
|
||||||
if (tDecodeIsEnd(&decoder)) {
|
if (tDecodeIsEnd(&decoder)) {
|
||||||
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
|
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
|
||||||
SFuncExtraInfo extraInfo = { 0 };
|
SFuncExtraInfo extraInfo = {0};
|
||||||
taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo);
|
taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
|
for (int32_t i = 0; i < pRsp->numOfFuncs; ++i) {
|
||||||
SFuncExtraInfo extraInfo = { 0 };
|
SFuncExtraInfo extraInfo = {0};
|
||||||
if (tDecodeI32(&decoder, &extraInfo.funcVersion) < 0) return -1;
|
if (tDecodeI32(&decoder, &extraInfo.funcVersion) < 0) return -1;
|
||||||
if (tDecodeI64(&decoder, &extraInfo.funcCreatedTime) < 0) return -1;
|
if (tDecodeI64(&decoder, &extraInfo.funcCreatedTime) < 0) return -1;
|
||||||
taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo);
|
taosArrayPush(pRsp->pFuncExtraInfos, &extraInfo);
|
||||||
|
|
|
@ -390,6 +390,7 @@ static SSdbRow *mndUserActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT32(pRaw, dataPos, &ref, _OVER);
|
SDB_GET_INT32(pRaw, dataPos, &ref, _OVER);
|
||||||
|
|
||||||
taosHashPut(pUser->useDbs, key, keyLen, &ref, sizeof(ref));
|
taosHashPut(pUser->useDbs, key, keyLen, &ref, sizeof(ref));
|
||||||
|
taosMemoryFree(key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -956,6 +957,7 @@ _OVER:
|
||||||
mError("user:%s, failed to alter since %s", alterReq.user, terrstr());
|
mError("user:%s, failed to alter since %s", alterReq.user, terrstr());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tFreeSAlterUserReq(&alterReq);
|
||||||
mndReleaseUser(pMnode, pOperUser);
|
mndReleaseUser(pMnode, pOperUser);
|
||||||
mndReleaseUser(pMnode, pUser);
|
mndReleaseUser(pMnode, pUser);
|
||||||
mndUserFreeObj(&newUser);
|
mndUserFreeObj(&newUser);
|
||||||
|
|
|
@ -58,6 +58,7 @@ typedef enum {
|
||||||
CTG_CI_OTHERTABLE_META,
|
CTG_CI_OTHERTABLE_META,
|
||||||
CTG_CI_TBL_SMA,
|
CTG_CI_TBL_SMA,
|
||||||
CTG_CI_TBL_CFG,
|
CTG_CI_TBL_CFG,
|
||||||
|
CTG_CI_TBL_TAG,
|
||||||
CTG_CI_INDEX_INFO,
|
CTG_CI_INDEX_INFO,
|
||||||
CTG_CI_USER,
|
CTG_CI_USER,
|
||||||
CTG_CI_UDF,
|
CTG_CI_UDF,
|
||||||
|
@ -110,6 +111,7 @@ typedef enum {
|
||||||
CTG_TASK_GET_SVR_VER,
|
CTG_TASK_GET_SVR_VER,
|
||||||
CTG_TASK_GET_TB_META_BATCH,
|
CTG_TASK_GET_TB_META_BATCH,
|
||||||
CTG_TASK_GET_TB_HASH_BATCH,
|
CTG_TASK_GET_TB_HASH_BATCH,
|
||||||
|
CTG_TASK_GET_TB_TAG,
|
||||||
} CTG_TASK_TYPE;
|
} CTG_TASK_TYPE;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -152,6 +154,11 @@ typedef struct SCtgTbCacheInfo {
|
||||||
int32_t tbType;
|
int32_t tbType;
|
||||||
} SCtgTbCacheInfo;
|
} SCtgTbCacheInfo;
|
||||||
|
|
||||||
|
typedef struct SCtgTbMetaParam {
|
||||||
|
SName* pName;
|
||||||
|
int32_t flag;
|
||||||
|
} SCtgTbMetaParam;
|
||||||
|
|
||||||
typedef struct SCtgTbMetaCtx {
|
typedef struct SCtgTbMetaCtx {
|
||||||
SCtgTbCacheInfo tbInfo;
|
SCtgTbCacheInfo tbInfo;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
|
@ -186,6 +193,11 @@ typedef struct SCtgTbCfgCtx {
|
||||||
SVgroupInfo* pVgInfo;
|
SVgroupInfo* pVgInfo;
|
||||||
} SCtgTbCfgCtx;
|
} SCtgTbCfgCtx;
|
||||||
|
|
||||||
|
typedef struct SCtgTbTagCtx {
|
||||||
|
SName* pName;
|
||||||
|
SVgroupInfo* pVgInfo;
|
||||||
|
} SCtgTbTagCtx;
|
||||||
|
|
||||||
typedef struct SCtgDbVgCtx {
|
typedef struct SCtgDbVgCtx {
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
} SCtgDbVgCtx;
|
} SCtgDbVgCtx;
|
||||||
|
@ -304,6 +316,7 @@ typedef struct SCtgJob {
|
||||||
catalogCallback userFp;
|
catalogCallback userFp;
|
||||||
int32_t tbMetaNum;
|
int32_t tbMetaNum;
|
||||||
int32_t tbHashNum;
|
int32_t tbHashNum;
|
||||||
|
int32_t tbTagNum;
|
||||||
int32_t dbVgNum;
|
int32_t dbVgNum;
|
||||||
int32_t udfNum;
|
int32_t udfNum;
|
||||||
int32_t qnodeNum;
|
int32_t qnodeNum;
|
||||||
|
@ -346,6 +359,7 @@ typedef struct SCtgSubRes {
|
||||||
|
|
||||||
struct SCtgTask {
|
struct SCtgTask {
|
||||||
CTG_TASK_TYPE type;
|
CTG_TASK_TYPE type;
|
||||||
|
bool subTask;
|
||||||
int32_t taskId;
|
int32_t taskId;
|
||||||
SCtgJob* pJob;
|
SCtgJob* pJob;
|
||||||
void* taskCtx;
|
void* taskCtx;
|
||||||
|
@ -623,6 +637,7 @@ typedef struct SCtgCacheItemInfo {
|
||||||
#define CTG_FLAG_SYS_DB 0x8
|
#define CTG_FLAG_SYS_DB 0x8
|
||||||
#define CTG_FLAG_FORCE_UPDATE 0x10
|
#define CTG_FLAG_FORCE_UPDATE 0x10
|
||||||
#define CTG_FLAG_ONLY_CACHE 0x20
|
#define CTG_FLAG_ONLY_CACHE 0x20
|
||||||
|
#define CTG_FLAG_SYNC_OP 0x40
|
||||||
|
|
||||||
#define CTG_FLAG_SET(_flag, _v) ((_flag) |= (_v))
|
#define CTG_FLAG_SET(_flag, _v) ((_flag) |= (_v))
|
||||||
|
|
||||||
|
@ -925,6 +940,10 @@ void ctgReleaseVgMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCach
|
||||||
void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCache* pCache);
|
void ctgReleaseTbMetaToCache(SCatalog* pCtg, SCtgDBCache* dbCache, SCtgTbCache* pCache);
|
||||||
void ctgGetGlobalCacheStat(SCtgCacheStat* pStat);
|
void ctgGetGlobalCacheStat(SCtgCacheStat* pStat);
|
||||||
int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res);
|
int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res);
|
||||||
|
int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx, STableMeta** pTableMeta);
|
||||||
|
int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid, char **stbName);
|
||||||
|
int32_t ctgGetTbTagCb(SCtgTask* pTask);
|
||||||
|
int32_t ctgGetUserCb(SCtgTask* pTask);
|
||||||
|
|
||||||
extern SCatalogMgmt gCtgMgmt;
|
extern SCatalogMgmt gCtgMgmt;
|
||||||
extern SCtgDebug gCTGDebug;
|
extern SCtgDebug gCTGDebug;
|
||||||
|
|
|
@ -208,7 +208,7 @@ int32_t ctgGetTbMeta(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgTbMetaCtx* ctx
|
||||||
}
|
}
|
||||||
|
|
||||||
while (true) {
|
while (true) {
|
||||||
CTG_ERR_JRET(ctgRefreshTbMeta(pCtg, pConn, ctx, &output, false));
|
CTG_ERR_JRET(ctgRefreshTbMeta(pCtg, pConn, ctx, &output, ctx->flag & CTG_FLAG_SYNC_OP));
|
||||||
|
|
||||||
if (CTG_IS_META_TABLE(output->metaType)) {
|
if (CTG_IS_META_TABLE(output->metaType)) {
|
||||||
*pTableMeta = output->tbMeta;
|
*pTableMeta = output->tbMeta;
|
||||||
|
@ -429,6 +429,48 @@ int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName,
|
||||||
CTG_RET(TSDB_CODE_SUCCESS);
|
CTG_RET(TSDB_CODE_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetTbTag(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** pRes) {
|
||||||
|
SVgroupInfo vgroupInfo = {0};
|
||||||
|
STableCfg* pCfg = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
CTG_ERR_RET(ctgGetTbHashVgroup(pCtg, pConn, pTableName, &vgroupInfo, NULL));
|
||||||
|
CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, &pCfg, NULL));
|
||||||
|
|
||||||
|
if (NULL == pCfg->pTags || pCfg->tagsLen <= 0) {
|
||||||
|
ctgError("invalid tag in tbCfg rsp, pTags:%p, len:%d", pCfg->pTags, pCfg->tagsLen);
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pTagVals = NULL;
|
||||||
|
STag* pTag = (STag*)pCfg->pTags;
|
||||||
|
|
||||||
|
if (tTagIsJson(pTag)) {
|
||||||
|
pTagVals = taosArrayInit(1, sizeof(STagVal));
|
||||||
|
if (NULL == pTagVals) {
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* pJson = parseTagDatatoJson(pTag);
|
||||||
|
STagVal tagVal;
|
||||||
|
tagVal.cid = 0;
|
||||||
|
tagVal.type = TSDB_DATA_TYPE_JSON;
|
||||||
|
tagVal.pData = pJson;
|
||||||
|
tagVal.nData = strlen(pJson);
|
||||||
|
taosArrayPush(pTagVals, &tagVal);
|
||||||
|
} else {
|
||||||
|
CTG_ERR_JRET(tTagToValArray((const STag*)pCfg->pTags, &pTagVals));
|
||||||
|
}
|
||||||
|
|
||||||
|
*pRes = pTagVals;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
tFreeSTableCfgRsp((STableCfgRsp*)pCfg);
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** pVgList) {
|
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo* pConn, SName* pTableName, SArray** pVgList) {
|
||||||
STableMeta* tbMeta = NULL;
|
STableMeta* tbMeta = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -1414,6 +1456,21 @@ _return:
|
||||||
CTG_API_LEAVE(code);
|
CTG_API_LEAVE(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t catalogGetTableTag(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, SArray** pRes) {
|
||||||
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
if (NULL == pCtg || NULL == pConn || NULL == pTableName || NULL == pRes) {
|
||||||
|
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t code = 0;
|
||||||
|
CTG_ERR_JRET(ctgGetTbTag(pCtg, pConn, (SName*)pTableName, pRes));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
CTG_API_LEAVE(code);
|
||||||
|
}
|
||||||
|
|
||||||
int32_t catalogRefreshGetTableCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** pCfg) {
|
int32_t catalogRefreshGetTableCfg(SCatalog* pCtg, SRequestConnInfo* pConn, const SName* pTableName, STableCfg** pCfg) {
|
||||||
CTG_API_ENTER();
|
CTG_API_ENTER();
|
||||||
|
|
||||||
|
|
|
@ -21,7 +21,8 @@
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
SName* name = (SName*)param;
|
SCtgTbMetaParam* pParam = (SCtgTbMetaParam*)param;
|
||||||
|
SName* name = pParam->pName;
|
||||||
SCtgTask task = {0};
|
SCtgTask task = {0};
|
||||||
|
|
||||||
task.type = CTG_TASK_GET_TB_META;
|
task.type = CTG_TASK_GET_TB_META;
|
||||||
|
@ -41,7 +42,7 @@ int32_t ctgInitGetTbMetaTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(ctx->pName, name, sizeof(*name));
|
memcpy(ctx->pName, name, sizeof(*name));
|
||||||
ctx->flag = CTG_FLAG_UNKNOWN_STB;
|
ctx->flag = pParam->flag | CTG_FLAG_UNKNOWN_STB;
|
||||||
|
|
||||||
taosArrayPush(pJob->pTasks, &task);
|
taosArrayPush(pJob->pTasks, &task);
|
||||||
|
|
||||||
|
@ -386,6 +387,37 @@ int32_t ctgInitGetTbCfgTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgInitGetTbTagTask(SCtgJob* pJob, int32_t taskIdx, void* param) {
|
||||||
|
SName* name = (SName*)param;
|
||||||
|
SCtgTask task = {0};
|
||||||
|
|
||||||
|
task.type = CTG_TASK_GET_TB_TAG;
|
||||||
|
task.taskId = taskIdx;
|
||||||
|
task.pJob = pJob;
|
||||||
|
|
||||||
|
task.taskCtx = taosMemoryCalloc(1, sizeof(SCtgTbTagCtx));
|
||||||
|
if (NULL == task.taskCtx) {
|
||||||
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
SCtgTbTagCtx* ctx = task.taskCtx;
|
||||||
|
ctx->pName = taosMemoryMalloc(sizeof(*name));
|
||||||
|
if (NULL == ctx->pName) {
|
||||||
|
taosMemoryFree(task.taskCtx);
|
||||||
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
memcpy(ctx->pName, name, sizeof(*name));
|
||||||
|
|
||||||
|
taosArrayPush(pJob->pTasks, &task);
|
||||||
|
|
||||||
|
qDebug("QID:0x%" PRIx64 " the %dth task type %s initialized, tbName:%s", pJob->queryId, taskIdx,
|
||||||
|
ctgTaskTypeStr(task.type), name->tname);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) {
|
int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, const SCatalogReq* pReq) {
|
||||||
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
SHashObj* pDb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
SHashObj* pTb = taosHashInit(taskNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
|
||||||
|
@ -437,6 +469,15 @@ int32_t ctgHandleForceUpdate(SCatalog* pCtg, int32_t taskNum, SCtgJob* pJob, con
|
||||||
char dbFName[TSDB_DB_FNAME_LEN];
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
tNameGetFullDbName(name, dbFName);
|
tNameGetFullDbName(name, dbFName);
|
||||||
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||||
|
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pJob->tbTagNum; ++i) {
|
||||||
|
SName* name = taosArrayGet(pReq->pTableTag, i);
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
tNameGetFullDbName(name, dbFName);
|
||||||
|
taosHashPut(pDb, dbFName, strlen(dbFName), dbFName, TSDB_DB_FNAME_LEN);
|
||||||
|
taosHashPut(pTb, name, sizeof(SName), name, sizeof(SName));
|
||||||
}
|
}
|
||||||
|
|
||||||
char* dbFName = taosHashIterate(pDb, NULL);
|
char* dbFName = taosHashIterate(pDb, NULL);
|
||||||
|
@ -505,9 +546,10 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
|
int32_t dbInfoNum = (int32_t)taosArrayGetSize(pReq->pDbInfo);
|
||||||
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
|
int32_t tbIndexNum = (int32_t)taosArrayGetSize(pReq->pTableIndex);
|
||||||
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
|
int32_t tbCfgNum = (int32_t)taosArrayGetSize(pReq->pTableCfg);
|
||||||
|
int32_t tbTagNum = (int32_t)taosArrayGetSize(pReq->pTableTag);
|
||||||
|
|
||||||
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
|
int32_t taskNum = tbMetaNum + dbVgNum + udfNum + tbHashNum + qnodeNum + dnodeNum + svrVerNum + dbCfgNum + indexNum +
|
||||||
userNum + dbInfoNum + tbIndexNum + tbCfgNum;
|
userNum + dbInfoNum + tbIndexNum + tbCfgNum + tbTagNum;
|
||||||
|
|
||||||
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
*job = taosMemoryCalloc(1, sizeof(SCtgJob));
|
||||||
if (NULL == *job) {
|
if (NULL == *job) {
|
||||||
|
@ -537,6 +579,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
pJob->tbIndexNum = tbIndexNum;
|
pJob->tbIndexNum = tbIndexNum;
|
||||||
pJob->tbCfgNum = tbCfgNum;
|
pJob->tbCfgNum = tbCfgNum;
|
||||||
pJob->svrVerNum = svrVerNum;
|
pJob->svrVerNum = svrVerNum;
|
||||||
|
pJob->tbTagNum = tbTagNum;
|
||||||
|
|
||||||
#if CTG_BATCH_FETCH
|
#if CTG_BATCH_FETCH
|
||||||
pJob->pBatchs =
|
pJob->pBatchs =
|
||||||
|
@ -604,6 +647,12 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo* pConn, SCtgJob** job, const
|
||||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_CFG, name, NULL));
|
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_CFG, name, NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < tbTagNum; ++i) {
|
||||||
|
SName* name = taosArrayGet(pReq->pTableTag, i);
|
||||||
|
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_TB_TAG, name, NULL));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < indexNum; ++i) {
|
for (int32_t i = 0; i < indexNum; ++i) {
|
||||||
char* indexName = taosArrayGet(pReq->pIndex, i);
|
char* indexName = taosArrayGet(pReq->pIndex, i);
|
||||||
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_INDEX_INFO, indexName, NULL));
|
CTG_ERR_JRET(ctgInitTask(pJob, CTG_TASK_GET_INDEX_INFO, indexName, NULL));
|
||||||
|
@ -650,6 +699,10 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
|
int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pTableMeta) {
|
if (NULL == pJob->jobRes.pTableMeta) {
|
||||||
pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes));
|
pJob->jobRes.pTableMeta = taosArrayInit(pJob->tbMetaNum, sizeof(SMetaRes));
|
||||||
|
@ -665,6 +718,10 @@ int32_t ctgDumpTbMetaRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpTbMetasRes(SCtgTask* pTask) {
|
int32_t ctgDumpTbMetasRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
|
||||||
pJob->jobRes.pTableMeta = pTask->res;
|
pJob->jobRes.pTableMeta = pTask->res;
|
||||||
|
@ -673,6 +730,10 @@ int32_t ctgDumpTbMetasRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
|
int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pDbVgroup) {
|
if (NULL == pJob->jobRes.pDbVgroup) {
|
||||||
pJob->jobRes.pDbVgroup = taosArrayInit(pJob->dbVgNum, sizeof(SMetaRes));
|
pJob->jobRes.pDbVgroup = taosArrayInit(pJob->dbVgNum, sizeof(SMetaRes));
|
||||||
|
@ -688,6 +749,10 @@ int32_t ctgDumpDbVgRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
|
int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pTableHash) {
|
if (NULL == pJob->jobRes.pTableHash) {
|
||||||
pJob->jobRes.pTableHash = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes));
|
pJob->jobRes.pTableHash = taosArrayInit(pJob->tbHashNum, sizeof(SMetaRes));
|
||||||
|
@ -703,6 +768,10 @@ int32_t ctgDumpTbHashRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpTbHashsRes(SCtgTask* pTask) {
|
int32_t ctgDumpTbHashsRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
|
||||||
pJob->jobRes.pTableHash = pTask->res;
|
pJob->jobRes.pTableHash = pTask->res;
|
||||||
|
@ -711,9 +780,17 @@ int32_t ctgDumpTbHashsRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
|
int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pTableIndex) {
|
if (NULL == pJob->jobRes.pTableIndex) {
|
||||||
pJob->jobRes.pTableIndex = taosArrayInit(pJob->tbIndexNum, sizeof(SMetaRes));
|
SArray* pRes = taosArrayInit(pJob->tbIndexNum, sizeof(SMetaRes));
|
||||||
|
if (atomic_val_compare_exchange_ptr(&pJob->jobRes.pTableIndex, NULL, pRes)) {
|
||||||
|
taosArrayDestroy(pRes);
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == pJob->jobRes.pTableIndex) {
|
if (NULL == pJob->jobRes.pTableIndex) {
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
@ -726,9 +803,17 @@ int32_t ctgDumpTbIndexRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpTbCfgRes(SCtgTask* pTask) {
|
int32_t ctgDumpTbCfgRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pTableCfg) {
|
if (NULL == pJob->jobRes.pTableCfg) {
|
||||||
pJob->jobRes.pTableCfg = taosArrayInit(pJob->tbCfgNum, sizeof(SMetaRes));
|
SArray* pRes = taosArrayInit(pJob->tbCfgNum, sizeof(SMetaRes));
|
||||||
|
if (atomic_val_compare_exchange_ptr(&pJob->jobRes.pTableCfg, NULL, pRes)) {
|
||||||
|
taosArrayDestroy(pRes);
|
||||||
|
}
|
||||||
|
|
||||||
if (NULL == pJob->jobRes.pTableCfg) {
|
if (NULL == pJob->jobRes.pTableCfg) {
|
||||||
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
@ -740,7 +825,35 @@ int32_t ctgDumpTbCfgRes(SCtgTask* pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgDumpTbTagRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
if (NULL == pJob->jobRes.pTableTag) {
|
||||||
|
SArray* pRes = taosArrayInit(pJob->tbTagNum, sizeof(SMetaRes));
|
||||||
|
if (atomic_val_compare_exchange_ptr(&pJob->jobRes.pTableTag, NULL, pRes)) {
|
||||||
|
taosArrayDestroy(pRes);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pJob->jobRes.pTableTag) {
|
||||||
|
CTG_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaRes res = {.code = pTask->code, .pRes = pTask->res};
|
||||||
|
taosArrayPush(pJob->jobRes.pTableTag, &res);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgDumpIndexRes(SCtgTask* pTask) {
|
int32_t ctgDumpIndexRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pIndex) {
|
if (NULL == pJob->jobRes.pIndex) {
|
||||||
pJob->jobRes.pIndex = taosArrayInit(pJob->indexNum, sizeof(SMetaRes));
|
pJob->jobRes.pIndex = taosArrayInit(pJob->indexNum, sizeof(SMetaRes));
|
||||||
|
@ -756,6 +869,10 @@ int32_t ctgDumpIndexRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
|
int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pQnodeList) {
|
if (NULL == pJob->jobRes.pQnodeList) {
|
||||||
pJob->jobRes.pQnodeList = taosArrayInit(1, sizeof(SMetaRes));
|
pJob->jobRes.pQnodeList = taosArrayInit(1, sizeof(SMetaRes));
|
||||||
|
@ -771,6 +888,10 @@ int32_t ctgDumpQnodeRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpDnodeRes(SCtgTask* pTask) {
|
int32_t ctgDumpDnodeRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pDnodeList) {
|
if (NULL == pJob->jobRes.pDnodeList) {
|
||||||
pJob->jobRes.pDnodeList = taosArrayInit(1, sizeof(SMetaRes));
|
pJob->jobRes.pDnodeList = taosArrayInit(1, sizeof(SMetaRes));
|
||||||
|
@ -786,6 +907,10 @@ int32_t ctgDumpDnodeRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
|
int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pDbCfg) {
|
if (NULL == pJob->jobRes.pDbCfg) {
|
||||||
pJob->jobRes.pDbCfg = taosArrayInit(pJob->dbCfgNum, sizeof(SMetaRes));
|
pJob->jobRes.pDbCfg = taosArrayInit(pJob->dbCfgNum, sizeof(SMetaRes));
|
||||||
|
@ -801,6 +926,10 @@ int32_t ctgDumpDbCfgRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
|
int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pDbInfo) {
|
if (NULL == pJob->jobRes.pDbInfo) {
|
||||||
pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SMetaRes));
|
pJob->jobRes.pDbInfo = taosArrayInit(pJob->dbInfoNum, sizeof(SMetaRes));
|
||||||
|
@ -816,6 +945,10 @@ int32_t ctgDumpDbInfoRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpUdfRes(SCtgTask* pTask) {
|
int32_t ctgDumpUdfRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pUdfList) {
|
if (NULL == pJob->jobRes.pUdfList) {
|
||||||
pJob->jobRes.pUdfList = taosArrayInit(pJob->udfNum, sizeof(SMetaRes));
|
pJob->jobRes.pUdfList = taosArrayInit(pJob->udfNum, sizeof(SMetaRes));
|
||||||
|
@ -831,6 +964,10 @@ int32_t ctgDumpUdfRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpUserRes(SCtgTask* pTask) {
|
int32_t ctgDumpUserRes(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pUser) {
|
if (NULL == pJob->jobRes.pUser) {
|
||||||
pJob->jobRes.pUser = taosArrayInit(pJob->userNum, sizeof(SMetaRes));
|
pJob->jobRes.pUser = taosArrayInit(pJob->userNum, sizeof(SMetaRes));
|
||||||
|
@ -846,6 +983,10 @@ int32_t ctgDumpUserRes(SCtgTask* pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t ctgDumpSvrVer(SCtgTask* pTask) {
|
int32_t ctgDumpSvrVer(SCtgTask* pTask) {
|
||||||
|
if (pTask->subTask) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
SCtgJob* pJob = pTask->pJob;
|
SCtgJob* pJob = pTask->pJob;
|
||||||
if (NULL == pJob->jobRes.pSvrVer) {
|
if (NULL == pJob->jobRes.pSvrVer) {
|
||||||
pJob->jobRes.pSvrVer = taosMemoryCalloc(1, sizeof(SMetaRes));
|
pJob->jobRes.pSvrVer = taosMemoryCalloc(1, sizeof(SMetaRes));
|
||||||
|
@ -1075,7 +1216,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf
|
||||||
|
|
||||||
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
STableMetaOutput* pOut = (STableMetaOutput*)pMsgCtx->out;
|
||||||
|
|
||||||
ctgUpdateTbMetaToCache(pCtg, pOut, false);
|
ctgUpdateTbMetaToCache(pCtg, pOut, flag & CTG_FLAG_SYNC_OP);
|
||||||
|
|
||||||
if (CTG_IS_META_BOTH(pOut->metaType)) {
|
if (CTG_IS_META_BOTH(pOut->metaType)) {
|
||||||
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
|
memcpy(pOut->tbMeta, &pOut->ctbMeta, sizeof(pOut->ctbMeta));
|
||||||
|
@ -1473,6 +1614,49 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgHandleGetTbTagRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCtgTask* pTask = tReq->pTask;
|
||||||
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
|
CTG_ERR_JRET(ctgProcessRspMsg(&pTask->msgCtx.out, reqType, pMsg->pData, pMsg->len, rspCode, pTask->msgCtx.target));
|
||||||
|
|
||||||
|
STableCfgRsp* pRsp = (STableCfgRsp*)pTask->msgCtx.out;
|
||||||
|
if (NULL == pRsp->pTags || pRsp->tagsLen <= 0) {
|
||||||
|
ctgError("invalid tag in tbCfg rsp, pTags:%p, len:%d", pRsp->pTags, pRsp->tagsLen);
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_INVALID_MSG);
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pTagVals = NULL;
|
||||||
|
STag* pTag = (STag*)pRsp->pTags;
|
||||||
|
|
||||||
|
if (tTagIsJson(pTag)) {
|
||||||
|
pTagVals = taosArrayInit(1, sizeof(STagVal));
|
||||||
|
if (NULL == pTagVals) {
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* pJson = parseTagDatatoJson(pTag);
|
||||||
|
STagVal tagVal;
|
||||||
|
tagVal.cid = 0;
|
||||||
|
tagVal.type = TSDB_DATA_TYPE_JSON;
|
||||||
|
tagVal.pData = pJson;
|
||||||
|
tagVal.nData = strlen(pJson);
|
||||||
|
taosArrayPush(pTagVals, &tagVal);
|
||||||
|
} else {
|
||||||
|
CTG_ERR_JRET(tTagToValArray((const STag*)pRsp->pTags, &pTagVals));
|
||||||
|
}
|
||||||
|
|
||||||
|
pTask->res = pTagVals;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
ctgHandleTaskEnd(pTask, code);
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
int32_t ctgHandleGetDbCfgRsp(SCtgTaskReq* tReq, int32_t reqType, const SDataBuf* pMsg, int32_t rspCode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
SCtgTask* pTask = tReq->pTask;
|
SCtgTask* pTask = tReq->pTask;
|
||||||
|
@ -1905,7 +2089,10 @@ int32_t ctgLaunchGetTbCfgTask(SCtgTask* pTask) {
|
||||||
if (pCtx->tbType <= 0) {
|
if (pCtx->tbType <= 0) {
|
||||||
CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
|
CTG_ERR_JRET(ctgReadTbTypeFromCache(pCtg, dbFName, pCtx->pName->tname, &pCtx->tbType));
|
||||||
if (pCtx->tbType <= 0) {
|
if (pCtx->tbType <= 0) {
|
||||||
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, pCtx->pName));
|
SCtgTbMetaParam param;
|
||||||
|
param.pName = pCtx->pName;
|
||||||
|
param.flag = 0;
|
||||||
|
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, ¶m));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1935,6 +2122,45 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int32_t ctgLaunchGetTbTagTask(SCtgTask* pTask) {
|
||||||
|
int32_t code = 0;
|
||||||
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
|
SCtgTbTagCtx* pCtx = (SCtgTbTagCtx*)pTask->taskCtx;
|
||||||
|
SArray* pRes = NULL;
|
||||||
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
|
tNameGetFullDbName(pCtx->pName, dbFName);
|
||||||
|
SCtgJob* pJob = pTask->pJob;
|
||||||
|
SCtgMsgCtx* pMsgCtx = CTG_GET_TASK_MSGCTX(pTask, -1);
|
||||||
|
if (NULL == pMsgCtx->pBatchs) {
|
||||||
|
pMsgCtx->pBatchs = pJob->pBatchs;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (NULL == pCtx->pVgInfo) {
|
||||||
|
CTG_ERR_JRET(ctgGetTbHashVgroupFromCache(pCtg, pCtx->pName, &pCtx->pVgInfo));
|
||||||
|
if (NULL == pCtx->pVgInfo) {
|
||||||
|
CTG_ERR_JRET(ctgLaunchSubTask(pTask, CTG_TASK_GET_DB_VGROUP, ctgGetTbTagCb, dbFName));
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_CACHE_NHIT_INC(CTG_CI_TBL_TAG, 1);
|
||||||
|
|
||||||
|
CTG_ERR_JRET(ctgGetTableCfgFromVnode(pCtg, pConn, pCtx->pName, pCtx->pVgInfo, NULL, pTask));
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
if (CTG_TASK_LAUNCHED == pTask->status) {
|
||||||
|
ctgHandleTaskEnd(pTask, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_RET(code);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgLaunchGetQnodeTask(SCtgTask* pTask) {
|
int32_t ctgLaunchGetQnodeTask(SCtgTask* pTask) {
|
||||||
SCatalog* pCtg = pTask->pJob->pCtg;
|
SCatalog* pCtg = pTask->pJob->pCtg;
|
||||||
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
SRequestConnInfo* pConn = &pTask->pJob->conn;
|
||||||
|
@ -2077,6 +2303,8 @@ int32_t ctgLaunchGetUserTask(SCtgTask* pTask) {
|
||||||
if (inCache) {
|
if (inCache) {
|
||||||
pTask->res = rsp.pRawRes;
|
pTask->res = rsp.pRawRes;
|
||||||
|
|
||||||
|
ctgTaskDebug("Final res got, pass:%d, pCond:%p", rsp.pRawRes->pass, rsp.pRawRes->pCond);
|
||||||
|
|
||||||
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
|
CTG_ERR_RET(ctgHandleTaskEnd(pTask, 0));
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2084,7 +2312,10 @@ int32_t ctgLaunchGetUserTask(SCtgTask* pTask) {
|
||||||
taosMemoryFreeClear(rsp.pRawRes);
|
taosMemoryFreeClear(rsp.pRawRes);
|
||||||
|
|
||||||
if (rsp.metaNotExists) {
|
if (rsp.metaNotExists) {
|
||||||
CTG_ERR_RET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetTbCfgCb, &pCtx->user.tbName));
|
SCtgTbMetaParam param;
|
||||||
|
param.pName = &pCtx->user.tbName;
|
||||||
|
param.flag = CTG_FLAG_SYNC_OP;
|
||||||
|
CTG_ERR_RET(ctgLaunchSubTask(pTask, CTG_TASK_GET_TB_META, ctgGetUserCb, ¶m));
|
||||||
} else {
|
} else {
|
||||||
CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pCtx->user.user, NULL, pTask));
|
CTG_ERR_RET(ctgGetUserDbAuthFromMnode(pCtg, pConn, pCtx->user.user, NULL, pTask));
|
||||||
}
|
}
|
||||||
|
@ -2138,6 +2369,27 @@ _return:
|
||||||
CTG_RET(ctgHandleTaskEnd(pTask, pTask->subRes.code));
|
CTG_RET(ctgHandleTaskEnd(pTask, pTask->subRes.code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetTbTagCb(SCtgTask* pTask) {
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
CTG_ERR_JRET(pTask->subRes.code);
|
||||||
|
|
||||||
|
SCtgTbTagCtx* pCtx = (SCtgTbTagCtx*)pTask->taskCtx;
|
||||||
|
SDBVgInfo* pDb = (SDBVgInfo*)pTask->subRes.res;
|
||||||
|
|
||||||
|
if (NULL == pCtx->pVgInfo) {
|
||||||
|
pCtx->pVgInfo = taosMemoryCalloc(1, sizeof(SVgroupInfo));
|
||||||
|
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pTask->pJob->pCtg, pDb, pCtx->pName, pCtx->pVgInfo));
|
||||||
|
}
|
||||||
|
|
||||||
|
CTG_RET(ctgLaunchGetTbTagTask(pTask));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
CTG_RET(ctgHandleTaskEnd(pTask, pTask->subRes.code));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t ctgGetUserCb(SCtgTask* pTask) {
|
int32_t ctgGetUserCb(SCtgTask* pTask) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -2162,8 +2414,12 @@ int32_t ctgCompDbVgTasks(SCtgTask* pTask, void* param, bool* equal) {
|
||||||
|
|
||||||
int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) {
|
int32_t ctgCompTbMetaTasks(SCtgTask* pTask, void* param, bool* equal) {
|
||||||
SCtgTbMetaCtx* ctx = pTask->taskCtx;
|
SCtgTbMetaCtx* ctx = pTask->taskCtx;
|
||||||
|
SCtgTbMetaParam* pParam = (SCtgTbMetaParam*)param;
|
||||||
|
|
||||||
*equal = tNameTbNameEqual(ctx->pName, (SName*)param);
|
*equal = tNameTbNameEqual(ctx->pName, (SName*)pParam->pName);
|
||||||
|
if (*equal) {
|
||||||
|
ctx->flag |= pParam->flag;
|
||||||
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -2197,6 +2453,7 @@ SCtgAsyncFps gCtgAsyncFps[] = {
|
||||||
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
|
{ctgInitGetSvrVerTask, ctgLaunchGetSvrVerTask, ctgHandleGetSvrVerRsp, ctgDumpSvrVer, NULL, NULL},
|
||||||
{ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL},
|
{ctgInitGetTbMetasTask, ctgLaunchGetTbMetasTask, ctgHandleGetTbMetasRsp, ctgDumpTbMetasRes, NULL, NULL},
|
||||||
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
|
{ctgInitGetTbHashsTask, ctgLaunchGetTbHashsTask, ctgHandleGetTbHashsRsp, ctgDumpTbHashsRes, NULL, NULL},
|
||||||
|
{ctgInitGetTbTagTask, ctgLaunchGetTbTagTask, ctgHandleGetTbTagRsp, ctgDumpTbTagRes, NULL, NULL},
|
||||||
};
|
};
|
||||||
|
|
||||||
int32_t ctgMakeAsyncRes(SCtgJob* pJob) {
|
int32_t ctgMakeAsyncRes(SCtgJob* pJob) {
|
||||||
|
@ -2284,6 +2541,9 @@ int32_t ctgLaunchSubTask(SCtgTask* pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
|
||||||
}
|
}
|
||||||
|
|
||||||
SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId);
|
SCtgTask* pSub = taosArrayGet(pJob->pTasks, subTaskId);
|
||||||
|
if (newTask) {
|
||||||
|
pSub->subTask = true;
|
||||||
|
}
|
||||||
|
|
||||||
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
|
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
|
||||||
|
|
||||||
|
|
|
@ -703,7 +703,31 @@ _return:
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t ctgGetCachedStbNameFromSuid(SCatalog* pCtg, char* dbFName, uint64_t suid, char **stbName) {
|
||||||
|
*stbName = NULL;
|
||||||
|
|
||||||
|
SCtgDBCache *dbCache = NULL;
|
||||||
|
ctgAcquireDBCache(pCtg, dbFName, &dbCache);
|
||||||
|
if (NULL == dbCache) {
|
||||||
|
ctgDebug("db %s not in cache", dbFName);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *stb = taosHashAcquire(dbCache->stbCache, &suid, sizeof(suid));
|
||||||
|
if (NULL == stb) {
|
||||||
|
ctgDebug("stb 0x%" PRIx64 " not in cache, dbFName:%s", suid, dbFName);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
*stbName = taosStrdup(stb);
|
||||||
|
|
||||||
|
taosHashRelease(dbCache->stbCache, stb);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp *pRes) {
|
int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache, SCtgAuthRsp *pRes) {
|
||||||
|
int32_t code = 0;
|
||||||
if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
|
if (IS_SYS_DBNAME(pReq->tbName.dbname)) {
|
||||||
*inCache = true;
|
*inCache = true;
|
||||||
pRes->pRawRes->pass = true;
|
pRes->pRawRes->pass = true;
|
||||||
|
@ -728,7 +752,7 @@ int32_t ctgChkAuthFromCache(SCatalog *pCtg, SUserAuthInfo *pReq, bool *inCache,
|
||||||
|
|
||||||
CTG_LOCK(CTG_READ, &pUser->lock);
|
CTG_LOCK(CTG_READ, &pUser->lock);
|
||||||
memcpy(&req.authInfo, &pUser->userAuth, sizeof(pUser->userAuth));
|
memcpy(&req.authInfo, &pUser->userAuth, sizeof(pUser->userAuth));
|
||||||
int32_t code = ctgChkSetAuthRes(pCtg, &req, pRes);
|
code = ctgChkSetAuthRes(pCtg, &req, pRes);
|
||||||
CTG_UNLOCK(CTG_READ, &pUser->lock);
|
CTG_UNLOCK(CTG_READ, &pUser->lock);
|
||||||
CTG_ERR_JRET(code);
|
CTG_ERR_JRET(code);
|
||||||
|
|
||||||
|
@ -742,8 +766,9 @@ _return:
|
||||||
|
|
||||||
*inCache = false;
|
*inCache = false;
|
||||||
CTG_CACHE_NHIT_INC(CTG_CI_USER, 1);
|
CTG_CACHE_NHIT_INC(CTG_CI_USER, 1);
|
||||||
|
ctgDebug("Get user from cache failed, user:%s, metaNotExists:%d, code:%d", pReq->user, pRes->metaNotExists, code);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void ctgDequeue(SCtgCacheOperation **op) {
|
void ctgDequeue(SCtgCacheOperation **op) {
|
||||||
|
|
|
@ -170,6 +170,9 @@ void ctgFreeSMetaData(SMetaData* pData) {
|
||||||
taosArrayDestroy(pData->pTableCfg);
|
taosArrayDestroy(pData->pTableCfg);
|
||||||
pData->pTableCfg = NULL;
|
pData->pTableCfg = NULL;
|
||||||
|
|
||||||
|
taosArrayDestroy(pData->pTableTag);
|
||||||
|
pData->pTableTag = NULL;
|
||||||
|
|
||||||
taosMemoryFreeClear(pData->pSvrVer);
|
taosMemoryFreeClear(pData->pSvrVer);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -486,6 +489,18 @@ void ctgFreeBatchHash(void* hash) {
|
||||||
taosMemoryFreeClear(pRes->pRes);
|
taosMemoryFreeClear(pRes->pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ctgFreeJsonTagVal(void* val) {
|
||||||
|
if (NULL == val) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
STagVal* pVal = (STagVal*)val;
|
||||||
|
|
||||||
|
if (TSDB_DATA_TYPE_JSON == pVal->type) {
|
||||||
|
taosMemoryFree(pVal->pData);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) {
|
void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case CTG_TASK_GET_QNODE:
|
case CTG_TASK_GET_QNODE:
|
||||||
|
@ -516,16 +531,32 @@ void ctgFreeTaskRes(CTG_TASK_TYPE type, void** pRes) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case CTG_TASK_GET_USER: {
|
||||||
|
if (*pRes) {
|
||||||
|
SUserAuthRes* pAuth = (SUserAuthRes*)*pRes;
|
||||||
|
nodesDestroyNode(pAuth->pCond);
|
||||||
|
taosMemoryFreeClear(*pRes);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
case CTG_TASK_GET_TB_HASH:
|
case CTG_TASK_GET_TB_HASH:
|
||||||
case CTG_TASK_GET_DB_INFO:
|
case CTG_TASK_GET_DB_INFO:
|
||||||
case CTG_TASK_GET_INDEX_INFO:
|
case CTG_TASK_GET_INDEX_INFO:
|
||||||
case CTG_TASK_GET_UDF:
|
case CTG_TASK_GET_UDF:
|
||||||
case CTG_TASK_GET_USER:
|
|
||||||
case CTG_TASK_GET_SVR_VER:
|
case CTG_TASK_GET_SVR_VER:
|
||||||
case CTG_TASK_GET_TB_META: {
|
case CTG_TASK_GET_TB_META: {
|
||||||
taosMemoryFreeClear(*pRes);
|
taosMemoryFreeClear(*pRes);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case CTG_TASK_GET_TB_TAG: {
|
||||||
|
if (1 == taosArrayGetSize(*pRes)) {
|
||||||
|
taosArrayDestroyEx(*pRes, ctgFreeJsonTagVal);
|
||||||
|
} else {
|
||||||
|
taosArrayDestroy(*pRes);
|
||||||
|
}
|
||||||
|
*pRes = NULL;
|
||||||
|
break;
|
||||||
|
}
|
||||||
case CTG_TASK_GET_TB_META_BATCH: {
|
case CTG_TASK_GET_TB_META_BATCH: {
|
||||||
SArray* pArray = (SArray*)*pRes;
|
SArray* pArray = (SArray*)*pRes;
|
||||||
int32_t num = taosArrayGetSize(pArray);
|
int32_t num = taosArrayGetSize(pArray);
|
||||||
|
@ -679,6 +710,13 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
|
||||||
taosMemoryFreeClear(pTask->taskCtx);
|
taosMemoryFreeClear(pTask->taskCtx);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
case CTG_TASK_GET_TB_TAG: {
|
||||||
|
SCtgTbTagCtx* taskCtx = (SCtgTbTagCtx*)pTask->taskCtx;
|
||||||
|
taosMemoryFreeClear(taskCtx->pName);
|
||||||
|
taosMemoryFreeClear(taskCtx->pVgInfo);
|
||||||
|
taosMemoryFreeClear(taskCtx);
|
||||||
|
break;
|
||||||
|
}
|
||||||
case CTG_TASK_GET_DB_VGROUP:
|
case CTG_TASK_GET_DB_VGROUP:
|
||||||
case CTG_TASK_GET_DB_CFG:
|
case CTG_TASK_GET_DB_CFG:
|
||||||
case CTG_TASK_GET_DB_INFO:
|
case CTG_TASK_GET_DB_INFO:
|
||||||
|
@ -1336,57 +1374,75 @@ int32_t ctgChkSetTbAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
|
||||||
STableMeta* pMeta = NULL;
|
STableMeta* pMeta = NULL;
|
||||||
SGetUserAuthRsp* pInfo = &req->authInfo;
|
SGetUserAuthRsp* pInfo = &req->authInfo;
|
||||||
SHashObj* pTbs = (AUTH_TYPE_READ == req->singleType) ? pInfo->readTbs : pInfo->writeTbs;
|
SHashObj* pTbs = (AUTH_TYPE_READ == req->singleType) ? pInfo->readTbs : pInfo->writeTbs;
|
||||||
|
char* stbName = NULL;
|
||||||
|
|
||||||
char tbFullName[TSDB_TABLE_FNAME_LEN];
|
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||||
tNameExtractFullName(&req->pRawReq->tbName, tbFullName);
|
char dbFName[TSDB_DB_FNAME_LEN];
|
||||||
char* pCond = taosHashGet(pTbs, tbFullName, strlen(tbFullName));
|
tNameExtractFullName(&req->pRawReq->tbName, tbFName);
|
||||||
|
tNameGetFullDbName(&req->pRawReq->tbName, dbFName);
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
taosMemoryFreeClear(pMeta);
|
||||||
|
|
||||||
|
char* pCond = taosHashGet(pTbs, tbFName, strlen(tbFName));
|
||||||
if (pCond) {
|
if (pCond) {
|
||||||
if (strlen(pCond) > 1) {
|
if (strlen(pCond) > 1) {
|
||||||
CTG_ERR_RET(nodesStringToNode(pCond, &res->pRawRes->pCond));
|
CTG_ERR_JRET(nodesStringToNode(pCond, &res->pRawRes->pCond));
|
||||||
}
|
}
|
||||||
|
|
||||||
res->pRawRes->pass = true;
|
res->pRawRes->pass = true;
|
||||||
return TSDB_CODE_SUCCESS;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (stbName) {
|
||||||
res->pRawRes->pass = false;
|
res->pRawRes->pass = false;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
// CTG_ERR_RET(catalogGetCachedTableMeta(pCtg, &req->pRawReq->tbName, &pMeta));
|
CTG_ERR_JRET(catalogGetCachedTableMeta(pCtg, &req->pRawReq->tbName, &pMeta));
|
||||||
// if (NULL == pMeta) {
|
if (NULL == pMeta) {
|
||||||
// if (req->onlyCache) {
|
if (req->onlyCache) {
|
||||||
// res->metaNotExists = true;
|
res->metaNotExists = true;
|
||||||
// ctgDebug("db %s tb %s meta not in cache for auth", req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
ctgDebug("db %s tb %s meta not in cache for auth", req->pRawReq->tbName.dbname, req->pRawReq->tbName.tname);
|
||||||
// return TSDB_CODE_SUCCESS;
|
goto _return;
|
||||||
// }
|
}
|
||||||
|
|
||||||
// CTG_ERR_RET(catalogGetTableMeta(pCtg, req->pConn, &req->pRawReq->tbName, &pMeta));
|
SCtgTbMetaCtx ctx = {0};
|
||||||
// }
|
ctx.pName = (SName*)&req->pRawReq->tbName;
|
||||||
|
ctx.flag = CTG_FLAG_UNKNOWN_STB | CTG_FLAG_SYNC_OP;
|
||||||
|
|
||||||
// if (TSDB_SUPER_TABLE == pMeta->tableType || TSDB_NORMAL_TABLE == pMeta->tableType) {
|
CTG_ERR_JRET(ctgGetTbMeta(pCtg, req->pConn, &ctx, &pMeta));
|
||||||
// res->pRawRes->pass = false;
|
}
|
||||||
// goto _return;
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if (TSDB_CHILD_TABLE == pMeta->tableType) {
|
if (TSDB_SUPER_TABLE == pMeta->tableType || TSDB_NORMAL_TABLE == pMeta->tableType) {
|
||||||
// res->pRawRes->pass = true;
|
res->pRawRes->pass = false;
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
// /*
|
if (TSDB_CHILD_TABLE == pMeta->tableType) {
|
||||||
// char stbName[TSDB_TABLE_NAME_LEN] = {0};
|
CTG_ERR_JRET(ctgGetCachedStbNameFromSuid(pCtg, dbFName, pMeta->suid, &stbName));
|
||||||
// CTG_ERR_JRET(ctgGetCachedStbNameFromSuid(pCtg, pMeta->suid, stbName));
|
if (NULL == stbName) {
|
||||||
// if (0 == stbName[0]) {
|
if (req->onlyCache) {
|
||||||
// if (req->onlyCache) {
|
res->metaNotExists = true;
|
||||||
// res->notExists = true;
|
ctgDebug("suid %" PRIu64 " name not in cache for auth", pMeta->suid);
|
||||||
// return TSDB_CODE_SUCCESS;
|
goto _return;
|
||||||
// }
|
}
|
||||||
|
|
||||||
// CTG_ERR_RET(catalogRefreshTableMeta(pCtg, req->pConn, &req->pRawReq->tbName, 0));
|
continue;
|
||||||
// }
|
}
|
||||||
// */
|
|
||||||
// }
|
sprintf(tbFName, "%s.%s", dbFName, stbName);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
ctgError("Invalid table type %d for %s", pMeta->tableType, tbFName);
|
||||||
|
CTG_ERR_JRET(TSDB_CODE_INVALID_PARA);
|
||||||
|
}
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
taosMemoryFree(pMeta);
|
taosMemoryFree(pMeta);
|
||||||
|
taosMemoryFree(stbName);
|
||||||
|
|
||||||
CTG_RET(code);
|
CTG_RET(code);
|
||||||
}
|
}
|
||||||
|
@ -1423,7 +1479,7 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
|
||||||
if (pInfo->readTbs && taosHashGetSize(pInfo->readTbs) > 0) {
|
if (pInfo->readTbs && taosHashGetSize(pInfo->readTbs) > 0) {
|
||||||
req->singleType = AUTH_TYPE_READ;
|
req->singleType = AUTH_TYPE_READ;
|
||||||
CTG_ERR_RET(ctgChkSetTbAuthRes(pCtg, req, res));
|
CTG_ERR_RET(ctgChkSetTbAuthRes(pCtg, req, res));
|
||||||
if (pRes->pass) {
|
if (pRes->pass || res->metaNotExists) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1439,7 +1495,7 @@ int32_t ctgChkSetAuthRes(SCatalog* pCtg, SCtgAuthReq* req, SCtgAuthRsp* res) {
|
||||||
if (pInfo->writeTbs && taosHashGetSize(pInfo->writeTbs) > 0) {
|
if (pInfo->writeTbs && taosHashGetSize(pInfo->writeTbs) > 0) {
|
||||||
req->singleType = AUTH_TYPE_WRITE;
|
req->singleType = AUTH_TYPE_WRITE;
|
||||||
CTG_ERR_RET(ctgChkSetTbAuthRes(pCtg, req, res));
|
CTG_ERR_RET(ctgChkSetTbAuthRes(pCtg, req, res));
|
||||||
if (pRes->pass) {
|
if (pRes->pass || res->metaNotExists) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -827,6 +827,8 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pNode;
|
SVnodeModifyOpStmt* pStmt = (SVnodeModifyOpStmt*)pNode;
|
||||||
destroyVgDataBlockArray(pStmt->pDataBlocks);
|
destroyVgDataBlockArray(pStmt->pDataBlocks);
|
||||||
taosMemoryFreeClear(pStmt->pTableMeta);
|
taosMemoryFreeClear(pStmt->pTableMeta);
|
||||||
|
nodesDestroyNode(pStmt->pTagCond);
|
||||||
|
taosArrayDestroy(pStmt->pTableTag);
|
||||||
taosHashCleanup(pStmt->pVgroupsHashObj);
|
taosHashCleanup(pStmt->pVgroupsHashObj);
|
||||||
taosHashCleanup(pStmt->pSubTableHashObj);
|
taosHashCleanup(pStmt->pSubTableHashObj);
|
||||||
taosHashCleanup(pStmt->pTableNameHashObj);
|
taosHashCleanup(pStmt->pTableNameHashObj);
|
||||||
|
@ -953,8 +955,12 @@ void nodesDestroyNode(SNode* pNode) {
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_SPLIT_VGROUP_STMT: // no pointer field
|
case QUERY_NODE_SPLIT_VGROUP_STMT: // no pointer field
|
||||||
case QUERY_NODE_SYNCDB_STMT: // no pointer field
|
case QUERY_NODE_SYNCDB_STMT: // no pointer field
|
||||||
case QUERY_NODE_GRANT_STMT: // no pointer field
|
break;
|
||||||
case QUERY_NODE_REVOKE_STMT: // no pointer field
|
case QUERY_NODE_GRANT_STMT:
|
||||||
|
nodesDestroyNode(((SGrantStmt*)pNode)->pTagCond);
|
||||||
|
break;
|
||||||
|
case QUERY_NODE_REVOKE_STMT:
|
||||||
|
nodesDestroyNode(((SRevokeStmt*)pNode)->pTagCond);
|
||||||
break;
|
break;
|
||||||
case QUERY_NODE_SHOW_DNODES_STMT:
|
case QUERY_NODE_SHOW_DNODES_STMT:
|
||||||
case QUERY_NODE_SHOW_MNODES_STMT:
|
case QUERY_NODE_SHOW_MNODES_STMT:
|
||||||
|
|
|
@ -70,7 +70,7 @@ static EDealRes authSubquery(SAuthCxt* pCxt, SNode* pStmt) {
|
||||||
return TSDB_CODE_SUCCESS == authQuery(pCxt, pStmt) ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
|
return TSDB_CODE_SUCCESS == authQuery(pCxt, pStmt) ? DEAL_RES_CONTINUE : DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mergeStableTagCond(SNode** pWhere, SNode** pTagCond) {
|
static int32_t mergeStableTagCond(SNode** pWhere, SNode* pTagCond) {
|
||||||
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
|
SLogicConditionNode* pLogicCond = (SLogicConditionNode*)nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
|
||||||
if (NULL == pLogicCond) {
|
if (NULL == pLogicCond) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -78,7 +78,7 @@ static int32_t mergeStableTagCond(SNode** pWhere, SNode** pTagCond) {
|
||||||
pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
pLogicCond->node.resType.type = TSDB_DATA_TYPE_BOOL;
|
||||||
pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
pLogicCond->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BOOL].bytes;
|
||||||
pLogicCond->condType = LOGIC_COND_TYPE_AND;
|
pLogicCond->condType = LOGIC_COND_TYPE_AND;
|
||||||
int32_t code = nodesListMakeStrictAppend(&pLogicCond->pParameterList, *pTagCond);
|
int32_t code = nodesListMakeStrictAppend(&pLogicCond->pParameterList, pTagCond);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = nodesListMakeAppend(&pLogicCond->pParameterList, *pWhere);
|
code = nodesListMakeAppend(&pLogicCond->pParameterList, *pWhere);
|
||||||
}
|
}
|
||||||
|
@ -106,7 +106,7 @@ static int32_t appendStableTagCond(SNode** pWhere, SNode* pTagCond) {
|
||||||
return nodesListStrictAppend(((SLogicConditionNode*)*pWhere)->pParameterList, pTagCondCopy);
|
return nodesListStrictAppend(((SLogicConditionNode*)*pWhere)->pParameterList, pTagCondCopy);
|
||||||
}
|
}
|
||||||
|
|
||||||
return mergeStableTagCond(pWhere, &pTagCondCopy);
|
return mergeStableTagCond(pWhere, pTagCondCopy);
|
||||||
}
|
}
|
||||||
|
|
||||||
static EDealRes authSelectImpl(SNode* pNode, void* pContext) {
|
static EDealRes authSelectImpl(SNode* pNode, void* pContext) {
|
||||||
|
|
|
@ -53,6 +53,7 @@ typedef struct SInsertParseContext {
|
||||||
bool missCache;
|
bool missCache;
|
||||||
bool usingDuplicateTable;
|
bool usingDuplicateTable;
|
||||||
bool forceUpdate;
|
bool forceUpdate;
|
||||||
|
bool needTableTagVal;
|
||||||
} SInsertParseContext;
|
} SInsertParseContext;
|
||||||
|
|
||||||
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
|
typedef int32_t (*_row_append_fn_t)(SMsgBuf* pMsgBuf, const void* value, int32_t len, void* param);
|
||||||
|
@ -577,28 +578,39 @@ static int32_t rewriteTagCondColumnImpl(STagVal* pVal, SNode** pNode) {
|
||||||
if (NULL == pValue) {
|
if (NULL == pValue) {
|
||||||
return TSDB_CODE_OUT_OF_MEMORY;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
pValue->node.resType.type = pVal->type;
|
|
||||||
|
pValue->node.resType = ((SColumnNode*)*pNode)->node.resType;
|
||||||
|
nodesDestroyNode(*pNode);
|
||||||
|
*pNode = (SNode*)pValue;
|
||||||
|
|
||||||
switch (pVal->type) {
|
switch (pVal->type) {
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
pValue->datum.b = *(int8_t*)(&pVal->i64);
|
pValue->datum.b = *(int8_t*)(&pVal->i64);
|
||||||
|
*(bool*)&pValue->typeData = pValue->datum.b;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_TINYINT:
|
case TSDB_DATA_TYPE_TINYINT:
|
||||||
pValue->datum.i = *(int8_t*)(&pVal->i64);
|
pValue->datum.i = *(int8_t*)(&pVal->i64);
|
||||||
|
*(int8_t*)&pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_SMALLINT:
|
case TSDB_DATA_TYPE_SMALLINT:
|
||||||
pValue->datum.i = *(int16_t*)(&pVal->i64);
|
pValue->datum.i = *(int16_t*)(&pVal->i64);
|
||||||
|
*(int16_t*)&pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_INT:
|
case TSDB_DATA_TYPE_INT:
|
||||||
pValue->datum.i = *(int32_t*)(&pVal->i64);
|
pValue->datum.i = *(int32_t*)(&pVal->i64);
|
||||||
|
*(int32_t*)&pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
pValue->datum.i = pVal->i64;
|
pValue->datum.i = pVal->i64;
|
||||||
|
pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_FLOAT:
|
case TSDB_DATA_TYPE_FLOAT:
|
||||||
pValue->datum.d = *(float*)(&pVal->i64);
|
pValue->datum.d = *(float*)(&pVal->i64);
|
||||||
|
*(float*)&pValue->typeData = pValue->datum.d;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_DOUBLE:
|
case TSDB_DATA_TYPE_DOUBLE:
|
||||||
pValue->datum.d = *(double*)(&pVal->i64);
|
pValue->datum.d = *(double*)(&pVal->i64);
|
||||||
|
*(double*)&pValue->typeData = pValue->datum.d;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_VARCHAR:
|
case TSDB_DATA_TYPE_VARCHAR:
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
@ -611,18 +623,23 @@ static int32_t rewriteTagCondColumnImpl(STagVal* pVal, SNode** pNode) {
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
pValue->datum.i = pVal->i64;
|
pValue->datum.i = pVal->i64;
|
||||||
|
pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UTINYINT:
|
case TSDB_DATA_TYPE_UTINYINT:
|
||||||
pValue->datum.i = *(uint8_t*)(&pVal->i64);
|
pValue->datum.i = *(uint8_t*)(&pVal->i64);
|
||||||
|
*(uint8_t*)&pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_USMALLINT:
|
case TSDB_DATA_TYPE_USMALLINT:
|
||||||
pValue->datum.i = *(uint16_t*)(&pVal->i64);
|
pValue->datum.i = *(uint16_t*)(&pVal->i64);
|
||||||
|
*(uint16_t*)&pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UINT:
|
case TSDB_DATA_TYPE_UINT:
|
||||||
pValue->datum.i = *(uint32_t*)(&pVal->i64);
|
pValue->datum.i = *(uint32_t*)(&pVal->i64);
|
||||||
|
*(uint32_t*)&pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_UBIGINT:
|
case TSDB_DATA_TYPE_UBIGINT:
|
||||||
pValue->datum.i = *(uint64_t*)(&pVal->i64);
|
pValue->datum.i = *(uint64_t*)(&pVal->i64);
|
||||||
|
*(uint64_t*)&pValue->typeData = pValue->datum.i;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_JSON:
|
case TSDB_DATA_TYPE_JSON:
|
||||||
case TSDB_DATA_TYPE_VARBINARY:
|
case TSDB_DATA_TYPE_VARBINARY:
|
||||||
|
@ -667,16 +684,15 @@ static int32_t checkTagCondResult(SNode* pResult) {
|
||||||
: TSDB_CODE_PAR_PERMISSION_DENIED;
|
: TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t checkSubtablePrivilege(SArray* pTagVals, SArray* pTagName, SNode* pCond) {
|
static int32_t checkSubtablePrivilege(SArray* pTagVals, SArray* pTagName, SNode** pCond) {
|
||||||
int32_t code = setTagVal(pTagVals, pTagName, pCond);
|
int32_t code = setTagVal(pTagVals, pTagName, *pCond);
|
||||||
SNode* pNew = NULL;
|
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = scalarCalculateConstants(pCond, &pNew);
|
code = scalarCalculateConstants(*pCond, pCond);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = checkTagCondResult(pNew);
|
code = checkTagCondResult(*pCond);
|
||||||
}
|
}
|
||||||
nodesDestroyNode(pNew);
|
NODES_DESTORY_NODE(*pCond);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -716,6 +732,10 @@ static int32_t parseTagsClauseImpl(SInsertParseContext* pCxt, SVnodeModifyOpStmt
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pStmt->pTagCond) {
|
||||||
|
code = checkSubtablePrivilege(pTagVals, pTagName, &pStmt->pTagCond);
|
||||||
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS == code && !isParseBindParam && !isJson) {
|
if (TSDB_CODE_SUCCESS == code && !isParseBindParam && !isJson) {
|
||||||
code = tTagNew(pTagVals, 1, false, &pTag);
|
code = tTagNew(pTagVals, 1, false, &pTag);
|
||||||
}
|
}
|
||||||
|
@ -843,7 +863,7 @@ static void setUserAuthInfo(SParseContext* pCxt, SName* pTbName, SUserAuthInfo*
|
||||||
pInfo->type = AUTH_TYPE_WRITE;
|
pInfo->type = AUTH_TYPE_WRITE;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache) {
|
static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache, SNode** pTagCond) {
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
SUserAuthInfo authInfo = {0};
|
SUserAuthInfo authInfo = {0};
|
||||||
setUserAuthInfo(pCxt, pTbName, &authInfo);
|
setUserAuthInfo(pCxt, pTbName, &authInfo);
|
||||||
|
@ -863,11 +883,28 @@ static int32_t checkAuth(SParseContext* pCxt, SName* pTbName, bool* pMissCache)
|
||||||
*pMissCache = true;
|
*pMissCache = true;
|
||||||
} else if (!authRes.pass) {
|
} else if (!authRes.pass) {
|
||||||
code = TSDB_CODE_PAR_PERMISSION_DENIED;
|
code = TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||||
|
} else if (NULL != authRes.pCond) {
|
||||||
|
*pTagCond = authRes.pCond;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t checkAuthForTable(SParseContext* pCxt, SName* pTbName, bool* pMissCache, bool* pNeedTableTagVal) {
|
||||||
|
SNode* pTagCond = NULL;
|
||||||
|
int32_t code = checkAuth(pCxt, pTbName, pMissCache, &pTagCond);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*pNeedTableTagVal = ((*pMissCache) || (NULL != pTagCond));
|
||||||
|
*pMissCache = (NULL != pTagCond);
|
||||||
|
}
|
||||||
|
nodesDestroyNode(pTagCond);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t checkAuthForStable(SParseContext* pCxt, SName* pTbName, bool* pMissCache, SNode** pTagCond) {
|
||||||
|
return checkAuth(pCxt, pTbName, pMissCache, pTagCond);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta,
|
static int32_t getTableMeta(SInsertParseContext* pCxt, SName* pTbName, bool isStb, STableMeta** pTableMeta,
|
||||||
bool* pMissCache) {
|
bool* pMissCache) {
|
||||||
SParseContext* pComCxt = pCxt->pComCxt;
|
SParseContext* pComCxt = pCxt->pComCxt;
|
||||||
|
@ -970,7 +1007,7 @@ static int32_t getTargetTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStm
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
int32_t code = checkAuthForTable(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache, &pCxt->needTableTagVal);
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
code = getTableMetaAndVgroup(pCxt, pStmt, &pCxt->missCache);
|
||||||
}
|
}
|
||||||
|
@ -993,7 +1030,7 @@ static int32_t getUsingTableSchema(SInsertParseContext* pCxt, SVnodeModifyOpStmt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t code = checkAuth(pCxt->pComCxt, &pStmt->targetTableName, &pCxt->missCache);
|
int32_t code = checkAuthForStable(pCxt->pComCxt, &pStmt->usingTableName, &pCxt->missCache, &pStmt->pTagCond);
|
||||||
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
if (TSDB_CODE_SUCCESS == code && !pCxt->missCache) {
|
||||||
code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache);
|
code = getTableMeta(pCxt, &pStmt->usingTableName, true, &pStmt->pTableMeta, &pCxt->missCache);
|
||||||
}
|
}
|
||||||
|
@ -1606,6 +1643,8 @@ static int32_t parseInsertTableClauseBottom(SInsertParseContext* pCxt, SVnodeMod
|
||||||
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
static void resetEnvPreTable(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt) {
|
||||||
insDestroyBoundColInfo(&pCxt->tags);
|
insDestroyBoundColInfo(&pCxt->tags);
|
||||||
taosMemoryFreeClear(pStmt->pTableMeta);
|
taosMemoryFreeClear(pStmt->pTableMeta);
|
||||||
|
nodesDestroyNode(pStmt->pTagCond);
|
||||||
|
taosArrayDestroy(pStmt->pTableTag);
|
||||||
tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
|
tdDestroySVCreateTbReq(pStmt->pCreateTblReq);
|
||||||
taosMemoryFreeClear(pStmt->pCreateTblReq);
|
taosMemoryFreeClear(pStmt->pCreateTblReq);
|
||||||
pCxt->missCache = false;
|
pCxt->missCache = false;
|
||||||
|
@ -1780,14 +1819,18 @@ static int32_t createInsertQuery(SInsertParseContext* pCxt, SQuery** pOutput) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t checkAuthFromMetaData(const SArray* pUsers) {
|
static int32_t checkAuthFromMetaData(const SArray* pUsers, SNode** pTagCond) {
|
||||||
if (1 != taosArrayGetSize(pUsers)) {
|
if (1 != taosArrayGetSize(pUsers)) {
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMetaRes* pRes = taosArrayGet(pUsers, 0);
|
SMetaRes* pRes = taosArrayGet(pUsers, 0);
|
||||||
if (TSDB_CODE_SUCCESS == pRes->code) {
|
if (TSDB_CODE_SUCCESS == pRes->code) {
|
||||||
return (*(bool*)pRes->pRes) ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED;
|
SUserAuthRes* pAuth = pRes->pRes;
|
||||||
|
if (NULL != pAuth->pCond) {
|
||||||
|
*pTagCond = nodesCloneNode(pAuth->pCond);
|
||||||
|
}
|
||||||
|
return pAuth->pass ? TSDB_CODE_SUCCESS : TSDB_CODE_PAR_PERMISSION_DENIED;
|
||||||
}
|
}
|
||||||
return pRes->code;
|
return pRes->code;
|
||||||
}
|
}
|
||||||
|
@ -1826,9 +1869,40 @@ static int32_t getTableVgroupFromMetaData(const SArray* pTables, SVnodeModifyOpS
|
||||||
sizeof(SVgroupInfo));
|
sizeof(SVgroupInfo));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildTagNameFromMeta(STableMeta* pMeta, SArray** pTagName) {
|
||||||
|
*pTagName = taosArrayInit(pMeta->tableInfo.numOfTags, TSDB_COL_NAME_LEN);
|
||||||
|
if (NULL == *pTagName) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
SSchema* pSchema = getTableTagSchema(pMeta);
|
||||||
|
for (int32_t i = 0; i < pMeta->tableInfo.numOfTags; ++i) {
|
||||||
|
taosArrayPush(*pTagName, pSchema[i].name);
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t checkSubtablePrivilegeForTable(const SArray* pTables, SVnodeModifyOpStmt* pStmt) {
|
||||||
|
if (1 != taosArrayGetSize(pTables)) {
|
||||||
|
return TSDB_CODE_FAILED;
|
||||||
|
}
|
||||||
|
|
||||||
|
SMetaRes* pRes = taosArrayGet(pTables, 0);
|
||||||
|
if (TSDB_CODE_SUCCESS != pRes->code) {
|
||||||
|
return pRes->code;
|
||||||
|
}
|
||||||
|
|
||||||
|
SArray* pTagName = NULL;
|
||||||
|
int32_t code = buildTagNameFromMeta(pStmt->pTableMeta, &pTagName);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = checkSubtablePrivilege((SArray*)pRes->pRes, pTagName, &pStmt->pTagCond);
|
||||||
|
}
|
||||||
|
taosArrayDestroy(pTagName);
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t getTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData,
|
static int32_t getTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMetaData* pMetaData,
|
||||||
SVnodeModifyOpStmt* pStmt, bool isStb) {
|
SVnodeModifyOpStmt* pStmt, bool isStb) {
|
||||||
int32_t code = checkAuthFromMetaData(pMetaData->pUser);
|
int32_t code = checkAuthFromMetaData(pMetaData->pUser, &pStmt->pTagCond);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = getTableMetaFromMetaData(pMetaData->pTableMeta, &pStmt->pTableMeta);
|
code = getTableMetaFromMetaData(pMetaData->pTableMeta, &pStmt->pTableMeta);
|
||||||
}
|
}
|
||||||
|
@ -1841,6 +1915,9 @@ static int32_t getTableSchemaFromMetaData(SInsertParseContext* pCxt, const SMeta
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
|
code = getTableVgroupFromMetaData(pMetaData->pTableHash, pStmt, isStb);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && !isStb && NULL != pStmt->pTagCond) {
|
||||||
|
code = checkSubtablePrivilegeForTable(pMetaData->pTableTag, pStmt);
|
||||||
|
}
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1860,6 +1937,8 @@ static void clearCatalogReq(SCatalogReq* pCatalogReq) {
|
||||||
pCatalogReq->pTableHash = NULL;
|
pCatalogReq->pTableHash = NULL;
|
||||||
taosArrayDestroy(pCatalogReq->pUser);
|
taosArrayDestroy(pCatalogReq->pUser);
|
||||||
pCatalogReq->pUser = NULL;
|
pCatalogReq->pUser = NULL;
|
||||||
|
taosArrayDestroy(pCatalogReq->pTableTag);
|
||||||
|
pCatalogReq->pTableTag = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
static int32_t setVnodeModifOpStmt(SInsertParseContext* pCxt, SCatalogReq* pCatalogReq, const SMetaData* pMetaData,
|
||||||
|
@ -2033,8 +2112,15 @@ static int32_t buildInsertUserAuthReq(const char* pUser, SName* pName, SArray**
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t buildInsertTableTagReq(SName* pName, SArray** pTables) { return buildInsertTableReq(pName, pTables); }
|
||||||
|
|
||||||
static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SCatalogReq* pCatalogReq) {
|
static int32_t buildInsertCatalogReq(SInsertParseContext* pCxt, SVnodeModifyOpStmt* pStmt, SCatalogReq* pCatalogReq) {
|
||||||
int32_t code = buildInsertUserAuthReq(pCxt->pComCxt->pUser, &pStmt->targetTableName, &pCatalogReq->pUser);
|
int32_t code = buildInsertUserAuthReq(
|
||||||
|
pCxt->pComCxt->pUser, (0 == pStmt->usingTableName.type ? &pStmt->targetTableName : &pStmt->usingTableName),
|
||||||
|
&pCatalogReq->pUser);
|
||||||
|
if (TSDB_CODE_SUCCESS == code && pCxt->needTableTagVal) {
|
||||||
|
code = buildInsertTableTagReq(&pStmt->targetTableName, &pCatalogReq->pTableTag);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
if (0 == pStmt->usingTableName.type) {
|
if (0 == pStmt->usingTableName.type) {
|
||||||
code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableMeta);
|
code = buildInsertDbReq(&pStmt->targetTableName, &pCatalogReq->pTableMeta);
|
||||||
|
|
|
@ -1310,7 +1310,8 @@ static EDealRes translateOperator(STranslateContext* pCxt, SOperatorNode* pOp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static EDealRes haveVectorFunction(SNode* pNode, void* pContext) {
|
static EDealRes haveVectorFunction(SNode* pNode, void* pContext) {
|
||||||
if (isAggFunc(pNode) || isIndefiniteRowsFunc(pNode) || isWindowPseudoColumnFunc(pNode) || isInterpPseudoColumnFunc(pNode)) {
|
if (isAggFunc(pNode) || isIndefiniteRowsFunc(pNode) || isWindowPseudoColumnFunc(pNode) ||
|
||||||
|
isInterpPseudoColumnFunc(pNode)) {
|
||||||
*((bool*)pContext) = true;
|
*((bool*)pContext) = true;
|
||||||
return DEAL_RES_END;
|
return DEAL_RES_END;
|
||||||
}
|
}
|
||||||
|
@ -6642,6 +6643,7 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &req);
|
code = buildCmdMsg(pCxt, TDMT_MND_ALTER_USER, (FSerializeFunc)tSerializeSAlterUserReq, &req);
|
||||||
}
|
}
|
||||||
|
tFreeSAlterUserReq(&req);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -53,6 +53,7 @@ int32_t sclCreateColumnInfoData(SDataType *pType, int32_t numOfRows, SScalarPara
|
||||||
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
|
int32_t code = colInfoDataEnsureCapacity(pColumnData, numOfRows, true);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
colDataDestroy(pColumnData);
|
||||||
taosMemoryFree(pColumnData);
|
taosMemoryFree(pColumnData);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
@ -1061,17 +1062,20 @@ int32_t sclConvertOpValueNodeTs(SOperatorNode *node, SScalarCtx *ctx) {
|
||||||
|
|
||||||
if (node->pLeft && SCL_IS_VAR_VALUE_NODE(node->pLeft)) {
|
if (node->pLeft && SCL_IS_VAR_VALUE_NODE(node->pLeft)) {
|
||||||
if (node->pRight && (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pRight)->resType.type)) {
|
if (node->pRight && (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pRight)->resType.type)) {
|
||||||
SCL_ERR_JRET(sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight), (SValueNode*)node->pLeft));
|
SCL_ERR_JRET(
|
||||||
|
sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight), (SValueNode *)node->pLeft));
|
||||||
}
|
}
|
||||||
} else if (node->pRight && SCL_IS_NOTNULL_CONST_NODE(node->pRight)) {
|
} else if (node->pRight && SCL_IS_NOTNULL_CONST_NODE(node->pRight)) {
|
||||||
if (node->pLeft && (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pLeft)->resType.type)) {
|
if (node->pLeft && (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pLeft)->resType.type)) {
|
||||||
if (SCL_IS_VAR_VALUE_NODE(node->pRight)) {
|
if (SCL_IS_VAR_VALUE_NODE(node->pRight)) {
|
||||||
SCL_ERR_JRET(sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight), (SValueNode*)node->pRight));
|
SCL_ERR_JRET(sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, node->pRight),
|
||||||
|
(SValueNode *)node->pRight));
|
||||||
} else if (QUERY_NODE_NODE_LIST == node->pRight->type) {
|
} else if (QUERY_NODE_NODE_LIST == node->pRight->type) {
|
||||||
SNode* pNode;
|
SNode *pNode;
|
||||||
FOREACH(pNode, ((SNodeListNode*)node->pRight)->pNodeList) {
|
FOREACH(pNode, ((SNodeListNode *)node->pRight)->pNodeList) {
|
||||||
if (SCL_IS_VAR_VALUE_NODE(pNode)) {
|
if (SCL_IS_VAR_VALUE_NODE(pNode)) {
|
||||||
SCL_ERR_JRET(sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, pNode), (SValueNode*)pNode));
|
SCL_ERR_JRET(
|
||||||
|
sclConvertToTsValueNode(sclGetOpValueNodeTsPrecision(node->pLeft, pNode), (SValueNode *)pNode));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1086,8 +1090,6 @@ _return:
|
||||||
return DEAL_RES_ERROR;
|
return DEAL_RES_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) {
|
int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -1096,19 +1098,20 @@ int32_t sclConvertCaseWhenValueNodeTs(SCaseWhenNode *node, SScalarCtx *ctx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SCL_IS_VAR_VALUE_NODE(node->pCase)) {
|
if (SCL_IS_VAR_VALUE_NODE(node->pCase)) {
|
||||||
SNode* pNode;
|
SNode *pNode;
|
||||||
FOREACH(pNode, node->pWhenThenList) {
|
FOREACH(pNode, node->pWhenThenList) {
|
||||||
SExprNode *pExpr = (SExprNode *)((SWhenThenNode *)pNode)->pWhen;
|
SExprNode *pExpr = (SExprNode *)((SWhenThenNode *)pNode)->pWhen;
|
||||||
if (TSDB_DATA_TYPE_TIMESTAMP == pExpr->resType.type) {
|
if (TSDB_DATA_TYPE_TIMESTAMP == pExpr->resType.type) {
|
||||||
SCL_ERR_JRET(sclConvertToTsValueNode(pExpr->resType.precision, (SValueNode*)node->pCase));
|
SCL_ERR_JRET(sclConvertToTsValueNode(pExpr->resType.precision, (SValueNode *)node->pCase));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else if (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pCase)->resType.type) {
|
} else if (TSDB_DATA_TYPE_TIMESTAMP == ((SExprNode *)node->pCase)->resType.type) {
|
||||||
SNode* pNode;
|
SNode *pNode;
|
||||||
FOREACH(pNode, node->pWhenThenList) {
|
FOREACH(pNode, node->pWhenThenList) {
|
||||||
if (SCL_IS_VAR_VALUE_NODE(((SWhenThenNode *)pNode)->pWhen)) {
|
if (SCL_IS_VAR_VALUE_NODE(((SWhenThenNode *)pNode)->pWhen)) {
|
||||||
SCL_ERR_JRET(sclConvertToTsValueNode(((SExprNode *)node->pCase)->resType.precision, (SValueNode*)((SWhenThenNode *)pNode)->pWhen));
|
SCL_ERR_JRET(sclConvertToTsValueNode(((SExprNode *)node->pCase)->resType.precision,
|
||||||
|
(SValueNode *)((SWhenThenNode *)pNode)->pWhen));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1271,7 +1274,6 @@ EDealRes sclRewriteLogic(SNode **pNode, SScalarCtx *ctx) {
|
||||||
return DEAL_RES_CONTINUE;
|
return DEAL_RES_CONTINUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
|
EDealRes sclRewriteOperator(SNode **pNode, SScalarCtx *ctx) {
|
||||||
SOperatorNode *node = (SOperatorNode *)*pNode;
|
SOperatorNode *node = (SOperatorNode *)*pNode;
|
||||||
|
|
||||||
|
|
|
@ -732,6 +732,7 @@
|
||||||
,,y,script,./test.sh -f tsim/user/privilege_db.sim
|
,,y,script,./test.sh -f tsim/user/privilege_db.sim
|
||||||
,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
|
,,y,script,./test.sh -f tsim/user/privilege_sysinfo.sim
|
||||||
,,y,script,./test.sh -f tsim/user/privilege_topic.sim
|
,,y,script,./test.sh -f tsim/user/privilege_topic.sim
|
||||||
|
,,y,script,./test.sh -f tsim/user/privilege_table.sim
|
||||||
,,y,script,./test.sh -f tsim/db/alter_option.sim
|
,,y,script,./test.sh -f tsim/db/alter_option.sim
|
||||||
,,y,script,./test.sh -f tsim/db/alter_replica_31.sim
|
,,y,script,./test.sh -f tsim/db/alter_replica_31.sim
|
||||||
,,y,script,./test.sh -f tsim/db/basic1.sim
|
,,y,script,./test.sh -f tsim/db/basic1.sim
|
||||||
|
|
|
@ -0,0 +1,302 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
print =============== init env
|
||||||
|
sql drop database if exists test;
|
||||||
|
sql create database test vgroups 1;
|
||||||
|
sql use test;
|
||||||
|
sql create stable st1(ts timestamp, i int) tags(id int, loc varchar(20));
|
||||||
|
sql create table st1s1 using st1 tags(1, 'beijing');
|
||||||
|
sql create table st1s2 using st1 tags(2, 'shanghai');
|
||||||
|
sql insert into st1s1 values(now, 1) st1s2 values(now, 2);
|
||||||
|
sql create stable st2(ts timestamp, i int) tags(id int, loc varchar(20));
|
||||||
|
sql create table st2s1 using st2 tags(1, 'beijing');
|
||||||
|
sql create table st2s2 using st2 tags(2, 'shanghai');
|
||||||
|
sql insert into st2s1 values(now, 1) st2s2 values(now, 2);
|
||||||
|
sql create user wxy pass 'taosdata';
|
||||||
|
|
||||||
|
print =============== case 1: database unauthorized and table unauthorized
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql_error select * from test.st1;
|
||||||
|
sql_error insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
|
||||||
|
print =============== case 2: database unauthorized and table read privilege
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql grant read on test.st1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 3: database unauthorized and table read privilege with condition
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke read on test.st1 from wxy;
|
||||||
|
sql grant read on test.st1 with id = 1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 1 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st1s1 values(now, 10);
|
||||||
|
sql_error insert into test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 4: database unauthorized and table write privilege
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke read on test.st1 with id = 1 from wxy;
|
||||||
|
sql grant write on test.st1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql_error select tbname, * from test.st1;
|
||||||
|
sql insert into test.st1s1 values(now, 10);
|
||||||
|
sql insert into test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 5: database unauthorized and table write privilege with condition
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke write on test.st1 from wxy;
|
||||||
|
sql grant write on test.st1 with id = 1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql_error select tbname, * from test.st1;
|
||||||
|
sql insert into test.st1s1 values(now, 10);
|
||||||
|
sql insert into test.st1s3 using test.st1 tags(1, 'dachang') values(now, 100);
|
||||||
|
sql_error insert into test.st1s2 values(now, 20);
|
||||||
|
sql_error insert into test.st1s4 using test.st1 tags(3, 'dachang') values(now, 300);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 6: database read privilege and table unauthorized
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke write on test.st1 with id = 1 from wxy;
|
||||||
|
sql grant read on test.* to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 6 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql select * from test.st2;
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 7: database read privilege and table read privilege
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql grant read on test.st1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 6 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql select * from test.st2;
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 8: database read privilege and table read privilege with condition
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke read on test.st1 from wxy;
|
||||||
|
sql grant read on test.st1 with id = 1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 4 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql select * from test.st2;
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 9: database read privilege and table write privilege
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke read on test.st1 with id = 1 from wxy;
|
||||||
|
sql grant write on test.st1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 6 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql select * from test.st2;
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 10: database read privilege and table write privilege with condition
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke write on test.st1 from wxy;
|
||||||
|
sql grant write on test.st1 with id = 1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 8 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql insert into test.st1s1 values(now, 10);
|
||||||
|
sql_error insert into test.st1s2 values(now, 20);
|
||||||
|
sql select * from test.st2;
|
||||||
|
if $rows != 2 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql_error insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 11: database write privilege and table unauthorized
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke read on test.* from wxy;
|
||||||
|
sql revoke write on test.st1 with id = 1 from wxy;
|
||||||
|
sql grant write on test.* to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql_error select * from test.st1;
|
||||||
|
sql insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 12: database write privilege and table read privilege
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql grant read on test.st1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 11 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 13: database write privilege and table read privilege with condition
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke read on test.st1 from wxy;
|
||||||
|
sql grant read on test.st1 with id = 1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql select * from test.st1;
|
||||||
|
if $rows != 8 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
sql insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 14: database write privilege and table write privilege
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke read on test.st1 with id = 1 from wxy;
|
||||||
|
sql grant write on test.st1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql_error select * from test.st1;
|
||||||
|
sql insert into test.st1s1 values(now, 10) test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
print =============== case 15: database write privilege and table write privilege with condition
|
||||||
|
sql close
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
sql revoke write on test.st1 from wxy;
|
||||||
|
sql grant write on test.st1 with id = 1 to wxy;
|
||||||
|
|
||||||
|
sql close
|
||||||
|
sql connect wxy
|
||||||
|
|
||||||
|
sql reset query cache;
|
||||||
|
sql_error select * from test.st1;
|
||||||
|
sql insert into test.st1s1 values(now, 10);
|
||||||
|
sql_error insert into test.st1s2 values(now, 20);
|
||||||
|
sql_error select * from test.st2;
|
||||||
|
sql insert into test.st2s1 values(now, 10) test.st2s2 values(now, 20);
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
|
@ -3,6 +3,7 @@
|
||||||
./test.sh -f tsim/user/privilege_db.sim
|
./test.sh -f tsim/user/privilege_db.sim
|
||||||
./test.sh -f tsim/user/privilege_sysinfo.sim
|
./test.sh -f tsim/user/privilege_sysinfo.sim
|
||||||
./test.sh -f tsim/user/privilege_topic.sim
|
./test.sh -f tsim/user/privilege_topic.sim
|
||||||
|
./test.sh -f tsim/user/privilege_table.sim
|
||||||
./test.sh -f tsim/db/alter_option.sim
|
./test.sh -f tsim/db/alter_option.sim
|
||||||
rem ./test.sh -f tsim/db/alter_replica_13.sim
|
rem ./test.sh -f tsim/db/alter_replica_13.sim
|
||||||
./test.sh -f tsim/db/alter_replica_31.sim
|
./test.sh -f tsim/db/alter_replica_31.sim
|
||||||
|
|
Loading…
Reference in New Issue