fix query crash

This commit is contained in:
Liu Jicong 2022-01-25 18:08:38 +08:00
parent 0ff19d6d5b
commit 5f7ae72271
5 changed files with 11 additions and 9 deletions

View File

@ -253,6 +253,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x03E4) #define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x03E4)
#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5) #define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6) #define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6)
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
// dnode // dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)

View File

@ -659,7 +659,8 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
if (buf == NULL) { if (buf == NULL) {
tscError("failed to malloc get subscribe ep buf"); tscError("failed to malloc get subscribe ep buf");
} }
buf->consumerId = htobe64(buf->consumerId); buf->consumerId = htobe64(tmq->consumerId);
strcpy(buf->cgroup, tmq->groupId);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
if (pRequest == NULL) { if (pRequest == NULL) {

View File

@ -536,7 +536,7 @@ TEST(testCase, create_topic_Test) {
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); //taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr); ASSERT_TRUE(pFields == nullptr);

View File

@ -66,13 +66,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->pCont; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp; SMqCMGetSubEpRsp rsp;
int64_t consumerId = be64toh(pReq->consumerId); int64_t consumerId = be64toh(pReq->consumerId);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
/*terrno = */ terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1; return -1;
} }
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
@ -190,7 +190,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
} }
mndReleaseTopic(pMnode, pTopic); /*mndReleaseTopic(pMnode, pTopic);*/
mndTransDrop(pTrans); mndTransDrop(pTrans);
} }
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
@ -621,14 +621,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
if (newSub) taosArrayDestroy(newSub); if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer); /*mndReleaseConsumer(pMnode, pConsumer);*/
return -1; return -1;
} }
if (newSub) taosArrayDestroy(newSub); if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer); /*mndReleaseConsumer(pMnode, pConsumer);*/
return 0; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) { static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {

View File

@ -237,7 +237,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
topicObj.createTime = taosGetTimestampMs(); topicObj.createTime = taosGetTimestampMs();
topicObj.updateTime = topicObj.createTime; topicObj.updateTime = topicObj.createTime;
topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TOPIC_FNAME_LEN);
topicObj.dbUid = pDb->uid; topicObj.dbUid = pDb->uid;
topicObj.version = 1; topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql); topicObj.sql = strdup(pCreate->sql);