diff --git a/include/common/tmsg.h b/include/common/tmsg.h index cb0c76eb71..fe96f4ee52 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1561,7 +1561,7 @@ typedef struct SMqSetCVgRsp { typedef struct SMqCVConsumeReq { int64_t reqId; int64_t offset; - int64_t clientId; + int64_t consumerId; int64_t blockingTime; char topicName[TSDB_TOPIC_FNAME_LEN]; char cgroup[TSDB_CONSUMER_GROUP_LEN]; diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 7a57956f3b..4158d7e7c7 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -55,8 +55,6 @@ int32_t mndInitSubscribe(SMnode *pMnode) { .deleteFp = (SdbDeleteFp)mndSubActionDelete}; mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ - /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); return sdbSetTable(pMnode->pSdb, table); @@ -95,7 +93,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); pCEp->consumerId = consumerId; taosArrayPush(pSub->assigned, pCEp); - pSub->nextConsumerIdx = (pSub->nextConsumerIdx+1) % taosArrayGetSize(pSub->availConsumer); + pSub->nextConsumerIdx = (pSub->nextConsumerIdx + 1) % taosArrayGetSize(pSub->availConsumer); // build msg SMqSetCVgReq req = { diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index eb82d19a4f..aaf7f33061 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -162,7 +162,7 @@ typedef struct STqGroup { } STqGroup; typedef struct STqTaskItem { - int8_t status; + int8_t status; int64_t offset; void* dst; SSubQueryMsg* pMsg; @@ -175,18 +175,22 @@ typedef struct STqBuffer { STqTaskItem output[TQ_BUFFER_SIZE]; } STqBuffer; -typedef struct STqClientHandle { - int64_t clientId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_TOPIC_FNAME_LEN]; - char* sql; - char* logicalPlan; - char* physicalPlan; - int64_t committedOffset; - int64_t currentOffset; - STqBuffer buffer; +typedef struct STqTopicHandle { + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_TOPIC_FNAME_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + int64_t committedOffset; + int64_t currentOffset; + STqBuffer buffer; SWalReadHandle* pReadhandle; -} STqClientHandle; +} STqTopicHandle; + +typedef struct STqConsumerHandle { + int64_t consumerId; + SArray* topics; // SArray +} STqConsumerHandle; typedef struct STqQueryMsg { STqMsgItem* item; @@ -337,7 +341,7 @@ int tqRegisterContext(STqGroup*, void* ahandle); int tqSendLaunchQuery(STqMsgItem*, int64_t offset); #endif -int32_t tqProcessConsume(STQ* pTq, SRpcMsg *pMsg, SRpcMsg **ppRsp); +int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp); typedef struct STqReadHandle { int64_t ver; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b83cfe99bc..c4a4f087c6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -37,7 +37,7 @@ const void* tqDeserializeItem(const void* pBytes, STqMsgItem* pItem); int tqInit() { int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); - if(old == 1) return 0; + if (old == 1) return 0; tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ"); return 0; @@ -45,7 +45,7 @@ int tqInit() { void tqCleanUp() { int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0); - if(old == 0) return; + if (old == 0) return; taosTmrStop(tqMgmt.timer); taosTmrCleanUp(tqMgmt.timer); } @@ -150,7 +150,7 @@ int tqCreateGroup(STQ* pTq, int64_t topicId, int64_t cgId, int64_t cId, STqGroup memset(pGroup, 0, sizeof(STqGroup)); pGroup->topicList = tdListNew(sizeof(STqTopic)); - if(pGroup->topicList == NULL) { + if (pGroup->topicList == NULL) { free(pGroup); return -1; } @@ -190,7 +190,7 @@ static int tqFetch(STqGroup* pGroup, STqConsumeRsp** pRsp) { int totSize = 0; int numOfMsgs = 0; // TODO: make it a macro - int sizeLimit = 4 * 1024; + int sizeLimit = 4 * 1024; void* ptr = realloc(*pRsp, sizeof(STqConsumeRsp) + sizeLimit); if (ptr == NULL) { @@ -329,9 +329,9 @@ int tqProcessCMsg(STQ* pTq, STqConsumeReq* pMsg, STqRspHandle* pRsp) { } int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { - STqConsumeReq *pMsg = pReq->pCont; - int64_t clientId = pMsg->head.clientId; - STqGroup* pGroup = tqGetGroup(pTq, clientId); + STqConsumeReq* pMsg = pReq->pCont; + int64_t clientId = pMsg->head.clientId; + STqGroup* pGroup = tqGetGroup(pTq, clientId); if (pGroup == NULL) { terrno = TSDB_CODE_TQ_GROUP_NOT_SET; return -1; @@ -343,9 +343,8 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { int numOfMsgs = 0; int sizeLimit = 4096; - - STqConsumeRsp *pCsmRsp = (*pRsp)->pCont; - void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit); + STqConsumeRsp* pCsmRsp = (*pRsp)->pCont; + void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + sizeLimit); if (ptr == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; return -1; @@ -356,16 +355,16 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { tdListInitIter(topicList, &iter, TD_LIST_FORWARD); STqMsgContent* buffer = NULL; - SArray* pArray = taosArrayInit(0, sizeof(void*)); + SArray* pArray = taosArrayInit(0, sizeof(void*)); - SListNode *pn; - while((pn = tdListNext(&iter)) != NULL) { - STqTopic* pTopic = *(STqTopic**)pn->data; - int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE; + SListNode* pn; + while ((pn = tdListNext(&iter)) != NULL) { + STqTopic* pTopic = *(STqTopic**)pn->data; + int idx = pTopic->floatingCursor % TQ_BUFFER_SIZE; STqMsgItem* pItem = &pTopic->buffer[idx]; if (pItem->content != NULL && pItem->offset == pTopic->floatingCursor) { - if(pItem->status == TQ_ITEM_READY) { - //if has data + if (pItem->status == TQ_ITEM_READY) { + // if has data totSize += pTopic->buffer[idx].size; if (totSize > sizeLimit) { void* ptr = realloc((*pRsp)->pCont, sizeof(STqConsumeRsp) + totSize); @@ -388,13 +387,13 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { if (totSize > sizeLimit) { break; } - } else if(pItem->status == TQ_ITEM_PROCESS) { - //if not have data but in process + } else if (pItem->status == TQ_ITEM_PROCESS) { + // if not have data but in process - } else if(pItem->status == TQ_ITEM_EMPTY){ - //if not have data and not in process + } else if (pItem->status == TQ_ITEM_EMPTY) { + // if not have data and not in process int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_EMPTY, TQ_ITEM_PROCESS); - if(old != TQ_ITEM_EMPTY) { + if (old != TQ_ITEM_EMPTY) { continue; } pItem->offset = pTopic->floatingCursor; @@ -416,22 +415,22 @@ int tqConsume(STQ* pTq, SRpcMsg* pReq, SRpcMsg** pRsp) { } // fetched a num of msgs, rpc response - for(int i = 0; i < pArray->size; i++) { + for (int i = 0; i < pArray->size; i++) { STqMsgItem* pItem = taosArrayGet(pArray, i); - //read from wal + // read from wal void* raw = NULL; /*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/ int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset); - if(code < 0) { - //TODO: error + if (code < 0) { + // TODO: error } - //get msgType - //if submitblk + // get msgType + // if submitblk pItem->executor->assign(pItem->executor->runtimeEnv, raw); SSDataBlock* content = pItem->executor->exec(pItem->executor->runtimeEnv); pItem->content = content; - //if other type, send just put into buffer + // if other type, send just put into buffer /*pItem->content = raw;*/ int32_t old = atomic_val_compare_exchange_32(&pItem->status, TQ_ITEM_PROCESS, TQ_ITEM_READY); @@ -608,41 +607,48 @@ int tqItemSSize() { return 0; } -int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg **ppRsp) { +int32_t tqProcessConsume(STQ* pTq, SRpcMsg* pMsg, SRpcMsg** ppRsp) { SMqCVConsumeReq* pReq = pMsg->pCont; - int64_t reqId = pReq->reqId; - int64_t clientId = pReq->clientId; - int64_t offset = pReq->offset; - int64_t blockingTime = pReq->blockingTime; + int64_t reqId = pReq->reqId; + int64_t consumerId = pReq->consumerId; + int64_t offset = pReq->offset; + int64_t blockingTime = pReq->blockingTime; - STqClientHandle* pHandle = tqHandleGet(pTq->tqMeta, clientId); - int8_t pos = offset % TQ_BUFFER_SIZE; - int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); - if (old == 1) { - // do nothing + STqConsumerHandle* pConsumer = tqHandleGet(pTq->tqMeta, consumerId); + int sz = taosArrayGetSize(pConsumer->topics); + + for (int i = 0 ; i < sz; i++) { + STqTopicHandle *pHandle = taosArrayGet(pConsumer->topics, i); + + int8_t pos = offset % TQ_BUFFER_SIZE; + int8_t old = atomic_val_compare_exchange_8(&pHandle->buffer.output[pos].status, 0, 1); + if (old == 1) { + // do nothing + } + if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { + // TODO + } + SWalHead* pHead = pHandle->pReadhandle->pHead; + while (pHead->head.msgType != TDMT_VND_SUBMIT) { + // read until find TDMT_VND_SUBMIT + } + SSubmitMsg* pCont = (SSubmitMsg*)&pHead->head.body; + + SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; + + void* outputData; + atomic_store_8(&pHandle->buffer.output[pos].status, 1); + + // put output into rsp } - if (walReadWithHandle(pHandle->pReadhandle, offset) < 0) { - //TODO - } - SWalHead *pHead = pHandle->pReadhandle->pHead; - while (pHead->head.msgType != TDMT_VND_SUBMIT) { - // read until find TDMT_VND_SUBMIT - } - SSubmitMsg *pCont = (SSubmitMsg*)&pHead->head.body; - SSubQueryMsg* pQueryMsg = pHandle->buffer.output[pos].pMsg; - - void* outputData; - - atomic_store_8(&pHandle->buffer.output[pos].status, 1); // launch query // get result - // put into SMqCvConsumeRsp* pRsp; return 0; } -STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { +STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg* pMsg) { STqReadHandle* pReadHandle = malloc(sizeof(STqReadHandle)); if (pReadHandle == NULL) { return NULL; @@ -655,39 +661,39 @@ STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta, SSubmitMsg *pMsg) { } bool tqNextDataBlock(STqReadHandle* pHandle) { - if(tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { + if (tGetSubmitMsgNext(&pHandle->msgIter, &pHandle->pBlock) < 0) { return false; } return true; } int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) { - SMemRow row; - int32_t sversion = pHandle->pBlock->sversion; + SMemRow row; + int32_t sversion = pHandle->pBlock->sversion; SSchemaWrapper* pSchema = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, false); pBlockInfo->numOfCols = pSchema->nCols; pBlockInfo->rows = pHandle->pBlock->numOfRows; pBlockInfo->uid = pHandle->pBlock->uid; - //TODO: filter out unused column + // TODO: filter out unused column return 0; } -SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { - int32_t sversion = pHandle->pBlock->sversion; +SArray* tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { + int32_t sversion = pHandle->pBlock->sversion; SSchemaWrapper* pSchemaWrapper = metaGetTableSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion, true); - STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); - SArray *pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); + STSchema* pTschema = metaGetTbTSchema(pHandle->pMeta, pHandle->pBlock->uid, sversion); + SArray* pArray = taosArrayInit(pSchemaWrapper->nCols, sizeof(SColumnInfoData)); if (pArray == NULL) { return NULL; } SColumnInfoData colInfo; - int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes; + int sz = pSchemaWrapper->nCols * pSchemaWrapper->pSchema->bytes; colInfo.pData = malloc(sz); if (colInfo.pData == NULL) { return NULL; } for (int i = 0; i < pTschema->numOfCols; i++) { - //TODO: filter out unused column + // TODO: filter out unused column taosArrayPush(pColumnIdList, &(schemaColAt(pTschema, i)->colId)); } @@ -695,16 +701,17 @@ SArray *tqRetrieveDataBlock(STqReadHandle* pHandle, SArray* pColumnIdList) { int32_t kvIdx; while ((row = tGetSubmitBlkNext(&pHandle->blkIter)) != NULL) { for (int i = 0; i < pTschema->numOfCols && kvIdx < pTschema->numOfCols; i++) { - //TODO: filter out unused column - STColumn *pCol = schemaColAt(pTschema, i); + // TODO: filter out unused column + STColumn* pCol = schemaColAt(pTschema, i); void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx); - //TODO: handle varlen + // TODO: handle varlen memcpy(POINTER_SHIFT(colInfo.pData, pCol->offset), val, pCol->bytes); } } taosArrayPush(pArray, &colInfo); return pArray; } -/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t status) {*/ - /*return 0;*/ +/*int tqLoadDataBlock(SExecTaskInfo* pTaskInfo, SSubmitBlkScanInfo* pSubmitBlkScanInfo, SSDataBlock* pBlock, uint32_t + * status) {*/ +/*return 0;*/ /*}*/