fix invalid write
This commit is contained in:
parent
72ff3200bc
commit
fee2fceaf2
|
@ -895,7 +895,7 @@ void tFreeSShowReq(SShowReq* pReq);
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t showId;
|
int64_t showId;
|
||||||
STableMetaRsp tableMeta;
|
STableMetaRsp tableMeta;
|
||||||
} SShowRsp;
|
} SShowRsp, SVShowTablesRsp;
|
||||||
|
|
||||||
int32_t tSerializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
|
int32_t tSerializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
|
||||||
int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
|
int32_t tDeserializeSShowRsp(void* buf, int32_t bufLen, SShowRsp* pRsp);
|
||||||
|
@ -1428,11 +1428,6 @@ typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
} SVShowTablesReq;
|
} SVShowTablesReq;
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t id;
|
|
||||||
STableMetaRsp metaInfo;
|
|
||||||
} SVShowTablesRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
int32_t id;
|
int32_t id;
|
||||||
|
@ -1637,7 +1632,7 @@ int32_t tDeserializeSClientHbBatchRsp(void* buf, int32_t bufLen, SClientHbBatchR
|
||||||
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
|
static FORCE_INLINE int32_t tEncodeSKv(SCoder* pEncoder, const SKv* pKv) {
|
||||||
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
|
if (tEncodeI32(pEncoder, pKv->key) < 0) return -1;
|
||||||
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
|
if (tEncodeI32(pEncoder, pKv->valueLen) < 0) return -1;
|
||||||
if (tEncodeCStrWithLen(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
|
if (tEncodeBinary(pEncoder, (const char*)pKv->value, pKv->valueLen) < 0) return -1;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1485,7 +1485,7 @@ int32_t tSerializeSShowReq(void *buf, int32_t bufLen, SShowReq *pReq) {
|
||||||
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
|
||||||
if (tEncodeI32(&encoder, pReq->payloadLen) < 0) return -1;
|
if (tEncodeI32(&encoder, pReq->payloadLen) < 0) return -1;
|
||||||
if (pReq->payloadLen > 0) {
|
if (pReq->payloadLen > 0) {
|
||||||
if (tEncodeCStr(&encoder, pReq->payload) < 0) return -1;
|
if (tEncodeBinary(&encoder, pReq->payload, pReq->payloadLen) < 0) return -1;
|
||||||
}
|
}
|
||||||
tEndEncode(&encoder);
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
@ -1706,12 +1706,13 @@ void tFreeSShowRsp(SShowRsp *pRsp) { tFreeSTableMetaRsp(&pRsp->tableMeta); }
|
||||||
|
|
||||||
int32_t tSerializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq) {
|
int32_t tSerializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq) {
|
||||||
int32_t headLen = sizeof(SMsgHead);
|
int32_t headLen = sizeof(SMsgHead);
|
||||||
SCoder encoder = {0};
|
if (buf != NULL) {
|
||||||
tCoderInit(&encoder, TD_LITTLE_ENDIAN, (char *)buf + headLen, bufLen - headLen, TD_ENCODER);
|
buf = (char *)buf + headLen;
|
||||||
|
bufLen -= headLen;
|
||||||
|
}
|
||||||
|
|
||||||
SMsgHead *pHead = &pReq->header;
|
SCoder encoder = {0};
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
tCoderInit(&encoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_ENCODER);
|
||||||
pHead->contLen = htonl(pHead->vgId);
|
|
||||||
|
|
||||||
if (tStartEncode(&encoder) < 0) return -1;
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1;
|
if (tEncodeCStr(&encoder, pReq->dbFName) < 0) return -1;
|
||||||
|
@ -1720,16 +1721,25 @@ int32_t tSerializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq)
|
||||||
|
|
||||||
int32_t tlen = encoder.pos;
|
int32_t tlen = encoder.pos;
|
||||||
tCoderClear(&encoder);
|
tCoderClear(&encoder);
|
||||||
return tlen;
|
|
||||||
|
if (buf != NULL) {
|
||||||
|
SMsgHead *pHead = (SMsgHead *)((char *)buf - headLen);
|
||||||
|
pHead->vgId = htonl(pReq->header.vgId);
|
||||||
|
pHead->contLen = htonl(tlen + headLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
return tlen + headLen;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq) {
|
int32_t tDeserializeSTableInfoReq(void *buf, int32_t bufLen, STableInfoReq *pReq) {
|
||||||
SMsgHead *pHead = &pReq->header;
|
int32_t headLen = sizeof(SMsgHead);
|
||||||
pHead->vgId = htonl(pHead->vgId);
|
|
||||||
pHead->contLen = htonl(pHead->vgId);
|
SMsgHead *pHead = buf;
|
||||||
|
pHead->vgId = pReq->header.vgId;
|
||||||
|
pHead->contLen = pReq->header.contLen;
|
||||||
|
|
||||||
SCoder decoder = {0};
|
SCoder decoder = {0};
|
||||||
tCoderInit(&decoder, TD_LITTLE_ENDIAN, buf, bufLen, TD_DECODER);
|
tCoderInit(&decoder, TD_LITTLE_ENDIAN, (char *)buf + headLen, bufLen - headLen, TD_DECODER);
|
||||||
|
|
||||||
if (tStartDecode(&decoder) < 0) return -1;
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1;
|
if (tDecodeCStrTo(&decoder, pReq->dbFName) < 0) return -1;
|
||||||
|
|
|
@ -120,6 +120,7 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
|
||||||
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
SShowMgmt *pMgmt = &pMnode->showMgmt;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SShowReq showReq = {0};
|
SShowReq showReq = {0};
|
||||||
|
SShowRsp showRsp = {0};
|
||||||
|
|
||||||
if (tDeserializeSShowReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &showReq) != 0) {
|
if (tDeserializeSShowReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &showReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
@ -142,7 +143,6 @@ static int32_t mndProcessShowReq(SMnodeMsg *pReq) {
|
||||||
goto SHOW_OVER;
|
goto SHOW_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SShowRsp showRsp = {0};
|
|
||||||
showRsp.showId = pShow->id;
|
showRsp.showId = pShow->id;
|
||||||
showRsp.tableMeta.pSchemas = calloc(TSDB_MAX_COLUMNS, sizeof(SSchema));
|
showRsp.tableMeta.pSchemas = calloc(TSDB_MAX_COLUMNS, sizeof(SSchema));
|
||||||
if (showRsp.tableMeta.pSchemas == NULL) {
|
if (showRsp.tableMeta.pSchemas == NULL) {
|
||||||
|
|
|
@ -1183,7 +1183,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
|
||||||
taosRLockLatch(&pStb->lock);
|
taosRLockLatch(&pStb->lock);
|
||||||
|
|
||||||
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
|
int32_t totalCols = pStb->numOfColumns + pStb->numOfTags;
|
||||||
pRsp->pSchemas = malloc(totalCols * sizeof(SSchema));
|
pRsp->pSchemas = calloc(totalCols, sizeof(SSchema));
|
||||||
if (pRsp->pSchemas == NULL) {
|
if (pRsp->pSchemas == NULL) {
|
||||||
taosRUnLockLatch(&pStb->lock);
|
taosRUnLockLatch(&pStb->lock);
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -1222,6 +1222,7 @@ static int32_t mndBuildStbSchemaImp(SDbObj *pDb, SStbObj *pStb, const char *tbNa
|
||||||
}
|
}
|
||||||
|
|
||||||
taosRUnLockLatch(&pStb->lock);
|
taosRUnLockLatch(&pStb->lock);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
|
static int32_t mndBuildStbSchema(SMnode *pMnode, const char *dbFName, const char *tbName, STableMetaRsp *pRsp) {
|
||||||
|
@ -1253,7 +1254,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
||||||
STableInfoReq infoReq = {0};
|
STableInfoReq infoReq = {0};
|
||||||
STableMetaRsp metaRsp = {0};
|
STableMetaRsp metaRsp = {0};
|
||||||
|
|
||||||
if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) {
|
if (tDeserializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) {
|
||||||
terrno = TSDB_CODE_INVALID_MSG;
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
goto RETRIEVE_META_OVER;
|
goto RETRIEVE_META_OVER;
|
||||||
}
|
}
|
||||||
|
@ -1269,7 +1270,7 @@ static int32_t mndProcessStbMetaReq(SMnodeMsg *pReq) {
|
||||||
goto RETRIEVE_META_OVER;
|
goto RETRIEVE_META_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pRsp = malloc(rspLen);
|
void *pRsp = rpcMallocCont(rspLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto RETRIEVE_META_OVER;
|
goto RETRIEVE_META_OVER;
|
||||||
|
|
|
@ -34,7 +34,7 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pTopic, SMqTopicObj
|
||||||
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessDropTopicMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg);
|
static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq);
|
||||||
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
static int32_t mndGetTopicMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta);
|
||||||
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
static int32_t mndRetrieveTopic(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows);
|
||||||
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
|
static void mndCancelGetNextTopic(SMnode *pMnode, void *pIter);
|
||||||
|
@ -334,11 +334,16 @@ static int32_t mndProcessDropTopicInRsp(SMnodeMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessTopicMetaMsg(SMnodeMsg *pReq) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pReq->pMnode;
|
||||||
STableInfoReq *pInfo = pMsg->rpcMsg.pCont;
|
STableInfoReq infoReq = {0};
|
||||||
|
|
||||||
mDebug("topic:%s, start to retrieve meta", pInfo->tbName);
|
if (tSerializeSTableInfoReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &infoReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mDebug("topic:%s, start to retrieve meta", infoReq.tbName);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
|
SDbObj *pDb = mndAcquireDbByTopic(pMnode, pInfo->tableFname);
|
||||||
|
|
|
@ -339,10 +339,13 @@ TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) {
|
||||||
|
|
||||||
// ----- meta ------
|
// ----- meta ------
|
||||||
{
|
{
|
||||||
int32_t contLen = sizeof(STableInfoReq);
|
STableInfoReq infoReq = {0};
|
||||||
STableInfoReq* pReq = (STableInfoReq*)rpcMallocCont(contLen);
|
strcpy(infoReq.dbFName, dbname);
|
||||||
strcpy(pReq->dbFName, dbname);
|
strcpy(infoReq.tbName, "stb");
|
||||||
strcpy(pReq->tbName, "stb");
|
|
||||||
|
int32_t contLen = tSerializeSTableInfoReq(NULL, 0, &infoReq);
|
||||||
|
void* pReq = rpcMallocCont(contLen);
|
||||||
|
tSerializeSTableInfoReq(pReq, contLen, &infoReq);
|
||||||
|
|
||||||
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
|
SRpcMsg* pMsg = test.SendReq(TDMT_MND_STB_META, pReq, contLen);
|
||||||
ASSERT_NE(pMsg, nullptr);
|
ASSERT_NE(pMsg, nullptr);
|
||||||
|
|
|
@ -71,7 +71,6 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
STableInfoReq * pReq = (STableInfoReq *)(pMsg->pCont);
|
|
||||||
STbCfg * pTbCfg = NULL;
|
STbCfg * pTbCfg = NULL;
|
||||||
STbCfg * pStbCfg = NULL;
|
STbCfg * pStbCfg = NULL;
|
||||||
tb_uid_t uid;
|
tb_uid_t uid;
|
||||||
|
@ -79,12 +78,19 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
int32_t nTagCols;
|
int32_t nTagCols;
|
||||||
SSchemaWrapper *pSW = NULL;
|
SSchemaWrapper *pSW = NULL;
|
||||||
STableMetaRsp *pTbMetaMsg = NULL;
|
STableMetaRsp *pTbMetaMsg = NULL;
|
||||||
SSchema * pTagSchema;
|
STableMetaRsp metaRsp = {0};
|
||||||
|
SSchema *pTagSchema;
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
int msgLen = 0;
|
int msgLen = 0;
|
||||||
int32_t code = TSDB_CODE_VND_APP_ERROR;
|
int32_t code = TSDB_CODE_VND_APP_ERROR;
|
||||||
|
|
||||||
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, pReq->tbName, &uid);
|
STableInfoReq infoReq = {0};
|
||||||
|
if (tDeserializeSTableInfoReq(pMsg->pCont, pMsg->contLen, &infoReq) != 0) {
|
||||||
|
terrno = TSDB_CODE_INVALID_MSG;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTbCfg = metaGetTbInfoByName(pVnode->pMeta, infoReq.tbName, &uid);
|
||||||
if (pTbCfg == NULL) {
|
if (pTbCfg == NULL) {
|
||||||
code = TSDB_CODE_VND_TB_NOT_EXIST;
|
code = TSDB_CODE_VND_TB_NOT_EXIST;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -114,16 +120,15 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
pTagSchema = NULL;
|
pTagSchema = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
STableMetaRsp metaRsp = {0};
|
metaRsp.pSchemas = calloc(nCols + nTagCols, sizeof(SSchema));
|
||||||
metaRsp.pSchemas = malloc( sizeof(SSchema) * (nCols + nTagCols));
|
|
||||||
if (metaRsp.pSchemas == NULL) {
|
if (metaRsp.pSchemas == NULL) {
|
||||||
code = TSDB_CODE_VND_OUT_OF_MEMORY;
|
code = TSDB_CODE_VND_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
metaRsp.dbId = htobe64(pVnode->config.dbId);
|
metaRsp.dbId = htobe64(pVnode->config.dbId);
|
||||||
memcpy(metaRsp.dbFName, pReq->dbFName, sizeof(metaRsp.dbFName));
|
memcpy(metaRsp.dbFName, infoReq.dbFName, sizeof(metaRsp.dbFName));
|
||||||
strcpy(metaRsp.tbName, pReq->tbName);
|
strcpy(metaRsp.tbName, infoReq.tbName);
|
||||||
if (pTbCfg->type == META_CHILD_TABLE) {
|
if (pTbCfg->type == META_CHILD_TABLE) {
|
||||||
strcpy(metaRsp.stbName, pStbCfg->name);
|
strcpy(metaRsp.stbName, pStbCfg->name);
|
||||||
metaRsp.suid = pTbCfg->ctbCfg.suid;
|
metaRsp.suid = pTbCfg->ctbCfg.suid;
|
||||||
|
@ -148,11 +153,12 @@ static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *pRsp = malloc(rspLen);
|
void *pRsp = rpcMallocCont(rspLen);
|
||||||
if (pRsp == NULL) {
|
if (pRsp == NULL) {
|
||||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto _exit;
|
goto _exit;
|
||||||
}
|
}
|
||||||
|
tSerializeSTableMetaRsp(pRsp, rspLen, &metaRsp);
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
|
|
@ -30,40 +30,33 @@ int32_t queryBuildTableMetaReqMsg(void *input, char **msg, int32_t msgSize, int3
|
||||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t estimateSize = sizeof(STableInfoReq);
|
STableInfoReq infoReq = {0};
|
||||||
if (NULL == *msg || msgSize < estimateSize) {
|
infoReq.header.vgId = pInput->vgId;
|
||||||
tfree(*msg);
|
|
||||||
*msg = rpcMallocCont(estimateSize);
|
|
||||||
if (NULL == *msg) {
|
|
||||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
STableInfoReq *bMsg = (STableInfoReq *)*msg;
|
|
||||||
|
|
||||||
bMsg->header.vgId = htonl(pInput->vgId);
|
|
||||||
|
|
||||||
if (pInput->dbFName) {
|
if (pInput->dbFName) {
|
||||||
tstrncpy(bMsg->dbFName, pInput->dbFName, tListLen(bMsg->dbFName));
|
tstrncpy(infoReq.dbFName, pInput->dbFName, TSDB_DB_FNAME_LEN);
|
||||||
}
|
}
|
||||||
|
tstrncpy(infoReq.tbName, pInput->tbName, TSDB_TABLE_NAME_LEN);
|
||||||
|
|
||||||
tstrncpy(bMsg->tbName, pInput->tbName, tListLen(bMsg->tbName));
|
int32_t bufLen = tSerializeSTableInfoReq(NULL, 0, &infoReq);
|
||||||
|
void *pBuf = rpcMallocCont(bufLen);
|
||||||
|
tSerializeSTableInfoReq(pBuf, bufLen, &infoReq);
|
||||||
|
|
||||||
|
*msg = pBuf;
|
||||||
|
*msgLen = bufLen;
|
||||||
|
|
||||||
*msgLen = (int32_t)sizeof(*bMsg);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
int32_t queryBuildUseDbMsg(void *input, char **msg, int32_t msgSize, int32_t *msgLen) {
|
||||||
if (NULL == input || NULL == msg || NULL == msgLen) {
|
SBuildUseDBInput *pInput = input;
|
||||||
|
if (NULL == pInput || NULL == msg || NULL == msgLen) {
|
||||||
return TSDB_CODE_TSC_INVALID_INPUT;
|
return TSDB_CODE_TSC_INVALID_INPUT;
|
||||||
}
|
}
|
||||||
|
|
||||||
SBuildUseDBInput *bInput = input;
|
|
||||||
|
|
||||||
SUseDbReq usedbReq = {0};
|
SUseDbReq usedbReq = {0};
|
||||||
strncpy(usedbReq.db, bInput->db, sizeof(usedbReq.db));
|
strncpy(usedbReq.db, pInput->db, sizeof(usedbReq.db));
|
||||||
usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
|
usedbReq.db[sizeof(usedbReq.db) - 1] = 0;
|
||||||
usedbReq.vgVersion = bInput->vgVersion;
|
usedbReq.vgVersion = pInput->vgVersion;
|
||||||
|
|
||||||
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
|
int32_t bufLen = tSerializeSUseDbReq(NULL, 0, &usedbReq);
|
||||||
void *pBuf = rpcMallocCont(bufLen);
|
void *pBuf = rpcMallocCont(bufLen);
|
||||||
|
@ -99,15 +92,14 @@ int32_t queryProcessUseDBRsp(void *output, char *msg, int32_t msgSize) {
|
||||||
|
|
||||||
memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN);
|
memcpy(pOut->db, usedbRsp.db, TSDB_DB_FNAME_LEN);
|
||||||
pOut->dbId = usedbRsp.uid;
|
pOut->dbId = usedbRsp.uid;
|
||||||
pOut->dbVgroup->vgVersion = usedbRsp.vgVersion;
|
|
||||||
pOut->dbVgroup->hashMethod = usedbRsp.hashMethod;
|
|
||||||
|
|
||||||
pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
|
pOut->dbVgroup = calloc(1, sizeof(SDBVgInfo));
|
||||||
if (NULL == pOut->dbVgroup) {
|
if (NULL == pOut->dbVgroup) {
|
||||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||||
goto PROCESS_USEDB_OVER;
|
goto PROCESS_USEDB_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pOut->dbVgroup->vgVersion = usedbRsp.vgVersion;
|
||||||
|
pOut->dbVgroup->hashMethod = usedbRsp.hashMethod;
|
||||||
pOut->dbVgroup->vgHash =
|
pOut->dbVgroup->vgHash =
|
||||||
taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
taosHashInit(usedbRsp.vgNum, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
|
||||||
if (NULL == pOut->dbVgroup->vgHash) {
|
if (NULL == pOut->dbVgroup->vgHash) {
|
||||||
|
|
|
@ -172,45 +172,54 @@ int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
|
int32_t qwBuildAndSendShowRsp(SRpcMsg *pMsg, int32_t code) {
|
||||||
int32_t numOfCols = 6;
|
int32_t numOfCols = 6;
|
||||||
int32_t msgSize = sizeof(SVShowTablesRsp) + sizeof(SSchema) * numOfCols;
|
SVShowTablesRsp showRsp = {0};
|
||||||
|
|
||||||
SVShowTablesRsp *pRsp = (SVShowTablesRsp *)rpcMallocCont(msgSize);
|
// showRsp.showId = 1;
|
||||||
|
showRsp.tableMeta.pSchemas = calloc(numOfCols, sizeof(SSchema));
|
||||||
|
if (showRsp.tableMeta.pSchemas == NULL) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t cols = 0;
|
int32_t cols = 0;
|
||||||
SSchema *pSchema = pRsp->metaInfo.pSchemas;
|
SSchema *pSchema = showRsp.tableMeta.pSchemas;
|
||||||
|
|
||||||
const SSchema *s = tGetTbnameColumnSchema();
|
const SSchema *s = tGetTbnameColumnSchema();
|
||||||
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "name");
|
*pSchema = createSchema(s->type, s->bytes, ++cols, "name");
|
||||||
pSchema++;
|
pSchema++;
|
||||||
|
|
||||||
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
|
int32_t type = TSDB_DATA_TYPE_TIMESTAMP;
|
||||||
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "created");
|
*pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "created");
|
||||||
pSchema++;
|
pSchema++;
|
||||||
|
|
||||||
type = TSDB_DATA_TYPE_SMALLINT;
|
type = TSDB_DATA_TYPE_SMALLINT;
|
||||||
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "columns");
|
*pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "columns");
|
||||||
pSchema++;
|
pSchema++;
|
||||||
|
|
||||||
*pSchema = createSchema(s->type, htonl(s->bytes), htonl(++cols), "stable");
|
*pSchema = createSchema(s->type, s->bytes, ++cols, "stable");
|
||||||
pSchema++;
|
pSchema++;
|
||||||
|
|
||||||
type = TSDB_DATA_TYPE_BIGINT;
|
type = TSDB_DATA_TYPE_BIGINT;
|
||||||
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "uid");
|
*pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "uid");
|
||||||
pSchema++;
|
pSchema++;
|
||||||
|
|
||||||
type = TSDB_DATA_TYPE_INT;
|
type = TSDB_DATA_TYPE_INT;
|
||||||
*pSchema = createSchema(type, htonl(tDataTypes[type].bytes), htonl(++cols), "vgId");
|
*pSchema = createSchema(type, tDataTypes[type].bytes, ++cols, "vgId");
|
||||||
|
|
||||||
assert(cols == numOfCols);
|
assert(cols == numOfCols);
|
||||||
pRsp->metaInfo.numOfColumns = htonl(cols);
|
showRsp.tableMeta.numOfColumns = cols;
|
||||||
|
|
||||||
|
int32_t bufLen = tSerializeSShowRsp(NULL, 0, &showRsp);
|
||||||
|
void *pBuf = rpcMallocCont(bufLen);
|
||||||
|
tSerializeSShowRsp(pBuf, bufLen, &showRsp);
|
||||||
|
|
||||||
SRpcMsg rpcMsg = {
|
SRpcMsg rpcMsg = {
|
||||||
.handle = pMsg->handle,
|
.handle = pMsg->handle,
|
||||||
.ahandle = pMsg->ahandle,
|
.ahandle = pMsg->ahandle,
|
||||||
.pCont = pRsp,
|
.pCont = pBuf,
|
||||||
.contLen = msgSize,
|
.contLen = bufLen,
|
||||||
.code = code,
|
.code = code,
|
||||||
};
|
};
|
||||||
|
|
||||||
rpcSendResponse(&rpcMsg);
|
rpcSendResponse(&rpcMsg);
|
||||||
|
|
Loading…
Reference in New Issue