From 9e71d03d37dc1a3de0bcd8061bfb6bf7d5e6f705 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 18 May 2022 15:31:05 +0800 Subject: [PATCH 1/2] enh(tmq): commit when consumer_close called --- source/client/src/tmq.c | 39 +++++++++++++++++------ source/dnode/mnode/impl/src/mndConsumer.c | 27 +++++++++------- 2 files changed, 46 insertions(+), 20 deletions(-) diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index ecfc991331..f7ddca8f69 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -119,7 +119,7 @@ enum { enum { TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__READY, - TMQ_CONSUMER_STATUS__NO_TOPIC, + /*TMQ_CONSUMER_STATUS__NO_TOPIC,*/ }; enum { @@ -753,12 +753,19 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { + if (topic_list == NULL) { + return TMQ_RESP_ERR__FAIL; + } const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; SCMSubscribeReq req = {0}; int32_t code = -1; + if (sz == 0) { + return TMQ_RESP_ERR__FAIL; + } + req.consumerId = tmq->consumerId; tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); @@ -830,10 +837,12 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } // init hb timer - tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer); + if (tmq->hbTimer == NULL) { + tmq->hbTimer = taosTmrStart(tmqAssignDelayedHbTask, 1000, tmq, tmqMgmt.timer); + } // init auto commit timer - if (tmq->autoCommit) { + if (tmq->autoCommit && tmq->commitTimer == NULL) { tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, tmq, tmqMgmt.timer); } @@ -1074,10 +1083,13 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { taosHashCleanup(pHash); tmq->clientTopics = newTopics; - if (taosArrayGetSize(tmq->clientTopics) == 0) - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); - else - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + ASSERT(taosArrayGetSize(tmq->clientTopics) != 0); + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); + + /*if (taosArrayGetSize(tmq->clientTopics) == 0)*/ + /*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);*/ + /*else*/ + /*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);*/ atomic_store_32(&tmq->epoch, epoch); return set; @@ -1456,9 +1468,18 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) { tmq_resp_err_t tmq_consumer_close(tmq_t* tmq) { if (tmq->status == TMQ_CONSUMER_STATUS__READY) { - tmq_list_t* lst = tmq_list_new(); - tmq_resp_err_t rsp = tmq_subscribe(tmq, lst); + tmq_resp_err_t rsp = tmq_commit_sync(tmq, NULL); + if (rsp == TMQ_RESP_ERR__SUCCESS) { + // TODO: free resources + return TMQ_RESP_ERR__SUCCESS; + } else { + return TMQ_RESP_ERR__FAIL; + } + + tmq_list_t* lst = tmq_list_new(); + rsp = tmq_subscribe(tmq, lst); tmq_list_destroy(lst); + if (rsp == TMQ_RESP_ERR__SUCCESS) { // TODO: free resources return TMQ_RESP_ERR__SUCCESS; diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 503c0f404a..274876567e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -627,21 +627,26 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer, if (pNewConsumer->updateType == CONSUMER_UPDATE__MODIFY) { ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); - SArray *tmp = pOldConsumer->rebNewTopics; - pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; - pNewConsumer->rebNewTopics = tmp; - tmp = pOldConsumer->rebRemovedTopics; - pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; - pNewConsumer->rebRemovedTopics = tmp; + if (taosArrayGetSize(pNewConsumer->rebNewTopics) == 0 && taosArrayGetSize(pNewConsumer->rebRemovedTopics) == 0) { + pOldConsumer->status = MQ_CONSUMER_STATUS__READY; + } else { + SArray *tmp = pOldConsumer->rebNewTopics; + pOldConsumer->rebNewTopics = pNewConsumer->rebNewTopics; + pNewConsumer->rebNewTopics = tmp; - tmp = pOldConsumer->assignedTopics; - pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; - pNewConsumer->assignedTopics = tmp; + tmp = pOldConsumer->rebRemovedTopics; + pOldConsumer->rebRemovedTopics = pNewConsumer->rebRemovedTopics; + pNewConsumer->rebRemovedTopics = tmp; - pOldConsumer->subscribeTime = pNewConsumer->upTime; + tmp = pOldConsumer->assignedTopics; + pOldConsumer->assignedTopics = pNewConsumer->assignedTopics; + pNewConsumer->assignedTopics = tmp; - pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + pOldConsumer->subscribeTime = pNewConsumer->upTime; + + pOldConsumer->status = MQ_CONSUMER_STATUS__MODIFY; + } } else if (pNewConsumer->updateType == CONSUMER_UPDATE__LOST) { ASSERT(taosArrayGetSize(pOldConsumer->rebNewTopics) == 0); ASSERT(taosArrayGetSize(pOldConsumer->rebRemovedTopics) == 0); From 4e640b6d67433c52c90d981ef6fa0cbcc2e4a100 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 18 May 2022 16:13:58 +0800 Subject: [PATCH 2/2] enh(tmq): update tb list for existing topic --- include/libs/executor/executor.h | 6 +++--- source/client/src/tmq.c | 20 +++++--------------- source/dnode/vnode/src/inc/vnodeInt.h | 5 +++-- source/dnode/vnode/src/tq/tq.c | 15 +++++++++++++++ source/dnode/vnode/src/vnd/vnodeSvr.c | 7 ++++++- source/libs/executor/src/executor.c | 8 ++++---- 6 files changed, 36 insertions(+), 25 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 162b6fb2ed..8f300c96c5 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -42,7 +42,7 @@ typedef struct SReadHandle { #define STREAM_DATA_TYPE_SSDATA_BLOCK 0x2 typedef enum { - OPTR_EXEC_MODEL_BATCH = 0x1, + OPTR_EXEC_MODEL_BATCH = 0x1, OPTR_EXEC_MODEL_STREAM = 0x2, } EOPTR_EXEC_MODEL; @@ -81,7 +81,7 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO * @param isAdd * @return */ -int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd); +int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd); /** * Create the exec task object according to task json @@ -169,7 +169,7 @@ int32_t qUpdateQueriedTableIdList(qTaskInfo_t tinfo, int64_t uid, int32_t type); void qProcessFetchRsp(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t *resNum, SExplainExecInfo **pRes); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes); #ifdef __cplusplus } diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index f7ddca8f69..2a2fdea537 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -119,7 +119,7 @@ enum { enum { TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__READY, - /*TMQ_CONSUMER_STATUS__NO_TOPIC,*/ + TMQ_CONSUMER_STATUS__NO_TOPIC, }; enum { @@ -753,19 +753,12 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in } tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { - if (topic_list == NULL) { - return TMQ_RESP_ERR__FAIL; - } const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); void* buf = NULL; SCMSubscribeReq req = {0}; int32_t code = -1; - if (sz == 0) { - return TMQ_RESP_ERR__FAIL; - } - req.consumerId = tmq->consumerId; tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); @@ -1083,13 +1076,10 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqAskEpRsp* pRsp) { taosHashCleanup(pHash); tmq->clientTopics = newTopics; - ASSERT(taosArrayGetSize(tmq->clientTopics) != 0); - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); - - /*if (taosArrayGetSize(tmq->clientTopics) == 0)*/ - /*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC);*/ - /*else*/ - /*atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY);*/ + if (taosArrayGetSize(tmq->clientTopics) == 0) + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__NO_TOPIC); + else + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); atomic_store_32(&tmq->epoch, epoch); return set; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 9727d9df9f..6a5ad63822 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -96,8 +96,8 @@ STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid, bool deepCopy) SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid); SArray* metaGetSmaTbUids(SMeta* pMeta); -int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); -int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); +int32_t metaCreateTSma(SMeta* pMeta, int64_t version, SSmaCfg* pCfg); +int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); // tsdb int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb, const char* dir, STsdbKeepCfg* pKeepCfg); @@ -117,6 +117,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); int tqCommit(STQ*); +int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList); int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen); int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 6cc986d54b..80aa350e4d 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -101,6 +101,21 @@ static void tdSRowDemo() { taosMemoryFree(pTSChema); } +int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList) { + void* pIter = NULL; + STqExec* pExec = NULL; + while (1) { + pIter = taosHashIterate(pTq->execs, pIter); + if (pIter == NULL) break; + pExec = (STqExec*)pIter; + for (int32_t i = 0; i < 5; i++) { + int32_t code = qUpdateQualifiedTableId(pExec->task[i], tbUidList, true); + ASSERT(code == 0); + } + } + return 0; +} + int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { if (msgType != TDMT_VND_SUBMIT) return 0; void* pIter = NULL; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 464331319e..5c8cd362fd 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -333,6 +333,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, SVCreateTbRsp cRsp = {0}; char tbName[TSDB_TABLE_FNAME_LEN]; STbUidStore *pStore = NULL; + SArray *tbUids = NULL; pRsp->msgType = TDMT_VND_CREATE_TABLE_RSP; pRsp->code = TSDB_CODE_SUCCESS; @@ -348,7 +349,8 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, } rsp.pArray = taosArrayInit(req.nReqs, sizeof(cRsp)); - if (rsp.pArray == NULL) { + tbUids = taosArrayInit(req.nReqs, sizeof(int64_t)); + if (rsp.pArray == NULL || tbUids == NULL) { rcode = -1; terrno = TSDB_CODE_OUT_OF_MEMORY; goto _exit; @@ -376,6 +378,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, } else { cRsp.code = TSDB_CODE_SUCCESS; tdFetchTbUidList(pVnode->pSma, &pStore, pCreateReq->ctb.suid, pCreateReq->uid); + taosArrayPush(tbUids, &pCreateReq->uid); } taosArrayPush(rsp.pArray, &cRsp); @@ -383,6 +386,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, tDecoderClear(&decoder); + tqUpdateTbUidList(pVnode->pTq, tbUids); tdUpdateTbUidList(pVnode->pSma, pStore); tdUidStoreFree(pStore); @@ -402,6 +406,7 @@ static int vnodeProcessCreateTbReq(SVnode *pVnode, int64_t version, void *pReq, _exit: taosArrayDestroy(rsp.pArray); + taosArrayDestroy(tbUids); tDecoderClear(&decoder); tEncoderClear(&encoder); return rcode; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 320450eb6e..66073b70eb 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -126,7 +126,7 @@ qTaskInfo_t qCreateStreamExecTaskInfo(void* msg, void* streamReadHandle) { return pTaskInfo; } -int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isAdd) { +int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, const SArray* tableIdList, bool isAdd) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; // traverse to the stream scanner node to add this table id @@ -141,12 +141,12 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA SMetaReader mr = {0}; metaReaderInit(&mr, pScanInfo->readHandle.meta, 0); - for(int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { + for (int32_t i = 0; i < taosArrayGetSize(tableIdList); ++i) { int64_t* id = (int64_t*)taosArrayGet(tableIdList, i); int32_t code = metaGetTableEntryByUid(&mr, *id); if (code != TSDB_CODE_SUCCESS) { - qError("failed to get table meta, uid:%"PRIu64" code:%s", *id, tstrerror(terrno)); + qError("failed to get table meta, uid:%" PRIu64 " code:%s", *id, tstrerror(terrno)); continue; } @@ -160,7 +160,7 @@ int32_t qUpdateQualifiedTableId(qTaskInfo_t tinfo, SArray* tableIdList, bool isA metaReaderClear(&mr); - qDebug(" %d qualified child tables added into stream scanner", (int32_t) taosArrayGetSize(qa)); + qDebug(" %d qualified child tables added into stream scanner", (int32_t)taosArrayGetSize(qa)); int32_t code = tqReadHandleAddTbUidList(pScanInfo->streamBlockReader, qa); if (code != TSDB_CODE_SUCCESS) { return code;