fix query crash
This commit is contained in:
parent
a9bd009ba1
commit
ca84e47a97
|
@ -1241,6 +1241,12 @@ typedef struct {
|
|||
char data[];
|
||||
} SVShowTablesFetchRsp;
|
||||
|
||||
typedef struct SMqCMGetSubEpReq {
|
||||
int64_t consumerId;
|
||||
int32_t epoch;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
} SMqCMGetSubEpReq;
|
||||
|
||||
#pragma pack(pop)
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
|
||||
|
@ -1562,7 +1568,7 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
|
|||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||||
tlen += taosEncodeFixedU32(buf, pReq->qmsgLen);
|
||||
tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen);
|
||||
tlen += taosEncodeString(buf, (char*)pReq->qmsg);
|
||||
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
|
||||
return tlen;
|
||||
}
|
||||
|
@ -1577,7 +1583,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
|||
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
||||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
||||
buf = taosDecodeFixedU32(buf, &pReq->qmsgLen);
|
||||
buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen);
|
||||
buf = taosDecodeString(buf, (char**)&pReq->qmsg);
|
||||
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
|
||||
return buf;
|
||||
}
|
||||
|
@ -1639,6 +1645,92 @@ typedef struct SMqConsumeReq {
|
|||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
} SMqConsumeReq;
|
||||
|
||||
typedef struct SMqSubVgEp {
|
||||
int32_t vgId;
|
||||
SEpSet epSet;
|
||||
} SMqSubVgEp;
|
||||
|
||||
typedef struct SMqSubTopicEp {
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
SArray* vgs; // SArray<SMqSubVgEp>
|
||||
} SMqSubTopicEp;
|
||||
|
||||
typedef struct SMqCMGetSubEpRsp {
|
||||
int64_t consumerId;
|
||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||
SArray* topics; // SArray<SMqSubTopicEp>
|
||||
} SMqCMGetSubEpRsp;
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI16(buf, pVgEp->vgId);
|
||||
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
||||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||||
return buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeString(buf, pTopicEp->topic);
|
||||
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
|
||||
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
|
||||
buf = taosDecodeStringTo(buf, pTopicEp->topic);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
|
||||
if (pTopicEp->vgs == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubVgEp vgEp;
|
||||
buf = tDecodeSMqSubVgEp(buf, &vgEp);
|
||||
taosArrayPush(pTopicEp->vgs, &vgEp);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSubEpRsp* pRsp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pRsp->consumerId);
|
||||
tlen += taosEncodeString(buf, pRsp->cgroup);
|
||||
int32_t sz = taosArrayGetSize(pRsp->topics);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubTopicEp* pVgEp = (SMqSubTopicEp*)taosArrayGet(pRsp->topics, i);
|
||||
tlen += tEncodeSMqSubTopicEp(buf, pVgEp);
|
||||
}
|
||||
return tlen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void* tDecodeSMqCMGetSubEpRsp(void* buf, SMqCMGetSubEpRsp* pRsp) {
|
||||
buf = taosDecodeFixedI64(buf, &pRsp->consumerId);
|
||||
buf = taosDecodeStringTo(buf, pRsp->cgroup);
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pRsp->topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
|
||||
if (pRsp->topics == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubTopicEp topicEp;
|
||||
buf = tDecodeSMqSubTopicEp(buf, &topicEp);
|
||||
taosArrayPush(pRsp->topics, &topicEp);
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -140,6 +140,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_GET_SUB_EP, "mnode-get-sub-ep", SMqCMGetSubEpReq, SMqCMGetSubEpRsp)
|
||||
TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg)
|
||||
|
||||
// Requests handled by VNODE
|
||||
|
|
|
@ -263,7 +263,7 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
|
|||
|
||||
typedef struct SMqClientVg {
|
||||
// statistics
|
||||
int64_t consumeCnt;
|
||||
int64_t pollCnt;
|
||||
// offset
|
||||
int64_t committedOffset;
|
||||
int64_t currentOffset;
|
||||
|
@ -345,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err
|
|||
strcpy(pTmq->groupId, conf->groupId);
|
||||
pTmq->commit_cb = conf->commit_cb;
|
||||
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
|
||||
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
|
||||
return pTmq;
|
||||
}
|
||||
|
||||
|
@ -411,13 +412,12 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
|||
tSerializeSCMSubscribeReq(&abuf, &req);
|
||||
/*printf("formatted: %s\n", dagStr);*/
|
||||
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TSDB_SQL_SELECT);
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_SUBSCRIBE);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc sqlObj");
|
||||
}
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||
pRequest->type = TDMT_MND_SUBSCRIBE;
|
||||
|
||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
@ -596,6 +596,37 @@ struct tmq_message_t {
|
|||
SMqConsumeRsp rsp;
|
||||
};
|
||||
|
||||
int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||
tmq_t* tmq = (tmq_t*)param;
|
||||
SMqCMGetSubEpRsp rsp;
|
||||
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
|
||||
int32_t sz = taosArrayGetSize(rsp.topics);
|
||||
// TODO: lock
|
||||
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqClientTopic topic = {0};
|
||||
SMqSubTopicEp* pTopicEp = taosArrayGet(rsp.topics, i);
|
||||
topic.topicName = strdup(pTopicEp->topic);
|
||||
int32_t vgSz = taosArrayGetSize(pTopicEp->vgs);
|
||||
topic.vgs = taosArrayInit(vgSz, sizeof(SMqClientVg));
|
||||
for (int32_t j = 0; j < vgSz; j++) {
|
||||
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
||||
SMqClientVg clientVg = {
|
||||
.vgId = pVgEp->vgId,
|
||||
.epSet = pVgEp->epSet
|
||||
};
|
||||
taosArrayPush(topic.vgs, &clientVg);
|
||||
}
|
||||
taosArrayPush(tmq->clientTopics, &topic);
|
||||
}
|
||||
// unlock
|
||||
return 0;
|
||||
}
|
||||
|
||||
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||
if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
|
||||
return NULL;
|
||||
|
@ -605,9 +636,38 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
req.reqType = 1;
|
||||
req.blockingTime = blocking_time;
|
||||
req.consumerId = tmq->consumerId;
|
||||
tmq_message_t* tmq_message = NULL;
|
||||
strcpy(req.cgroup, tmq->groupId);
|
||||
|
||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
|
||||
if (taosArrayGetSize(tmq->clientTopics) == 0) {
|
||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
||||
SMqCMGetSubEpReq* buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
tscError("failed to malloc get subscribe ep buf");
|
||||
}
|
||||
buf->consumerId = htobe64(buf->consumerId);
|
||||
|
||||
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
|
||||
if (pRequest == NULL) {
|
||||
tscError("failed to malloc subscribe ep request");
|
||||
}
|
||||
|
||||
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = tmq;
|
||||
sendInfo->fp = tmq_ask_ep_cb;
|
||||
|
||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
}
|
||||
|
||||
SMqClientTopic* pTopic = taosArrayGetP(tmq->clientTopics, tmq->nextTopicIdx);
|
||||
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
|
||||
strcpy(req.topic, pTopic->topicName);
|
||||
int32_t nextVgIdx = pTopic->nextVgIdx;
|
||||
|
@ -618,14 +678,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
|
|||
pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) };
|
||||
pRequest->type = TDMT_VND_CONSUME;
|
||||
|
||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
|
||||
sendInfo->requestObjRefId = 0;
|
||||
sendInfo->param = &tmq_message;
|
||||
sendInfo->fp = tmq_poll_cb_inner;
|
||||
|
||||
int64_t transporterId = 0;
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, body);
|
||||
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
|
||||
|
||||
tsem_wait(&pRequest->body.rspSem);
|
||||
|
||||
return (tmq_message_t*)pRequest->body.resInfo.pData;
|
||||
return tmq_message;
|
||||
|
||||
/*tsem_wait(&pRequest->body.rspSem);*/
|
||||
|
||||
|
|
|
@ -570,31 +570,30 @@ TEST(testCase, create_topic_Test) {
|
|||
//taos_close(pConn);
|
||||
//}
|
||||
|
||||
TEST(testCase, tmq_subscribe_Test) {
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
//TEST(testCase, tmq_subscribe_Test) {
|
||||
//TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
//assert(pConn != NULL);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
//TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||
//if (taos_errno(pRes) != 0) {
|
||||
//printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
//}
|
||||
//taos_free_result(pRes);
|
||||
|
||||
//tmq_conf_t* conf = tmq_conf_new();
|
||||
//tmq_conf_set(conf, "group.id", "tg1");
|
||||
//tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "group.id", "tg1");
|
||||
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
|
||||
//tmq_list_t* topic_list = tmq_list_new();
|
||||
//tmq_list_append(topic_list, "test_topic_1");
|
||||
//tmq_subscribe(tmq, topic_list);
|
||||
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
tmq_list_append(topic_list, "test_topic_1");
|
||||
tmq_subscribe(tmq, topic_list);
|
||||
|
||||
while (1) {
|
||||
tmq_message_t* msg = tmq_consume_poll(tmq, 0);
|
||||
printf("get msg\n");
|
||||
if (msg == NULL) break;
|
||||
}
|
||||
}
|
||||
//while (1) {
|
||||
//tmq_message_t* msg = tmq_consume_poll(tmq, 0);
|
||||
//printf("get msg\n");
|
||||
//if (msg == NULL) break;
|
||||
//}
|
||||
//}
|
||||
|
||||
TEST(testCase, tmq_consume_Test) {
|
||||
}
|
||||
|
|
|
@ -115,6 +115,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
|||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
|
||||
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GET_SUB_EP)] = dndProcessMnodeReadMsg;
|
||||
|
||||
// Requests handled by VNODE
|
||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
|
||||
|
|
|
@ -25,7 +25,7 @@ extern "C" {
|
|||
int32_t mndInitConsumer(SMnode *pMnode);
|
||||
void mndCleanupConsumer(SMnode *pMnode);
|
||||
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId);
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId);
|
||||
void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer);
|
||||
|
||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer);
|
||||
|
|
|
@ -40,14 +40,13 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter);
|
|||
|
||||
int32_t mndInitConsumer(SMnode *pMnode) {
|
||||
SSdbTable table = {.sdbType = SDB_CONSUMER,
|
||||
.keyType = SDB_KEY_BINARY,
|
||||
.keyType = SDB_KEY_INT64,
|
||||
.encodeFp = (SdbEncodeFp)mndConsumerActionEncode,
|
||||
.decodeFp = (SdbDecodeFp)mndConsumerActionDecode,
|
||||
.insertFp = (SdbInsertFp)mndConsumerActionInsert,
|
||||
.updateFp = (SdbUpdateFp)mndConsumerActionUpdate,
|
||||
.deleteFp = (SdbDeleteFp)mndConsumerActionDelete};
|
||||
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
|
@ -147,7 +146,7 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
|
|||
return 0;
|
||||
}
|
||||
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId) {
|
||||
SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int64_t consumerId) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SMqConsumerObj *pConsumer = sdbAcquire(pSdb, SDB_CONSUMER, &consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
|
|
|
@ -30,6 +30,8 @@
|
|||
#define MND_SUBSCRIBE_VER_NUMBER 1
|
||||
#define MND_SUBSCRIBE_RESERVE_SIZE 64
|
||||
|
||||
static char *mndMakeSubscribeKey(char *cgroup, char *topicName);
|
||||
|
||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *);
|
||||
static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw);
|
||||
static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *);
|
||||
|
@ -41,6 +43,7 @@ static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg);
|
|||
static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
|
||||
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
|
||||
|
||||
static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer,
|
||||
SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic, SMqConsumerEp *pSub);
|
||||
|
@ -57,9 +60,60 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
|||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
||||
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
|
||||
SMnode *pMnode = pMsg->pMnode;
|
||||
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->pCont;
|
||||
SMqCMGetSubEpRsp rsp;
|
||||
int64_t consumerId = be64toh(pReq->consumerId);
|
||||
|
||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
|
||||
if (pConsumer == NULL) {
|
||||
/*terrno = */
|
||||
return -1;
|
||||
}
|
||||
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
|
||||
|
||||
strcpy(rsp.cgroup, pReq->cgroup);
|
||||
rsp.consumerId = consumerId;
|
||||
SArray *pTopics = pConsumer->topics;
|
||||
int32_t sz = taosArrayGetSize(pTopics);
|
||||
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqSubTopicEp topicEp;
|
||||
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pTopics, i);
|
||||
strcpy(topicEp.topic, pConsumerTopic->name);
|
||||
|
||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, pConsumerTopic->name);
|
||||
int32_t assignedSz = taosArrayGetSize(pSub->assigned);
|
||||
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
|
||||
for (int32_t j = 0; j < assignedSz; j++) {
|
||||
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i);
|
||||
if (pCEp->consumerId == consumerId) {
|
||||
taosArrayPush(pSub->assigned, pCEp);
|
||||
}
|
||||
}
|
||||
if (taosArrayGetSize(topicEp.vgs) != 0) {
|
||||
taosArrayPush(rsp.topics, &topicEp);
|
||||
}
|
||||
}
|
||||
int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
|
||||
void *buf = malloc(tlen);
|
||||
if (buf == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
}
|
||||
void *abuf = buf;
|
||||
tEncodeSMqCMGetSubEpRsp(&abuf, &rsp);
|
||||
//TODO: free rsp
|
||||
pMsg->pCont = buf;
|
||||
pMsg->contLen = tlen;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
|
||||
int i = 0;
|
||||
while (key[i] != ':') {
|
||||
|
@ -97,7 +151,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
|
||||
// build msg
|
||||
|
||||
SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen);
|
||||
SMqSetCVgReq *pReq = malloc(sizeof(SMqSetCVgReq));
|
||||
if (pReq == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return -1;
|
||||
|
@ -108,7 +162,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
|||
pReq->logicalPlan = strdup(pTopic->logicalPlan);
|
||||
pReq->physicalPlan = strdup(pTopic->physicalPlan);
|
||||
pReq->qmsgLen = pCEp->qmsgLen;
|
||||
memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);
|
||||
/*memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);*/
|
||||
pReq->qmsg = strdup(pCEp->qmsg);
|
||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
|
||||
void *reqStr = malloc(tlen);
|
||||
if (reqStr == NULL) {
|
||||
|
@ -168,14 +223,17 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
|||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||
STaskInfo *pTaskInfo = taosArrayGet(pArray, i);
|
||||
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
|
||||
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
|
||||
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1],
|
||||
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
|
||||
CEp.vgId = pTaskInfo->addr.nodeId;
|
||||
CEp.qmsgLen = pTaskInfo->msg->contentLen;
|
||||
CEp.qmsg = malloc(CEp.qmsgLen);
|
||||
if (CEp.qmsg == NULL) {
|
||||
return -1;
|
||||
}
|
||||
memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);
|
||||
CEp.qmsg = strdup(pTaskInfo->msg->msg);
|
||||
CEp.qmsgLen = strlen(CEp.qmsg) + 1;
|
||||
printf("abc:\n%s\n", CEp.qmsg);
|
||||
/*CEp.qmsg = malloc(CEp.qmsgLen);*/
|
||||
/*if (CEp.qmsg == NULL) {*/
|
||||
/*return -1;*/
|
||||
/*}*/
|
||||
/*memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);*/
|
||||
taosArrayPush(unassignedVg, &CEp);
|
||||
}
|
||||
|
||||
|
@ -219,7 +277,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
|
|||
STransAction action = {0};
|
||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||
action.pCont = buf;
|
||||
action.contLen = tlen;
|
||||
action.contLen = sizeof(SMsgHead) + tlen;
|
||||
action.msgType = TDMT_VND_MQ_SET_CONN;
|
||||
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
|
@ -503,7 +561,6 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
|||
}
|
||||
taosArrayPush(pSub->availConsumer, &consumerId);
|
||||
|
||||
|
||||
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
|
||||
taosArrayPush(pConsumer->topics, pConsumerTopic);
|
||||
|
||||
|
|
|
@ -83,8 +83,9 @@ int32_t qCreateExecTask(void* readHandle, int32_t vgId, SSubplan* pSubplan, qTas
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
if (handle) {
|
||||
code = dsCreateDataSinker(pSubplan->pDataSink, handle);
|
||||
}
|
||||
|
||||
_error:
|
||||
// if failed to add ref for all tables in this query, abort current query
|
||||
|
|
|
@ -1132,7 +1132,6 @@ int32_t subPlanToString(const SSubplan* subplan, char** str, int32_t* len) {
|
|||
}
|
||||
|
||||
int32_t stringToSubplan(const char* str, SSubplan** subplan) {
|
||||
printf("aa: %s\n", str);
|
||||
cJSON* json = cJSON_Parse(str);
|
||||
if (NULL == json) {
|
||||
return TSDB_CODE_FAILED;
|
||||
|
|
Loading…
Reference in New Issue