Merge pull request #10022 from taosdata/feature/tq

Feature/tq
This commit is contained in:
Liu Jicong 2022-01-26 10:18:49 +08:00 committed by GitHub
commit 297ddd2457
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 218 additions and 138 deletions

View File

@ -1533,9 +1533,7 @@ typedef struct SMqSetCVgReq {
char* sql; char* sql;
char* logicalPlan; char* logicalPlan;
char* physicalPlan; char* physicalPlan;
uint32_t qmsgLen; char* qmsg;
void* qmsg;
//SSubQueryMsg msg;
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) { static FORCE_INLINE int32_t tEncodeSSubQueryMsg(void** buf, const SSubQueryMsg* pMsg) {
@ -1567,7 +1565,6 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen += taosEncodeString(buf, pReq->sql); tlen += taosEncodeString(buf, pReq->sql);
tlen += taosEncodeString(buf, pReq->logicalPlan); tlen += taosEncodeString(buf, pReq->logicalPlan);
tlen += taosEncodeString(buf, pReq->physicalPlan); tlen += taosEncodeString(buf, pReq->physicalPlan);
tlen += taosEncodeFixedU32(buf, pReq->qmsgLen);
tlen += taosEncodeString(buf, (char*)pReq->qmsg); tlen += taosEncodeString(buf, (char*)pReq->qmsg);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg); //tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return tlen; return tlen;
@ -1582,7 +1579,6 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf = taosDecodeString(buf, &pReq->sql); buf = taosDecodeString(buf, &pReq->sql);
buf = taosDecodeString(buf, &pReq->logicalPlan); buf = taosDecodeString(buf, &pReq->logicalPlan);
buf = taosDecodeString(buf, &pReq->physicalPlan); buf = taosDecodeString(buf, &pReq->physicalPlan);
buf = taosDecodeFixedU32(buf, &pReq->qmsgLen);
buf = taosDecodeString(buf, (char**)&pReq->qmsg); buf = taosDecodeString(buf, (char**)&pReq->qmsg);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg); //buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return buf; return buf;
@ -1600,37 +1596,41 @@ typedef struct SMqColData {
int16_t colId; int16_t colId;
int16_t type; int16_t type;
int16_t bytes; int16_t bytes;
char data[]; } SMqColMeta;
} SMqColData;
typedef struct SMqTbData { typedef struct SMqTbData {
int64_t uid; int64_t uid;
int32_t numOfCols;
int32_t numOfRows; int32_t numOfRows;
SMqColData colData[]; char colData[];
} SMqTbData; } SMqTbData;
typedef struct SMqTopicBlk { typedef struct SMqTopicBlk {
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
int64_t committedOffset; int64_t committedOffset;
int64_t reqOffset; int64_t reqOffset;
int64_t rspOffset; int64_t rspOffset;
int32_t skipLogNum; int32_t skipLogNum;
int32_t bodyLen; int32_t bodyLen;
int32_t numOfTb; int32_t numOfTb;
SMqTbData tbData[]; SMqTbData* tbData;
} SMqTopicData; } SMqTopicData;
typedef struct SMqConsumeRsp { typedef struct SMqConsumeRsp {
int64_t reqId; int64_t consumerId;
int64_t consumerId; int32_t numOfCols;
int32_t bodyLen; SMqColMeta* meta;
int32_t numOfTopics; int32_t numOfTopics;
SMqTopicData data[]; SMqTopicData* data;
} SMqConsumeRsp; } SMqConsumeRsp;
static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) {
int32_t tlen = 0;
return tlen;
}
// one req for one vg+topic // one req for one vg+topic
typedef struct SMqConsumeReq { typedef struct SMqConsumeReq {
SMsgHead head;
//0: commit only, current offset //0: commit only, current offset
//1: consume only, poll next offset //1: consume only, poll next offset
//2: commit current and consume next offset //2: commit current and consume next offset
@ -1663,7 +1663,7 @@ typedef struct SMqCMGetSubEpRsp {
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) { static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI16(buf, pVgEp->vgId); tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet); tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
return tlen; return tlen;
} }

View File

