fix compile error

This commit is contained in:
Liu Jicong 2022-01-20 15:04:40 +08:00
parent 51f02913f4
commit fa73f1d497
3 changed files with 21 additions and 12 deletions

View File

@ -1567,6 +1567,12 @@ typedef struct SMqCVConsumeReq {
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCVConsumeReq; } SMqCVConsumeReq;
typedef struct SMqConsumeRspBlock {
int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN];
char body[];
} SMqConsumeRspBlock;
typedef struct SMqCVConsumeRsp { typedef struct SMqCVConsumeRsp {
int64_t reqId; int64_t reqId;
int64_t clientId; int64_t clientId;
@ -1576,7 +1582,7 @@ typedef struct SMqCVConsumeRsp {
int32_t skipLogNum; int32_t skipLogNum;
int32_t bodyLen; int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char body[]; SMqConsumeRspBlock blocks[];
} SMqCvConsumeRsp; } SMqCvConsumeRsp;
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -189,6 +189,7 @@ typedef struct STqTopicHandle {
typedef struct STqConsumerHandle { typedef struct STqConsumerHandle {
int64_t consumerId; int64_t consumerId;
int64_t epoch;
SArray* topics; // SArray<STqClientTopic> SArray* topics; // SArray<STqClientTopic>
} STqConsumerHandle; } STqConsumerHandle;

View File

@ -114,15 +114,17 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
char* reqStr = ptr; char* reqStr = ptr;
SMqSetCVgReq req; SMqSetCVgReq req;
tDecodeSMqSetCVgReq(reqStr, &req); tDecodeSMqSetCVgReq(reqStr, &req);
STqClientHandle* pHandle = calloc(sizeof(STqClientHandle), 1); STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1);
if (pHandle == NULL) {
STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1);
if (pTopic == NULL) {
// TODO: handle error // TODO: handle error
} }
strcpy(pHandle->topicName, req.topicName); strcpy(pTopic->topicName, req.topicName);
strcpy(pHandle->cGroup, req.cGroup); strcpy(pTopic->cGroup, req.cGroup);
strcpy(pHandle->sql, req.sql); strcpy(pTopic->sql, req.sql);
strcpy(pHandle->logicalPlan, req.logicalPlan); strcpy(pTopic->logicalPlan, req.logicalPlan);
strcpy(pHandle->physicalPlan, req.physicalPlan); strcpy(pTopic->physicalPlan, req.physicalPlan);
SArray *pArray; SArray *pArray;
//TODO: deserialize to SQueryDag //TODO: deserialize to SQueryDag
SQueryDag *pDag; SQueryDag *pDag;
@ -134,12 +136,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
STaskInfo *pInfo = taosArrayGet(pArray, 0); STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks; SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pHandle->buffer.firstOffset = -1; pTopic->buffer.firstOffset = -1;
pHandle->buffer.lastOffset = -1; pTopic->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) { for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pHandle->buffer.output[i].pMsg = pMsg; pTopic->buffer.output[i].pMsg = pMsg;
pHandle->buffer.output[i].status = 0; pTopic->buffer.output[i].status = 0;
} }
// write mq meta // write mq meta
} }