add msg for vnode consume

This commit is contained in:
Liu Jicong 2022-01-20 10:45:15 +08:00
parent 6cf90b0a92
commit 5d77fab535
8 changed files with 99 additions and 23 deletions

View File

@ -1552,6 +1552,25 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return buf; return buf;
} }
typedef struct SMqCVConsumeReq {
int64_t reqId;
int64_t offset;
int64_t clientId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCVConsumeReq;
typedef struct SMqCVConsumeRsp {
int64_t reqId;
int64_t clientId;
int64_t committedOffset;
int64_t receiveOffset;
int64_t rspOffset;
int32_t skipLogNum;
int32_t bodyLen;
char topicName[TSDB_TOPIC_FNAME_LEN];
char body[];
} SMqCvConsumeRsp;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -175,6 +175,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_SCHEDULE_DATA_SINK, "vnode-schedule-data-sink", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp) TD_DEF_MSG_TYPE(TDMT_VND_SUBSCRIBE, "vnode-subscribe", SMVSubscribeReq, SMVSubscribeRsp)
TD_DEF_MSG_TYPE(TDMT_VND_CONSUME, "vnode-consume", SMqCVConsumeReq, SMqCVConsumeRsp)
// Requests handled by QNODE // Requests handled by QNODE

View File

@ -464,7 +464,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
} }
taosArrayPush(pSub->availConsumer, &consumerId); taosArrayPush(pSub->availConsumer, &consumerId);
//TODO: no need // TODO: no need
SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub);
taosArrayPush(pConsumer->topics, pConsumerTopic); taosArrayPush(pConsumer->topics, pConsumerTopic);
@ -542,7 +542,10 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return 0; return 0;
} }
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pRsp) {
mndTransProcessRsp(pRsp);
return 0;
}
static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) {
SMnode *pMnode = pMsg->pMnode; SMnode *pMnode = pMsg->pMnode;

View File

@ -25,6 +25,7 @@ target_link_libraries(
PUBLIC bdb PUBLIC bdb
PUBLIC tfs PUBLIC tfs
PUBLIC wal PUBLIC wal
PUBLIC scheduler
PUBLIC qworker PUBLIC qworker
) )

View File

@ -18,14 +18,15 @@
#include "common.h" #include "common.h"
#include "mallocator.h" #include "mallocator.h"
#include "meta.h"
#include "os.h" #include "os.h"
#include "scheduler.h"
#include "taoserror.h" #include "taoserror.h"
#include "tmsg.h"
#include "tlist.h" #include "tlist.h"
#include "tmsg.h"
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "meta.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -150,15 +151,40 @@ typedef struct STqListHandle {
} STqList; } STqList;
typedef struct STqGroup { typedef struct STqGroup {
int64_t clientId; int64_t clientId;
int64_t cgId; int64_t cgId;
void* ahandle; void* ahandle;
int32_t topicNum; int32_t topicNum;
STqList* head; STqList* head;
SList* topicList; // SList<STqTopic> SList* topicList; // SList<STqTopic>
STqRspHandle rspHandle; STqRspHandle rspHandle;
} STqGroup; } STqGroup;
typedef struct STqTaskItem {
int32_t status;
void* dst;
SSubQueryMsg* pMsg;
} STqTaskItem;
// new version
typedef struct STqBuffer {
int64_t firstOffset;
int64_t lastOffset;
STqTaskItem output[TQ_BUFFER_SIZE];
} STqBuffer;
typedef struct STqClientHandle {
int64_t clientId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_TOPIC_FNAME_LEN];
char* sql;
char* logicalPlan;
char* physicalPlan;
int64_t committedOffset;
int64_t currentOffset;
STqBuffer buffer;
} STqClientHandle;
typedef struct STqQueryMsg { typedef struct STqQueryMsg {
STqMsgItem* item; STqMsgItem* item;
struct STqQueryMsg* next; struct STqQueryMsg* next;
@ -253,7 +279,7 @@ typedef struct STqMetaStore {
// a table head // a table head
STqMetaList* unpersistHead; STqMetaList* unpersistHead;
// topics that are not connectted // topics that are not connectted
STqMetaList* unconnectTopic; STqMetaList* unconnectTopic;
// TODO:temporaral use, to be replaced by unified tfile // TODO:temporaral use, to be replaced by unified tfile
int fileFd; int fileFd;
@ -316,24 +342,22 @@ const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
static int tqQueryExecuting(int32_t status) { return status; } static int tqQueryExecuting(int32_t status) { return status; }
typedef struct STqReadHandle { typedef struct STqReadHandle {
int64_t ver; int64_t ver;
SSubmitMsg* pMsg; SSubmitMsg* pMsg;
SSubmitBlk* pBlock; SSubmitBlk* pBlock;
SSubmitMsgIter msgIter; SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter; SSubmitBlkIter blkIter;
SMeta* pMeta; SMeta* pMeta;
} STqReadHandle; } STqReadHandle;
typedef struct SSubmitBlkScanInfo { typedef struct SSubmitBlkScanInfo {
} SSubmitBlkScanInfo; } SSubmitBlkScanInfo;
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg);
bool tqNextDataBlock(STqReadHandle* pHandle); bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo *pBlockInfo); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);
//return SArray<SColumnInfoData> // return SArray<SColumnInfoData>
SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList); SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList);
//int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -177,4 +177,4 @@ bool vmaIsFull(SVMemAllocator* pVMA);
} }
#endif #endif
#endif /*_TD_VNODE_DEF_H_*/ #endif /*_TD_VNODE_DEF_H_*/

View File

@ -57,6 +57,8 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg); // return qWorkerProcessShowFetchMsg(pVnode->pMeta, pVnode->pQuery, pMsg);
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp); return vnodeGetTableMeta(pVnode, pMsg, pRsp);
case TDMT_VND_CONSUME:
return 0;
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;

View File

@ -14,6 +14,7 @@
*/ */
#include "vnd.h" #include "vnd.h"
#include "tq.h"
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) { int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) { switch (pMsg->msgType) {
@ -111,9 +112,34 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
char* reqStr = ptr; char* reqStr = ptr;
SMqSetCVgReq req; SMqSetCVgReq req;
/*tDecodeSMqSetCVgReq(reqStr, &req);*/ tDecodeSMqSetCVgReq(reqStr, &req);
// create topic if not exist STqClientHandle* pHandle = calloc(sizeof(STqClientHandle), 1);
if (pHandle == NULL) {
// TODO: handle error
}
strcpy(pHandle->topicName, req.topicName);
strcpy(pHandle->cGroup, req.cGroup);
strcpy(pHandle->sql, req.sql);
strcpy(pHandle->logicalPlan, req.logicalPlan);
strcpy(pHandle->physicalPlan, req.physicalPlan);
SArray *pArray;
//TODO: deserialize to SQueryDag
SQueryDag *pDag;
// convert to task // convert to task
if (schedulerConvertDagToTaskList(pDag, &pArray) < 0) {
// TODO: handle error
}
ASSERT(taosArrayGetSize(pArray) == 0);
STaskInfo *pInfo = taosArrayGet(pArray, 0);
SArray* pTasks;
schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE);
pHandle->buffer.firstOffset = -1;
pHandle->buffer.lastOffset = -1;
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
SSubQueryMsg* pMsg = taosArrayGet(pTasks, i);
pHandle->buffer.output[i].pMsg = pMsg;
pHandle->buffer.output[i].status = 0;
}
// write mq meta // write mq meta
} }
break; break;