@ -19,6 +19,7 @@
#include "tarray.h" #include "tarray.h"
#include "tdef.h" #include "tdef.h"
#include "tlog.h" #include "tlog.h"
#include "tmsg.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
@ -159,7 +160,7 @@ int32_t walAlter(SWal *, SWalCfg *pCfg);
void walClose(SWal *); void walClose(SWal *);
// write // write
int64_t walWrite(SWal *, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen); int64_t walWrite(SWal *, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen);
void walFsync(SWal *, bool force); void walFsync(SWal *, bool force);
// apis for lifecycle management // apis for lifecycle management

View File

@ -253,6 +253,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x03E4) #define TSDB_CODE_MND_INVALID_TOPIC_OPTION TAOS_DEF_ERROR_CODE(0, 0x03E4)
#define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5) #define TSDB_CODE_MND_TOPIC_OPTION_UNCHNAGED TAOS_DEF_ERROR_CODE(0, 0x03E5)
#define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6) #define TSDB_CODE_MND_NAME_CONFLICT_WITH_STB TAOS_DEF_ERROR_CODE(0, 0x03E6)
#define TSDB_CODE_MND_CONSUMER_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x03E7)
// dnode // dnode
#define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400) #define TSDB_CODE_DND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0400)

View File

@ -328,6 +328,7 @@ struct tmq_t {
char clientId[256]; char clientId[256];
int64_t consumerId; int64_t consumerId;
int64_t status; int64_t status;
tsem_t rspSem;
STscObj* pTscObj; STscObj* pTscObj;
tmq_commit_cb* commit_cb; tmq_commit_cb* commit_cb;
int32_t nextTopicIdx; int32_t nextTopicIdx;
@ -344,6 +345,7 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err
strcpy(pTmq->clientId, conf->clientId); strcpy(pTmq->clientId, conf->clientId);
strcpy(pTmq->groupId, conf->groupId); strcpy(pTmq->groupId, conf->groupId);
pTmq->commit_cb = conf->commit_cb; pTmq->commit_cb = conf->commit_cb;
tsem_init(&pTmq->rspSem, 0, 0);
pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1); pTmq->consumerId = generateRequestId() & ((uint64_t)-1 >> 1);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
return pTmq; return pTmq;
@ -372,11 +374,27 @@ int32_t tmq_list_append(tmq_list_t* ptr, char* src) {
} }
int32_t tmq_null_cb(void* param, const SDataBuf* pMsg, int32_t code) {
if (code == 0) {
//
}
//
return 0;
}
TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) { TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
SRequestObj *pRequest = NULL; SRequestObj *pRequest = NULL;
tmq->status = 1;
int32_t sz = topic_list->cnt; int32_t sz = topic_list->cnt;
tmq->clientTopics = taosArrayInit(sz, sizeof(void*)); //destroy ex
taosArrayDestroy(tmq->clientTopics);
tmq->clientTopics = taosArrayInit(sz, sizeof(SMqClientTopic));
SCMSubscribeReq req;
req.topicNum = sz;
req.consumerId = tmq->consumerId;
req.consumerGroup = strdup(tmq->groupId);
req.topicNames = taosArrayInit(sz, sizeof(void*));
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
char* topicName = topic_list->elems[i]; char* topicName = topic_list->elems[i];
@ -391,16 +409,21 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
} }
tNameExtractFullName(&name, topicFname); tNameExtractFullName(&name, topicFname);
tscDebug("subscribe topic: %s", topicFname); tscDebug("subscribe topic: %s", topicFname);
taosArrayPush(tmq->clientTopics, &topicFname); SMqClientTopic topic = {
.nextVgIdx = 0,
.sql = NULL,
.sqlLen = 0,
.topicId = 0,
.topicName = topicFname,
.vgs = NULL
};
topic.vgs = taosArrayInit(0, sizeof(SMqClientVg));
taosArrayPush(tmq->clientTopics, &topic);
/*SMqClientTopic topic = {*/ /*SMqClientTopic topic = {*/
/*.*/ /*.*/
/*};*/ /*};*/
taosArrayPush(req.topicNames, &topicFname);
} }
SCMSubscribeReq req;
req.topicNum = taosArrayGetSize(tmq->clientTopics);
req.consumerId = tmq->consumerId;
req.consumerGroup = strdup(tmq->groupId);
req.topicNames = tmq->clientTopics;
int tlen = tSerializeSCMSubscribeReq(NULL, &req); int tlen = tSerializeSCMSubscribeReq(NULL, &req);
void* buf = malloc(tlen); void* buf = malloc(tlen);
@ -419,18 +442,19 @@ TAOS_RES* tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
/*sendInfo->fp*/
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
_return: _return:
if (body != NULL) { /*if (sendInfo != NULL) {*/
destroySendMsgInfo(body); /*destroySendMsgInfo(sendInfo);*/
} /*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno; pRequest->code = terrno;
@ -569,19 +593,19 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen };
pRequest->type = TDMT_MND_CREATE_TOPIC; pRequest->type = TDMT_MND_CREATE_TOPIC;
SMsgSendInfo* body = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp); SEpSet epSet = getEpSet_s(&pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, body); asyncSendMsgToServer(pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&pRequest->body.rspSem);
_return: _return:
qDestroyQuery(pQueryNode); qDestroyQuery(pQueryNode);
if (body != NULL) { /*if (sendInfo != NULL) {*/
destroySendMsgInfo(body); /*destroySendMsgInfo(sendInfo);*/
} /*}*/
if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) {
pRequest->code = terrno; pRequest->code = terrno;
@ -602,6 +626,12 @@ int32_t tmq_poll_cb_inner(void* param, const SDataBuf* pMsg, int32_t code) {
int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) { int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
tmq_t* tmq = (tmq_t*)param; tmq_t* tmq = (tmq_t*)param;
if (code != 0) {
tsem_post(&tmq->rspSem);
return 0;
}
tscDebug("tmq ask ep cb called");
bool set = false;
SMqCMGetSubEpRsp rsp; SMqCMGetSubEpRsp rsp;
tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp); tDecodeSMqCMGetSubEpRsp(pMsg->pData, &rsp);
int32_t sz = taosArrayGetSize(rsp.topics); int32_t sz = taosArrayGetSize(rsp.topics);
@ -616,38 +646,35 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
for (int32_t j = 0; j < vgSz; j++) { for (int32_t j = 0; j < vgSz; j++) {
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j); SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
SMqClientVg clientVg = { SMqClientVg clientVg = {
.pollCnt = 0,
.committedOffset = -1,
.currentOffset = -1,
.vgId = pVgEp->vgId, .vgId = pVgEp->vgId,
.epSet = pVgEp->epSet .epSet = pVgEp->epSet
}; };
taosArrayPush(topic.vgs, &clientVg); taosArrayPush(topic.vgs, &clientVg);
set = true;
} }
taosArrayPush(tmq->clientTopics, &topic); taosArrayPush(tmq->clientTopics, &topic);
} }
if(set) tmq->status = 1;
// unlock // unlock
tsem_post(&tmq->rspSem);
return 0; return 0;
} }
tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) { tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
if (tmq->clientTopics == NULL || taosArrayGetSize(tmq->clientTopics) == 0) {
return NULL;
}
SRequestObj *pRequest = NULL;
SMqConsumeReq req = {0};
req.reqType = 1;
req.blockingTime = blocking_time;
req.consumerId = tmq->consumerId;
tmq_message_t* tmq_message = NULL;
strcpy(req.cgroup, tmq->groupId);
if (taosArrayGetSize(tmq->clientTopics) == 0) { if (taosArrayGetSize(tmq->clientTopics) == 0 || tmq->status == 0) {
int32_t tlen = sizeof(SMqCMGetSubEpReq); int32_t tlen = sizeof(SMqCMGetSubEpReq);
SMqCMGetSubEpReq* buf = malloc(tlen); SMqCMGetSubEpReq* buf = malloc(tlen);
if (buf == NULL) { if (buf == NULL) {
tscError("failed to malloc get subscribe ep buf"); tscError("failed to malloc get subscribe ep buf");
} }
buf->consumerId = htobe64(buf->consumerId); buf->consumerId = htobe64(tmq->consumerId);
strcpy(buf->cgroup, tmq->groupId);
pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP); SRequestObj *pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_MND_GET_SUB_EP);
if (pRequest == NULL) { if (pRequest == NULL) {
tscError("failed to malloc subscribe ep request"); tscError("failed to malloc subscribe ep request");
} }
@ -664,24 +691,39 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
tsem_wait(&pRequest->body.rspSem); tsem_wait(&tmq->rspSem);
} }
SMqClientTopic* pTopic = taosArrayGetP(tmq->clientTopics, tmq->nextTopicIdx); if (taosArrayGetSize(tmq->clientTopics) == 0) {
tscDebug("consumer:%ld poll but not assigned", tmq->consumerId);
return NULL;
}
SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq));
pReq->reqType = 1;
pReq->blockingTime = blocking_time;
pReq->consumerId = tmq->consumerId;
tmq_message_t* tmq_message = NULL;
strcpy(pReq->cgroup, tmq->groupId);
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, tmq->nextTopicIdx);
tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics); tmq->nextTopicIdx = (tmq->nextTopicIdx + 1) % taosArrayGetSize(tmq->clientTopics);
strcpy(req.topic, pTopic->topicName); strcpy(pReq->topic, pTopic->topicName);
int32_t nextVgIdx = pTopic->nextVgIdx; int32_t nextVgIdx = pTopic->nextVgIdx;
pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs); pTopic->nextVgIdx = (nextVgIdx + 1) % taosArrayGetSize(pTopic->vgs);
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx); SMqClientVg* pVg = taosArrayGet(pTopic->vgs, nextVgIdx);
req.offset = pVg->currentOffset; pReq->offset = pVg->currentOffset;
pRequest->body.requestMsg = (SDataBuf){ .pData = &req, .len = sizeof(SMqConsumeReq) }; pReq->head.vgId = htonl(pVg->vgId);
pRequest->type = TDMT_VND_CONSUME; pReq->head.contLen = htonl(sizeof(SMqConsumeReq));
SRequestObj* pRequest = createRequest(tmq->pTscObj, NULL, NULL, TDMT_VND_CONSUME);
pRequest->body.requestMsg = (SDataBuf){ .pData = pReq, .len = sizeof(SMqConsumeReq) };
SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest); SMsgSendInfo* sendInfo = buildMsgInfoImpl(pRequest);
sendInfo->requestObjRefId = 0; /*sendInfo->requestObjRefId = 0;*/
sendInfo->param = &tmq_message; /*sendInfo->param = &tmq_message;*/
sendInfo->fp = tmq_poll_cb_inner; /*sendInfo->fp = tmq_poll_cb_inner;*/
int64_t transporterId = 0; int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);

