fix query crash

This commit is contained in:
Liu Jicong 2022-01-25 16:33:21 +08:00
parent ca84e47a97
commit 0ff19d6d5b
6 changed files with 62 additions and 61 deletions

View File

@ -1533,9 +1533,7 @@ typedef struct SMqSetCVgReq {
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
uint32_t qmsgLen; char* qmsg;
void* qmsg;
//SSubQueryMsg msg;
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
@ -1567,7 +1565,6 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeFixedU32(buf, pReq->qmsgLen);
tlen += taosEncodeString(buf, (char*)pReq->qmsg); tlen += taosEncodeString(buf, (char*)pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen; return tlen;
@ -1582,7 +1579,6 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeFixedU32(buf, &pReq->qmsgLen);
buf = taosDecodeString(buf, (char**)&pReq->qmsg); buf = taosDecodeString(buf, (char**)&pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg); //buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf; return buf;

View File

@ -374,9 +374,17 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
tmq->status = 1;
int32_t sz = topic_list->cnt; int32_t sz = topic_list->cnt;
tmq->clientTopics = taosArrayInit(sz, sizeof(void*)); //destroy ex
taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
SCMSubscribeReq req;
req.topicNum = sz;
req.consumerId = tmq->consumerId;
req.consumerGroup = strdup(tmq->groupId);
req.topicNames = taosArrayInit(sz, sizeof(void*));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
char* topicName = topic_list->elems[i]; char* topicName = topic_list->elems[i];
@ -391,16 +399,21 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
} }
tNameExtractFullName(&name, topicFname); tNameExtractFullName(&name, topicFname);
tscDebug("subscribe topic: %s", topicFname); tscDebug("subscribe topic: %s", topicFname);
taosArrayPush(tmq->clientTopics, &topicFname); SMqClientTopic topic = {
.nextVgIdx = 0,
.sql = NULL,
.sqlLen = 0,
.topicId = 0,
.topicName = topicFname,
.vgs = NULL
};
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
taosArrayPush(tmq->clientTopics, &topic);
/*SMqClientTopic topic = {*/ /*SMqClientTopic topic = {*/
/*.*/ /*.*/
/*};*/ /*};*/
taosArrayPush(req.topicNames, &topicFname);
} }
SCMSubscribeReq req;
req.topicNum = taosArrayGetSize(tmq->clientTopics);
req.consumerId = tmq->consumerId;
req.consumerGroup = strdup(tmq->groupId);
req.topicNames = tmq->clientTopics;
int tlen = tSerializeSCMSubscribeReq(NULL, &req); int tlen = tSerializeSCMSubscribeReq(NULL, &req);
void* buf = malloc(tlen); void* buf = malloc(tlen);
@ -419,17 +432,17 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
_return: _return:
if (body != NULL) { if (sendInfo != NULL) {
destroySendMsgInfo(body); destroySendMsgInfo(sendInfo);
} }
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
@ -601,6 +614,8 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
} }
int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
tscDebug("tmq ask ep cb called");
bool set = false;
tmq_t* tmq = (tmq_t*)param; tmq_t* tmq = (tmq_t*)param;
SMqCMGetSubEpRsp rsp; SMqCMGetSubEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
@ -620,17 +635,16 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
.epSet = pVgEp->epSet .epSet = pVgEp->epSet
}; };
taosArrayPush(topic.vgs, &clientVg); taosArrayPush(topic.vgs, &clientVg);
set = true;
} }
taosArrayPush(tmq->clientTopics, &topic); taosArrayPush(tmq->clientTopics, &topic);
} }
if(set) tmq->status = 1;
// unlock // unlock
return 0; return 0;
} }
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
return NULL;
}
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
SMqConsumeReq req = {0}; SMqConsumeReq req = {0};
req.reqType = 1; req.reqType = 1;
@ -639,7 +653,7 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
tmq_message_t* tmq_message = NULL; tmq_message_t* tmq_message = NULL;
strcpy(req.cgroup, tmq->groupId); strcpy(req.cgroup, tmq->groupId);
if (taosArrayGetSize(tmq->clientTopics) == 0) { if (taosArrayGetSize(tmq->clientTopics) == 0 || tmq->status == 0) {
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen); SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
@ -667,7 +681,12 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
} }
SMqClientTopic* pTopic = taosArrayGetP(tmq->clientTopics, tmq->nextTopicIdx); if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
return NULL;
}
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
strcpy(req.topic, pTopic->topicName); strcpy(req.topic, pTopic->topicName);
int32_t nextVgIdx = pTopic->nextVgIdx; int32_t nextVgIdx = pTopic->nextVgIdx;

