From 255d79544295df25703b18a8d03b436859cf7ec9 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 8 Mar 2022 17:22:21 +0800 Subject: [PATCH] add tq push --- include/common/tcommon.h | 6 +- include/common/tmsg.h | 12 +- include/libs/qcom/query.h | 145 +++++++++++------- source/client/src/clientImpl.c | 4 +- source/client/src/tmq.c | 30 ++-- source/common/src/tmsg.c | 4 +- source/dnode/mnode/impl/src/mndDb.c | 6 +- source/dnode/mnode/impl/src/mndScheduler.c | 31 ++-- source/dnode/mnode/impl/src/mndSubscribe.c | 15 +- source/dnode/mnode/impl/src/mnode.c | 8 +- source/dnode/mnode/impl/test/db/db.cpp | 12 +- source/dnode/vnode/inc/tq.h | 14 +- source/dnode/vnode/src/inc/tqInt.h | 10 +- source/dnode/vnode/src/inc/tqPush.h | 77 ++++++++++ source/dnode/vnode/src/tq/tq.c | 63 ++++---- source/dnode/vnode/src/tq/tqPush.c | 84 ++++++++++ source/dnode/vnode/src/vnd/vnodeQuery.c | 12 +- source/libs/catalog/src/catalog.c | 2 +- source/libs/catalog/test/catalogTests.cpp | 48 +++--- source/libs/executor/src/executorimpl.c | 4 +- source/libs/parser/src/dCDAstProcess.c | 4 +- .../libs/parser/test/mockCatalogService.cpp | 22 +-- source/libs/planner/src/physicalPlan.c | 4 +- source/libs/planner/src/physicalPlanJson.c | 10 +- source/libs/scheduler/src/scheduler.c | 6 +- source/libs/scheduler/test/schedulerTests.cpp | 14 +- tests/test/c/tmqDemo.c | 6 +- 27 files changed, 438 insertions(+), 215 deletions(-) create mode 100644 source/dnode/vnode/src/inc/tqPush.h create mode 100644 source/dnode/vnode/src/tq/tqPush.c diff --git a/include/common/tcommon.h b/include/common/tcommon.h index d0ec5c9296..1d3ab4f340 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -135,7 +135,7 @@ static FORCE_INLINE void* tDecodeDataBlock(const void* buf, SSDataBlock* pBlock) return (void*)buf; } -static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp* pRsp) { +static FORCE_INLINE int32_t tEncodeSMqPollRsp(void** buf, const SMqPollRsp* pRsp) { int32_t tlen = 0; int32_t sz = 0; tlen += taosEncodeFixedI64(buf, pRsp->consumerId); @@ -156,7 +156,7 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp return tlen; } -static FORCE_INLINE void* tDecodeSMqConsumeRsp(void* buf, SMqConsumeRsp* pRsp) { +static FORCE_INLINE void* tDecodeSMqPollRsp(void* buf, SMqPollRsp* pRsp) { int32_t sz; buf = taosDecodeFixedI64(buf, &pRsp->consumerId); buf = taosDecodeFixedI64(buf, &pRsp->reqOffset); @@ -194,7 +194,7 @@ static FORCE_INLINE void tDeleteSSDataBlock(SSDataBlock* pBlock) { // tfree(pBlock); } -static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqConsumeRsp* pRsp) { +static FORCE_INLINE void tDeleteSMqConsumeRsp(SMqPollRsp* pRsp) { if (pRsp->schemas) { if (pRsp->schemas->nCols) { tfree(pRsp->schemas->pSchema); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d56db2046e..238753e5b7 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -795,7 +795,7 @@ typedef struct SVgroupInfo { int32_t vgId; uint32_t hashBegin; uint32_t hashEnd; - SEpSet epset; + SEpSet epSet; } SVgroupInfo; typedef struct { @@ -1876,8 +1876,8 @@ typedef struct { } SVCreateTSmaReq; typedef struct { - int8_t type; // 0 status report, 1 update data - char indexName[TSDB_INDEX_NAME_LEN + 1]; // + int8_t type; // 0 status report, 1 update data + char indexName[TSDB_INDEX_NAME_LEN + 1]; // STimeWindow windows; } STSmaMsg; @@ -2073,7 +2073,7 @@ typedef struct { int32_t skipLogNum; int32_t numOfTopics; SArray* pBlockData; // SArray -} SMqConsumeRsp; +} SMqPollRsp; // one req for one vg+topic typedef struct { @@ -2086,7 +2086,7 @@ typedef struct { int64_t currentOffset; char topic[TSDB_TOPIC_FNAME_LEN]; -} SMqConsumeReq; +} SMqPollReq; typedef struct { int32_t vgId; @@ -2108,7 +2108,7 @@ typedef struct { struct tmq_message_t { SMqRspHead head; union { - SMqConsumeRsp consumeRsp; + SMqPollRsp consumeRsp; SMqCMGetSubEpRsp getEpRsp; }; void* extra; diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 1f56254476..5d5ab74ba9 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -44,10 +44,10 @@ enum { }; typedef struct STableComInfo { - uint8_t numOfTags; // the number of tags in schema - uint8_t precision; // the number of precision - int16_t numOfColumns; // the number of columns - int32_t rowSize; // row size of the schema + uint8_t numOfTags; // the number of tags in schema + uint8_t precision; // the number of precision + int16_t numOfColumns; // the number of columns + int32_t rowSize; // row size of the schema } STableComInfo; /* @@ -56,49 +56,45 @@ typedef struct STableComInfo { * The cached child table meta info. For each child table, 24 bytes are required to keep the essential table info. */ typedef struct SCTableMeta { - int32_t vgId:24; + int32_t vgId : 24; int8_t tableType; uint64_t uid; uint64_t suid; } SCTableMeta; /* - * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a SCTableMeta. + * Note that the first 24 bytes of STableMeta are identical to SCTableMeta, it is safe to cast a STableMeta to be a + * SCTableMeta. */ typedef struct STableMeta { - //BEGIN: KEEP THIS PART SAME WITH SCTableMeta - int32_t vgId:24; - int8_t tableType; - uint64_t uid; - uint64_t suid; - //END: KEEP THIS PART SAME WITH SCTableMeta - - // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta info - int16_t sversion; - int16_t tversion; - STableComInfo tableInfo; - SSchema schema[]; + // BEGIN: KEEP THIS PART SAME WITH SCTableMeta + int32_t vgId : 24; + int8_t tableType; + uint64_t uid; + uint64_t suid; + // END: KEEP THIS PART SAME WITH SCTableMeta + + // if the table is TSDB_CHILD_TABLE, the following information is acquired from the corresponding super table meta + // info + int16_t sversion; + int16_t tversion; + STableComInfo tableInfo; + SSchema schema[]; } STableMeta; typedef struct SDBVgInfo { - int32_t vgVersion; + int32_t vgVersion; int8_t hashMethod; - SHashObj *vgHash; //key:vgId, value:SVgroupInfo + SHashObj* vgHash; // key:vgId, value:SVgroupInfo } SDBVgInfo; typedef struct SUseDbOutput { - char db[TSDB_DB_FNAME_LEN]; - uint64_t dbId; - SDBVgInfo *dbVgroup; + char db[TSDB_DB_FNAME_LEN]; + uint64_t dbId; + SDBVgInfo* dbVgroup; } SUseDbOutput; -enum { - META_TYPE_NULL_TABLE = 1, - META_TYPE_CTABLE, - META_TYPE_TABLE, - META_TYPE_BOTH_TABLE -}; - +enum { META_TYPE_NULL_TABLE = 1, META_TYPE_CTABLE, META_TYPE_TABLE, META_TYPE_BOTH_TABLE }; typedef struct STableMetaOutput { int32_t metaType; @@ -107,30 +103,30 @@ typedef struct STableMetaOutput { char ctbName[TSDB_TABLE_NAME_LEN]; char tbName[TSDB_TABLE_NAME_LEN]; SCTableMeta ctbMeta; - STableMeta *tbMeta; + STableMeta* tbMeta; } STableMetaOutput; typedef struct SDataBuf { - void *pData; - uint32_t len; - void *handle; + void* pData; + uint32_t len; + void* handle; } SDataBuf; typedef int32_t (*__async_send_cb_fn_t)(void* param, const SDataBuf* pMsg, int32_t code); typedef int32_t (*__async_exec_fn_t)(void* param); typedef struct SMsgSendInfo { - __async_send_cb_fn_t fp; //async callback function - void *param; - uint64_t requestId; - uint64_t requestObjRefId; - int32_t msgType; - SDataBuf msgInfo; + __async_send_cb_fn_t fp; // async callback function + void* param; + uint64_t requestId; + uint64_t requestObjRefId; + int32_t msgType; + SDataBuf msgInfo; } SMsgSendInfo; typedef struct SQueryNodeAddr { int32_t nodeId; // vgId or qnodeId - SEpSet epset; + SEpSet epSet; } SQueryNodeAddr; int32_t initTaskQueue(); @@ -154,32 +150,67 @@ int32_t taosAsyncExec(__async_exec_fn_t execFn, void* execParam, int32_t* code); * @param pInfo * @return */ -int32_t asyncSendMsgToServer(void *pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); +int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransporterId, const SMsgSendInfo* pInfo); -int32_t queryBuildUseDbOutput(SUseDbOutput *pOut, SUseDbRsp *usedbRsp); +int32_t queryBuildUseDbOutput(SUseDbOutput* pOut, SUseDbRsp* usedbRsp); void initQueryModuleMsgHandle(); const SSchema* tGetTbnameColumnSchema(); -bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); +bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); -int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta **pMeta); +int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); -extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char **msg, int32_t msgSize, int32_t *msgLen); -extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char *msg, int32_t msgSize); +extern int32_t (*queryBuildMsg[TDMT_MAX])(void* input, char** msg, int32_t msgSize, int32_t* msgLen); +extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t msgSize); -#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE -#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE -#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE +#define SET_META_TYPE_NULL(t) (t) = META_TYPE_NULL_TABLE +#define SET_META_TYPE_CTABLE(t) (t) = META_TYPE_CTABLE +#define SET_META_TYPE_TABLE(t) (t) = META_TYPE_TABLE #define SET_META_TYPE_BOTH_TABLE(t) (t) = META_TYPE_BOTH_TABLE -#define qFatal(...) do { if (qDebugFlag & DEBUG_FATAL) { taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); }} while(0) -#define qError(...) do { if (qDebugFlag & DEBUG_ERROR) { taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); }} while(0) -#define qWarn(...) do { if (qDebugFlag & DEBUG_WARN) { taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); }} while(0) -#define qInfo(...) do { if (qDebugFlag & DEBUG_INFO) { taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); }} while(0) -#define qDebug(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); }} while(0) -#define qTrace(...) do { if (qDebugFlag & DEBUG_TRACE) { taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); }} while(0) -#define qDebugL(...) do { if (qDebugFlag & DEBUG_DEBUG) { taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); }} while(0) +#define qFatal(...) \ + do { \ + if (qDebugFlag & DEBUG_FATAL) { \ + taosPrintLog("QRY FATAL ", DEBUG_FATAL, qDebugFlag, __VA_ARGS__); \ + } \ + } while (0) +#define qError(...) \ + do { \ + if (qDebugFlag & DEBUG_ERROR) { \ + taosPrintLog("QRY ERROR ", DEBUG_ERROR, qDebugFlag, __VA_ARGS__); \ + } \ + } while (0) +#define qWarn(...) \ + do { \ + if (qDebugFlag & DEBUG_WARN) { \ + taosPrintLog("QRY WARN ", DEBUG_WARN, qDebugFlag, __VA_ARGS__); \ + } \ + } while (0) +#define qInfo(...) \ + do { \ + if (qDebugFlag & DEBUG_INFO) { \ + taosPrintLog("QRY ", DEBUG_INFO, qDebugFlag, __VA_ARGS__); \ + } \ + } while (0) +#define qDebug(...) \ + do { \ + if (qDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ + } \ + } while (0) +#define qTrace(...) \ + do { \ + if (qDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("QRY ", DEBUG_TRACE, qDebugFlag, __VA_ARGS__); \ + } \ + } while (0) +#define qDebugL(...) \ + do { \ + if (qDebugFlag & DEBUG_DEBUG) { \ + taosPrintLongString("QRY ", DEBUG_DEBUG, qDebugFlag, __VA_ARGS__); \ + } \ + } while (0) #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 259d0e5799..e9febef7e2 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -517,7 +517,7 @@ void* doFetchRow(SRequestObj* pRequest) { SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; SVgroupInfo* pVgroupInfo = taosArrayGet(pShowReqInfo->pArray, pShowReqInfo->currentIndex); - epSet = pVgroupInfo->epset; + epSet = pVgroupInfo->epSet; } else if (pRequest->type == TDMT_VND_SHOW_TABLES_FETCH) { pRequest->type = TDMT_VND_SHOW_TABLES; SShowReqInfo* pShowReqInfo = &pRequest->body.showInfo; @@ -534,7 +534,7 @@ void* doFetchRow(SRequestObj* pRequest) { pRequest->body.requestMsg.pData = pShowReq; SMsgSendInfo* body = buildMsgInfoImpl(pRequest); - epSet = pVgroupInfo->epset; + epSet = pVgroupInfo->epSet; int64_t transporterId = 0; STscObj* pTscObj = pRequest->pTscObj; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 10dc378518..60103cc9c5 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -13,8 +13,6 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE - #include "clientInt.h" #include "clientLog.h" #include "parser.h" @@ -606,17 +604,17 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { int32_t tmqGetSkipLogNum(tmq_message_t* tmq_message) { if (tmq_message == NULL) return 0; - SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; + SMqPollRsp* pRsp = &tmq_message->consumeRsp; return pRsp->skipLogNum; } void tmqShowMsg(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; - static bool noPrintSchema; - char pBuf[128]; - SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; - int32_t colNum = pRsp->schemas->nCols; + static bool noPrintSchema; + char pBuf[128]; + SMqPollRsp* pRsp = &tmq_message->consumeRsp; + int32_t colNum = pRsp->schemas->nCols; if (!noPrintSchema) { printf("|"); for (int32_t i = 0; i < colNum; i++) { @@ -703,7 +701,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) { goto WRITE_QUEUE_FAIL; } memcpy(pRsp, pMsg->pData, sizeof(SMqRspHead)); - tDecodeSMqConsumeRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); + tDecodeSMqPollRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pRsp->consumeRsp); /*printf("rsp commit off:%ld rsp off:%ld has data:%d\n", pRsp->committedOffset, pRsp->rspOffset, pRsp->numOfTopics);*/ if (pRsp->consumeRsp.numOfTopics == 0) { /*printf("no data\n");*/ @@ -874,7 +872,7 @@ tmq_resp_err_t tmq_seek(tmq_t* tmq, const tmq_topic_vgroup_t* offset) { return TMQ_RESP_ERR__FAIL; } -SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { +SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTopic* pTopic, SMqClientVg* pVg) { int64_t reqOffset; if (pVg->currentOffset >= 0) { reqOffset = pVg->currentOffset; @@ -886,7 +884,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien reqOffset = tmq->resetOffsetCfg; } - SMqConsumeReq* pReq = malloc(sizeof(SMqConsumeReq)); + SMqPollReq* pReq = malloc(sizeof(SMqPollReq)); if (pReq == NULL) { return NULL; } @@ -900,7 +898,7 @@ SMqConsumeReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClien pReq->currentOffset = reqOffset; pReq->head.vgId = htonl(pVg->vgId); - pReq->head.contLen = htonl(sizeof(SMqConsumeReq)); + pReq->head.contLen = htonl(sizeof(SMqPollReq)); return pReq; } @@ -914,7 +912,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { /*if (vgStatus != TMQ_VG_STATUS__IDLE) {*/ /*continue;*/ /*}*/ - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); // TODO: out of mem @@ -941,7 +939,7 @@ tmq_message_t* tmqSyncPollImpl(tmq_t* tmq, int64_t blockingTime) { sendInfo->msgInfo = (SDataBuf){ .pData = pReq, - .len = sizeof(SMqConsumeReq), + .len = sizeof(SMqPollReq), .handle = NULL, }; sendInfo->requestId = generateRequestId(); @@ -982,7 +980,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { if (vgStatus != TMQ_VG_STATUS__IDLE) { continue; } - SMqConsumeReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); + SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg); if (pReq == NULL) { atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); tsem_post(&tmq->rspSem); @@ -1011,7 +1009,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) { sendInfo->msgInfo = (SDataBuf){ .pData = pReq, - .len = sizeof(SMqConsumeReq), + .len = sizeof(SMqPollReq), .handle = NULL, }; sendInfo->requestId = generateRequestId(); @@ -1271,7 +1269,7 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* tmq_topic_v void tmq_message_destroy(tmq_message_t* tmq_message) { if (tmq_message == NULL) return; - SMqConsumeRsp* pRsp = &tmq_message->consumeRsp; + SMqPollRsp* pRsp = &tmq_message->consumeRsp; tDeleteSMqConsumeRsp(pRsp); /*free(tmq_message);*/ taosFreeQitem(tmq_message); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index e7309eacc8..97b19c1c79 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1481,7 +1481,7 @@ static int32_t tSerializeSUseDbRspImp(SCoder *pEncoder, SUseDbRsp *pRsp) { if (tEncodeI32(pEncoder, pVgInfo->vgId) < 0) return -1; if (tEncodeU32(pEncoder, pVgInfo->hashBegin) < 0) return -1; if (tEncodeU32(pEncoder, pVgInfo->hashEnd) < 0) return -1; - if (tEncodeSEpSet(pEncoder, &pVgInfo->epset) < 0) return -1; + if (tEncodeSEpSet(pEncoder, &pVgInfo->epSet) < 0) return -1; } return 0; @@ -1541,7 +1541,7 @@ int32_t tDeserializeSUseDbRspImp(SCoder *pDecoder, SUseDbRsp *pRsp) { if (tDecodeI32(pDecoder, &vgInfo.vgId) < 0) return -1; if (tDecodeU32(pDecoder, &vgInfo.hashBegin) < 0) return -1; if (tDecodeU32(pDecoder, &vgInfo.hashEnd) < 0) return -1; - if (tDecodeSEpSet(pDecoder, &vgInfo.epset) < 0) return -1; + if (tDecodeSEpSet(pDecoder, &vgInfo.epSet) < 0) return -1; taosArrayPush(pRsp->pVgroupInfos, &vgInfo); } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 974f5fc982..9cde9201ed 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -900,10 +900,10 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { vgInfo.vgId = pVgroup->vgId; vgInfo.hashBegin = pVgroup->hashBegin; vgInfo.hashEnd = pVgroup->hashEnd; - vgInfo.epset.numOfEps = pVgroup->replica; + vgInfo.epSet.numOfEps = pVgroup->replica; for (int32_t gid = 0; gid < pVgroup->replica; ++gid) { SVnodeGid *pVgid = &pVgroup->vnodeGid[gid]; - SEp *pEp = &vgInfo.epset.eps[gid]; + SEp *pEp = &vgInfo.epSet.eps[gid]; SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId); if (pDnode != NULL) { memcpy(pEp->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); @@ -911,7 +911,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) { } mndReleaseDnode(pMnode, pDnode); if (pVgid->role == TAOS_SYNC_STATE_LEADER) { - vgInfo.epset.inUse = gid; + vgInfo.epSet.inUse = gid; } } vindex++; diff --git a/source/dnode/mnode/impl/src/mndScheduler.c b/source/dnode/mnode/impl/src/mndScheduler.c index 5308c0c0f6..dfde6a9258 100644 --- a/source/dnode/mnode/impl/src/mndScheduler.c +++ b/source/dnode/mnode/impl/src/mndScheduler.c @@ -33,23 +33,29 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib SSdb* pSdb = pMnode->pSdb; SVgObj* pVgroup = NULL; SQueryDag* pDag = qStringToDag(pTopic->physicalPlan); - SArray* pAray = NULL; - SArray* unassignedVg = pSub->unassignedVg; + if (pDag == NULL) { + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } ASSERT(pSub->vgNum == 0); int32_t levelNum = taosArrayGetSize(pDag->pSubplans); if (levelNum != 1) { + qDestroyQueryDag(pDag); + terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC; return -1; } - SArray* inner = taosArrayGet(pDag->pSubplans, 0); + SArray* plans = taosArrayGet(pDag->pSubplans, 0); - int32_t opNum = taosArrayGetSize(inner); + int32_t opNum = taosArrayGetSize(plans); if (opNum != 1) { + qDestroyQueryDag(pDag); + terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC; return -1; } - SSubplan* plan = taosArrayGetP(inner, 0); + SSubplan* plan = taosArrayGetP(plans, 0); void* pIter = NULL; while (1) { @@ -62,17 +68,24 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib pSub->vgNum++; plan->execNode.nodeId = pVgroup->vgId; - plan->execNode.epset = mndGetVgroupEpset(pMnode, pVgroup); + plan->execNode.epSet = mndGetVgroupEpset(pMnode, pVgroup); SMqConsumerEp consumerEp = {0}; consumerEp.status = 0; consumerEp.consumerId = -1; - consumerEp.epSet = plan->execNode.epset; + consumerEp.epSet = plan->execNode.epSet; consumerEp.vgId = plan->execNode.nodeId; int32_t msgLen; - int32_t code = qSubPlanToString(plan, &consumerEp.qmsg, &msgLen); - taosArrayPush(unassignedVg, &consumerEp); + if (qSubPlanToString(plan, &consumerEp.qmsg, &msgLen) < 0) { + sdbRelease(pSdb, pVgroup); + qDestroyQueryDag(pDag); + terrno = TSDB_CODE_QRY_INVALID_INPUT; + return -1; + } + taosArrayPush(pSub->unassignedVg, &consumerEp); } + qDestroyQueryDag(pDag); + return 0; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 9acd881897..1d964c4383 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -93,7 +93,6 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj strcpy(pSub->key, key); if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) { - terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC; tDeleteSMqSubscribeObj(pSub); free(pSub); return NULL; @@ -295,7 +294,11 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) { for (int32_t k = 0; k < vgsz; k++) { char offsetKey[TSDB_PARTITION_KEY_LEN]; SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k); - SMqSubVgEp vgEp = {.epSet = pConsumerEp->epSet, .vgId = pConsumerEp->vgId, .offset = -1}; + SMqSubVgEp vgEp = { + .epSet = pConsumerEp->epSet, + .vgId = pConsumerEp->vgId, + .offset = -1, + }; mndMakePartitionKey(offsetKey, pConsumer->cgroup, topicName, pConsumerEp->vgId); SMqOffsetObj *pOffsetObj = mndAcquireOffset(pMnode, offsetKey); if (pOffsetObj != NULL) { @@ -345,7 +348,7 @@ static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) { if (pRebSub == NULL) { pRebSub = tNewSMqRebSubscribe(key); if (pRebSub == NULL) { - // TODO + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe)); @@ -412,7 +415,11 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { } if (taosHashGetSize(pRebMsg->rebSubHash) != 0) { mInfo("mq rebalance will be triggered"); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)}; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_MQ_DO_REBALANCE, + .pCont = pRebMsg, + .contLen = sizeof(SMqDoRebalanceMsg), + }; pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } else { taosHashCleanup(pRebMsg->rebSubHash); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 64c7a66bf9..d3642f4204 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -96,7 +96,11 @@ static void mndCalMqRebalance(void *param, void *tmrId) { if (mndIsMaster(pMnode)) { int32_t contLen = 0; void *pReq = mndBuildTimerMsg(&contLen); - SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen}; + SRpcMsg rpcMsg = { + .msgType = TDMT_MND_MQ_TIMER, + .pCont = pReq, + .contLen = contLen, + }; pMnode->putReqToMReadQFp(pMnode->pDnode, &rpcMsg); } @@ -631,4 +635,4 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr } return 0; -} \ No newline at end of file +} diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index 17fda48cd7..9dbc1be4e9 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -292,9 +292,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_GT(pInfo->vgId, 0); EXPECT_EQ(pInfo->hashBegin, 0); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX / 2 - 1); - EXPECT_EQ(pInfo->epset.inUse, 0); - EXPECT_EQ(pInfo->epset.numOfEps, 1); - SEp* pAddr = &pInfo->epset.eps[0]; + EXPECT_EQ(pInfo->epSet.inUse, 0); + EXPECT_EQ(pInfo->epSet.numOfEps, 1); + SEp* pAddr = &pInfo->epSet.eps[0]; EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); } @@ -307,9 +307,9 @@ TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_GT(pInfo->vgId, 0); EXPECT_EQ(pInfo->hashBegin, UINT32_MAX / 2); EXPECT_EQ(pInfo->hashEnd, UINT32_MAX); - EXPECT_EQ(pInfo->epset.inUse, 0); - EXPECT_EQ(pInfo->epset.numOfEps, 1); - SEp* pAddr = &pInfo->epset.eps[0]; + EXPECT_EQ(pInfo->epSet.inUse, 0); + EXPECT_EQ(pInfo->epSet.numOfEps, 1); + SEp* pAddr = &pInfo->epSet.eps[0]; EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); } diff --git a/source/dnode/vnode/inc/tq.h b/source/dnode/vnode/inc/tq.h index 36626514ec..b4d45e83fb 100644 --- a/source/dnode/vnode/inc/tq.h +++ b/source/dnode/vnode/inc/tq.h @@ -13,16 +13,14 @@ * along with this program. If not, see . */ -#ifndef _TD_TQ_H_ -#define _TD_TQ_H_ +#ifndef _TQ_H_ +#define _TQ_H_ -#include "tcommon.h" #include "executor.h" -#include "tmallocator.h" #include "meta.h" -#include "scheduler.h" #include "taoserror.h" -#include "tlist.h" +#include "tcommon.h" +#include "tmallocator.h" #include "tmsg.h" #include "trpc.h" #include "ttimer.h" @@ -54,7 +52,7 @@ void tqClose(STQ*); int tqPushMsg(STQ*, void* msg, tmsg_t msgType, int64_t version); int tqCommit(STQ*); -int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg); +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessSetConnReq(STQ* pTq, char* msg); int32_t tqProcessRebReq(STQ* pTq, char* msg); @@ -62,4 +60,4 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg); } #endif -#endif /*_TD_TQ_H_*/ +#endif /*_TQ_H_*/ diff --git a/source/dnode/vnode/src/inc/tqInt.h b/source/dnode/vnode/src/inc/tqInt.h index f416413859..fc6a3699d5 100644 --- a/source/dnode/vnode/src/inc/tqInt.h +++ b/source/dnode/vnode/src/inc/tqInt.h @@ -19,7 +19,7 @@ #include "meta.h" #include "tlog.h" #include "tq.h" -#include "trpc.h" +#include "tqPush.h" #ifdef __cplusplus extern "C" { @@ -31,30 +31,35 @@ extern "C" { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ } \ } + #define tqError(...) \ { \ if (tqDebugFlag & DEBUG_ERROR) { \ taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ } \ } + #define tqWarn(...) \ { \ if (tqDebugFlag & DEBUG_WARN) { \ taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ } \ } + #define tqInfo(...) \ { \ if (tqDebugFlag & DEBUG_INFO) { \ taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \ } \ } + #define tqDebug(...) \ { \ if (tqDebugFlag & DEBUG_DEBUG) { \ taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \ } \ } + #define tqTrace(...) \ { \ if (tqDebugFlag & DEBUG_TRACE) { \ @@ -138,9 +143,7 @@ typedef struct { // topics that are not connectted STqMetaList* unconnectTopic; - // TODO:temporaral use, to be replaced by unified tfile TdFilePtr pFile; - // TODO:temporaral use, to be replaced by unified tfile TdFilePtr pIdxFile; char* dirPath; @@ -157,6 +160,7 @@ struct STQ { STqCfg* tqConfig; STqMemRef tqMemRef; STqMetaStore* tqMeta; + STqPushMgr* tqPushMgr; SWal* pWal; SMeta* pVnodeMeta; }; diff --git a/source/dnode/vnode/src/inc/tqPush.h b/source/dnode/vnode/src/inc/tqPush.h new file mode 100644 index 0000000000..32fd7c3ddf --- /dev/null +++ b/source/dnode/vnode/src/inc/tqPush.h @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TQ_PUSH_H_ +#define _TQ_PUSH_H_ + +#include "thash.h" +#include "trpc.h" +#include "ttimer.h" + +#ifdef __cplusplus +extern "C" { +#endif + +enum { + TQ_PUSHER_TYPE__CLIENT = 1, + TQ_PUSHER_TYPE__STREAM, +}; + +typedef struct { + int8_t type; + int8_t reserved[3]; + int32_t ttl; + int64_t consumerId; + SRpcMsg* pMsg; + // SMqPollRsp* rsp; +} STqClientPusher; + +typedef struct { + int8_t type; + int8_t nodeType; + int8_t reserved[6]; + int64_t streamId; + SEpSet epSet; +} STqStreamPusher; + +typedef struct { + int8_t type; // mq or stream +} STqPusher; + +typedef struct { + SHashObj* pHash; // +} STqPushMgr; + +typedef struct { + int8_t inited; + tmr_h timer; +} STqPushMgmt; + +static STqPushMgmt tqPushMgmt; + +int32_t tqPushMgrInit(); +void tqPushMgrCleanUp(); + +STqPushMgr* tqPushMgrOpen(); +void tqPushMgrClose(STqPushMgr* pushMgr); + +STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl); +STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet); + +#ifdef __cplusplus +} +#endif + +#endif /*_TQ_PUSH_H_*/ diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index d540b589b7..16809f1527 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -12,28 +12,16 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "tcompare.h" #include "tqInt.h" #include "tqMetaStore.h" -int tqInit() { - int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 0, 1); - if (old == 1) return 0; +int32_t tqInit() { return tqPushMgrInit(); } - tqMgmt.timer = taosTmrInit(0, 0, 0, "TQ"); - return 0; -} +void tqCleanUp() { tqPushMgrCleanUp(); } -void tqCleanUp() { - int8_t old = atomic_val_compare_exchange_8(&tqMgmt.inited, 1, 0); - if (old == 0) return; - taosTmrStop(tqMgmt.timer); - taosTmrCleanUp(tqMgmt.timer); -} - -STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { +STQ* tqOpen(const char* path, SWal* pWal, SMeta* pVnodeMeta, STqCfg* tqConfig, SMemAllocatorFactory* allocFac) { STQ* pTq = malloc(sizeof(STQ)); if (pTq == NULL) { terrno = TSDB_CODE_TQ_OUT_OF_MEMORY; @@ -42,7 +30,7 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl pTq->path = strdup(path); pTq->tqConfig = tqConfig; pTq->pWal = pWal; - pTq->pVnodeMeta = pMeta; + pTq->pVnodeMeta = pVnodeMeta; #if 0 pTq->tqMemRef.pAllocatorFactory = allocFac; pTq->tqMemRef.pAllocator = allocFac->create(allocFac); @@ -60,6 +48,13 @@ STQ* tqOpen(const char* path, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, SMemAl return NULL; } + pTq->tqPushMgr = tqPushMgrOpen(); + if (pTq->tqPushMgr == NULL) { + // free store + free(pTq); + return NULL; + } + return pTq; } @@ -72,6 +67,8 @@ void tqClose(STQ* pTq) { } int tqPushMsg(STQ* pTq, void* msg, tmsg_t msgType, int64_t version) { + // iterate hash + // process all msg // if waiting // memcpy and send msg to fetch thread // TODO: add reference @@ -199,7 +196,10 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu for (int j = 0; j < TQ_BUFFER_SIZE; j++) { pTopic->buffer.output[j].status = 0; STqReadHandle* pReadHandle = tqInitSubmitMsgScanner(pTq->pVnodeMeta); - SReadHandle handle = {.reader = pReadHandle, .meta = pTq->pVnodeMeta}; + SReadHandle handle = { + .reader = pReadHandle, + .meta = pTq->pVnodeMeta, + }; pTopic->buffer.output[j].pReadHandle = pReadHandle; pTopic->buffer.output[j].task = qCreateStreamExecTaskInfo(pTopic->qmsg, &handle); } @@ -208,11 +208,11 @@ int32_t tqDeserializeConsumer(STQ* pTq, const STqSerializedHead* pHead, STqConsu return 0; } -int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { - SMqConsumeReq* pReq = pMsg->pCont; - int64_t consumerId = pReq->consumerId; - int64_t fetchOffset; - int64_t blockingTime = pReq->blockingTime; +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { + SMqPollReq* pReq = pMsg->pCont; + int64_t consumerId = pReq->consumerId; + int64_t fetchOffset; + int64_t blockingTime = pReq->blockingTime; if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) { fetchOffset = 0; @@ -222,7 +222,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { fetchOffset = pReq->currentOffset + 1; } - SMqConsumeRsp rsp = { + SMqPollRsp rsp = { .consumerId = consumerId, .numOfTopics = 0, .pBlockData = NULL, @@ -236,6 +236,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { rpcSendResponse(pMsg); return 0; } + int sz = taosArrayGetSize(pConsumer->topics); ASSERT(sz == 1); STqTopic* pTopic = taosArrayGet(pConsumer->topics, 0); @@ -247,13 +248,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { SWalHead* pHead; while (1) { - int8_t pos = fetchOffset % TQ_BUFFER_SIZE; + /*if (fetchOffset > walGetLastVer(pTq->pWal) || walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) {*/ if (walReadWithHandle(pTopic->pReadhandle, fetchOffset) < 0) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user break; } + int8_t pos = fetchOffset % TQ_BUFFER_SIZE; pHead = pTopic->pReadhandle->pHead; if (pHead->head.msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->head.body; @@ -280,7 +282,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { rsp.numOfTopics = 1; rsp.pBlockData = pRes; - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; @@ -290,7 +292,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ((SMqRspHead*)buf)->epoch = pReq->epoch; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqConsumeRsp(&abuf, &rsp); + tEncodeSMqPollRsp(&abuf, &rsp); taosArrayDestroyEx(rsp.pBlockData, (void (*)(void*))tDeleteSSDataBlock); pMsg->pCont = buf; pMsg->contLen = tlen; @@ -304,7 +306,10 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { } } - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqConsumeRsp(NULL, &rsp); + /*if (blockingTime != 0) {*/ + /*tqAddClientPusher(pTq->tqPushMgr, pMsg, consumerId, blockingTime);*/ + /*} else {*/ + int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqPollRsp(NULL, &rsp); void* buf = rpcMallocCont(tlen); if (buf == NULL) { pMsg->code = -1; @@ -314,12 +319,14 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) { ((SMqRspHead*)buf)->epoch = pReq->epoch; void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqConsumeRsp(&abuf, &rsp); + tEncodeSMqPollRsp(&abuf, &rsp); rsp.pBlockData = NULL; pMsg->pCont = buf; pMsg->contLen = tlen; pMsg->code = 0; rpcSendResponse(pMsg); + /*}*/ + return 0; } diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c new file mode 100644 index 0000000000..fea65846be --- /dev/null +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -0,0 +1,84 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "tqPush.h" + +int32_t tqPushMgrInit() { + // + int8_t old = atomic_val_compare_exchange_8(&tqPushMgmt.inited, 0, 1); + if (old == 1) return 0; + + tqPushMgmt.timer = taosTmrInit(0, 0, 0, "TQ"); + return 0; +} + +void tqPushMgrCleanUp() { + int8_t old = atomic_val_compare_exchange_8(&tqPushMgmt.inited, 1, 0); + if (old == 0) return; + taosTmrStop(tqPushMgmt.timer); + taosTmrCleanUp(tqPushMgmt.timer); +} + +STqPushMgr* tqPushMgrOpen() { + STqPushMgr* mgr = malloc(sizeof(STqPushMgr)); + if (mgr == NULL) { + return NULL; + } + mgr->pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + return mgr; +} + +void tqPushMgrClose(STqPushMgr* pushMgr) { + taosHashCleanup(pushMgr->pHash); + free(pushMgr); +} + +STqClientPusher* tqAddClientPusher(STqPushMgr* pushMgr, SRpcMsg* pMsg, int64_t consumerId, int64_t ttl) { + STqClientPusher* clientPusher = malloc(sizeof(STqClientPusher)); + if (clientPusher == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + clientPusher->type = TQ_PUSHER_TYPE__CLIENT; + clientPusher->pMsg = pMsg; + clientPusher->consumerId = consumerId; + clientPusher->ttl = ttl; + if (taosHashPut(pushMgr->pHash, &consumerId, sizeof(int64_t), &clientPusher, sizeof(void*)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + free(clientPusher); + // TODO send rsp back + return NULL; + } + return clientPusher; +} + +STqStreamPusher* tqAddStreamPusher(STqPushMgr* pushMgr, int64_t streamId, SEpSet* pEpSet) { + STqStreamPusher* streamPusher = malloc(sizeof(STqStreamPusher)); + if (streamPusher == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + streamPusher->type = TQ_PUSHER_TYPE__STREAM; + streamPusher->nodeType = 0; + streamPusher->streamId = streamId; + memcpy(&streamPusher->epSet, pEpSet, sizeof(SEpSet)); + + if (taosHashPut(pushMgr->pHash, &streamId, sizeof(int64_t), &streamPusher, sizeof(void*)) < 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + free(streamPusher); + return NULL; + } + return streamPusher; +} diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index 4b47413715..ccb59c26de 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -29,7 +29,7 @@ int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) { SReadHandle handle = {.reader = pVnode->pTsdb, .meta = pVnode->pMeta}; switch (pMsg->msgType) { - case TDMT_VND_QUERY:{ + case TDMT_VND_QUERY: { return qWorkerProcessQueryMsg(&handle, pVnode->pQuery, pMsg); } case TDMT_VND_QUERY_CONTINUE: @@ -63,7 +63,7 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { case TDMT_VND_TABLE_META: return vnodeGetTableMeta(pVnode, pMsg); case TDMT_VND_CONSUME: - return tqProcessConsumeReq(pVnode->pTq, pMsg); + return tqProcessPollReq(pVnode->pTq, pMsg); default: vError("unknown msg type:%d in fetch queue", pMsg->msgType); return TSDB_CODE_VND_APP_ERROR; @@ -71,8 +71,8 @@ int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg) { } static int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { - STbCfg * pTbCfg = NULL; - STbCfg * pStbCfg = NULL; + STbCfg *pTbCfg = NULL; + STbCfg *pStbCfg = NULL; tb_uid_t uid; int32_t nCols; int32_t nTagCols; @@ -204,9 +204,9 @@ static void freeItemHelper(void *pItem) { */ static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg) { SMTbCursor *pCur = metaOpenTbCursor(pVnode->pMeta); - SArray * pArray = taosArrayInit(10, POINTER_BYTES); + SArray *pArray = taosArrayInit(10, POINTER_BYTES); - char * name = NULL; + char *name = NULL; int32_t totalLen = 0; int32_t numOfTables = 0; while ((name = metaTbCursorNext(pCur)) != NULL) { diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 56c0ec1130..77d25fa164 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -845,7 +845,7 @@ int32_t ctgGetTableMetaFromVnode(SCatalog* pCtg, void *pTrans, const SEpSet* pMg }; SRpcMsg rpcRsp = {0}; - rpcSendRecv(pTrans, &vgroupInfo->epset, &rpcMsg, &rpcRsp); + rpcSendRecv(pTrans, &vgroupInfo->epSet, &rpcMsg, &rpcRsp); if (TSDB_CODE_SUCCESS != rpcRsp.code) { if (CTG_TABLE_NOT_EXIST(rpcRsp.code)) { diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index aeac6b3eee..c7867c4da5 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -228,10 +228,10 @@ void ctgTestBuildDBVgroup(SDBVgInfo **pdbVgroup) { vgInfo.vgId = i + 1; vgInfo.hashBegin = i * hashUnit; vgInfo.hashEnd = hashUnit * (i + 1) - 1; - vgInfo.epset.numOfEps = i % TSDB_MAX_REPLICA + 1; - vgInfo.epset.inUse = i % vgInfo.epset.numOfEps; - for (int32_t n = 0; n < vgInfo.epset.numOfEps; ++n) { - SEp *addr = &vgInfo.epset.eps[n]; + vgInfo.epSet.numOfEps = i % TSDB_MAX_REPLICA + 1; + vgInfo.epSet.inUse = i % vgInfo.epSet.numOfEps; + for (int32_t n = 0; n < vgInfo.epSet.numOfEps; ++n) { + SEp *addr = &vgInfo.epSet.eps[n]; strcpy(addr->fqdn, "a0"); addr->port = n + 22; } @@ -301,10 +301,10 @@ void ctgTestRspDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg * vg.hashEnd = htonl(UINT32_MAX); } - vg.epset.numOfEps = i % TSDB_MAX_REPLICA + 1; - vg.epset.inUse = i % vg.epset.numOfEps; - for (int32_t n = 0; n < vg.epset.numOfEps; ++n) { - SEp *addr = &vg.epset.eps[n]; + vg.epSet.numOfEps = i % TSDB_MAX_REPLICA + 1; + vg.epSet.inUse = i % vg.epSet.numOfEps; + for (int32_t n = 0; n < vg.epSet.numOfEps; ++n) { + SEp *addr = &vg.epSet.eps[n]; strcpy(addr->fqdn, "a0"); addr->port = n + 22; } @@ -877,7 +877,7 @@ TEST(tableMeta, normalTable) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.epset.numOfEps, 3); + ASSERT_EQ(vgInfo.epSet.numOfEps, 3); while (0 == ctgDbgGetClusterCacheNum(pCtg, CTG_DBG_DB_NUM)) { usleep(50000); @@ -1384,7 +1384,7 @@ TEST(refreshGetMeta, normal2normal) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.epset.numOfEps, 3); + ASSERT_EQ(vgInfo.epSet.numOfEps, 3); while (true) { uint64_t n = 0; @@ -1463,7 +1463,7 @@ TEST(refreshGetMeta, normal2notexist) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.epset.numOfEps, 3); + ASSERT_EQ(vgInfo.epSet.numOfEps, 3); while (true) { uint64_t n = 0; @@ -1537,7 +1537,7 @@ TEST(refreshGetMeta, normal2child) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.epset.numOfEps, 3); + ASSERT_EQ(vgInfo.epSet.numOfEps, 3); while (true) { uint64_t n = 0; @@ -1621,7 +1621,7 @@ TEST(refreshGetMeta, stable2child) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.epset.numOfEps, 3); + ASSERT_EQ(vgInfo.epSet.numOfEps, 3); while (true) { uint64_t n = 0; @@ -1706,7 +1706,7 @@ TEST(refreshGetMeta, stable2stable) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.epset.numOfEps, 3); + ASSERT_EQ(vgInfo.epSet.numOfEps, 3); while (true) { uint64_t n = 0; @@ -1794,7 +1794,7 @@ TEST(refreshGetMeta, child2stable) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.epset.numOfEps, 3); + ASSERT_EQ(vgInfo.epSet.numOfEps, 3); while (true) { uint64_t n = 0; @@ -1879,7 +1879,7 @@ TEST(tableDistVgroup, normalTable) { ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(vgInfo->vgId, 8); - ASSERT_EQ(vgInfo->epset.numOfEps, 3); + ASSERT_EQ(vgInfo->epSet.numOfEps, 3); catalogDestroy(); memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); @@ -1921,7 +1921,7 @@ TEST(tableDistVgroup, childTableCase) { ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(vgInfo->vgId, 9); - ASSERT_EQ(vgInfo->epset.numOfEps, 4); + ASSERT_EQ(vgInfo->epSet.numOfEps, 4); catalogDestroy(); memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); @@ -1964,13 +1964,13 @@ TEST(tableDistVgroup, superTableCase) { ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(vgInfo->vgId, 1); - ASSERT_EQ(vgInfo->epset.numOfEps, 1); + ASSERT_EQ(vgInfo->epSet.numOfEps, 1); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 1); ASSERT_EQ(vgInfo->vgId, 2); - ASSERT_EQ(vgInfo->epset.numOfEps, 2); + ASSERT_EQ(vgInfo->epSet.numOfEps, 2); vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 2); ASSERT_EQ(vgInfo->vgId, 3); - ASSERT_EQ(vgInfo->epset.numOfEps, 3); + ASSERT_EQ(vgInfo->epSet.numOfEps, 3); catalogDestroy(); memset(&gCtgMgmt, 0, sizeof(gCtgMgmt)); @@ -2025,14 +2025,14 @@ TEST(dbVgroup, getSetDbVgroupCase) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 8); - ASSERT_EQ(vgInfo.epset.numOfEps, 3); + ASSERT_EQ(vgInfo.epSet.numOfEps, 3); code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(pvgInfo->vgId, 8); - ASSERT_EQ(pvgInfo->epset.numOfEps, 3); + ASSERT_EQ(pvgInfo->epSet.numOfEps, 3); taosArrayDestroy(vgList); ctgTestBuildDBVgroup(&dbVgroup); @@ -2053,14 +2053,14 @@ TEST(dbVgroup, getSetDbVgroupCase) { code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgInfo); ASSERT_EQ(code, 0); ASSERT_EQ(vgInfo.vgId, 7); - ASSERT_EQ(vgInfo.epset.numOfEps, 2); + ASSERT_EQ(vgInfo.epSet.numOfEps, 2); code = catalogGetTableDistVgInfo(pCtg, mockPointer, (const SEpSet *)mockPointer, &n, &vgList); ASSERT_EQ(code, 0); ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); pvgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); ASSERT_EQ(pvgInfo->vgId, 8); - ASSERT_EQ(pvgInfo->epset.numOfEps, 3); + ASSERT_EQ(pvgInfo->epSet.numOfEps, 3); taosArrayDestroy(vgList); catalogDestroy(); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 3ebad151fd..eeb48a1f3d 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -4977,7 +4977,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf SSourceDataInfo *pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); qDebug("%s build fetch msg and send to vgId:%d, ep:%s, taskId:0x%" PRIx64 ", %d/%" PRIzu, - GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epset.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); + GET_TASKID(pTaskInfo), pSource->addr.nodeId, pSource->addr.epSet.eps[0].fqdn, pSource->taskId, sourceIndex, totalSources); pMsg->header.vgId = htonl(pSource->addr.nodeId); pMsg->sId = htobe64(pSource->schedId); @@ -5000,7 +5000,7 @@ static int32_t doSendFetchDataRequest(SExchangeInfo *pExchangeInfo, SExecTaskInf pMsgSendInfo->fp = loadRemoteDataCallback; int64_t transporterId = 0; - int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epset, &transporterId, pMsgSendInfo); + int32_t code = asyncSendMsgToServer(pExchangeInfo->pTransporter, &pSource->addr.epSet, &transporterId, pMsgSendInfo); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/parser/src/dCDAstProcess.c b/source/libs/parser/src/dCDAstProcess.c index 713847807d..7ec2d05d44 100644 --- a/source/libs/parser/src/dCDAstProcess.c +++ b/source/libs/parser/src/dCDAstProcess.c @@ -58,7 +58,7 @@ static int32_t setShowInfo(SShowInfo* pShowInfo, SParseContext* pCtx, void** out SVgroupInfo* info = taosArrayGet(array, 0); pShowReq->head.vgId = htonl(info->vgId); - *pEpSet = info->epset; + *pEpSet = info->epSet; *outputLen = sizeof(SVShowTablesReq); *output = pShowReq; @@ -902,4 +902,4 @@ SVnodeModifOpStmtInfo* qParserValidateCreateTbSqlNode(SSqlInfo* pInfo, SParseCon } return pModifSqlStmt; -} \ No newline at end of file +} diff --git a/source/libs/parser/test/mockCatalogService.cpp b/source/libs/parser/test/mockCatalogService.cpp index cdb547ee1b..cd3ffcdce9 100644 --- a/source/libs/parser/test/mockCatalogService.cpp +++ b/source/libs/parser/test/mockCatalogService.cpp @@ -43,11 +43,11 @@ public: SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0, }; - vgroup.epset.eps[0] = (SEp){"dnode_1", 6030}; - vgroup.epset.eps[1] = (SEp){"dnode_2", 6030}; - vgroup.epset.eps[2] = (SEp){"dnode_3", 6030}; - vgroup.epset.inUse = 0; - vgroup.epset.numOfEps = 3; + vgroup.epSet.eps[0] = (SEp){"dnode_1", 6030}; + vgroup.epSet.eps[1] = (SEp){"dnode_2", 6030}; + vgroup.epSet.eps[2] = (SEp){"dnode_3", 6030}; + vgroup.epSet.inUse = 0; + vgroup.epSet.numOfEps = 3; meta_->vgs.emplace_back(vgroup); return *this; @@ -122,7 +122,7 @@ public: int32_t catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { // todo vgInfo->vgId = 1; - addEpIntoEpSet(&vgInfo->epset, "node1", 6030); + addEpIntoEpSet(&vgInfo->epSet, "node1", 6030); return 0; } @@ -143,10 +143,10 @@ public: meta_[db][tbname]->schema->uid = id_++; SVgroupInfo vgroup = {.vgId = vgid, .hashBegin = 0, .hashEnd = 0,}; - addEpIntoEpSet(&vgroup.epset, "dnode_1", 6030); - addEpIntoEpSet(&vgroup.epset, "dnode_2", 6030); - addEpIntoEpSet(&vgroup.epset, "dnode_3", 6030); - vgroup.epset.inUse = 0; + addEpIntoEpSet(&vgroup.epSet, "dnode_1", 6030); + addEpIntoEpSet(&vgroup.epSet, "dnode_2", 6030); + addEpIntoEpSet(&vgroup.epSet, "dnode_3", 6030); + vgroup.epSet.inUse = 0; meta_[db][tbname]->vgs.emplace_back(vgroup); // super table @@ -313,4 +313,4 @@ int32_t MockCatalogService::catalogGetTableMeta(const SName* pTableName, STableM int32_t MockCatalogService::catalogGetTableHashVgroup(const SName* pTableName, SVgroupInfo* vgInfo) const { return impl_->catalogGetTableHashVgroup(pTableName, vgInfo); -} \ No newline at end of file +} diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index 748be8fcd8..dbc0340b34 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -254,7 +254,7 @@ static SSubplan* initSubplan(SPlanContext* pCxt, int32_t type) { static void vgroupInfoToNodeAddr(const SVgroupInfo* vg, SQueryNodeAddr* pNodeAddr) { pNodeAddr->nodeId = vg->vgId; - pNodeAddr->epset = vg->epset; + pNodeAddr->epSet = vg->epSet; } static uint64_t splitSubplanByTable(SPlanContext* pCxt, SQueryPlanNode* pPlanNode, SQueryTableInfo* pTableInfo) { @@ -363,7 +363,7 @@ static void splitModificationOpSubPlan(SPlanContext* pCxt, SQueryPlanNode* pPlan SSubplan* subplan = initSubplan(pCxt, QUERY_TYPE_MODIFY); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pPayload->payload, i); - subplan->execNode.epset = blocks->vg.epset; + subplan->execNode.epSet = blocks->vg.epSet; subplan->pDataSink = createDataInserter(pCxt, blocks, NULL); subplan->pNode = NULL; subplan->type = QUERY_TYPE_MODIFY; diff --git a/source/libs/planner/src/physicalPlanJson.c b/source/libs/planner/src/physicalPlanJson.c index b2109c0a4f..f61102a66d 100644 --- a/source/libs/planner/src/physicalPlanJson.c +++ b/source/libs/planner/src/physicalPlanJson.c @@ -762,11 +762,11 @@ static bool queryNodeAddrToJson(const void* obj, cJSON* json) { bool res = cJSON_AddNumberToObject(json, jkNodeAddrId, pAddr->nodeId); if (res) { - res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->epset.inUse); + res = cJSON_AddNumberToObject(json, jkNodeAddrInUse, pAddr->epSet.inUse); } if (res) { - res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epset.eps, sizeof(SEp), pAddr->epset.numOfEps); + res = addRawArray(json, jkNodeAddrEpAddrs, epAddrToJson, pAddr->epSet.eps, sizeof(SEp), pAddr->epSet.numOfEps); } return res; } @@ -775,11 +775,11 @@ static bool queryNodeAddrFromJson(const cJSON* json, void* obj) { SQueryNodeAddr* pAddr = (SQueryNodeAddr*) obj; pAddr->nodeId = getNumber(json, jkNodeAddrId); - pAddr->epset.inUse = getNumber(json, jkNodeAddrInUse); + pAddr->epSet.inUse = getNumber(json, jkNodeAddrInUse); int32_t numOfEps = 0; - bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epset.eps, sizeof(SEp), &numOfEps); - pAddr->epset.numOfEps = numOfEps; + bool res = fromRawArray(json, jkNodeAddrEpAddrs, epAddrFromJson, pAddr->epSet.eps, sizeof(SEp), &numOfEps); + pAddr->epSet.numOfEps = numOfEps; return res; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index f1ed0cef7d..8ed39eb0b7 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -423,13 +423,13 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - if (pTask->plan->execNode.epset.numOfEps > 0) { + if (pTask->plan->execNode.epSet.numOfEps > 0) { if (NULL == taosArrayPush(pTask->candidateAddrs, &pTask->plan->execNode)) { SCH_TASK_ELOG("taosArrayPush execNode to candidate addrs failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epset.numOfEps); + SCH_TASK_DLOG("use execNode from plan as candidate addr, numOfEps:%d", pTask->plan->execNode.epSet.numOfEps); return TSDB_CODE_SUCCESS; } @@ -1061,7 +1061,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, isCandidateAddr = true; } - SEpSet epSet = addr->epset; + SEpSet epSet = addr->epSet; switch (msgType) { case TDMT_VND_CREATE_TABLE: diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index daf65cf251..8ed963d875 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -104,8 +104,8 @@ void schtBuildQueryDag(SQueryDag *dag) { scanPlan->type = QUERY_TYPE_SCAN; scanPlan->execNode.nodeId = 1; - scanPlan->execNode.epset.inUse = 0; - addEpIntoEpSet(&scanPlan->execNode.epset, "ep0", 6030); + scanPlan->execNode.epSet.inUse = 0; + addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030); scanPlan->pChildren = NULL; scanPlan->level = 1; @@ -118,7 +118,7 @@ void schtBuildQueryDag(SQueryDag *dag) { mergePlan->id.subplanId = 0x5555555555; mergePlan->type = QUERY_TYPE_MERGE; mergePlan->level = 0; - mergePlan->execNode.epset.numOfEps = 0; + mergePlan->execNode.epSet.numOfEps = 0; mergePlan->pChildren = taosArrayInit(1, POINTER_BYTES); mergePlan->pParents = NULL; @@ -157,8 +157,8 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[0].level = 0; insertPlan[0].execNode.nodeId = 1; - insertPlan[0].execNode.epset.inUse = 0; - addEpIntoEpSet(&insertPlan[0].execNode.epset, "ep0", 6030); + insertPlan[0].execNode.epSet.inUse = 0; + addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030); insertPlan[0].pChildren = NULL; insertPlan[0].pParents = NULL; @@ -173,8 +173,8 @@ void schtBuildInsertDag(SQueryDag *dag) { insertPlan[1].level = 0; insertPlan[1].execNode.nodeId = 1; - insertPlan[1].execNode.epset.inUse = 0; - addEpIntoEpSet(&insertPlan[1].execNode.epset, "ep0", 6030); + insertPlan[1].execNode.epSet.inUse = 0; + addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030); insertPlan[1].pChildren = NULL; insertPlan[1].pParents = NULL; diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c index 3fe0425c91..3eb8e60d56 100644 --- a/tests/test/c/tmqDemo.c +++ b/tests/test/c/tmqDemo.c @@ -352,7 +352,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { int32_t cnt = 0; /*clock_t startTime = clock();*/ while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); if (tmqmessage) { cnt++; msg_process(tmqmessage); @@ -383,7 +383,7 @@ void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { } while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); if (tmqmessage) { msg_process(tmqmessage); tmq_message_destroy(tmqmessage); @@ -411,7 +411,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog int32_t skipLogNum = 0; int64_t startTime = taosGetTimestampUs(); while (running) { - tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1); if (tmqmessage) { batchCnt++; skipLogNum += tmqGetSkipLogNum(tmqmessage);