View File

@ -536,7 +536,7 @@ TEST(testCase, create_topic_Test) {
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
} }
taos_free_result(pRes); //taos_free_result(pRes);
TAOS_FIELD* pFields = taos_fetch_fields(pRes); TAOS_FIELD* pFields = taos_fetch_fields(pRes);
ASSERT_TRUE(pFields == nullptr); ASSERT_TRUE(pFields == nullptr);
@ -570,30 +570,32 @@ TEST(testCase, create_topic_Test) {
//taos_close(pConn); //taos_close(pConn);
//} //}
//TEST(testCase, tmq_subscribe_Test) { #if 0
//TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); TEST(testCase, tmq_subscribe_Test) {
//assert(pConn != NULL); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
//TAOS_RES* pRes = taos_query(pConn, "use abc1"); TAOS_RES* pRes = taos_query(pConn, "use abc1");
//if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
//printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
//} }
//taos_free_result(pRes); taos_free_result(pRes);
//tmq_conf_t* conf = tmq_conf_new(); tmq_conf_t* conf = tmq_conf_new();
//tmq_conf_set(conf, "group.id", "tg1"); tmq_conf_set(conf, "group.id", "tg1");
//tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0); tmq_t* tmq = taos_consumer_new(pConn, conf, NULL, 0);
//tmq_list_t* topic_list = tmq_list_new(); tmq_list_t* topic_list = tmq_list_new();
//tmq_list_append(topic_list, "test_topic_1"); tmq_list_append(topic_list, "test_topic_1");
//tmq_subscribe(tmq, topic_list); tmq_subscribe(tmq, topic_list);
//while (1) { while (1) {
//tmq_message_t* msg = tmq_consume_poll(tmq, 0); tmq_message_t* msg = tmq_consume_poll(tmq, 0);
//printf("get msg\n"); printf("get msg\n");
//if (msg == NULL) break; //if (msg == NULL) break;
//} }
//} }
#endif
TEST(testCase, tmq_consume_Test) { TEST(testCase, tmq_consume_Test) {
} }

View File

@ -149,6 +149,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) {
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_SHOW_TABLES_FETCH)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CONN)] = dndProcessVnodeWriteMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_VND_MQ_SET_CUR)] = dndProcessVnodeFetchMsg;
pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CONSUME)] = dndProcessVnodeFetchMsg;
} }
static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) { static void dndProcessResponse(void *parent, SRpcMsg *pRsp, SEpSet *pEpSet) {

View File

@ -363,9 +363,7 @@ typedef struct SMqConsumerEp {
int64_t consumerId; // -1 for unassigned int64_t consumerId; // -1 for unassigned
int64_t lastConsumerHbTs; int64_t lastConsumerHbTs;
int64_t lastVgHbTs; int64_t lastVgHbTs;
uint32_t qmsgLen;
char* qmsg; char* qmsg;
//SSubQueryMsg qExec;
} SMqConsumerEp; } SMqConsumerEp;
static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) {
@ -374,9 +372,10 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pCon
tlen += taosEncodeFixedI32(buf, pConsumerEp->status); tlen += taosEncodeFixedI32(buf, pConsumerEp->status);
tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet); tlen += taosEncodeSEpSet(buf, &pConsumerEp->epSet);
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
tlen += taosEncodeFixedI64(buf, pConsumerEp->lastConsumerHbTs);
tlen += taosEncodeFixedI64(buf, pConsumerEp->lastVgHbTs);
//tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec); //tlen += tEncodeSSubQueryMsg(buf, &pConsumerEp->qExec);
tlen += taosEncodeFixedU32(buf, pConsumerEp->qmsgLen); tlen += taosEncodeString(buf, pConsumerEp->qmsg);
tlen += taosEncodeBinary(buf, pConsumerEp->qmsg, pConsumerEp->qmsgLen);
return tlen; return tlen;
} }
@ -385,9 +384,10 @@ static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsu
buf = taosDecodeFixedI32(buf, &pConsumerEp->status); buf = taosDecodeFixedI32(buf, &pConsumerEp->status);
buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet); buf = taosDecodeSEpSet(buf, &pConsumerEp->epSet);
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
buf = taosDecodeFixedI64(buf, &pConsumerEp->lastConsumerHbTs);
buf = taosDecodeFixedI64(buf, &pConsumerEp->lastVgHbTs);
//buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec); //buf = tDecodeSSubQueryMsg(buf, &pConsumerEp->qExec);
buf = taosDecodeFixedU32(buf, &pConsumerEp->qmsgLen); buf = taosDecodeString(buf, &pConsumerEp->qmsg);
buf = taosDecodeBinary(buf, (void**)&pConsumerEp->qmsg, pConsumerEp->qmsgLen);
return buf; return buf;
} }
@ -423,18 +423,27 @@ static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() {
free(pSub); free(pSub);
return NULL; return NULL;
} }
pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); pSub->lostConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) { if (pSub->lostConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer); taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->idleConsumer); taosArrayDestroy(pSub->assigned);
free(pSub);
return NULL;
}
pSub->idleConsumer = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->idleConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
free(pSub); free(pSub);
return NULL; return NULL;
} }
pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) { if (pSub->unassignedVg == NULL) {
taosArrayDestroy(pSub->availConsumer); taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
taosArrayDestroy(pSub->idleConsumer); taosArrayDestroy(pSub->idleConsumer);
taosArrayDestroy(pSub->unassignedVg);
free(pSub); free(pSub);
return NULL; return NULL;
} }
@ -461,6 +470,13 @@ static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeOb
tlen += tEncodeSMqConsumerEp(buf, pCEp); tlen += tEncodeSMqConsumerEp(buf, pCEp);
} }
sz = taosArrayGetSize(pSub->lostConsumer);
tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp* pCEp = taosArrayGet(pSub->lostConsumer, i);
tlen += tEncodeSMqConsumerEp(buf, pCEp);
}
sz = taosArrayGetSize(pSub->idleConsumer); sz = taosArrayGetSize(pSub->idleConsumer);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
@ -485,20 +501,47 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
int32_t sz; int32_t sz;
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->assigned = taosArrayInit(sz, sizeof(int64_t)); pSub->availConsumer = taosArrayInit(sz, sizeof(int64_t));
if (pSub->assigned == NULL) { if (pSub->availConsumer == NULL) {
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
int64_t consumerId; int64_t consumerId;
buf = taosDecodeFixedI64(buf, &consumerId); buf = taosDecodeFixedI64(buf, &consumerId);
taosArrayPush(pSub->assigned, &consumerId); taosArrayPush(pSub->availConsumer, &consumerId);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->assigned = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->assigned == NULL) {
taosArrayDestroy(pSub->availConsumer);
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->assigned, &cEp);
}
buf = taosDecodeFixedI32(buf, &sz);
pSub->lostConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->lostConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned);
return NULL;
}
for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp cEp;
buf = tDecodeSMqConsumerEp(buf, &cEp);
taosArrayPush(pSub->lostConsumer, &cEp);
} }
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp)); pSub->idleConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->idleConsumer == NULL) { if (pSub->idleConsumer == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned); taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
return NULL; return NULL;
} }
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
@ -507,10 +550,13 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
taosArrayPush(pSub->idleConsumer, &cEp); taosArrayPush(pSub->idleConsumer, &cEp);
} }
buf = taosDecodeFixedI32(buf, &sz); buf = taosDecodeFixedI32(buf, &sz);
pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp));
if (pSub->unassignedVg == NULL) { if (pSub->unassignedVg == NULL) {
taosArrayDestroy(pSub->availConsumer);
taosArrayDestroy(pSub->assigned); taosArrayDestroy(pSub->assigned);
taosArrayDestroy(pSub->lostConsumer);
taosArrayDestroy(pSub->idleConsumer); taosArrayDestroy(pSub->idleConsumer);
return NULL; return NULL;
} }
@ -580,6 +626,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeString(buf, pConsumerTopic->name); tlen += taosEncodeString(buf, pConsumerTopic->name);
tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch); tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch);
ASSERT(pConsumerTopic->pVgInfo);
int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo);
tlen += taosEncodeFixedI32(buf, sz); tlen += taosEncodeFixedI32(buf, sz);
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {

View File

@ -66,13 +66,13 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;
SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->pCont; SMqCMGetSubEpReq *pReq = (SMqCMGetSubEpReq *)pMsg->rpcMsg.pCont;
SMqCMGetSubEpRsp rsp; SMqCMGetSubEpRsp rsp;
int64_t consumerId = be64toh(pReq->consumerId); int64_t consumerId = be64toh(pReq->consumerId);
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId); SMqConsumerObj *pConsumer = mndAcquireConsumer(pMsg->pMnode, consumerId);
if (pConsumer == NULL) { if (pConsumer == NULL) {
/*terrno = */ terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
return -1; return -1;
} }
ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0); ASSERT(strcmp(pReq->cgroup, pConsumer->cgroup) == 0);
@ -91,9 +91,13 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
int32_t assignedSz = taosArrayGetSize(pSub->assigned); int32_t assignedSz = taosArrayGetSize(pSub->assigned);
topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp)); topicEp.vgs = taosArrayInit(assignedSz, sizeof(SMqSubVgEp));
for (int32_t j = 0; j < assignedSz; j++) { for (int32_t j = 0; j < assignedSz; j++) {
SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, i); SMqConsumerEp *pCEp = taosArrayGet(pSub->assigned, j);
if (pCEp->consumerId == consumerId) { if (pCEp->consumerId == consumerId) {
taosArrayPush(pSub->assigned, pCEp); SMqSubVgEp vgEp = {
.epSet = pCEp->epSet,
.vgId = pCEp->vgId
};
taosArrayPush(topicEp.vgs, &vgEp);
} }
} }
if (taosArrayGetSize(topicEp.vgs) != 0) { if (taosArrayGetSize(topicEp.vgs) != 0) {
@ -101,7 +105,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
} }
} }
int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp); int32_t tlen = tEncodeSMqCMGetSubEpRsp(NULL, &rsp);
void *buf = malloc(tlen); void *buf = rpcMallocCont(tlen);
if (buf == NULL) { if (buf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1; return -1;
@ -161,8 +165,6 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
pReq->sql = strdup(pTopic->sql); pReq->sql = strdup(pTopic->sql);
pReq->logicalPlan = strdup(pTopic->logicalPlan); pReq->logicalPlan = strdup(pTopic->logicalPlan);
pReq->physicalPlan = strdup(pTopic->physicalPlan); pReq->physicalPlan = strdup(pTopic->physicalPlan);
pReq->qmsgLen = pCEp->qmsgLen;
/*memcpy(pReq->qmsg, pCEp->qmsg, pCEp->qmsgLen);*/
pReq->qmsg = strdup(pCEp->qmsg); pReq->qmsg = strdup(pCEp->qmsg);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq); int32_t tlen = tEncodeSMqSetCVgReq(NULL, pReq);
void *reqStr = malloc(tlen); void *reqStr = malloc(tlen);
@ -192,7 +194,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
if (mndTransPrepare(pMnode, pTrans) != 0) { if (mndTransPrepare(pMnode, pTrans) != 0) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
} }
mndReleaseTopic(pMnode, pTopic); /*mndReleaseTopic(pMnode, pTopic);*/
mndTransDrop(pTrans); mndTransDrop(pTrans);
} }
pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub);
@ -220,6 +222,7 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
for (int32_t i = 0; i < sz; i++) { for (int32_t i = 0; i < sz; i++) {
SMqConsumerEp CEp; SMqConsumerEp CEp;
CEp.status = 0; CEp.status = 0;
CEp.consumerId = -1;
CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1;
STaskInfo *pTaskInfo = taosArrayGet(pArray, i); STaskInfo *pTaskInfo = taosArrayGet(pArray, i);
tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr); tConvertQueryAddrToEpSet(&CEp.epSet, &pTaskInfo->addr);
@ -227,13 +230,6 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
* CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/ * CEp.epSet.fqdn[2], CEp.epSet.fqdn[3], CEp.epSet.fqdn[4]);*/
CEp.vgId = pTaskInfo->addr.nodeId; CEp.vgId = pTaskInfo->addr.nodeId;
CEp.qmsg = strdup(pTaskInfo->msg->msg); CEp.qmsg = strdup(pTaskInfo->msg->msg);
CEp.qmsgLen = strlen(CEp.qmsg) + 1;
printf("abc:\n%s\n", CEp.qmsg);
/*CEp.qmsg = malloc(CEp.qmsgLen);*/
/*if (CEp.qmsg == NULL) {*/
/*return -1;*/
/*}*/
/*memcpy(CEp.qmsg, pTaskInfo->msg->msg, pTaskInfo->msg->contentLen);*/
taosArrayPush(unassignedVg, &CEp); taosArrayPush(unassignedVg, &CEp);
} }
@ -257,8 +253,7 @@ static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsume
req.sql = pTopic->sql; req.sql = pTopic->sql;
req.logicalPlan = pTopic->logicalPlan; req.logicalPlan = pTopic->logicalPlan;
req.physicalPlan = pTopic->physicalPlan; req.physicalPlan = pTopic->physicalPlan;
req.qmsg = strdup(pCEp->qmsg); req.qmsg = pCEp->qmsg;
req.qmsgLen = strlen(req.qmsg);
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
void *buf = malloc(sizeof(SMsgHead) + tlen); void *buf = malloc(sizeof(SMsgHead) + tlen);
if (buf == NULL) { if (buf == NULL) {
@ -631,14 +626,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
if (newSub) taosArrayDestroy(newSub); if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer); /*mndReleaseConsumer(pMnode, pConsumer);*/
return -1; return -1;
} }
if (newSub) taosArrayDestroy(newSub); if (newSub) taosArrayDestroy(newSub);
mndTransDrop(pTrans); mndTransDrop(pTrans);
mndReleaseConsumer(pMnode, pConsumer); /*mndReleaseConsumer(pMnode, pConsumer);*/
return 0; return TSDB_CODE_MND_ACTION_IN_PROGRESS;
} }
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) { static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {

View File

@ -237,7 +237,7 @@ static int32_t mndCreateTopic(SMnode *pMnode, SMnodeMsg *pMsg, SCMCreateTopicReq
tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN); tstrncpy(topicObj.db, pDb->name, TSDB_DB_FNAME_LEN);
topicObj.createTime = taosGetTimestampMs(); topicObj.createTime = taosGetTimestampMs();
topicObj.updateTime = topicObj.createTime; topicObj.updateTime = topicObj.createTime;
topicObj.uid = mndGenerateUid(pCreate->name, TSDB_TABLE_FNAME_LEN); topicObj.uid = mndGenerateUid(pCreate->name, strlen(pCreate->name));
topicObj.dbUid = pDb->uid; topicObj.dbUid = pDb->uid;
topicObj.version = 1; topicObj.version = 1;
topicObj.sql = strdup(pCreate->sql); topicObj.sql = strdup(pCreate->sql);

View File

@ -679,6 +679,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
int rspLen = 0; int rspLen = 0;
STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId);
ASSERT(pConsumer);
int sz = taosArrayGetSize(pConsumer->topics); int sz = taosArrayGetSize(pConsumer->topics);
for (int i = 0; i < sz; i++) { for (int i = 0; i < sz; i++) {
@ -735,23 +736,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
break; break;
} }
if (pDataBlock != NULL) { if (pDataBlock != NULL) {
SMqTbData tbData = {
.uid = pDataBlock->info.uid,
.numOfCols = pDataBlock->info.numOfCols,
.numOfRows = pDataBlock->info.rows,
};
for (int i = 0; i < pDataBlock->info.numOfCols; i++) {
SColumnInfoData* pColData = taosArrayGet(pDataBlock->pDataBlock, i);
int32_t sz = pColData->info.bytes * pDataBlock->info.rows;
SMqColData colData = {
.bytes = pColData->info.bytes,
.colId = pColData->info.colId,
.type = pColData->info.type,
};
memcpy(colData.data, pColData->pData, colData.bytes * pDataBlock->info.rows);
memcpy(&tbData.colData[i], &colData, sz);
}
/*pDataBlock->info.*/
taosArrayPush(pRes, pDataBlock); taosArrayPush(pRes, pDataBlock);
} else { } else {
break; break;
@ -792,6 +776,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
} }
strcpy(pConsumer->cgroup, req.cgroup); strcpy(pConsumer->cgroup, req.cgroup);
pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle)); pConsumer->topics = taosArrayInit(0, sizeof(STqTopicHandle));
pConsumer->consumerId = req.newConsumerId;
pConsumer->epoch = 0;
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) { if (pTopic == NULL) {
@ -802,6 +788,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->sql = strdup(req.sql); pTopic->sql = strdup(req.sql);
pTopic->logicalPlan = strdup(req.logicalPlan); pTopic->logicalPlan = strdup(req.logicalPlan);
pTopic->physicalPlan = strdup(req.physicalPlan); pTopic->physicalPlan = strdup(req.physicalPlan);
pTopic->committedOffset = -1;
pTopic->currentOffset = -1;
pTopic->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pTopic->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
@ -814,6 +802,8 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle); pTopic->buffer.output[i].task = qCreateStreamExecTaskInfo(req.qmsg, pReadHandle);
} }
taosArrayPush(pConsumer->topics, pTopic); taosArrayPush(pConsumer->topics, pTopic);
tqHandleMovePut(pTq->tqMeta, req.newConsumerId, pConsumer);
tqHandleCommit(pTq->tqMeta, req.newConsumerId);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
return 0; return 0;
} }
@ -838,7 +828,7 @@ void tqReadHandleSetMsg(STqReadHandle* pReadHandle, SSubmitMsg* pMsg, int64_t ve
} }
bool tqNextDataBlock(STqReadHandle* pHandle) { bool tqNextDataBlock(STqReadHandle* pHandle) {
while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) >= 0) { while (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock)) {
if (pHandle->tbUid == pHandle->pBlock->uid) return true; if (pHandle->tbUid == pHandle->pBlock->uid) return true;
} }
return false; return false;

View File

@ -11,6 +11,7 @@ target_link_libraries(
PUBLIC cjson PUBLIC cjson
PUBLIC os PUBLIC os
PUBLIC util PUBLIC util
PUBLIC common
) )
if(${BUILD_TEST}) if(${BUILD_TEST})

View File

@ -257,7 +257,7 @@ static int walWriteIndex(SWal *pWal, int64_t ver, int64_t offset) {
return 0; return 0;
} }
int64_t walWrite(SWal *pWal, int64_t index, uint8_t msgType, const void *body, int32_t bodyLen) { int64_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, int32_t bodyLen) {
if (pWal == NULL) return -1; if (pWal == NULL) return -1;
int code = 0; int code = 0;