From 2f003140efcdd0f3eb2599af16458a2d18a1c38f Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Wed, 19 Jan 2022 17:35:06 +0800 Subject: [PATCH] add dependency for scheduler --- include/common/tmsg.h | 658 ++++++++++----------- source/dnode/mnode/impl/CMakeLists.txt | 3 +- source/dnode/mnode/impl/inc/mndDef.h | 1 + source/dnode/mnode/impl/src/mndSubscribe.c | 1 - source/libs/scheduler/CMakeLists.txt | 4 +- 5 files changed, 330 insertions(+), 337 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 7d9fd53e69..15daa97dbf 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -140,214 +140,6 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) - -typedef struct { - int32_t keyLen; - int32_t valueLen; - void* key; - void* value; -} SKv; - -typedef struct { - int32_t connId; - int32_t hbType; -} SClientHbKey; - -typedef struct { - SClientHbKey connKey; - SHashObj* info; // hash -} SClientHbReq; - -typedef struct { - int64_t reqId; - SArray* reqs; // SArray -} SClientHbBatchReq; - -typedef struct { - SClientHbKey connKey; - int32_t status; - int32_t bodyLen; - void* body; -} SClientHbRsp; - -typedef struct { - int64_t reqId; - int64_t rspId; - SArray* rsps; // SArray -} SClientHbBatchRsp; - -static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { - return taosIntHash_64(key, keyLen); -} - -int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); -void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); - -int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); -void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); - -static FORCE_INLINE void tFreeClientHbReq(void *pReq) { - SClientHbReq* req = (SClientHbReq*)pReq; - if (req->info) taosHashCleanup(req->info); -} - -int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); -void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); - -static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { - SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; - if (deep) { - taosArrayDestroyEx(req->reqs, tFreeClientHbReq); - } else { - taosArrayDestroy(req->reqs); - } - free(pReq); -} - -int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); -void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); - -static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKv->keyLen); - tlen += taosEncodeFixedI32(buf, pKv->valueLen); - tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); - tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { - buf = taosDecodeFixedI32(buf, &pKv->keyLen); - buf = taosDecodeFixedI32(buf, &pKv->valueLen); - buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); - buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); - return buf; -} - -static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKey->connId); - tlen += taosEncodeFixedI32(buf, pKey->hbType); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { - buf = taosDecodeFixedI32(buf, &pKey->connId); - buf = taosDecodeFixedI32(buf, &pKey->hbType); - return buf; -} - -typedef struct SMqHbVgInfo { - int32_t vgId; -} SMqHbVgInfo; - -static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { - buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); - return buf; -} - -typedef struct SMqHbTopicInfo { - int32_t epoch; - int64_t topicUid; - char name[TSDB_TOPIC_FNAME_LEN]; - SArray* pVgInfo; -} SMqHbTopicInfo; - -static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); - tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); - tlen += taosEncodeString(buf, pTopicInfo->name); - int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); - tlen += taosEncodeSMqVgInfo(buf, pVgInfo); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { - buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); - buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); - buf = taosDecodeStringTo(buf, pTopicInfo->name); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); - for (int32_t i = 0; i < sz; i++) { - SMqHbVgInfo vgInfo; - buf = taosDecodeSMqVgInfo(buf, &vgInfo); - taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); - } - return buf; -} - -typedef struct SMqHbMsg { - int32_t status; // ask hb endpoint - int32_t epoch; - int64_t consumerId; - SArray* pTopics; // SArray -} SMqHbMsg; - -static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pMsg->status); - tlen += taosEncodeFixedI32(buf, pMsg->epoch); - tlen += taosEncodeFixedI64(buf, pMsg->consumerId); - int32_t sz = taosArrayGetSize(pMsg->pTopics); - tlen += taosEncodeFixedI32(buf, sz); - for (int i = 0; i < sz; i++) { - SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); - tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { - buf = taosDecodeFixedI32(buf, &pMsg->status); - buf = taosDecodeFixedI32(buf, &pMsg->epoch); - buf = taosDecodeFixedI64(buf, &pMsg->consumerId); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); - for (int i = 0; i < sz; i++) { - SMqHbTopicInfo topicInfo; - buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); - taosArrayPush(pMsg->pTopics, &topicInfo); - } - return buf; -} - -typedef struct SMqSetCVgReq { - int32_t vgId; - int64_t consumerId; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cGroup[TSDB_CONSUMER_GROUP_LEN]; -} SMqSetCVgReq; - -static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { - int32_t tlen = 0; - tlen += taosEncodeFixedI32(buf, pReq->vgId); - tlen += taosEncodeFixedI64(buf, pReq->consumerId); - tlen += taosEncodeString(buf, pReq->topicName); - tlen += taosEncodeString(buf, pReq->cGroup); - return tlen; -} - -static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { - buf = taosDecodeFixedI32(buf, &pReq->vgId); - buf = taosDecodeFixedI64(buf, &pReq->consumerId); - buf = taosDecodeStringTo(buf, pReq->topicName); - buf = taosDecodeStringTo(buf, pReq->cGroup); - return buf; -} - typedef struct { int32_t vgId; char* dbName; @@ -384,6 +176,18 @@ typedef struct SSubmitBlk { char data[]; } SSubmitBlk; +typedef struct { + /* data */ +} SSubmitReq; + +typedef struct { + /* data */ +} SSubmitRsp; + +typedef struct { + /* data */ +} SSubmitReqReader; + // Submit message for this TSDB typedef struct { SMsgHead header; @@ -486,92 +290,6 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { } return buf; } - -typedef struct SMqHbRsp { - int8_t status; //idle or not - int8_t vnodeChanged; - int8_t epChanged; // should use new epset - int8_t reserved; - SEpSet epSet; -} SMqHbRsp; - -static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) { - int tlen = 0; - tlen += taosEncodeFixedI8(buf, pRsp->status); - tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged); - tlen += taosEncodeFixedI8(buf, pRsp->epChanged); - tlen += taosEncodeSEpSet(buf, &pRsp->epSet); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { - buf = taosDecodeFixedI8(buf, &pRsp->status); - buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged); - buf = taosDecodeFixedI8(buf, &pRsp->epChanged); - buf = taosDecodeSEpSet(buf, &pRsp->epSet); - return buf; -} - -typedef struct SMqHbOneTopicBatchRsp { - char topicName[TSDB_TOPIC_FNAME_LEN]; - SArray* rsps; // SArray -} SMqHbOneTopicBatchRsp; - -static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) { - int tlen = 0; - tlen += taosEncodeString(buf, pBatchRsp->topicName); - int32_t sz = taosArrayGetSize(pBatchRsp->rsps); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i); - tlen += taosEncodeSMqHbRsp(buf, pRsp); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) { - int32_t sz; - buf = taosDecodeStringTo(buf, pBatchRsp->topicName); - buf = taosDecodeFixedI32(buf, &sz); - pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); - for (int32_t i = 0; i < sz; i++) { - SMqHbRsp rsp; - buf = taosDecodeSMqHbRsp(buf, &rsp); - buf = taosArrayPush(pBatchRsp->rsps, &rsp); - } - return buf; -} - -typedef struct SMqHbBatchRsp { - int64_t consumerId; - SArray* batchRsps; // SArray -} SMqHbBatchRsp; - -static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { - int tlen = 0; - tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId); - int32_t sz; - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); - tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) { - buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); - for (int32_t i = 0; i < sz; i++) { - SMqHbOneTopicBatchRsp rsp; - buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); - buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); - } - return buf; -} - typedef struct { int32_t acctId; int64_t clusterId; @@ -1152,45 +870,7 @@ typedef struct { char desc[TSDB_STEP_DESC_LEN]; } SStartupReq; -// mq related -typedef struct { -} SMqConnectReq; - -typedef struct { -} SMqConnectRsp; - -typedef struct { -} SMqDisconnectReq; - -typedef struct { -} SMqDisconnectRsp; - -typedef struct { -} SMqAckReq; - -typedef struct { -} SMqAckRsp; - -typedef struct { -} SMqResetReq; - -typedef struct { -} SMqResetRsp; -// mq related end - -typedef struct { - /* data */ -} SSubmitReq; - -typedef struct { - /* data */ -} SSubmitRsp; - -typedef struct { - /* data */ -} SSubmitReqReader; - -typedef struct { +typedef struct SSubQueryMsg { SMsgHead header; uint64_t sId; uint64_t queryId; @@ -1557,6 +1237,318 @@ typedef struct { #pragma pack(pop) +static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pMsg->contLen); + tlen += taosEncodeFixedI32(buf, pMsg->vgId); + return tlen; +} + +typedef struct SMqHbRsp { + int8_t status; //idle or not + int8_t vnodeChanged; + int8_t epChanged; // should use new epset + int8_t reserved; + SEpSet epSet; +} SMqHbRsp; + +static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) { + int tlen = 0; + tlen += taosEncodeFixedI8(buf, pRsp->status); + tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged); + tlen += taosEncodeFixedI8(buf, pRsp->epChanged); + tlen += taosEncodeSEpSet(buf, &pRsp->epSet); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { + buf = taosDecodeFixedI8(buf, &pRsp->status); + buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged); + buf = taosDecodeFixedI8(buf, &pRsp->epChanged); + buf = taosDecodeSEpSet(buf, &pRsp->epSet); + return buf; +} + +typedef struct SMqHbOneTopicBatchRsp { + char topicName[TSDB_TOPIC_FNAME_LEN]; + SArray* rsps; // SArray +} SMqHbOneTopicBatchRsp; + +static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) { + int tlen = 0; + tlen += taosEncodeString(buf, pBatchRsp->topicName); + int32_t sz = taosArrayGetSize(pBatchRsp->rsps); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i); + tlen += taosEncodeSMqHbRsp(buf, pRsp); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) { + int32_t sz; + buf = taosDecodeStringTo(buf, pBatchRsp->topicName); + buf = taosDecodeFixedI32(buf, &sz); + pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); + for (int32_t i = 0; i < sz; i++) { + SMqHbRsp rsp; + buf = taosDecodeSMqHbRsp(buf, &rsp); + buf = taosArrayPush(pBatchRsp->rsps, &rsp); + } + return buf; +} + +typedef struct SMqHbBatchRsp { + int64_t consumerId; + SArray* batchRsps; // SArray +} SMqHbBatchRsp; + +static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { + int tlen = 0; + tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId); + int32_t sz; + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); + tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) { + buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); + for (int32_t i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp rsp; + buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); + buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); + } + return buf; +} + +typedef struct { + int32_t keyLen; + int32_t valueLen; + void* key; + void* value; +} SKv; + +typedef struct { + int32_t connId; + int32_t hbType; +} SClientHbKey; + +typedef struct { + SClientHbKey connKey; + SHashObj* info; // hash +} SClientHbReq; + +typedef struct { + int64_t reqId; + SArray* reqs; // SArray +} SClientHbBatchReq; + +typedef struct { + SClientHbKey connKey; + int32_t status; + int32_t bodyLen; + void* body; +} SClientHbRsp; + +typedef struct { + int64_t reqId; + int64_t rspId; + SArray* rsps; // SArray +} SClientHbBatchRsp; + +static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { + return taosIntHash_64(key, keyLen); +} + +int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); +void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); + +int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); +void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); + +static FORCE_INLINE void tFreeClientHbReq(void *pReq) { + SClientHbReq* req = (SClientHbReq*)pReq; + if (req->info) taosHashCleanup(req->info); +} + +int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); +void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); + +static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { + SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; + if (deep) { + taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + } else { + taosArrayDestroy(req->reqs); + } + free(pReq); +} + +int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); +void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); + +static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKv->keyLen); + tlen += taosEncodeFixedI32(buf, pKv->valueLen); + tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); + tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { + buf = taosDecodeFixedI32(buf, &pKv->keyLen); + buf = taosDecodeFixedI32(buf, &pKv->valueLen); + buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); + buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); + return buf; +} + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + +typedef struct SMqHbVgInfo { + int32_t vgId; +} SMqHbVgInfo; + +static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { + buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); + return buf; +} + +typedef struct SMqHbTopicInfo { + int32_t epoch; + int64_t topicUid; + char name[TSDB_TOPIC_FNAME_LEN]; + SArray* pVgInfo; +} SMqHbTopicInfo; + +static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); + tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); + tlen += taosEncodeString(buf, pTopicInfo->name); + int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); + tlen += taosEncodeSMqVgInfo(buf, pVgInfo); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { + buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); + buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); + buf = taosDecodeStringTo(buf, pTopicInfo->name); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); + for (int32_t i = 0; i < sz; i++) { + SMqHbVgInfo vgInfo; + buf = taosDecodeSMqVgInfo(buf, &vgInfo); + taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); + } + return buf; +} + +typedef struct SMqHbMsg { + int32_t status; // ask hb endpoint + int32_t epoch; + int64_t consumerId; + SArray* pTopics; // SArray +} SMqHbMsg; + +static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pMsg->status); + tlen += taosEncodeFixedI32(buf, pMsg->epoch); + tlen += taosEncodeFixedI64(buf, pMsg->consumerId); + int32_t sz = taosArrayGetSize(pMsg->pTopics); + tlen += taosEncodeFixedI32(buf, sz); + for (int i = 0; i < sz; i++) { + SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); + tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { + buf = taosDecodeFixedI32(buf, &pMsg->status); + buf = taosDecodeFixedI32(buf, &pMsg->epoch); + buf = taosDecodeFixedI64(buf, &pMsg->consumerId); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); + for (int i = 0; i < sz; i++) { + SMqHbTopicInfo topicInfo; + buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); + taosArrayPush(pMsg->pTopics, &topicInfo); + } + return buf; +} + +typedef struct SMqSetCVgReq { + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + SArray* tasks; // SArray +} SMqSetCVgReq; + + +static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pReq->vgId); + tlen += taosEncodeFixedI64(buf, pReq->consumerId); + tlen += taosEncodeString(buf, pReq->topicName); + tlen += taosEncodeString(buf, pReq->cGroup); + tlen += taosEncodeString(buf, pReq->sql); + tlen += taosEncodeString(buf, pReq->logicalPlan); + tlen += taosEncodeString(buf, pReq->physicalPlan); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { + buf = taosDecodeFixedI32(buf, &pReq->vgId); + buf = taosDecodeFixedI64(buf, &pReq->consumerId); + buf = taosDecodeStringTo(buf, pReq->topicName); + buf = taosDecodeStringTo(buf, pReq->cGroup); + buf = taosDecodeString(buf, &pReq->sql); + buf = taosDecodeString(buf, &pReq->logicalPlan); + buf = taosDecodeString(buf, &pReq->physicalPlan); + pReq->tasks = NULL; + return buf; +} + + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index adbef3b55f..3aad002d40 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -7,6 +7,7 @@ target_include_directories( ) target_link_libraries( mnode + PRIVATE scheduler PRIVATE sdb PRIVATE wal PRIVATE transport @@ -16,4 +17,4 @@ target_link_libraries( if(${BUILD_TEST}) add_subdirectory(test) -endif(${BUILD_TEST}) \ No newline at end of file +endif(${BUILD_TEST}) diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a069987368..45759117b5 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -26,6 +26,7 @@ #include "tlog.h" #include "trpc.h" #include "ttimer.h" +#include "scheduler.h" #include "mnode.h" diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 1aca2e9894..52f33fa89c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#define _DEFAULT_SOURCE #include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 4e297f4e17..1b4aee3ccf 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,9 +9,9 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner qcom common catalog transport + PUBLIC os util planner qcom common catalog transport ) if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) -endif(${BUILD_TEST}) \ No newline at end of file +endif(${BUILD_TEST})