feat: show create table
This commit is contained in:
parent
2dc7bcbefc
commit
40879a80fe
|
@ -168,7 +168,7 @@ typedef struct {
|
|||
int32_t vgId;
|
||||
char* dbFName;
|
||||
char* tbName;
|
||||
} SBuildTableMetaInput;
|
||||
} SBuildTableInput;
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
|
@ -667,6 +667,12 @@ int32_t tSerializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp
|
|||
|
||||
int32_t tDeserializeSQueryTableRsp(void* buf, int32_t bufLen, SQueryTableRsp* pRsp);
|
||||
|
||||
typedef struct {
|
||||
SMsgHead header;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
} STableCfgReq;
|
||||
|
||||
typedef struct {
|
||||
char tbName[TSDB_TABLE_NAME_LEN];
|
||||
char stbName[TSDB_TABLE_NAME_LEN];
|
||||
|
@ -684,6 +690,15 @@ typedef struct {
|
|||
SSchema* pSchemas;
|
||||
} STableCfg;
|
||||
|
||||
typedef STableCfg STableCfgRsp;
|
||||
|
||||
int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq);
|
||||
int32_t tDeserializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq);
|
||||
|
||||
int32_t tSerializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp);
|
||||
int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp);
|
||||
|
||||
|
||||
typedef struct {
|
||||
char db[TSDB_DB_FNAME_LEN];
|
||||
int32_t numOfVgroups;
|
||||
|
|
|
@ -131,6 +131,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "drop-topic", NULL, NULL)
|
||||
|
@ -171,6 +172,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_VND_UPDATE_TAG_VAL, "update-tag-val", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_CFG, "vnode-table-cfg", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
|
||||
|
|
|
@ -1757,6 +1757,61 @@ void tFreeSRetrieveFuncRsp(SRetrieveFuncRsp *pRsp) {
|
|||
taosArrayDestroy(pRsp->pFuncInfos);
|
||||
}
|
||||
|
||||
int32_t tSerializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
if (buf != NULL) {
|
||||
buf = (char *)buf + headLen;
|
||||
bufLen -= headLen;
|
||||
}
|
||||
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->tbName) < 0) return -1;
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
|
||||
if (buf != NULL) {
|
||||
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
|
||||
pHead->vgId = htonl(pReq->header.vgId);
|
||||
pHead->contLen = htonl(tlen + headLen);
|
||||
}
|
||||
|
||||
return tlen + headLen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSTableCfgReq(void *buf, int32_t bufLen, STableCfgReq *pReq) {
|
||||
int32_t headLen = sizeof(SMsgHead);
|
||||
|
||||
SMsgHead *pHead = buf;
|
||||
pHead->vgId = pReq->header.vgId;
|
||||
pHead->contLen = pReq->header.contLen;
|
||||
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, (char *)buf + headLen, bufLen - headLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->tbName) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp) {
|
||||
|
||||
}
|
||||
|
||||
int32_t tDeserializeSTableCfgRsp(void *buf, int32_t bufLen, STableCfgRsp *pRsp) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
int32_t tSerializeSCreateDbReq(void *buf, int32_t bufLen, SCreateDbReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
|
|
@ -180,6 +180,7 @@ SArray *mmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_STREAM, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -330,6 +330,7 @@ SArray *vmGetMsgHandles() {
|
|||
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_TABLE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
|
||||
|
|
|
@ -62,6 +62,7 @@ int32_t mndInitStb(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_STB_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_DROP_STB_RSP, mndTransProcessRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_META, mndProcessTableMetaReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_TABLE_CFG, mndProcessTableCfgReq);
|
||||
|
||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_STB, mndRetrieveStb);
|
||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STB, mndCancelGetNextStb);
|
||||
|
|
|
@ -124,6 +124,102 @@ _exit:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||
STableCfgReq cfgReq = {0};
|
||||
STableCfgRsp cfgRsp = {0};
|
||||
SMetaReader mer1 = {0};
|
||||
SMetaReader mer2 = {0};
|
||||
char tableFName[TSDB_TABLE_FNAME_LEN];
|
||||
SRpcMsg rpcMsg;
|
||||
int32_t code = 0;
|
||||
int32_t rspLen = 0;
|
||||
void *pRsp = NULL;
|
||||
SSchemaWrapper schema = {0};
|
||||
SSchemaWrapper schemaTag = {0};
|
||||
|
||||
// decode req
|
||||
if (tDeserializeSTableCfgReq(pMsg->pCont, pMsg->contLen, &cfgReq) != 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
strcpy(cfgRsp.tbName, cfgReq.tbName);
|
||||
memcpy(cfgRsp.dbFName, cfgReq.dbFName, sizeof(cfgRsp.dbFName));
|
||||
|
||||
sprintf(tableFName, "%s.%s", cfgReq.dbFName, cfgReq.tbName);
|
||||
code = vnodeValidateTableHash(pVnode, tableFName);
|
||||
if (code) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// query meta
|
||||
metaReaderInit(&mer1, pVnode->pMeta, 0);
|
||||
|
||||
if (metaGetTableEntryByName(&mer1, cfgReq.tbName) < 0) {
|
||||
code = terrno;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cfgRsp.tableType = mer1.me.type;
|
||||
|
||||
if (mer1.me.type == TSDB_SUPER_TABLE) {
|
||||
strcpy(cfgRsp.stbName, mer1.me.name);
|
||||
schema = mer1.me.stbEntry.schemaRow;
|
||||
schemaTag = mer1.me.stbEntry.schemaTag;
|
||||
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
|
||||
metaReaderInit(&mer2, pVnode->pMeta, 0);
|
||||
if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
||||
|
||||
strcpy(cfgRsp.stbName, mer2.me.name);
|
||||
schema = mer2.me.stbEntry.schemaRow;
|
||||
schemaTag = mer2.me.stbEntry.schemaTag;
|
||||
} else if (mer1.me.type == TSDB_NORMAL_TABLE) {
|
||||
schema = mer1.me.ntbEntry.schemaRow;
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
||||
cfgRsp.numOfTags = schemaTag.nCols;
|
||||
cfgRsp.numOfColumns = schema.nCols;
|
||||
cfgRsp.pSchemas = (SSchema *)taosMemoryMalloc(sizeof(SSchema) * (cfgRsp.numOfColumns + cfgRsp.numOfTags));
|
||||
|
||||
memcpy(cfgRsp.pSchemas, schema.pSchema, sizeof(SSchema) * schema.nCols);
|
||||
if (schemaTag.nCols) {
|
||||
memcpy(cfgRsp.pSchemas + schema.nCols, schemaTag.pSchema, sizeof(SSchema) * schemaTag.nCols);
|
||||
}
|
||||
|
||||
// encode and send response
|
||||
rspLen = tSerializeSTableCfgRsp(NULL, 0, &cfgRsp);
|
||||
if (rspLen < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
pRsp = rpcMallocCont(rspLen);
|
||||
if (pRsp == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _exit;
|
||||
}
|
||||
tSerializeSTableCfgRsp(pRsp, rspLen, &cfgRsp);
|
||||
|
||||
_exit:
|
||||
rpcMsg.info = pMsg->info;
|
||||
rpcMsg.pCont = pRsp;
|
||||
rpcMsg.contLen = rspLen;
|
||||
rpcMsg.code = code;
|
||||
|
||||
if (code) {
|
||||
qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code));
|
||||
}
|
||||
|
||||
tmsgSendRsp(&rpcMsg);
|
||||
|
||||
taosMemoryFree(cfgRsp.pSchemas);
|
||||
metaReaderClear(&mer2);
|
||||
metaReaderClear(&mer1);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
|
||||
pLoad->vgId = TD_VID(pVnode);
|
||||
pLoad->syncState = syncGetMyRole(pVnode->sync);
|
||||
|
|
|
@ -251,6 +251,8 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
|
|||
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
|
||||
case TDMT_VND_TABLE_META:
|
||||
return vnodeGetTableMeta(pVnode, pMsg);
|
||||
case TDMT_VND_TABLE_CFG:
|
||||
return vnodeGetTableCfg(pVnode, pMsg);
|
||||
case TDMT_VND_CONSUME:
|
||||
return tqProcessPollReq(pVnode->pTq, pMsg, pInfo->workerId);
|
||||
case TDMT_STREAM_TASK_RUN:
|
||||
|
|
|
@ -520,6 +520,7 @@ int32_t ctgDropTbIndexEnqueue(SCatalog* pCtg, SName* pName, bool syncOp);
|
|||
int32_t ctgOpDropTbIndex(SCtgCacheOperation *operation);
|
||||
int32_t ctgOpUpdateTbIndex(SCtgCacheOperation *operation);
|
||||
int32_t ctgOpClearCache(SCtgCacheOperation *operation);
|
||||
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, int32_t *tbType);
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -381,6 +381,23 @@ _return:
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbType(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, int32_t *tbType) {
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
CTG_ERR_RET(ctgReadTbTypeFromCache(pCtg, dbFName, pTableName->tname, tbType));
|
||||
if (*tbType > 0) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
STableMeta* pMeta = NULL;
|
||||
CTG_ERR_RET(catalogGetTableMeta(pCtg, pConn, pTableName, &pMeta));
|
||||
|
||||
*tbType = pMeta->tableType;
|
||||
taosMemoryFree(pMeta);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTbIndex(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pRes) {
|
||||
CTG_ERR_RET(ctgReadTbIndexFromCache(pCtg, pTableName, pRes));
|
||||
if (*pRes) {
|
||||
|
@ -419,6 +436,20 @@ _return:
|
|||
CTG_RET(code);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbCfg(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, STableCfg** pCfg) {
|
||||
int32_t tbType = 0;
|
||||
CTG_ERR_RET(ctgGetTbType(pCtg, pConn, pTableName, &tbType));
|
||||
|
||||
if (TSDB_SUPER_TABLE == tbType) {
|
||||
CTG_ERR_RET(ctgGetTableCfgFromMnode(pCtg, pConn, pTableName, pCfg, NULL));
|
||||
} else {
|
||||
SVgroupInfo vgroupInfo = {0};
|
||||
CTG_ERR_RET(catalogGetTableHashVgroup(pCtg, pConn, pTableName, &vgroupInfo));
|
||||
CTG_ERR_RET(ctgGetTableCfgFromVnode(pCtg, pConn, pTableName, &vgroupInfo, pCfg, NULL));
|
||||
}
|
||||
|
||||
CTG_RET(TSDB_CODE_SUCCESS);
|
||||
}
|
||||
|
||||
int32_t ctgGetTbDistVgInfo(SCatalog* pCtg, SRequestConnInfo *pConn, SName* pTableName, SArray** pVgList) {
|
||||
STableMeta *tbMeta = NULL;
|
||||
|
@ -1207,6 +1238,16 @@ _return:
|
|||
CTG_API_LEAVE(code);
|
||||
}
|
||||
|
||||
int32_t catalogGetTableCfg(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg** pCfg) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
if (NULL == pCtg || NULL == pConn || NULL == pTableName || NULL == pCfg) {
|
||||
CTG_API_LEAVE(TSDB_CODE_CTG_INVALID_INPUT);
|
||||
}
|
||||
|
||||
CTG_API_LEAVE(ctgGetTbCfg(pCtg, pConn, pTableName, pCfg, NULL));
|
||||
}
|
||||
|
||||
int32_t catalogGetUdfInfo(SCatalog* pCtg, SRequestConnInfo *pConn, const char* funcName, SFuncInfo* pInfo) {
|
||||
CTG_API_ENTER();
|
||||
|
||||
|
|
|
@ -540,10 +540,10 @@ int32_t ctgReadTbVerFromCache(SCatalog *pCtg, SName *pTableName, int32_t *sver,
|
|||
}
|
||||
|
||||
|
||||
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, int32_t *tbType) {
|
||||
int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tbName, int32_t *tbType) {
|
||||
SCtgDBCache *dbCache = NULL;
|
||||
SCtgTbCache *tbCache = NULL;
|
||||
CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tableName, &dbCache, &tbCache));
|
||||
CTG_ERR_RET(ctgAcquireTbMetaFromCache(pCtg, dbFName, tbName, &dbCache, &tbCache));
|
||||
if (NULL == tbCache) {
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -552,7 +552,7 @@ int32_t ctgReadTbTypeFromCache(SCatalog* pCtg, char* dbFName, char *tableName, i
|
|||
*tbType = tbCache->pMeta->tableType;
|
||||
ctgReleaseTbMetaToCache(pCtg, dbCache, tbCache);
|
||||
|
||||
ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tableName, *tbType, dbFName);
|
||||
ctgDebug("Got tb %s tbType %d from cache, dbFName:%s", tbName, *tbType, dbFName);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
|
|
@ -550,7 +550,7 @@ int32_t ctgGetUserDbAuthFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const
|
|||
|
||||
|
||||
int32_t ctgGetTbMetaFromMnodeImpl(SCatalog* pCtg, SRequestConnInfo *pConn, char *dbFName, char* tbName, STableMetaOutput* out, SCtgTask* pTask) {
|
||||
SBuildTableMetaInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
|
||||
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = tbName};
|
||||
char *msg = NULL;
|
||||
SEpSet *pVnodeEpSet = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
@ -608,7 +608,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
|
|||
|
||||
ctgDebug("try to get table meta from vnode, vgId:%d, tbFName:%s", vgroupInfo->vgId, tbFName);
|
||||
|
||||
SBuildTableMetaInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
|
||||
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = (char *)tNameGetTableName(pTableName)};
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
|
||||
|
@ -646,4 +646,83 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_VND_TABLE_CFG;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTableName, tbFName);
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
SBuildTableInput bInput = {.vgId = vgroupInfo->vgId, .dbFName = dbFName, .tbName = pTableName->tname};
|
||||
|
||||
ctgDebug("try to get table cfg from vnode, vgId:%d, tbFName:%s", vgroupInfo->vgId, tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
if (pTask) {
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = reqType,
|
||||
.pCont = msg,
|
||||
.contLen = msgLen,
|
||||
};
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
|
||||
|
||||
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask) {
|
||||
char *msg = NULL;
|
||||
int32_t msgLen = 0;
|
||||
int32_t reqType = TDMT_MND_TABLE_CFG;
|
||||
char tbFName[TSDB_TABLE_FNAME_LEN];
|
||||
tNameExtractFullName(pTableName, tbFName);
|
||||
void*(*mallocFp)(int32_t) = pTask ? taosMemoryMalloc : rpcMallocCont;
|
||||
char dbFName[TSDB_DB_FNAME_LEN];
|
||||
tNameGetFullDbName(pTableName, dbFName);
|
||||
SBuildTableInput bInput = {.vgId = 0, .dbFName = dbFName, .tbName = pTableName->tname};
|
||||
|
||||
ctgDebug("try to get table cfg from mnode, tbFName:%s", tbFName);
|
||||
|
||||
int32_t code = queryBuildMsg[TMSG_INDEX(reqType)](&bInput, &msg, 0, &msgLen, mallocFp);
|
||||
if (code) {
|
||||
ctgError("Build get tb cfg msg failed, code:%s, tbFName:%s", tstrerror(code), tbFName);
|
||||
CTG_ERR_RET(code);
|
||||
}
|
||||
|
||||
if (pTask) {
|
||||
CTG_ERR_RET(ctgUpdateMsgCtx(&pTask->msgCtx, reqType, NULL, (char*)tbFName));
|
||||
|
||||
CTG_RET(ctgAsyncSendMsg(pCtg, pConn, pTask, reqType, msg, msgLen));
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsg = {
|
||||
.msgType = reqType,
|
||||
.pCont = msg,
|
||||
.contLen = msgLen,
|
||||
};
|
||||
|
||||
SRpcMsg rpcRsp = {0};
|
||||
rpcSendRecv(pConn->pTrans, &pConn->mgmtEps, &rpcMsg, &rpcRsp);
|
||||
|
||||
CTG_ERR_RET(ctgProcessRspMsg(out, reqType, rpcRsp.pCont, rpcRsp.contLen, rpcRsp.code, (char*)tbFName));
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp) {
|
|||
}
|
||||
|
||||
int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
|
||||
SBuildTableMetaInput *pInput = input;
|
||||
SBuildTableInput *pInput = input;
|
||||
if (NULL == input || NULL == msg || NULL == msgLen) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
@ -221,6 +221,27 @@ int32_t queryBuildGetTbIndexMsg(void *input, char **msg, int32_t msgSize, int32_
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t queryBuildGetTbCfgMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen, void*(*mallcFp)(int32_t)) {
|
||||
if (NULL == msg || NULL == msgLen) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
SBuildTableInput *pInput = input;
|
||||
STableCfgReq cfgReq = {0};
|
||||
cfgReq.header.vgId = pInput->vgId;
|
||||
strcpy(cfgReq.dbFName, pInput->dbFName);
|
||||
strcpy(cfgReq.tbName, pInput->tbName);
|
||||
|
||||
int32_t bufLen = tSerializeSTableCfgReq(NULL, 0, &cfgReq);
|
||||
void *pBuf = (*mallcFp)(bufLen);
|
||||
tSerializeSTableCfgReq(pBuf, bufLen, &cfgReq);
|
||||
|
||||
*msg = pBuf;
|
||||
*msgLen = bufLen;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||
SUseDbOutput *pOut = output;
|
||||
|
@ -494,6 +515,24 @@ int32_t queryProcessGetTbIndexRsp(void *output, char *msg, int32_t msgSize) {
|
|||
}
|
||||
|
||||
|
||||
int32_t queryProcessGetTbCfgRsp(void *output, char *msg, int32_t msgSize) {
|
||||
if (NULL == output || NULL == msg || msgSize <= 0) {
|
||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||
}
|
||||
|
||||
STableCfgRsp *out = taosMemoryCalloc(1, sizeof(STableCfgRsp));
|
||||
if (tDeserializeSTableCfgRsp(msg, msgSize, out) != 0) {
|
||||
qError("tDeserializeSTableCfgRsp failed, msgSize:%d", msgSize);
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
|
||||
*(STableCfgRsp**)output = out;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
|
||||
void initQueryModuleMsgHandle() {
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryBuildTableMetaReqMsg;
|
||||
|
@ -504,6 +543,8 @@ void initQueryModuleMsgHandle() {
|
|||
queryBuildMsg[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryBuildRetrieveFuncMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryBuildGetUserAuthMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryBuildGetTbIndexMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
|
||||
queryBuildMsg[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryBuildGetTbCfgMsg;
|
||||
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_META)] = queryProcessTableMetaRsp;
|
||||
|
@ -514,6 +555,8 @@ void initQueryModuleMsgHandle() {
|
|||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_RETRIEVE_FUNC)] = queryProcessRetrieveFuncRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_USER_AUTH)] = queryProcessGetUserAuthRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_GET_TABLE_INDEX)] = queryProcessGetTbIndexRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_VND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
|
||||
queryProcessMsgRsp[TMSG_INDEX(TDMT_MND_TABLE_CFG)] = queryProcessGetTbCfgRsp;
|
||||
}
|
||||
|
||||
#pragma GCC diagnostic pop
|
||||
|
|
Loading…
Reference in New Issue