commit
fab1f8d912
|
@ -1519,15 +1519,17 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SMqSetCVgReq {
|
typedef struct SMqSetCVgReq {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t oldConsumerId;
|
int64_t oldConsumerId;
|
||||||
int64_t newConsumerId;
|
int64_t newConsumerId;
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
char* sql;
|
char* sql;
|
||||||
char* logicalPlan;
|
char* logicalPlan;
|
||||||
char* physicalPlan;
|
char* physicalPlan;
|
||||||
SSubQueryMsg msg;
|
uint32_t qmsgLen;
|
||||||
|
void* qmsg;
|
||||||
|
//SSubQueryMsg msg;
|
||||||
} SMqSetCVgReq;
|
} SMqSetCVgReq;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
|
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
|
||||||
|
@ -1536,7 +1538,7 @@ static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg*
|
||||||
tlen += taosEncodeFixedU64(buf, pMsg->queryId);
|
tlen += taosEncodeFixedU64(buf, pMsg->queryId);
|
||||||
tlen += taosEncodeFixedU64(buf, pMsg->taskId);
|
tlen += taosEncodeFixedU64(buf, pMsg->taskId);
|
||||||
tlen += taosEncodeFixedU32(buf, pMsg->contentLen);
|
tlen += taosEncodeFixedU32(buf, pMsg->contentLen);
|
||||||
tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
|
//tlen += taosEncodeBinary(buf, pMsg->msg, pMsg->contentLen);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1545,7 +1547,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
|
||||||
buf = taosDecodeFixedU64(buf, &pMsg->queryId);
|
buf = taosDecodeFixedU64(buf, &pMsg->queryId);
|
||||||
buf = taosDecodeFixedU64(buf, &pMsg->taskId);
|
buf = taosDecodeFixedU64(buf, &pMsg->taskId);
|
||||||
buf = taosDecodeFixedU32(buf, &pMsg->contentLen);
|
buf = taosDecodeFixedU32(buf, &pMsg->contentLen);
|
||||||
buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
|
//buf = taosDecodeBinaryTo(buf, pMsg->msg, pMsg->contentLen);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1559,7 +1561,9 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
|
||||||
tlen += taosEncodeString(buf, pReq->sql);
|
tlen += taosEncodeString(buf, pReq->sql);
|
||||||
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
||||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
||||||
tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
|
tlen += taosEncodeFixedU32(buf, pReq->qmsgLen);
|
||||||
|
tlen += taosEncodeBinary(buf, pReq->qmsg, pReq->qmsgLen);
|
||||||
|
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1572,15 +1576,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
||||||
buf = taosDecodeString(buf, &pReq->sql);
|
buf = taosDecodeString(buf, &pReq->sql);
|
||||||
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
||||||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
||||||
buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
|
buf = taosDecodeFixedU32(buf, &pReq->qmsgLen);
|
||||||
|
buf = taosDecodeBinary(buf, &pReq->qmsg, pReq->qmsgLen);
|
||||||
|
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct SMqSetCVgRsp {
|
typedef struct SMqSetCVgRsp {
|
||||||
int32_t vgId;
|
SMsgHead header;
|
||||||
int64_t consumerId;
|
int32_t vgId;
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
int64_t consumerId;
|
||||||
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
|
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
} SMqSetCVgRsp;
|
} SMqSetCVgRsp;
|
||||||
|
|
||||||
typedef struct SMqColData {
|
typedef struct SMqColData {
|
||||||
|
|
|
@ -32,7 +32,7 @@ struct SSubplan;
|
||||||
* @param streamReadHandle
|
* @param streamReadHandle
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg *pMsg, void* streamReadHandle);
|
qTaskInfo_t qCreateStreamExecTaskInfo(void *msg, void* streamReadHandle);
|
||||||
|
|
||||||
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
|
int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input);
|
||||||
|
|
||||||
|
|
|
@ -314,10 +314,10 @@ tmq_conf_t* tmq_conf_new() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
|
int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
|
||||||
if (strcmp(key, "group.id")) {
|
if (strcmp(key, "group.id") == 0) {
|
||||||
strcpy(conf->groupId, value);
|
strcpy(conf->groupId, value);
|
||||||
}
|
}
|
||||||
if (strcmp(key, "client.id")) {
|
if (strcmp(key, "client.id") == 0) {
|
||||||
strcpy(conf->clientId, value);
|
strcpy(conf->clientId, value);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -365,7 +365,7 @@ tmq_list_t* tmq_list_new() {
|
||||||
|
|
||||||
int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
|
int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
|
||||||
if (ptr->cnt >= ptr->tot-1) return -1;
|
if (ptr->cnt >= ptr->tot-1) return -1;
|
||||||
ptr->elems[ptr->cnt] = src;
|
ptr->elems[ptr->cnt] = strdup(src);
|
||||||
ptr->cnt++;
|
ptr->cnt++;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -377,8 +377,23 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
int32_t sz = topic_list->cnt;
|
int32_t sz = topic_list->cnt;
|
||||||
tmq->clientTopics = taosArrayInit(sz, sizeof(void*));
|
tmq->clientTopics = taosArrayInit(sz, sizeof(void*));
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
char* topicName = strdup(topic_list->elems[i]);
|
char* topicName = topic_list->elems[i];
|
||||||
taosArrayPush(tmq->clientTopics, &topicName);
|
|
||||||
|
SName name = {0};
|
||||||
|
char* dbName = getDbOfConnection(tmq->pTscObj);
|
||||||
|
tNameSetDbName(&name, tmq->pTscObj->acctId, dbName, strlen(dbName));
|
||||||
|
tNameFromString(&name, topicName, T_NAME_TABLE);
|
||||||
|
|
||||||
|
char* topicFname = calloc(1, TSDB_TOPIC_FNAME_LEN);
|
||||||
|
if (topicFname == NULL) {
|
||||||
|
|
||||||
|
}
|
||||||
|
tNameExtractFullName(&name, topicFname);
|
||||||
|
tscDebug("subscribe topic: %s", topicFname);
|
||||||
|
taosArrayPush(tmq->clientTopics, &topicFname);
|
||||||
|
/*SMqClientTopic topic = {*/
|
||||||
|
/*.*/
|
||||||
|
/*};*/
|
||||||
}
|
}
|
||||||
SCMSubscribeReq req;
|
SCMSubscribeReq req;
|
||||||
req.topicNum = taosArrayGetSize(tmq->clientTopics);
|
req.topicNum = taosArrayGetSize(tmq->clientTopics);
|
||||||
|
@ -402,7 +417,7 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
|
||||||
pRequest->type = TDMT_MND_CREATE_TOPIC;
|
pRequest->type = TDMT_MND_SUBSCRIBE;
|
||||||
|
|
||||||
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
SMsgSendInfo* body = buildMsgInfoImpl(pRequest);
|
||||||
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
|
||||||
|
@ -535,7 +550,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
|
||||||
|
|
||||||
SCMCreateTopicReq req = {
|
SCMCreateTopicReq req = {
|
||||||
.name = (char*) topicFname,
|
.name = (char*) topicFname,
|
||||||
.igExists = 0,
|
.igExists = 1,
|
||||||
.physicalPlan = (char*) pStr,
|
.physicalPlan = (char*) pStr,
|
||||||
.sql = (char*) sql,
|
.sql = (char*) sql,
|
||||||
.logicalPlan = "no logic plan",
|
.logicalPlan = "no logic plan",
|
||||||
|
|
|
@ -458,10 +458,9 @@ TEST(testCase, show_table_Test) {
|
||||||
assert(pConn != NULL);
|
assert(pConn != NULL);
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, "show tables");
|
TAOS_RES* pRes = taos_query(pConn, "show tables");
|
||||||
ASSERT_NE(taos_errno(pRes), 0);
|
|
||||||
|
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("expected failed to show tables, reason:%s\n", taos_errstr(pRes));
|
printf("failed to show tables, reason:%s\n", taos_errstr(pRes));
|
||||||
|
taos_free_result(pRes);
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
@ -537,6 +536,7 @@ TEST(testCase, create_topic_Test) {
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||||
}
|
}
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
TAOS_FIELD* pFields = taos_fetch_fields(pRes);
|
||||||
ASSERT_TRUE(pFields == nullptr);
|
ASSERT_TRUE(pFields == nullptr);
|
||||||
|
@ -570,6 +570,51 @@ TEST(testCase, insert_test) {
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
TEST(testCase, tmq_subscribe_Test) {
|
||||||
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
assert(pConn != NULL);
|
||||||
|
|
||||||
|
tmq_conf_t* conf = tmq_conf_new();
|
||||||
|
tmq_conf_set(conf, "group.id", "tg1");
|
||||||
|
tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
|
||||||
|
|
||||||
|
tmq_list_t* topic_list = tmq_list_new();
|
||||||
|
tmq_list_append(topic_list, "test_topic_1");
|
||||||
|
tmq_subscribe(tmq, topic_list);
|
||||||
|
|
||||||
|
while (1) {
|
||||||
|
tmq_message_t* msg = tmq_consume_poll(tmq, 0);
|
||||||
|
printf("get msg\n");
|
||||||
|
if (msg == NULL) break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, tmq_consume_Test) {
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, tmq_commit_TEST) {
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
//TEST(testCase, insert_test) {
|
||||||
|
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
|
// ASSERT_NE(pConn, nullptr);
|
||||||
|
//
|
||||||
|
// TAOS_RES* pRes = taos_query(pConn, "use abc1");
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
//
|
||||||
|
// pRes = taos_query(pConn, "insert into t_2 values(now, 1)");
|
||||||
|
// if (taos_errno(pRes) != 0) {
|
||||||
|
// printf("failed to create multiple tables, reason:%s\n", taos_errstr(pRes));
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
// ASSERT_TRUE(false);
|
||||||
|
// }
|
||||||
|
//
|
||||||
|
// taos_free_result(pRes);
|
||||||
|
// taos_close(pConn);
|
||||||
|
//}
|
||||||
|
|
||||||
TEST(testCase, projection_query_tables) {
|
TEST(testCase, projection_query_tables) {
|
||||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||||
ASSERT_NE(pConn, nullptr);
|
ASSERT_NE(pConn, nullptr);
|
||||||
|
|
|
@ -112,6 +112,9 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_CREATE_TOPIC)] = dndProcessMnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_ALTER_TOPIC)] = dndProcessMnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_DROP_TOPIC)] = dndProcessMnodeWriteMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SUBSCRIBE)] = dndProcessMnodeWriteMsg;
|
||||||
|
/*pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBSCRIBE_RSP)] = dndProcessMnodeWriteMsg;*/
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN_RSP)] = dndProcessMnodeWriteMsg;
|
||||||
|
|
||||||
// Requests handled by VNODE
|
// Requests handled by VNODE
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SUBMIT)] = dndProcessVnodeWriteMsg;
|
||||||
|
@ -142,6 +145,8 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_DROP_TABLE)] = dndProcessVnodeWriteMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES)] = dndProcessVnodeFetchMsg;
|
||||||
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg;
|
||||||
|
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {
|
||||||
|
|
|
@ -363,8 +363,9 @@ typedef struct SMqConsumerEp {
|
||||||
int64_t consumerId; // -1 for unassigned
|
int64_t consumerId; // -1 for unassigned
|
||||||
int64_t lastConsumerHbTs;
|
int64_t lastConsumerHbTs;
|
||||||
int64_t lastVgHbTs;
|
int64_t lastVgHbTs;
|
||||||
int32_t execLen;
|
uint32_t qmsgLen;
|
||||||
SSubQueryMsg qExec;
|
char* qmsg;
|
||||||
|
//SSubQueryMsg qExec;
|
||||||
} SMqConsumerEp;
|
} SMqConsumerEp;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
|
||||||
|
@ -373,7 +374,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
|
||||||
tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
|
tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
|
||||||
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
|
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||||
tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
//tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||||
|
tlen += taosEncodeFixedU32(buf, pConsumerEp->qmsgLen);
|
||||||
|
tlen += taosEncodeBinary(buf, pConsumerEp->qmsg, pConsumerEp->qmsgLen);
|
||||||
return tlen;
|
return tlen;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -382,8 +385,9 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
|
||||||
buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
|
buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
|
||||||
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
|
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
|
||||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||||
buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
//buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
|
||||||
pConsumerEp->execLen = sizeof(SSubQueryMsg) + pConsumerEp->qExec.contentLen;
|
buf = taosDecodeFixedU32(buf, &pConsumerEp->qmsgLen);
|
||||||
|
buf = taosDecodeBinary(buf, (void**)&pConsumerEp->qmsg, pConsumerEp->qmsgLen);
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -402,11 +406,12 @@ typedef struct SMqSubscribeObj {
|
||||||
|
|
||||||
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
|
static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
|
||||||
SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj));
|
SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj));
|
||||||
pSub->key[0] = 0;
|
|
||||||
pSub->epoch = 0;
|
|
||||||
if (pSub == NULL) {
|
if (pSub == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
pSub->key[0] = 0;
|
||||||
|
pSub->epoch = 0;
|
||||||
|
|
||||||
pSub->availConsumer = taosArrayInit(0, sizeof(int64_t));
|
pSub->availConsumer = taosArrayInit(0, sizeof(int64_t));
|
||||||
if (pSub->availConsumer == NULL) {
|
if (pSub->availConsumer == NULL) {
|
||||||
free(pSub);
|
free(pSub);
|
||||||
|
@ -433,7 +438,7 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
|
||||||
free(pSub);
|
free(pSub);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
return NULL;
|
return pSub;
|
||||||
}
|
}
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
|
static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) {
|
||||||
|
|
|
@ -56,7 +56,9 @@ void mndCleanupConsumer(SMnode *pMnode) {}
|
||||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
|
int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer);
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, tlen);
|
int32_t size = sizeof(int32_t) + tlen + MND_CONSUMER_RESERVE_SIZE;
|
||||||
|
|
||||||
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto CM_ENCODE_OVER;
|
if (pRaw == NULL) goto CM_ENCODE_OVER;
|
||||||
|
|
||||||
void* buf = malloc(tlen);
|
void* buf = malloc(tlen);
|
||||||
|
@ -68,34 +70,6 @@ SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER);
|
||||||
|
|
||||||
#if 0
|
|
||||||
int32_t topicNum = taosArrayGetSize(pConsumer->topics);
|
|
||||||
SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER);
|
|
||||||
int32_t len = strlen(pConsumer->cgroup);
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
|
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CM_ENCODE_OVER);
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, topicNum, CM_ENCODE_OVER);
|
|
||||||
for (int i = 0; i < topicNum; i++) {
|
|
||||||
int32_t len;
|
|
||||||
SMqConsumerTopic *pConsumerTopic = taosArrayGet(pConsumer->topics, i);
|
|
||||||
len = strlen(pConsumerTopic->name);
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, len, CM_ENCODE_OVER);
|
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pConsumerTopic->name, len, CM_ENCODE_OVER);
|
|
||||||
int vgSize;
|
|
||||||
if (pConsumerTopic->vgroups == NULL) {
|
|
||||||
vgSize = 0;
|
|
||||||
} else {
|
|
||||||
vgSize = listNEles(pConsumerTopic->vgroups);
|
|
||||||
}
|
|
||||||
SDB_SET_INT32(pRaw, dataPos, vgSize, CM_ENCODE_OVER);
|
|
||||||
for (int j = 0; j < vgSize; j++) {
|
|
||||||
// SList* head;
|
|
||||||
/*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
|
SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER);
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
|
SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER);
|
||||||
|
|
||||||
|
@ -116,53 +90,35 @@ SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int8_t sver = 0;
|
int8_t sver = 0;
|
||||||
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CONSUME_DECODE_OVER;
|
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto CM_DECODE_OVER;
|
||||||
|
|
||||||
if (sver != MND_CONSUMER_VER_NUMBER) {
|
if (sver != MND_CONSUMER_VER_NUMBER) {
|
||||||
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
|
||||||
goto CONSUME_DECODE_OVER;
|
goto CM_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
|
SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj));
|
||||||
if (pRow == NULL) goto CONSUME_DECODE_OVER;
|
if (pRow == NULL) goto CM_DECODE_OVER;
|
||||||
|
|
||||||
SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
|
SMqConsumerObj *pConsumer = sdbGetRowObj(pRow);
|
||||||
if (pConsumer == NULL) goto CONSUME_DECODE_OVER;
|
if (pConsumer == NULL) goto CM_DECODE_OVER;
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &len, CM_DECODE_OVER);
|
||||||
void* buf = malloc(len);
|
void* buf = malloc(len);
|
||||||
if (buf == NULL) goto CONSUME_DECODE_OVER;
|
if (buf == NULL) goto CM_DECODE_OVER;
|
||||||
|
SDB_GET_BINARY(pRaw, dataPos, buf, len, CM_DECODE_OVER);
|
||||||
SDB_GET_BINARY(pRaw, dataPos, buf, len, CONSUME_DECODE_OVER);
|
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_DECODE_OVER);
|
||||||
|
|
||||||
tDecodeSMqConsumerObj(buf, pConsumer);
|
if (tDecodeSMqConsumerObj(buf, pConsumer) == NULL) {
|
||||||
|
goto CM_DECODE_OVER;
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER);
|
}
|
||||||
|
|
||||||
terrno = TSDB_CODE_SUCCESS;
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
#if 0
|
CM_DECODE_OVER:
|
||||||
SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER);
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
for (int i = 0; i < topicNum; i++) {
|
|
||||||
int32_t topicLen;
|
|
||||||
SMqConsumerTopic *pConsumerTopic = malloc(sizeof(SMqConsumerTopic));
|
|
||||||
if (pConsumerTopic == NULL) {
|
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
// TODO
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
/*pConsumerTopic->vgroups = taosArrayInit(topicNum, sizeof(SMqConsumerTopic));*/
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &topicLen, CONSUME_DECODE_OVER);
|
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pConsumerTopic->name, topicLen, CONSUME_DECODE_OVER);
|
|
||||||
int32_t vgSize;
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
CONSUME_DECODE_OVER:
|
|
||||||
if (terrno != 0) {
|
|
||||||
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
mError("consumer:%ld, failed to decode from raw:%p since %s", pConsumer->consumerId, pRaw, terrstr());
|
||||||
tfree(pRow);
|
tfree(pRow);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -55,7 +55,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||||
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
|
.deleteFp = (SdbDeleteFp)mndSubActionDelete};
|
||||||
|
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
@ -96,25 +96,27 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
|
pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer);
|
||||||
|
|
||||||
// build msg
|
// build msg
|
||||||
SMqSetCVgReq req = {
|
|
||||||
.vgId = pCEp->vgId,
|
SMqSetCVgReq* pReq = malloc(sizeof(SMqSetCVgReq) + pCEp->qmsgLen);
|
||||||
.oldConsumerId = -1,
|
if (pReq == NULL) {
|
||||||
.newConsumerId = consumerId,
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
};
|
return -1;
|
||||||
strcpy(req.cgroup, cgroup);
|
}
|
||||||
strcpy(req.topicName, topic);
|
strcpy(pReq->cgroup, cgroup);
|
||||||
strcpy(req.sql, pTopic->sql);
|
strcpy(pReq->topicName, topic);
|
||||||
strcpy(req.logicalPlan, pTopic->logicalPlan);
|
pReq->sql = strdup(pTopic->sql);
|
||||||
strcpy(req.physicalPlan, pTopic->physicalPlan);
|
pReq->logicalPlan = strdup(pTopic->logicalPlan);
|
||||||
memcpy(&req.msg, &pCEp->qExec, pCEp->execLen);
|
pReq->physicalPlan = strdup(pTopic->physicalPlan);
|
||||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
pReq->qmsgLen = pCEp->qmsgLen;
|
||||||
|
memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);
|
||||||
|
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
|
||||||
void *reqStr = malloc(tlen);
|
void *reqStr = malloc(tlen);
|
||||||
if (reqStr == NULL) {
|
if (reqStr == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void *abuf = reqStr;
|
void *abuf = reqStr;
|
||||||
tEncodeSMqSetCVgReq(abuf, &req);
|
tEncodeSMqSetCVgReq(&abuf, pReq);
|
||||||
|
|
||||||
// persist msg
|
// persist msg
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
@ -128,6 +130,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
||||||
mndTransAppendRedolog(pTrans, pRaw);
|
mndTransAppendRedolog(pTrans, pRaw);
|
||||||
|
|
||||||
|
free(pReq);
|
||||||
tfree(topic);
|
tfree(topic);
|
||||||
tfree(cgroup);
|
tfree(cgroup);
|
||||||
}
|
}
|
||||||
|
@ -146,6 +149,14 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
||||||
//convert phyplan to dag
|
//convert phyplan to dag
|
||||||
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
|
SQueryDag *pDag = qStringToDag(pTopic->physicalPlan);
|
||||||
SArray *pArray;
|
SArray *pArray;
|
||||||
|
SArray* inner = taosArrayGet(pDag->pSubplans, 0);
|
||||||
|
SSubplan *plan = taosArrayGetP(inner, 0);
|
||||||
|
plan->execNode.inUse = 0;
|
||||||
|
strcpy(plan->execNode.epAddr[0].fqdn, "localhost");
|
||||||
|
plan->execNode.epAddr[0].port = 6030;
|
||||||
|
plan->execNode.nodeId = 2;
|
||||||
|
plan->execNode.numOfEps = 1;
|
||||||
|
|
||||||
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -157,11 +168,18 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
|
||||||
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
|
||||||
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
|
STaskInfo* pTaskInfo = taosArrayGet(pArray, i);
|
||||||
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
|
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
|
||||||
|
/*mDebug("subscribe convert ep %d %s %s %s %s %s\n", CEp.epSet.numOfEps, CEp.epSet.fqdn[0], CEp.epSet.fqdn[1], CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
|
||||||
CEp.vgId = pTaskInfo->addr.nodeId;
|
CEp.vgId = pTaskInfo->addr.nodeId;
|
||||||
|
CEp.qmsgLen = pTaskInfo->msg->contentLen;
|
||||||
|
CEp.qmsg = malloc(CEp.qmsgLen);
|
||||||
|
if (CEp.qmsg == NULL) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);
|
||||||
taosArrayPush(unassignedVg, &CEp);
|
taosArrayPush(unassignedVg, &CEp);
|
||||||
}
|
}
|
||||||
|
|
||||||
qDestroyQueryDag(pDag);
|
/*qDestroyQueryDag(pDag);*/
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,27 +196,33 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
|
||||||
};
|
};
|
||||||
strcpy(req.cgroup, pConsumer->cgroup);
|
strcpy(req.cgroup, pConsumer->cgroup);
|
||||||
strcpy(req.topicName, pTopic->name);
|
strcpy(req.topicName, pTopic->name);
|
||||||
strcpy(req.sql, pTopic->sql);
|
req.sql = pTopic->sql;
|
||||||
strcpy(req.logicalPlan, pTopic->logicalPlan);
|
req.logicalPlan = pTopic->logicalPlan;
|
||||||
strcpy(req.physicalPlan, pTopic->physicalPlan);
|
req.physicalPlan = pTopic->physicalPlan;
|
||||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
||||||
void *reqStr = malloc(tlen);
|
void *buf = malloc(sizeof(SMsgHead) + tlen);
|
||||||
if (reqStr == NULL) {
|
if (buf == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
void *abuf = reqStr;
|
|
||||||
|
SMsgHead* pMsgHead = (SMsgHead*)buf;
|
||||||
|
|
||||||
|
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
|
||||||
|
pMsgHead->vgId = htonl(vgId);
|
||||||
|
|
||||||
|
void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
tEncodeSMqSetCVgReq(&abuf, &req);
|
tEncodeSMqSetCVgReq(&abuf, &req);
|
||||||
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
action.epSet = mndGetVgroupEpset(pMnode, pVgObj);
|
||||||
action.pCont = reqStr;
|
action.pCont = buf;
|
||||||
action.contLen = tlen;
|
action.contLen = tlen;
|
||||||
action.msgType = TDMT_VND_MQ_SET_CONN;
|
action.msgType = TDMT_VND_MQ_SET_CONN;
|
||||||
|
|
||||||
mndReleaseVgroup(pMnode, pVgObj);
|
mndReleaseVgroup(pMnode, pVgObj);
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
free(reqStr);
|
free(buf);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -208,19 +232,18 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
|
||||||
void mndCleanupSubscribe(SMnode *pMnode) {}
|
void mndCleanupSubscribe(SMnode *pMnode) {}
|
||||||
|
|
||||||
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
|
static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
|
||||||
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
|
int32_t tlen = tEncodeSubscribeObj(NULL, pSub);
|
||||||
int32_t size = tlen + MND_SUBSCRIBE_RESERVE_SIZE;
|
int32_t size = sizeof(int32_t) + tlen + MND_SUBSCRIBE_RESERVE_SIZE;
|
||||||
|
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto SUB_ENCODE_OVER;
|
if (pRaw == NULL) goto SUB_ENCODE_OVER;
|
||||||
|
|
||||||
void *buf = malloc(tlen);
|
void *buf = malloc(tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) goto SUB_ENCODE_OVER;
|
||||||
goto SUB_ENCODE_OVER;
|
|
||||||
}
|
|
||||||
void *abuf = buf;
|
|
||||||
|
|
||||||
tEncodeSubscribeObj(&buf, pSub);
|
void *abuf = buf;
|
||||||
|
tEncodeSubscribeObj(&abuf, pSub);
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER);
|
||||||
|
@ -228,6 +251,8 @@ static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) {
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
|
SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER);
|
||||||
SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
|
SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER);
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SUB_ENCODE_OVER:
|
SUB_ENCODE_OVER:
|
||||||
if (terrno != 0) {
|
if (terrno != 0) {
|
||||||
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
|
mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr());
|
||||||
|
@ -259,9 +284,9 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
||||||
|
|
||||||
int32_t dataPos = 0;
|
int32_t dataPos = 0;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
|
||||||
void *buf = malloc(tlen + 1);
|
void *buf = malloc(tlen + 1);
|
||||||
if (buf == NULL) goto SUB_DECODE_OVER;
|
if (buf == NULL) goto SUB_DECODE_OVER;
|
||||||
SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER);
|
|
||||||
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER);
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER);
|
||||||
|
|
||||||
|
@ -269,8 +294,10 @@ static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) {
|
||||||
goto SUB_DECODE_OVER;
|
goto SUB_DECODE_OVER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
SUB_DECODE_OVER:
|
SUB_DECODE_OVER:
|
||||||
if (terrno != 0) {
|
if (terrno != TSDB_CODE_SUCCESS) {
|
||||||
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
|
mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr());
|
||||||
// TODO free subscribeobj
|
// TODO free subscribeobj
|
||||||
tfree(pRow);
|
tfree(pRow);
|
||||||
|
@ -379,10 +406,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
|
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
|
||||||
j++;
|
j++;
|
||||||
} else if (j >= oldTopicNum) {
|
} else if (j >= oldTopicNum) {
|
||||||
newTopicName = taosArrayGet(newSub, i);
|
newTopicName = taosArrayGetP(newSub, i);
|
||||||
i++;
|
i++;
|
||||||
} else {
|
} else {
|
||||||
newTopicName = taosArrayGet(newSub, i);
|
newTopicName = taosArrayGetP(newSub, i);
|
||||||
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
|
oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name;
|
||||||
|
|
||||||
int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
|
int comp = compareLenPrefixedStr(newTopicName, oldTopicName);
|
||||||
|
@ -466,6 +493,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
char* key = mndMakeSubscribeKey(consumerGroup, newTopicName);
|
||||||
|
strcpy(pSub->key, key);
|
||||||
// set unassigned vg
|
// set unassigned vg
|
||||||
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
|
mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg);
|
||||||
//TODO: disable alter
|
//TODO: disable alter
|
||||||
|
@ -486,7 +515,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
||||||
/*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/
|
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||||
mndTransAppendRedolog(pTrans, pRaw);
|
mndTransAppendRedolog(pTrans, pRaw);
|
||||||
#if 0
|
#if 0
|
||||||
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
|
||||||
|
@ -519,8 +548,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
mndTransAppendRedolog(pTrans, pTopicRaw);
|
mndTransAppendRedolog(pTrans, pTopicRaw);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
/*mndReleaseTopic(pMnode, pTopic);*/
|
||||||
mndReleaseSubscribe(pMnode, pSub);
|
/*mndReleaseSubscribe(pMnode, pSub);*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// part3. persist consumerObj
|
// part3. persist consumerObj
|
||||||
|
|
|
@ -60,7 +60,9 @@ void mndCleanupTopic(SMnode *pMnode) {}
|
||||||
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
|
||||||
int32_t size = sizeof(SMqTopicObj) + MND_TOPIC_RESERVE_SIZE;
|
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
|
||||||
|
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
|
||||||
|
int32_t size = sizeof(SMqTopicObj) + logicalPlanLen + physicalPlanLen + pTopic->sqlLen + MND_TOPIC_RESERVE_SIZE;
|
||||||
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
|
SSdbRaw *pRaw = sdbAllocRaw(SDB_TOPIC, MND_TOPIC_VER_NUMBER, size);
|
||||||
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
|
if (pRaw == NULL) goto TOPIC_ENCODE_OVER;
|
||||||
|
|
||||||
|
@ -74,12 +76,10 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->version, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
int32_t logicalPlanLen = strlen(pTopic->logicalPlan) + 1;
|
SDB_SET_INT32(pRaw, dataPos, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->logicalPlan)+1, TOPIC_ENCODE_OVER);
|
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1;
|
SDB_SET_INT32(pRaw, dataPos, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER);
|
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
SDB_SET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_ENCODE_OVER);
|
||||||
|
@ -135,7 +135,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto TOPIC_DECODE_OVER;
|
goto TOPIC_DECODE_OVER;
|
||||||
}
|
}
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER);
|
||||||
pTopic->physicalPlan = calloc(len + 1, sizeof(char));
|
pTopic->physicalPlan = calloc(len + 1, sizeof(char));
|
||||||
|
@ -144,7 +144,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
goto TOPIC_DECODE_OVER;
|
goto TOPIC_DECODE_OVER;
|
||||||
}
|
}
|
||||||
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER);
|
SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
|
@ -231,6 +231,7 @@ static int32_t mndCheckCreateTopicMsg(SCMCreateTopicReq *creattopReq) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq *pCreate, SDbObj *pDb) {
|
||||||
|
mDebug("topic:%s to create", pCreate->name);
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
tstrncpy(topicObj.name, pCreate->name, TSDB_TOPIC_FNAME_LEN);
|
||||||
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
|
||||||
|
@ -273,7 +274,7 @@ static int32_t mndProcessCreateTopicMsg(SMnodeMsg *pMsg) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
|
terrno = TSDB_CODE_MND_TOPIC_ALREADY_EXIST;
|
||||||
mError("db:%s, failed to create since %s", createTopicReq.name, terrstr());
|
mError("topic:%s, failed to create since already exists", createTopicReq.name);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -319,7 +319,7 @@ int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp);
|
||||||
int32_t tqProcessSetConnReq(STQ* pTq, char* msg);
|
int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -783,7 +783,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
int32_t tqProcessSetConnReq(STQ* pTq, char* msg, SRpcMsg** ppRsp) {
|
||||||
SMqSetCVgReq req;
|
SMqSetCVgReq req;
|
||||||
tDecodeSMqSetCVgReq(msg, &req);
|
tDecodeSMqSetCVgReq(msg, &req);
|
||||||
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
|
||||||
|
@ -799,9 +799,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
strcpy(pTopic->topicName, req.topicName);
|
strcpy(pTopic->topicName, req.topicName);
|
||||||
strcpy(pTopic->sql, req.sql);
|
pTopic->sql = strdup(req.sql);
|
||||||
strcpy(pTopic->logicalPlan, req.logicalPlan);
|
pTopic->logicalPlan = strdup(req.logicalPlan);
|
||||||
strcpy(pTopic->physicalPlan, req.physicalPlan);
|
pTopic->physicalPlan = strdup(req.physicalPlan);
|
||||||
|
|
||||||
pTopic->buffer.firstOffset = -1;
|
pTopic->buffer.firstOffset = -1;
|
||||||
pTopic->buffer.lastOffset = -1;
|
pTopic->buffer.lastOffset = -1;
|
||||||
|
@ -811,9 +811,10 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
|
||||||
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
|
||||||
pTopic->buffer.output[i].status = 0;
|
pTopic->buffer.output[i].status = 0;
|
||||||
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pMeta);
|
||||||
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.msg, pReadHandle);
|
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(&req.qmsg, pReadHandle);
|
||||||
}
|
}
|
||||||
taosArrayPush(pConsumer->topics, pTopic);
|
taosArrayPush(pConsumer->topics, pTopic);
|
||||||
|
terrno = TSDB_CODE_SUCCESS;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -826,7 +827,7 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||||
pReadHandle->pMsg = NULL;
|
pReadHandle->pMsg = NULL;
|
||||||
pReadHandle->ver = -1;
|
pReadHandle->ver = -1;
|
||||||
pReadHandle->pColIdList = NULL;
|
pReadHandle->pColIdList = NULL;
|
||||||
return NULL;
|
return pReadHandle;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
|
void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ver) {
|
||||||
|
|
|
@ -115,7 +115,8 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case TDMT_VND_MQ_SET_CONN: {
|
case TDMT_VND_MQ_SET_CONN: {
|
||||||
if (tqProcessSetConnReq(pVnode->pTq, ptr) < 0) {
|
if (tqProcessSetConnReq(pVnode->pTq, POINTER_SHIFT(ptr, sizeof(SMsgHead)), NULL) < 0) {
|
||||||
|
// TODO: handle error
|
||||||
}
|
}
|
||||||
} break;
|
} break;
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -60,8 +60,8 @@ int32_t qSetStreamInput(qTaskInfo_t tinfo, void* input) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle) {
|
qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) {
|
||||||
if (pMsg == NULL || streamReadHandle == NULL) {
|
if (msg == NULL || streamReadHandle == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +74,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(SSubQueryMsg* pMsg, void* streamReadHandle
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
struct SSubplan* plan = NULL;
|
struct SSubplan* plan = NULL;
|
||||||
int32_t code = qStringToSubplan(pMsg->msg, &plan);
|
int32_t code = qStringToSubplan(msg, &plan);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
terrno = code;
|
terrno = code;
|
||||||
return NULL;
|
return NULL;
|
||||||
|
|
|
@ -87,6 +87,7 @@ static bool fromObjectWithAlloc(const cJSON* json, const char* name, FFromJson f
|
||||||
static const char* jkPnodeType = "Type";
|
static const char* jkPnodeType = "Type";
|
||||||
static int32_t getPnodeTypeSize(cJSON* json) {
|
static int32_t getPnodeTypeSize(cJSON* json) {
|
||||||
switch (getNumber(json, jkPnodeType)) {
|
switch (getNumber(json, jkPnodeType)) {
|
||||||
|
case OP_StreamScan:
|
||||||
case OP_TableScan:
|
case OP_TableScan:
|
||||||
case OP_DataBlocksOptScan:
|
case OP_DataBlocksOptScan:
|
||||||
case OP_TableSeqScan:
|
case OP_TableSeqScan:
|
||||||
|
@ -869,6 +870,7 @@ static bool specificPhyNodeFromJson(const cJSON* json, void* obj) {
|
||||||
SPhyNode* phyNode = (SPhyNode*)obj;
|
SPhyNode* phyNode = (SPhyNode*)obj;
|
||||||
switch (phyNode->info.type) {
|
switch (phyNode->info.type) {
|
||||||
case OP_TableScan:
|
case OP_TableScan:
|
||||||
|
case OP_StreamScan:
|
||||||
case OP_DataBlocksOptScan:
|
case OP_DataBlocksOptScan:
|
||||||
case OP_TableSeqScan:
|
case OP_TableSeqScan:
|
||||||
return tableScanNodeFromJson(json, obj);
|
return tableScanNodeFromJson(json, obj);
|
||||||
|
@ -1189,14 +1191,14 @@ SQueryDag* qJsonToDag(const cJSON* pRoot) {
|
||||||
if(pDag == NULL) {
|
if(pDag == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "numOfSubplans"));
|
pDag->numOfSubplans = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "Number"));
|
||||||
pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "queryId"));
|
pDag->queryId = cJSON_GetNumberValue(cJSON_GetObjectItem(pRoot, "QueryId"));
|
||||||
pDag->pSubplans = taosArrayInit(0, sizeof(SArray));
|
pDag->pSubplans = taosArrayInit(0, sizeof(SArray));
|
||||||
if (pDag->pSubplans == NULL) {
|
if (pDag->pSubplans == NULL) {
|
||||||
free(pDag);
|
free(pDag);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
cJSON* pLevels = cJSON_GetObjectItem(pRoot, "pSubplans");
|
cJSON* pLevels = cJSON_GetObjectItem(pRoot, "Subplans");
|
||||||
int level = cJSON_GetArraySize(pLevels);
|
int level = cJSON_GetArraySize(pLevels);
|
||||||
for(int i = 0; i < level; i++) {
|
for(int i = 0; i < level; i++) {
|
||||||
SArray* plansOneLevel = taosArrayInit(0, sizeof(void*));
|
SArray* plansOneLevel = taosArrayInit(0, sizeof(void*));
|
||||||
|
|
|
@ -1490,7 +1490,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) {
|
||||||
|
|
||||||
SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;
|
SSubQueryMsg *pMsg = (SSubQueryMsg*) msg;
|
||||||
|
|
||||||
pMsg->header.vgId = htonl(tInfo.addr.nodeId);
|
pMsg->header.vgId = tInfo.addr.nodeId;
|
||||||
|
|
||||||
pMsg->sId = schMgmt.sId;
|
pMsg->sId = schMgmt.sId;
|
||||||
pMsg->queryId = plan->id.queryId;
|
pMsg->queryId = plan->id.queryId;
|
||||||
|
|
Loading…
Reference in New Issue