From f7726ce8a437ffbb371a743945e92613d8d02a60 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 14:38:55 +0800 Subject: [PATCH 1/5] add tq process consumer msg --- include/common/tmsg.h | 9 +++++- include/common/tmsgdef.h | 2 +- include/libs/wal/wal.h | 4 +-- source/dnode/mnode/impl/src/mndSubscribe.c | 2 +- source/dnode/vnode/inc/tq.h | 21 +++++-------- source/dnode/vnode/src/inc/tqInt.h | 3 ++ source/dnode/vnode/src/tq/tq.c | 34 ++++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeQuery.c | 2 +- source/dnode/vnode/src/vnd/vnodeWrite.c | 1 + 9 files changed, 59 insertions(+), 19 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 1f7ec3be1e..cb0c76eb71 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1527,7 +1527,6 @@ typedef struct SMqSetCVgReq { SArray* tasks; // SArray } SMqSetCVgReq; - static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { int32_t tlen = 0; tlen += taosEncodeFixedI32(buf, pReq->vgId); @@ -1552,10 +1551,18 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { return buf; } +typedef struct SMqSetCVgRsp { + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; +} SMqSetCVgRsp; + typedef struct SMqCVConsumeReq { int64_t reqId; int64_t offset; int64_t clientId; + int64_t blockingTime; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqCVConsumeReq; diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index ed7fdea2a8..52fff978bc 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -160,7 +160,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_VND_MQ_QUERY, "vnode-mq-query", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_CONNECT, "vnode-mq-connect", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_MQ_DISCONNECT, "vnode-mq-disconnect", NULL, NULL) - TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CONN, "vnode-mq-set-conn", SMqSetCVgReq, SMqSetCVgRsp) TD_DEF_MSG_TYPE(TDMT_VND_MQ_SET_CUR, "vnode-mq-set-cur", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_RES_READY, "vnode-res-ready", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_VND_TASKS_STATUS, "vnode-tasks-status", NULL, NULL) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index 51aaa7d903..641b485f4c 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -80,8 +80,8 @@ typedef enum { TAOS_WAL_NOLOG = 0, TAOS_WAL_WRITE = 1, TAOS_WAL_FSYNC = 2 } EWal typedef struct SWalReadHead { int8_t headVer; - uint8_t msgType; - int8_t reserved[2]; + int16_t msgType; + int8_t reserved; int32_t len; int64_t ingestTs; // not implemented int64_t version; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index e786a8972c..7a57956f3b 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -95,7 +95,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); pCEp->consumerId = consumerId; taosArrayPush(pSub->assigned, pCEp); - pSub->nextConsumerIdx++; + pSub->nextConsumerIdx = (pSub->nextConsumerIdx+1) % taosArrayGetSize(pSub->availConsumer); // build msg SMqSetCVgReq req = { diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 1a41a02673..eb82d19a4f 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -27,6 +27,7 @@ #include "trpc.h" #include "ttimer.h" #include "tutil.h" +#include "wal.h" #ifdef __cplusplus extern "C" { @@ -161,7 +162,8 @@ typedef struct STqGroup { } STqGroup; typedef struct STqTaskItem { - int32_t status; + int8_t status; + int64_t offset; void* dst; SSubQueryMsg* pMsg; } STqTaskItem; @@ -183,6 +185,7 @@ typedef struct STqClientHandle { int64_t committedOffset; int64_t currentOffset; STqBuffer buffer; + SWalReadHandle* pReadhandle; } STqClientHandle; typedef struct STqQueryMsg { @@ -321,25 +324,20 @@ void tqClose(STQ*); // void* will be replace by a msg type int tqPushMsg(STQ*, void* msg, int64_t version); int tqCommit(STQ*); -int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp); +#if 0 +int tqConsume(STQ*, SRpcMsg* pReq, SRpcMsg** pRsp); int tqSetCursor(STQ*, STqSetCurReq* pMsg); int tqBufferSetOffset(STqTopic*, int64_t offset); - STqTopic* tqFindTopic(STqGroup*, int64_t topicId); - STqGroup* tqGetGroup(STQ*, int64_t clientId); - STqGroup* tqOpenGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqCloseGroup(STQ*, int64_t topicId, int64_t cgId, int64_t cId); int tqRegisterContext(STqGroup*, void* ahandle); int tqSendLaunchQuery(STqMsgItem*, int64_t offset); +#endif -int tqSerializeGroup(const STqGroup*, STqSerializedHead**); - -const void* tqDeserializeGroup(const STqSerializedHead*, STqGroup**); - -static int tqQueryExecuting(int32_t status) { return status; } +int32_t tqProcessConsume(STQ* pTq, SRpcMsg *pMsg, SRpcMsg **ppRsp); typedef struct STqReadHandle { int64_t ver; @@ -350,9 +348,6 @@ typedef struct STqReadHandle { SMeta* pMeta; } STqReadHandle; -typedef struct SSubmitBlkScanInfo { -} SSubmitBlkScanInfo; - STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg); bool tqNextDataBlock(STqReadHandle* pHandle); int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo); diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index 107f5d5103..b4e1f57384 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -43,6 +43,9 @@ extern int32_t tqDebugFlag; // delete persistent storage for meta info // int tqDropTCGroup(STQ*, const char* topic, int cgId); +int tqSerializeGroup(const STqGroup*, STqSerializedHead**); +const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup); +static int FORCE_INLINE tqQueryExecuting(int32_t status) { return status; } #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index a5be0ec29a..b83cfe99bc 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -608,6 +608,40 @@ int tqItemSSize() { return 0; } +int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg **ppRsp) { + SMqCVConsumeReq* pReq = pMsg->pCont; + int64_t reqId = pReq->reqId; + int64_t clientId = pReq->clientId; + int64_t offset = pReq->offset; + int64_t blockingTime = pReq->blockingTime; + + STqClientHandle* pHandle = tqHandleGet(pTq->tqMeta, clientId); + int8_t pos = offset % TQ_BUFFER_SIZE; + int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); + if (old == 1) { + // do nothing + } + if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { + //TODO + } + SWalHead *pHead = pHandle->pReadhandle->pHead; + while (pHead->head.msgType != TDMT_VND_SUBMIT) { + // read until find TDMT_VND_SUBMIT + } + SSubmitMsg *pCont = (SSubmitMsg*)&pHead->head.body; + + SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; + + void* outputData; + + atomic_store_8(&pHandle->buffer.output[pos].status, 1); + // launch query + // get result + // put into + SMqCvConsumeRsp* pRsp; + return 0; +} + STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); if (pReadHandle == NULL) { diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 729d64f8b3..96791488fa 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -58,7 +58,7 @@ int vnodeProcessFetchReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg, pRsp); case TDMT_VND_CONSUME: - return 0; + return tqProcessConsume(pVnode->pTq, pMsg, pRsp); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 2f3a4d5409..deb8a714a6 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -110,6 +110,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { } break; case TDMT_VND_MQ_SET_CONN: { + //TODO: wrap in a function char* reqStr = ptr; SMqSetCVgReq req; tDecodeSMqSetCVgReq(reqStr, &req); From 51f02913f4e60636297b16f382d2a159aea54a31 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 14:53:09 +0800 Subject: [PATCH 2/5] tq handle multiple topic --- include/common/tmsg.h | 2 +- source/dnode/mnode/impl/src/mndSubscribe.c | 4 +- source/dnode/vnode/inc/tq.h | 30 +++-- source/dnode/vnode/src/tq/tq.c | 145 +++++++++++---------- 4 files changed, 95 insertions(+), 86 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cb0c76eb71..fe96f4ee52 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1561,7 +1561,7 @@ typedef struct SMqSetCVgRsp { typedef struct SMqCVConsumeReq { int64_t reqId; int64_t offset; - int64_t clientId; + int64_t consumerId; int64_t blockingTime; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7a57956f3b..4158d7e7c7 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -55,8 +55,6 @@ int32_t mndInitSubscribe(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSubActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ - /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); return sdbSetTable(pMnode->pSdb, table); @@ -95,7 +93,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); pCEp->consumerId = consumerId; taosArrayPush(pSub->assigned, pCEp); - pSub->nextConsumerIdx = (pSub->nextConsumerIdx+1) % taosArrayGetSize(pSub->availConsumer); + pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); // build msg SMqSetCVgReq req = { diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index eb82d19a4f..aaf7f33061 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -162,7 +162,7 @@ typedef struct STqGroup { } STqGroup; typedef struct STqTaskItem { - int8_t status; + int8_t status; int64_t offset; void* dst; SSubQueryMsg* pMsg; @@ -175,18 +175,22 @@ typedef struct STqBuffer { STqTaskItem output[TQ_BUFFER_SIZE]; } STqBuffer; -typedef struct STqClientHandle { - int64_t clientId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_TOPIC_FNAME_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - int64_t committedOffset; - int64_t currentOffset; - STqBuffer buffer; +typedef struct STqTopicHandle { + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_TOPIC_FNAME_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + int64_t committedOffset; + int64_t currentOffset; + STqBuffer buffer; SWalReadHandle* pReadhandle; -} STqClientHandle; +} STqTopicHandle; + +typedef struct STqConsumerHandle { + int64_t consumerId; + SArray* topics; // SArray +} STqConsumerHandle; typedef struct STqQueryMsg { STqMsgItem* item; @@ -337,7 +341,7 @@ int tqRegisterContext(STqGroup*, void* ahandle); int tqSendLaunchQuery(STqMsgItem*, int64_t offset); #endif -int32_t tqProcessConsume(STQ* pTq, SRpcMsg *pMsg, SRpcMsg **ppRsp); +int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); typedef struct STqReadHandle { int64_t ver; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b83cfe99bc..c4a4f087c6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -37,7 +37,7 @@ const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); int tqInit() { int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); - if(old == 1) return 0; + if (old == 1) return 0; tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ"); return 0; @@ -45,7 +45,7 @@ int tqInit() { void tqCleanUp() { int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0); - if(old == 0) return; + if (old == 0) return; taosTmrStop(tqMgmt.timer); taosTmrCleanUp(tqMgmt.timer); } @@ -150,7 +150,7 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup memset(pGroup, 0, sizeof(STqGroup)); pGroup->topicList = tdListNew(sizeof(STqTopic)); - if(pGroup->topicList == NULL) { + if (pGroup->topicList == NULL) { free(pGroup); return -1; } @@ -190,7 +190,7 @@ static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) { int totSize = 0; int numOfMsgs = 0; // TODO: make it a macro - int sizeLimit = 4 * 1024; + int sizeLimit = 4 * 1024; void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit); if (ptr == NULL) { @@ -329,9 +329,9 @@ int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) { } int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { - STqConsumeReq *pMsg = pReq->pCont; - int64_t clientId = pMsg->head.clientId; - STqGroup* pGroup = tqGetGroup(pTq, clientId); + STqConsumeReq* pMsg = pReq->pCont; + int64_t clientId = pMsg->head.clientId; + STqGroup* pGroup = tqGetGroup(pTq, clientId); if (pGroup == NULL) { terrno = TSDB_CODE_TQ_GROUP_NOT_SET; return -1; @@ -343,9 +343,8 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { int numOfMsgs = 0; int sizeLimit = 4096; - - STqConsumeRsp *pCsmRsp = (*pRsp)->pCont; - void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit); + STqConsumeRsp* pCsmRsp = (*pRsp)->pCont; + void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit); if (ptr == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; @@ -356,16 +355,16 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { tdListInitIter(topicList, &iter, TD_LIST_FORWARD); STqMsgContent* buffer = NULL; - SArray* pArray = taosArrayInit(0, sizeof(void*)); + SArray* pArray = taosArrayInit(0, sizeof(void*)); - SListNode *pn; - while((pn = tdListNext(&iter)) != NULL) { - STqTopic* pTopic = *(STqTopic**)pn->data; - int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE; + SListNode* pn; + while ((pn = tdListNext(&iter)) != NULL) { + STqTopic* pTopic = *(STqTopic**)pn->data; + int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE; STqMsgItem* pItem = &pTopic->buffer[idx]; if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) { - if(pItem->status == TQ_ITEM_READY) { - //if has data + if (pItem->status == TQ_ITEM_READY) { + // if has data totSize += pTopic->buffer[idx].size; if (totSize > sizeLimit) { void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize); @@ -388,13 +387,13 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { if (totSize > sizeLimit) { break; } - } else if(pItem->status == TQ_ITEM_PROCESS) { - //if not have data but in process + } else if (pItem->status == TQ_ITEM_PROCESS) { + // if not have data but in process - } else if(pItem->status == TQ_ITEM_EMPTY){ - //if not have data and not in process + } else if (pItem->status == TQ_ITEM_EMPTY) { + // if not have data and not in process int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS); - if(old != TQ_ITEM_EMPTY) { + if (old != TQ_ITEM_EMPTY) { continue; } pItem->offset = pTopic->floatingCursor; @@ -416,22 +415,22 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { } // fetched a num of msgs, rpc response - for(int i = 0; i < pArray->size; i++) { + for (int i = 0; i < pArray->size; i++) { STqMsgItem* pItem = taosArrayGet(pArray, i); - //read from wal + // read from wal void* raw = NULL; /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset); - if(code < 0) { - //TODO: error + if (code < 0) { + // TODO: error } - //get msgType - //if submitblk + // get msgType + // if submitblk pItem->executor->assign(pItem->executor->runtimeEnv, raw); SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv); pItem->content = content; - //if other type, send just put into buffer + // if other type, send just put into buffer /*pItem->content = raw;*/ int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY); @@ -608,41 +607,48 @@ int tqItemSSize() { return 0; } -int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg **ppRsp) { +int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { SMqCVConsumeReq* pReq = pMsg->pCont; - int64_t reqId = pReq->reqId; - int64_t clientId = pReq->clientId; - int64_t offset = pReq->offset; - int64_t blockingTime = pReq->blockingTime; + int64_t reqId = pReq->reqId; + int64_t consumerId = pReq->consumerId; + int64_t offset = pReq->offset; + int64_t blockingTime = pReq->blockingTime; - STqClientHandle* pHandle = tqHandleGet(pTq->tqMeta, clientId); - int8_t pos = offset % TQ_BUFFER_SIZE; - int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); - if (old == 1) { - // do nothing + STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); + int sz = taosArrayGetSize(pConsumer->topics); + + for (int i = 0 ; i < sz; i++) { + STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i); + + int8_t pos = offset % TQ_BUFFER_SIZE; + int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); + if (old == 1) { + // do nothing + } + if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { + // TODO + } + SWalHead* pHead = pHandle->pReadhandle->pHead; + while (pHead->head.msgType != TDMT_VND_SUBMIT) { + // read until find TDMT_VND_SUBMIT + } + SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + + SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; + + void* outputData; + atomic_store_8(&pHandle->buffer.output[pos].status, 1); + + // put output into rsp } - if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { - //TODO - } - SWalHead *pHead = pHandle->pReadhandle->pHead; - while (pHead->head.msgType != TDMT_VND_SUBMIT) { - // read until find TDMT_VND_SUBMIT - } - SSubmitMsg *pCont = (SSubmitMsg*)&pHead->head.body; - SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; - - void* outputData; - - atomic_store_8(&pHandle->buffer.output[pos].status, 1); // launch query // get result - // put into SMqCvConsumeRsp* pRsp; return 0; } -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) { STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); if (pReadHandle == NULL) { return NULL; @@ -655,39 +661,39 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { } bool tqNextDataBlock(STqReadHandle* pHandle) { - if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { return false; } return true; } int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - SMemRow row; - int32_t sversion = pHandle->pBlock->sversion; + SMemRow row; + int32_t sversion = pHandle->pBlock->sversion; SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false); pBlockInfo->numOfCols = pSchema->nCols; pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->uid = pHandle->pBlock->uid; - //TODO: filter out unused column + // TODO: filter out unused column return 0; } -SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { - int32_t sversion = pHandle->pBlock->sversion; +SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { + int32_t sversion = pHandle->pBlock->sversion; SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); - STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); - SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); + STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); + SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL; } SColumnInfoData colInfo; - int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes; + int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes; colInfo.pData = malloc(sz); if (colInfo.pData == NULL) { return NULL; } for (int i = 0; i < pTschema->numOfCols; i++) { - //TODO: filter out unused column + // TODO: filter out unused column taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId)); } @@ -695,16 +701,17 @@ SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { int32_t kvIdx; while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) { - //TODO: filter out unused column - STColumn *pCol = schemaColAt(pTschema, i); + // TODO: filter out unused column + STColumn* pCol = schemaColAt(pTschema, i); void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); - //TODO: handle varlen + // TODO: handle varlen memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes); } } taosArrayPush(pArray, &colInfo); return pArray; } -/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/ - /*return 0;*/ +/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t + * status) {*/ +/*return 0;*/ /*}*/ From fa73f1d497436170f9cdeae75455aaf8e31332f6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 15:04:40 +0800 Subject: [PATCH 3/5] fix compile error --- include/common/tmsg.h | 8 +++++++- source/dnode/vnode/inc/tq.h | 1 + source/dnode/vnode/src/vnd/vnodeWrite.c | 24 +++++++++++++----------- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fe96f4ee52..a3d62cc52d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1567,6 +1567,12 @@ typedef struct SMqCVConsumeReq { char cgroup[TSDB_CONSUMER_GROUP_LEN]; } SMqCVConsumeReq; +typedef struct SMqConsumeRspBlock { + int32_t bodyLen; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char body[]; +} SMqConsumeRspBlock; + typedef struct SMqCVConsumeRsp { int64_t reqId; int64_t clientId; @@ -1576,7 +1582,7 @@ typedef struct SMqCVConsumeRsp { int32_t skipLogNum; int32_t bodyLen; char topicName[TSDB_TOPIC_FNAME_LEN]; - char body[]; + SMqConsumeRspBlock blocks[]; } SMqCvConsumeRsp; #ifdef __cplusplus diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index aaf7f33061..588305c8ae 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -189,6 +189,7 @@ typedef struct STqTopicHandle { typedef struct STqConsumerHandle { int64_t consumerId; + int64_t epoch; SArray* topics; // SArray } STqConsumerHandle; diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index deb8a714a6..f05520a960 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -114,15 +114,17 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { char* reqStr = ptr; SMqSetCVgReq req; tDecodeSMqSetCVgReq(reqStr, &req); - STqClientHandle* pHandle = calloc(sizeof(STqClientHandle), 1); - if (pHandle == NULL) { + STqConsumerHandle* pConsumer = calloc(sizeof(STqConsumerHandle), 1); + + STqTopicHandle* pTopic = calloc(sizeof(STqTopicHandle), 1); + if (pTopic == NULL) { // TODO: handle error } - strcpy(pHandle->topicName, req.topicName); - strcpy(pHandle->cGroup, req.cGroup); - strcpy(pHandle->sql, req.sql); - strcpy(pHandle->logicalPlan, req.logicalPlan); - strcpy(pHandle->physicalPlan, req.physicalPlan); + strcpy(pTopic->topicName, req.topicName); + strcpy(pTopic->cGroup, req.cGroup); + strcpy(pTopic->sql, req.sql); + strcpy(pTopic->logicalPlan, req.logicalPlan); + strcpy(pTopic->physicalPlan, req.physicalPlan); SArray *pArray; //TODO: deserialize to SQueryDag SQueryDag *pDag; @@ -134,12 +136,12 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { STaskInfo *pInfo = taosArrayGet(pArray, 0); SArray* pTasks; schedulerCopyTask(pInfo, &pTasks, TQ_BUFFER_SIZE); - pHandle->buffer.firstOffset = -1; - pHandle->buffer.lastOffset = -1; + pTopic->buffer.firstOffset = -1; + pTopic->buffer.lastOffset = -1; for (int i = 0; i < TQ_BUFFER_SIZE; i++) { SSubQueryMsg* pMsg = taosArrayGet(pTasks, i); - pHandle->buffer.output[i].pMsg = pMsg; - pHandle->buffer.output[i].status = 0; + pTopic->buffer.output[i].pMsg = pMsg; + pTopic->buffer.output[i].status = 0; } // write mq meta } From 3203e8fe42eec55890c44a96478bd6efde763c64 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 15:18:33 +0800 Subject: [PATCH 4/5] try integrating allocator --- source/dnode/vnode/inc/tq.h | 14 +------------- source/dnode/vnode/src/tq/tq.c | 15 +++++---------- source/dnode/vnode/src/vnd/vnodeMain.c | 4 ++-- 3 files changed, 8 insertions(+), 25 deletions(-) diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 588305c8ae..ec886286f3 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -198,17 +198,6 @@ typedef struct STqQueryMsg { struct STqQueryMsg* next; } STqQueryMsg; -typedef struct STqLogHandle { - void* logHandle; - void* (*openLogReader)(void* logHandle); - void (*closeLogReader)(void* logReader); - int32_t (*logRead)(void* logReader, void** data, int64_t ver); - - int64_t (*logGetFirstVer)(void* logHandle); - int64_t (*logGetSnapshotVer)(void* logHandle); - int64_t (*logGetLastVer)(void* logHandle); -} STqLogHandle; - typedef struct STqCfg { // TODO } STqCfg; @@ -306,7 +295,6 @@ typedef struct STQ { // the handle of meta kvstore char* path; STqCfg* tqConfig; - STqLogHandle* tqLogHandle; STqMemRef tqMemRef; STqMetaStore* tqMeta; } STQ; @@ -323,7 +311,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac); +STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac); void tqClose(STQ*); // void* will be replace by a msg type diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c4a4f087c6..2b2272caea 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -50,7 +50,7 @@ void tqCleanUp() { taosTmrCleanUp(tqMgmt.timer); } -STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -58,20 +58,15 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogHandle* tqLogHandle, SMemA } pTq->path = strdup(path); pTq->tqConfig = tqConfig; - pTq->tqLogHandle = tqLogHandle; -#if 0 pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if (pTq->tqMemRef.pAllocator == NULL) { // TODO: error code of buffer pool } -#endif pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); if (pTq->tqMeta == NULL) { - free(pTq); -#if 0 allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator); -#endif + free(pTq); return NULL; } @@ -421,10 +416,10 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { // read from wal void* raw = NULL; /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ - int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset); - if (code < 0) { + /*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/ + /*if (code < 0) {*/ // TODO: error - } + /*}*/ // get msgType // if submitblk pItem->executor->assign(pItem->executor->runtimeEnv, raw); diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index 76b7ccf0d9..cb0d76ed29 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -119,7 +119,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { // TODO: Open TQ sprintf(dir, "%s/tq", pVnode->path); - pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), NULL, vBufPoolGetMAF(pVnode)); + pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) { // TODO: handle error return -1; @@ -151,4 +151,4 @@ static void vnodeCloseImpl(SVnode *pVnode) { tqClose(pVnode->pTq); walClose(pVnode->pWal); } -} \ No newline at end of file +} From 6bf4614bfa0fbaa6fcf13024ef4f25b4b0d96677 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Thu, 20 Jan 2022 15:25:13 +0800 Subject: [PATCH 5/5] revert and remove allocator --- source/dnode/vnode/src/tq/tq.c | 6 +++++- source/dnode/vnode/src/vnd/vnodeMain.c | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2b2272caea..ad71005153 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -58,15 +58,19 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) } pTq->path = strdup(path); pTq->tqConfig = tqConfig; +#if 0 pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); if (pTq->tqMemRef.pAllocator == NULL) { // TODO: error code of buffer pool } +#endif pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0); if (pTq->tqMeta == NULL) { - allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator); free(pTq); +#if 0 + allocFac->destroy(allocFac, pTq->tqMemRef.pAllocator); +#endif return NULL; } diff --git a/source/dnode/vnode/src/vnd/vnodeMain.c b/source/dnode/vnode/src/vnd/vnodeMain.c index cb0d76ed29..fd79341adf 100644 --- a/source/dnode/vnode/src/vnd/vnodeMain.c +++ b/source/dnode/vnode/src/vnd/vnodeMain.c @@ -117,7 +117,7 @@ static int vnodeOpenImpl(SVnode *pVnode) { return -1; } - // TODO: Open TQ + // Open TQ sprintf(dir, "%s/tq", pVnode->path); pVnode->pTq = tqOpen(dir, &(pVnode->config.tqCfg), vBufPoolGetMAF(pVnode)); if (pVnode->pTq == NULL) {