add tq process consumer msg

This commit is contained in:
Liu Jicong 2022-01-20 14:38:55 +08:00
parent 5d77fab535
commit f7726ce8a4
9 changed files with 59 additions and 19 deletions

View File

@ -1527,7 +1527,6 @@ typedef struct SMqSetCVgReq {
SArray* tasks; // SArray<SSubQueryMsg> SArray* tasks; // SArray<SSubQueryMsg>
} SMqSetCVgReq; } SMqSetCVgReq;
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
int32_t tlen = 0; int32_t tlen = 0;
tlen += taosEncodeFixedI32(buf, pReq->vgId); tlen += taosEncodeFixedI32(buf, pReq->vgId);
@ -1552,10 +1551,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
return buf; return buf;
} }
typedef struct SMqSetCVgRsp {
int32_t vgId;
int64_t consumerId;
char topicName[TSDB_TOPIC_FNAME_LEN];
char cGroup[TSDB_CONSUMER_GROUP_LEN];
} SMqSetCVgRsp;
typedef struct SMqCVConsumeReq { typedef struct SMqCVConsumeReq {
int64_t reqId; int64_t reqId;
int64_t offset; int64_t offset;
int64_t clientId; int64_t clientId;
int64_t blockingTime;
char topicName[TSDB_TOPIC_FNAME_LEN]; char topicName[TSDB_TOPIC_FNAME_LEN];
char cgroup[TSDB_CONSUMER_GROUP_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN];
} SMqCVConsumeReq; } SMqCVConsumeReq;

View File

@ -160,7 +160,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp)
TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL)

View File

@ -80,8 +80,8 @@ typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWal
typedef struct SWalReadHead { typedef struct SWalReadHead {
int8_t headVer; int8_t headVer;
uint8_t msgType; int16_t msgType;
int8_t reserved[2]; int8_t reserved;
int32_t len; int32_t len;
int64_t ingestTs; // not implemented int64_t ingestTs; // not implemented
int64_t version; int64_t version;

View File

@ -95,7 +95,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg);
pCEp->consumerId = consumerId; pCEp->consumerId = consumerId;
taosArrayPush(pSub->assigned, pCEp); taosArrayPush(pSub->assigned, pCEp);
pSub->nextConsumerIdx++; pSub->nextConsumerIdx = (pSub->nextConsumerIdx+1) % taosArrayGetSize(pSub->availConsumer);
// build msg // build msg
SMqSetCVgReq req = { SMqSetCVgReq req = {

View File

@ -27,6 +27,7 @@
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#include "wal.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -161,7 +162,8 @@ typedef struct STqGroup {
} STqGroup; } STqGroup;
typedef struct STqTaskItem { typedef struct STqTaskItem {
int32_t status; int8_t status;
int64_t offset;
void* dst; void* dst;
SSubQueryMsg* pMsg; SSubQueryMsg* pMsg;
} STqTaskItem; } STqTaskItem;
@ -183,6 +185,7 @@ typedef struct STqClientHandle {
int64_t committedOffset; int64_t committedOffset;
int64_t currentOffset; int64_t currentOffset;
STqBuffer buffer; STqBuffer buffer;
SWalReadHandle* pReadhandle;
} STqClientHandle; } STqClientHandle;
typedef struct STqQueryMsg { typedef struct STqQueryMsg {
@ -321,25 +324,20 @@ void tqClose(STQ*);
// void* will be replace by a msg type // void* will be replace by a msg type
int tqPushMsg(STQ*, void* msg, int64_t version); int tqPushMsg(STQ*, void* msg, int64_t version);
int tqCommit(STQ*); int tqCommit(STQ*);
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
#if 0
int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp);
int tqSetCursor(STQ*, STqSetCurReq* pMsg); int tqSetCursor(STQ*, STqSetCurReq* pMsg);
int tqBufferSetOffset(STqTopic*, int64_t offset); int tqBufferSetOffset(STqTopic*, int64_t offset);
STqTopic* tqFindTopic(STqGroup*, int64_t topicId); STqTopic* tqFindTopic(STqGroup*, int64_t topicId);
STqGroup* tqGetGroup(STQ*, int64_t clientId); STqGroup* tqGetGroup(STQ*, int64_t clientId);
STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId);
int tqRegisterContext(STqGroup*, void* ahandle); int tqRegisterContext(STqGroup*, void* ahandle);
int tqSendLaunchQuery(STqMsgItem*, int64_t offset); int tqSendLaunchQuery(STqMsgItem*, int64_t offset);
#endif
int tqSerializeGroup(const STqGroup*, STqSerializedHead**); int32_t tqProcessConsume(STQ* pTq, SRpcMsg *pMsg, SRpcMsg **ppRsp);
const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**);
static int tqQueryExecuting(int32_t status) { return status; }
typedef struct STqReadHandle { typedef struct STqReadHandle {
int64_t ver; int64_t ver;
@ -350,9 +348,6 @@ typedef struct STqReadHandle {
SMeta* pMeta; SMeta* pMeta;
} STqReadHandle; } STqReadHandle;
typedef struct SSubmitBlkScanInfo {
} SSubmitBlkScanInfo;
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg); STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg);
bool tqNextDataBlock(STqReadHandle* pHandle); bool tqNextDataBlock(STqReadHandle* pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo);

View File

@ -43,6 +43,9 @@ extern int32_t tqDebugFlag;
// delete persistent storage for meta info // delete persistent storage for meta info
// int tqDropTCGroup(STQ*, const char* topic, int cgId); // int tqDropTCGroup(STQ*, const char* topic, int cgId);
int tqSerializeGroup(const STqGroup*, STqSerializedHead**);
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup);
static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; }
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -608,6 +608,40 @@ int tqItemSSize() {
return 0; return 0;
} }
int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg **ppRsp) {
SMqCVConsumeReq* pReq = pMsg->pCont;
int64_t reqId = pReq->reqId;
int64_t clientId = pReq->clientId;
int64_t offset = pReq->offset;
int64_t blockingTime = pReq->blockingTime;
STqClientHandle* pHandle = tqHandleGet(pTq->tqMeta, clientId);
int8_t pos = offset % TQ_BUFFER_SIZE;
int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1);
if (old == 1) {
// do nothing
}
if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) {
//TODO
}
SWalHead *pHead = pHandle->pReadhandle->pHead;
while (pHead->head.msgType != TDMT_VND_SUBMIT) {
// read until find TDMT_VND_SUBMIT
}
SSubmitMsg *pCont = (SSubmitMsg*)&pHead->head.body;
SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg;
void* outputData;
atomic_store_8(&pHandle->buffer.output[pos].status, 1);
// launch query
// get result
// put into
SMqCvConsumeRsp* pRsp;
return 0;
}
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) {
STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle));
if (pReadHandle == NULL) { if (pReadHandle == NULL) {

View File

@ -58,7 +58,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
case TDMT_VND_TABLE_META: case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg, pRsp); return vnodeGetTableMeta(pVnode, pMsg, pRsp);
case TDMT_VND_CONSUME: case TDMT_VND_CONSUME:
return 0; return tqProcessConsume(pVnode->pTq, pMsg, pRsp);
default: default:
vError("unknown msg type:%d in fetch queue", pMsg->msgType); vError("unknown msg type:%d in fetch queue", pMsg->msgType);
return TSDB_CODE_VND_APP_ERROR; return TSDB_CODE_VND_APP_ERROR;

View File

@ -110,6 +110,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
break; break;
case TDMT_VND_MQ_SET_CONN: { case TDMT_VND_MQ_SET_CONN: {
//TODO: wrap in a function
char* reqStr = ptr; char* reqStr = ptr;
SMqSetCVgReq req; SMqSetCVgReq req;
tDecodeSMqSetCVgReq(reqStr, &req); tDecodeSMqSetCVgReq(reqStr, &req);