diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1f7ec3be1e..cb0c76eb71 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1527,7 +1527,6 @@ typedef struct SMqSetCVgReq { SArray* tasks; // SArray } SMqSetCVgReq; - static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pReq->vgId); @@ -1552,10 +1551,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { return buf; } +typedef struct SMqSetCVgRsp { + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqSetCVgRsp; + typedef struct SMqCVConsumeReq { int64_t reqId; int64_t offset; int64_t clientId; + int64_t blockingTime; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqCVConsumeReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index ed7fdea2a8..52fff978bc 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -160,7 +160,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 51aaa7d903..641b485f4c 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -80,8 +80,8 @@ typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWal typedef struct SWalReadHead { int8_t headVer; - uint8_t msgType; - int8_t reserved[2]; + int16_t msgType; + int8_t reserved; int32_t len; int64_t ingestTs; // not implemented int64_t version; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e786a8972c..7a57956f3b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -95,7 +95,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); pCEp->consumerId = consumerId; taosArrayPush(pSub->assigned, pCEp); - pSub->nextConsumerIdx++; + pSub->nextConsumerIdx = (pSub->nextConsumerIdx+1) % taosArrayGetSize(pSub->availConsumer); // build msg SMqSetCVgReq req = { diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 1a41a02673..eb82d19a4f 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -27,6 +27,7 @@ #include "trpc.h" #include "ttimer.h" #include "tutil.h" +#include "wal.h" #ifdef __cplusplus extern "C" { @@ -161,7 +162,8 @@ typedef struct STqGroup { } STqGroup; typedef struct STqTaskItem { - int32_t status; + int8_t status; + int64_t offset; void* dst; SSubQueryMsg* pMsg; } STqTaskItem; @@ -183,6 +185,7 @@ typedef struct STqClientHandle { int64_t committedOffset; int64_t currentOffset; STqBuffer buffer; + SWalReadHandle* pReadhandle; } STqClientHandle; typedef struct STqQueryMsg { @@ -321,25 +324,20 @@ void tqClose(STQ*); // void* will be replace by a msg type int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp); +#if 0 +int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp); int tqSetCursor(STQ*, STqSetCurReq* pMsg); int tqBufferSetOffset(STqTopic*, int64_t offset); - STqTopic* tqFindTopic(STqGroup*, int64_t topicId); - STqGroup* tqGetGroup(STQ*, int64_t clientId); - STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqRegisterContext(STqGroup*, void* ahandle); int tqSendLaunchQuery(STqMsgItem*, int64_t offset); +#endif -int tqSerializeGroup(const STqGroup*, STqSerializedHead**); - -const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**); - -static int tqQueryExecuting(int32_t status) { return status; } +int32_t tqProcessConsume(STQ* pTq, SRpcMsg *pMsg, SRpcMsg **ppRsp); typedef struct STqReadHandle { int64_t ver; @@ -350,9 +348,6 @@ typedef struct STqReadHandle { SMeta* pMeta; } STqReadHandle; -typedef struct SSubmitBlkScanInfo { -} SSubmitBlkScanInfo; - STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg); bool tqNextDataBlock(STqReadHandle* pHandle); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 107f5d5103..b4e1f57384 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -43,6 +43,9 @@ extern int32_t tqDebugFlag; // delete persistent storage for meta info // int tqDropTCGroup(STQ*, const char* topic, int cgId); +int tqSerializeGroup(const STqGroup*, STqSerializedHead**); +const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup); +static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a5be0ec29a..b83cfe99bc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -608,6 +608,40 @@ int tqItemSSize() { return 0; } +int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg **ppRsp) { + SMqCVConsumeReq* pReq = pMsg->pCont; + int64_t reqId = pReq->reqId; + int64_t clientId = pReq->clientId; + int64_t offset = pReq->offset; + int64_t blockingTime = pReq->blockingTime; + + STqClientHandle* pHandle = tqHandleGet(pTq->tqMeta, clientId); + int8_t pos = offset % TQ_BUFFER_SIZE; + int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); + if (old == 1) { + // do nothing + } + if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { + //TODO + } + SWalHead *pHead = pHandle->pReadhandle->pHead; + while (pHead->head.msgType != TDMT_VND_SUBMIT) { + // read until find TDMT_VND_SUBMIT + } + SSubmitMsg *pCont = (SSubmitMsg*)&pHead->head.body; + + SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; + + void* outputData; + + atomic_store_8(&pHandle->buffer.output[pos].status, 1); + // launch query + // get result + // put into + SMqCvConsumeRsp* pRsp; + return 0; +} + STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); if (pReadHandle == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 729d64f8b3..96791488fa 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -58,7 +58,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg, pRsp); case TDMT_VND_CONSUME: - return 0; + return tqProcessConsume(pVnode->pTq, pMsg, pRsp); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 2f3a4d5409..deb8a714a6 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -110,6 +110,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } break; case TDMT_VND_MQ_SET_CONN: { + //TODO: wrap in a function char* reqStr = ptr; SMqSetCVgReq req; tDecodeSMqSetCVgReq(reqStr, &req);