View File

@ -570,30 +570,30 @@ TEST(testCase, create_topic_Test) {
//taos_close(pConn); //taos_close(pConn);
//} //}
//TEST(testCase, tmq_subscribe_Test) { TEST(testCase, tmq_subscribe_Test) {
//TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
//assert(pConn != NULL); assert(pConn != NULL);
//TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
//if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
//printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
//} }
//taos_free_result(pRes); taos_free_result(pRes);
//tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
//tmq_conf_set(conf, "group.id", "tg1"); tmq_conf_set(conf, "group.id", "tg1");
//tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
//tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
//tmq_list_append(topic_list, "test_topic_1"); tmq_list_append(topic_list, "test_topic_1");
//tmq_subscribe(tmq, topic_list); tmq_subscribe(tmq, topic_list);
//while (1) { while (1) {
//tmq_message_t* msg = tmq_consume_poll(tmq, 0); tmq_message_t* msg = tmq_consume_poll(tmq, 0);
//printf("get msg\n"); printf("get msg\n");
//if (msg == NULL) break; if (msg == NULL) break;
//} }
//} }
TEST(testCase, tmq_consume_Test) { TEST(testCase, tmq_consume_Test) {
} }

View File

@ -363,9 +363,7 @@ typedef struct SMqConsumerEp {
int64_t consumerId; // -1 for unassigned int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs; int64_t lastConsumerHbTs;
int64_t lastVgHbTs; int64_t lastVgHbTs;
uint32_t qmsgLen;
char* qmsg; char* qmsg;
//SSubQueryMsg qExec;
} SMqConsumerEp; } SMqConsumerEp;
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
@ -375,8 +373,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
//tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); //tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
tlen += taosEncodeFixedU32(buf, pConsumerEp->qmsgLen); tlen += taosEncodeString(buf, pConsumerEp->qmsg);
tlen += taosEncodeBinary(buf, pConsumerEp->qmsg, pConsumerEp->qmsgLen);
return tlen; return tlen;
} }
@ -386,8 +383,7 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
//buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); //buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
buf = taosDecodeFixedU32(buf, &pConsumerEp->qmsgLen); buf = taosDecodeString(buf, &pConsumerEp->qmsg);
buf = taosDecodeBinary(buf, (void**)&pConsumerEp->qmsg, pConsumerEp->qmsgLen);
return buf; return buf;
} }

View File

@ -161,8 +161,6 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
pReq->sql = strdup(pTopic->sql); pReq->sql = strdup(pTopic->sql);
pReq->logicalPlan = strdup(pTopic->logicalPlan); pReq->logicalPlan = strdup(pTopic->logicalPlan);
pReq->physicalPlan = strdup(pTopic->physicalPlan); pReq->physicalPlan = strdup(pTopic->physicalPlan);
pReq->qmsgLen = pCEp->qmsgLen;
/*memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);*/
pReq->qmsg = strdup(pCEp->qmsg); pReq->qmsg = strdup(pCEp->qmsg);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
void *reqStr = malloc(tlen); void *reqStr = malloc(tlen);
@ -227,13 +225,6 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ * CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp.vgId = pTaskInfo->addr.nodeId; CEp.vgId = pTaskInfo->addr.nodeId;
CEp.qmsg = strdup(pTaskInfo->msg->msg); CEp.qmsg = strdup(pTaskInfo->msg->msg);
CEp.qmsgLen = strlen(CEp.qmsg) + 1;
printf("abc:\n%s\n", CEp.qmsg);
/*CEp.qmsg = malloc(CEp.qmsgLen);*/
/*if (CEp.qmsg == NULL) {*/
/*return -1;*/
/*}*/
/*memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);*/
taosArrayPush(unassignedVg, &CEp); taosArrayPush(unassignedVg, &CEp);
} }
@ -257,8 +248,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
req.sql = pTopic->sql; req.sql = pTopic->sql;
req.logicalPlan = pTopic->logicalPlan; req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan; req.physicalPlan = pTopic->physicalPlan;
req.qmsg = strdup(pCEp->qmsg); req.qmsg = pCEp->qmsg;
req.qmsgLen = strlen(req.qmsg);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen); void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) { if (buf == NULL) {

View File

@ -1125,8 +1125,8 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
*str = cJSON_Print(json); *str = cJSON_Print(json);
cJSON_Delete(json); cJSON_Delete(json);
printf("====Physical plan:====\n"); /*printf("====Physical plan:====\n");*/
printf("%s\n", *str); /*printf("%s\n", *str);*/
*len = strlen(*str) + 1; *len = strlen(*str) + 1;
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }