diff --git a/include/common/tmsg.h b/include/common/tmsg.h index dfd376f1e9..f6bee57c94 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -140,190 +140,6 @@ typedef enum _mgmt_table { #define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL) #define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC) #define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0) - -typedef struct { - int32_t keyLen; - int32_t valueLen; - void* key; - void* value; -} SKv; - -typedef struct { - int32_t connId; - int32_t hbType; -} SClientHbKey; - -typedef struct { - SClientHbKey connKey; - SHashObj* info; // hash -} SClientHbReq; - -typedef struct { - int64_t reqId; - SArray* reqs; // SArray -} SClientHbBatchReq; - -typedef struct { - SClientHbKey connKey; - int32_t status; - int32_t bodyLen; - void* body; -} SClientHbRsp; - -typedef struct { - int64_t reqId; - int64_t rspId; - SArray* rsps; // SArray -} SClientHbBatchRsp; - -static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { - return taosIntHash_64(key, keyLen); -} - -int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); -void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); - -int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); -void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); - -static FORCE_INLINE void tFreeClientHbReq(void *pReq) { - SClientHbReq* req = (SClientHbReq*)pReq; - if (req->info) taosHashCleanup(req->info); -} - -int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); -void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); - -static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { - SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; - if (deep) { - taosArrayDestroyEx(req->reqs, tFreeClientHbReq); - } else { - taosArrayDestroy(req->reqs); - } - free(pReq); -} - -int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); -void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); - -static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKv->keyLen); - tlen += taosEncodeFixedI32(buf, pKv->valueLen); - tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); - tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { - buf = taosDecodeFixedI32(buf, &pKv->keyLen); - buf = taosDecodeFixedI32(buf, &pKv->valueLen); - buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); - buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); - return buf; -} - -static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pKey->connId); - tlen += taosEncodeFixedI32(buf, pKey->hbType); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { - buf = taosDecodeFixedI32(buf, &pKey->connId); - buf = taosDecodeFixedI32(buf, &pKey->hbType); - return buf; -} - -typedef struct SMqHbVgInfo { - int32_t vgId; -} SMqHbVgInfo; - -static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { - buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); - return buf; -} - -typedef struct SMqHbTopicInfo { - int32_t epoch; - int64_t topicUid; - char name[TSDB_TOPIC_FNAME_LEN]; - SArray* pVgInfo; -} SMqHbTopicInfo; - -static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); - tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); - tlen += taosEncodeString(buf, pTopicInfo->name); - int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); - tlen += taosEncodeSMqVgInfo(buf, pVgInfo); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { - buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); - buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); - buf = taosDecodeStringTo(buf, pTopicInfo->name); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); - for (int32_t i = 0; i < sz; i++) { - SMqHbVgInfo vgInfo; - buf = taosDecodeSMqVgInfo(buf, &vgInfo); - taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); - } - return buf; -} - -typedef struct SMqHbMsg { - int32_t status; // ask hb endpoint - int32_t epoch; - int64_t consumerId; - SArray* pTopics; // SArray -} SMqHbMsg; - -static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { - int tlen = 0; - tlen += taosEncodeFixedI32(buf, pMsg->status); - tlen += taosEncodeFixedI32(buf, pMsg->epoch); - tlen += taosEncodeFixedI64(buf, pMsg->consumerId); - int32_t sz = taosArrayGetSize(pMsg->pTopics); - tlen += taosEncodeFixedI32(buf, sz); - for (int i = 0; i < sz; i++) { - SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); - tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { - buf = taosDecodeFixedI32(buf, &pMsg->status); - buf = taosDecodeFixedI32(buf, &pMsg->epoch); - buf = taosDecodeFixedI64(buf, &pMsg->consumerId); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); - for (int i = 0; i < sz; i++) { - SMqHbTopicInfo topicInfo; - buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); - taosArrayPush(pMsg->pTopics, &topicInfo); - } - return buf; -} - typedef struct { int32_t vgId; char* dbName; @@ -360,6 +176,18 @@ typedef struct SSubmitBlk { char data[]; } SSubmitBlk; +typedef struct { + /* data */ +} SSubmitReq; + +typedef struct { + /* data */ +} SSubmitRsp; + +typedef struct { + /* data */ +} SSubmitReqReader; + // Submit message for this TSDB typedef struct { SMsgHead header; @@ -462,92 +290,6 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) { } return buf; } - -typedef struct SMqHbRsp { - int8_t status; //idle or not - int8_t vnodeChanged; - int8_t epChanged; // should use new epset - int8_t reserved; - SEpSet epSet; -} SMqHbRsp; - -static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) { - int tlen = 0; - tlen += taosEncodeFixedI8(buf, pRsp->status); - tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged); - tlen += taosEncodeFixedI8(buf, pRsp->epChanged); - tlen += taosEncodeSEpSet(buf, &pRsp->epSet); - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { - buf = taosDecodeFixedI8(buf, &pRsp->status); - buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged); - buf = taosDecodeFixedI8(buf, &pRsp->epChanged); - buf = taosDecodeSEpSet(buf, &pRsp->epSet); - return buf; -} - -typedef struct SMqHbOneTopicBatchRsp { - char topicName[TSDB_TOPIC_FNAME_LEN]; - SArray* rsps; // SArray -} SMqHbOneTopicBatchRsp; - -static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) { - int tlen = 0; - tlen += taosEncodeString(buf, pBatchRsp->topicName); - int32_t sz = taosArrayGetSize(pBatchRsp->rsps); - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i); - tlen += taosEncodeSMqHbRsp(buf, pRsp); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) { - int32_t sz; - buf = taosDecodeStringTo(buf, pBatchRsp->topicName); - buf = taosDecodeFixedI32(buf, &sz); - pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); - for (int32_t i = 0; i < sz; i++) { - SMqHbRsp rsp; - buf = taosDecodeSMqHbRsp(buf, &rsp); - buf = taosArrayPush(pBatchRsp->rsps, &rsp); - } - return buf; -} - -typedef struct SMqHbBatchRsp { - int64_t consumerId; - SArray* batchRsps; // SArray -} SMqHbBatchRsp; - -static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { - int tlen = 0; - tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId); - int32_t sz; - tlen += taosEncodeFixedI32(buf, sz); - for (int32_t i = 0; i < sz; i++) { - SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); - tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); - } - return tlen; -} - -static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) { - buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId); - int32_t sz; - buf = taosDecodeFixedI32(buf, &sz); - pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); - for (int32_t i = 0; i < sz; i++) { - SMqHbOneTopicBatchRsp rsp; - buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); - buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); - } - return buf; -} - typedef struct { int32_t acctId; int64_t clusterId; @@ -1128,45 +870,7 @@ typedef struct { char desc[TSDB_STEP_DESC_LEN]; } SStartupReq; -// mq related -typedef struct { -} SMqConnectReq; - -typedef struct { -} SMqConnectRsp; - -typedef struct { -} SMqDisconnectReq; - -typedef struct { -} SMqDisconnectRsp; - -typedef struct { -} SMqAckReq; - -typedef struct { -} SMqAckRsp; - -typedef struct { -} SMqResetReq; - -typedef struct { -} SMqResetRsp; -// mq related end - -typedef struct { - /* data */ -} SSubmitReq; - -typedef struct { - /* data */ -} SSubmitRsp; - -typedef struct { - /* data */ -} SSubmitReqReader; - -typedef struct { +typedef struct SSubQueryMsg { SMsgHead header; uint64_t sId; uint64_t queryId; @@ -1385,6 +1089,10 @@ static FORCE_INLINE void* tDeserializeSMVSubscribeReq(void* buf, SMVSubscribeReq return buf; } +typedef struct SMqTmrMsg { + int32_t reserved; +} SMqTmrMsg; + typedef struct { int64_t status; } SMVSubscribeRsp; @@ -1533,6 +1241,318 @@ typedef struct { #pragma pack(pop) +static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pMsg->contLen); + tlen += taosEncodeFixedI32(buf, pMsg->vgId); + return tlen; +} + +typedef struct SMqHbRsp { + int8_t status; //idle or not + int8_t vnodeChanged; + int8_t epChanged; // should use new epset + int8_t reserved; + SEpSet epSet; +} SMqHbRsp; + +static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) { + int tlen = 0; + tlen += taosEncodeFixedI8(buf, pRsp->status); + tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged); + tlen += taosEncodeFixedI8(buf, pRsp->epChanged); + tlen += taosEncodeSEpSet(buf, &pRsp->epSet); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) { + buf = taosDecodeFixedI8(buf, &pRsp->status); + buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged); + buf = taosDecodeFixedI8(buf, &pRsp->epChanged); + buf = taosDecodeSEpSet(buf, &pRsp->epSet); + return buf; +} + +typedef struct SMqHbOneTopicBatchRsp { + char topicName[TSDB_TOPIC_FNAME_LEN]; + SArray* rsps; // SArray +} SMqHbOneTopicBatchRsp; + +static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) { + int tlen = 0; + tlen += taosEncodeString(buf, pBatchRsp->topicName); + int32_t sz = taosArrayGetSize(pBatchRsp->rsps); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i); + tlen += taosEncodeSMqHbRsp(buf, pRsp); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) { + int32_t sz; + buf = taosDecodeStringTo(buf, pBatchRsp->topicName); + buf = taosDecodeFixedI32(buf, &sz); + pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp)); + for (int32_t i = 0; i < sz; i++) { + SMqHbRsp rsp; + buf = taosDecodeSMqHbRsp(buf, &rsp); + buf = taosArrayPush(pBatchRsp->rsps, &rsp); + } + return buf; +} + +typedef struct SMqHbBatchRsp { + int64_t consumerId; + SArray* batchRsps; // SArray +} SMqHbBatchRsp; + +static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) { + int tlen = 0; + tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId); + int32_t sz; + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i); + tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) { + buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp)); + for (int32_t i = 0; i < sz; i++) { + SMqHbOneTopicBatchRsp rsp; + buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp); + buf = taosArrayPush(pBatchRsp->batchRsps, &rsp); + } + return buf; +} + +typedef struct { + int32_t keyLen; + int32_t valueLen; + void* key; + void* value; +} SKv; + +typedef struct { + int32_t connId; + int32_t hbType; +} SClientHbKey; + +typedef struct { + SClientHbKey connKey; + SHashObj* info; // hash +} SClientHbReq; + +typedef struct { + int64_t reqId; + SArray* reqs; // SArray +} SClientHbBatchReq; + +typedef struct { + SClientHbKey connKey; + int32_t status; + int32_t bodyLen; + void* body; +} SClientHbRsp; + +typedef struct { + int64_t reqId; + int64_t rspId; + SArray* rsps; // SArray +} SClientHbBatchRsp; + +static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) { + return taosIntHash_64(key, keyLen); +} + +int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq); +void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq); + +int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp); +void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp); + +static FORCE_INLINE void tFreeClientHbReq(void *pReq) { + SClientHbReq* req = (SClientHbReq*)pReq; + if (req->info) taosHashCleanup(req->info); +} + +int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq); +void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq); + +static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) { + SClientHbBatchReq *req = (SClientHbBatchReq*)pReq; + if (deep) { + taosArrayDestroyEx(req->reqs, tFreeClientHbReq); + } else { + taosArrayDestroy(req->reqs); + } + free(pReq); +} + +int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp); +void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp); + +static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKv->keyLen); + tlen += taosEncodeFixedI32(buf, pKv->valueLen); + tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen); + tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) { + buf = taosDecodeFixedI32(buf, &pKv->keyLen); + buf = taosDecodeFixedI32(buf, &pKv->valueLen); + buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen); + buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen); + return buf; +} + +static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pKey->connId); + tlen += taosEncodeFixedI32(buf, pKey->hbType); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) { + buf = taosDecodeFixedI32(buf, &pKey->connId); + buf = taosDecodeFixedI32(buf, &pKey->hbType); + return buf; +} + +typedef struct SMqHbVgInfo { + int32_t vgId; +} SMqHbVgInfo; + +static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pVgInfo->vgId); + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) { + buf = taosDecodeFixedI32(buf, &pVgInfo->vgId); + return buf; +} + +typedef struct SMqHbTopicInfo { + int32_t epoch; + int64_t topicUid; + char name[TSDB_TOPIC_FNAME_LEN]; + SArray* pVgInfo; +} SMqHbTopicInfo; + +static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch); + tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid); + tlen += taosEncodeString(buf, pTopicInfo->name); + int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i); + tlen += taosEncodeSMqVgInfo(buf, pVgInfo); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) { + buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch); + buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid); + buf = taosDecodeStringTo(buf, pTopicInfo->name); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo)); + for (int32_t i = 0; i < sz; i++) { + SMqHbVgInfo vgInfo; + buf = taosDecodeSMqVgInfo(buf, &vgInfo); + taosArrayPush(pTopicInfo->pVgInfo, &vgInfo); + } + return buf; +} + +typedef struct SMqHbMsg { + int32_t status; // ask hb endpoint + int32_t epoch; + int64_t consumerId; + SArray* pTopics; // SArray +} SMqHbMsg; + +static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, pMsg->status); + tlen += taosEncodeFixedI32(buf, pMsg->epoch); + tlen += taosEncodeFixedI64(buf, pMsg->consumerId); + int32_t sz = taosArrayGetSize(pMsg->pTopics); + tlen += taosEncodeFixedI32(buf, sz); + for (int i = 0; i < sz; i++) { + SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i); + tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo); + } + return tlen; +} + +static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) { + buf = taosDecodeFixedI32(buf, &pMsg->status); + buf = taosDecodeFixedI32(buf, &pMsg->epoch); + buf = taosDecodeFixedI64(buf, &pMsg->consumerId); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo)); + for (int i = 0; i < sz; i++) { + SMqHbTopicInfo topicInfo; + buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo); + taosArrayPush(pMsg->pTopics, &topicInfo); + } + return buf; +} + +typedef struct SMqSetCVgReq { + int32_t vgId; + int64_t consumerId; + char topicName[TSDB_TOPIC_FNAME_LEN]; + char cGroup[TSDB_CONSUMER_GROUP_LEN]; + char* sql; + char* logicalPlan; + char* physicalPlan; + SArray* tasks; // SArray +} SMqSetCVgReq; + + +static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pReq->vgId); + tlen += taosEncodeFixedI64(buf, pReq->consumerId); + tlen += taosEncodeString(buf, pReq->topicName); + tlen += taosEncodeString(buf, pReq->cGroup); + tlen += taosEncodeString(buf, pReq->sql); + tlen += taosEncodeString(buf, pReq->logicalPlan); + tlen += taosEncodeString(buf, pReq->physicalPlan); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) { + buf = taosDecodeFixedI32(buf, &pReq->vgId); + buf = taosDecodeFixedI64(buf, &pReq->consumerId); + buf = taosDecodeStringTo(buf, pReq->topicName); + buf = taosDecodeStringTo(buf, pReq->cGroup); + buf = taosDecodeString(buf, &pReq->sql); + buf = taosDecodeString(buf, &pReq->logicalPlan); + buf = taosDecodeString(buf, &pReq->physicalPlan); + pReq->tasks = NULL; + return buf; +} + + #ifdef __cplusplus } #endif diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index bfeba885d0..93adc58eca 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -140,6 +140,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "mnode-alter-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_DROP_TOPIC, "mnode-drop-topic", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_MND_SUBSCRIBE, "mnode-subscribe", SCMSubscribeReq, SCMSubscribeRsp) + TD_DEF_MSG_TYPE(TDMT_MND_MQ_TIMER, "mnode-timer", SMqTmrMsg, SMqTmrMsg) // Requests handled by VNODE TD_NEW_MSG_SEG(TDMT_VND_MSG) diff --git a/include/dnode/mnode/sdb/sdb.h b/include/dnode/mnode/sdb/sdb.h index 5a4ac6a96f..7b022dd7c7 100644 --- a/include/dnode/mnode/sdb/sdb.h +++ b/include/dnode/mnode/sdb/sdb.h @@ -113,8 +113,8 @@ typedef enum { SDB_USER = 7, SDB_AUTH = 8, SDB_ACCT = 9, - SDB_CONSUMER = 10, - SDB_CGROUP = 11, + SDB_SUBSCRIBE = 10, + SDB_CONSUMER = 11, SDB_TOPIC = 12, SDB_VGROUP = 13, SDB_STB = 14, diff --git a/include/util/tdef.h b/include/util/tdef.h index 1847432cc9..4c29d9963d 100644 --- a/include/util/tdef.h +++ b/include/util/tdef.h @@ -178,6 +178,7 @@ do { \ #define TSDB_TABLE_FNAME_LEN (TSDB_DB_FNAME_LEN + TSDB_TABLE_NAME_LEN + TSDB_NAME_DELIMITER_LEN) #define TSDB_TOPIC_FNAME_LEN TSDB_TABLE_FNAME_LEN #define TSDB_CONSUMER_GROUP_LEN 192 +#define TSDB_SUBSCRIBE_KEY_LEN (TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN + 2) #define TSDB_COL_NAME_LEN 65 #define TSDB_MAX_SAVED_SQL_LEN TSDB_MAX_COLUMNS * 64 #define TSDB_MAX_SQL_LEN TSDB_PAYLOAD_SIZE diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 415d6a57ce..1250402caf 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -148,30 +148,30 @@ TEST(testCase, connect_Test) { // taos_close(pConn); //} // -//TEST(testCase, create_db_Test) { - //TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - //assert(pConn != NULL); +TEST(testCase, create_db_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); - //TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); - //if (taos_errno(pRes) != 0) { - //printf("error in create db, reason:%s\n", taos_errstr(pRes)); - //} + TAOS_RES* pRes = taos_query(pConn, "create database abc1 vgroups 2"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } - //TAOS_FIELD* pFields = taos_fetch_fields(pRes); - //ASSERT_TRUE(pFields == NULL); + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); - //int32_t numOfFields = taos_num_fields(pRes); - //ASSERT_EQ(numOfFields, 0); + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); - //taos_free_result(pRes); + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database abc1 vgroups 4"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_close(pConn); +} - //pRes = taos_query(pConn, "create database abc1 vgroups 4"); - //if (taos_errno(pRes) != 0) { - //printf("error in create db, reason:%s\n", taos_errstr(pRes)); - //} - //taos_close(pConn); -//} -// //TEST(testCase, create_dnode_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index ab4ae4ac53..f4fda75bd8 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -100,7 +100,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_VGROUP_LIST)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_QUERY)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_KILL_CONN)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeReadMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_HEARTBEAT)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeReadMsg; diff --git a/source/dnode/mnode/impl/CMakeLists.txt b/source/dnode/mnode/impl/CMakeLists.txt index adbef3b55f..3aad002d40 100644 --- a/source/dnode/mnode/impl/CMakeLists.txt +++ b/source/dnode/mnode/impl/CMakeLists.txt @@ -7,6 +7,7 @@ target_include_directories( ) target_link_libraries( mnode + PRIVATE scheduler PRIVATE sdb PRIVATE wal PRIVATE transport @@ -16,4 +17,4 @@ target_link_libraries( if(${BUILD_TEST}) add_subdirectory(test) -endif(${BUILD_TEST}) \ No newline at end of file +endif(${BUILD_TEST}) diff --git a/source/dnode/mnode/impl/inc/mndConsumer.h b/source/dnode/mnode/impl/inc/mndConsumer.h index 68ba08b66e..9d1dd084ee 100644 --- a/source/dnode/mnode/impl/inc/mndConsumer.h +++ b/source/dnode/mnode/impl/inc/mndConsumer.h @@ -28,6 +28,9 @@ void mndCleanupConsumer(SMnode *pMnode); SMqConsumerObj *mndAcquireConsumer(SMnode *pMnode, int32_t consumerId); void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer); +SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); +SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index a2d6bbf4e6..ed88b42d39 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -26,6 +26,7 @@ #include "tlog.h" #include "trpc.h" #include "ttimer.h" +#include "scheduler.h" #include "mnode.h" @@ -326,17 +327,157 @@ typedef struct SMqTopicConsumer { #endif typedef struct SMqConsumerEp { - int32_t vgId; + int32_t vgId; // -1 for unassigned SEpSet epset; - int64_t consumerId; + int64_t consumerId; // -1 for unassigned + int64_t lastConsumerHbTs; + int64_t lastVgHbTs; } SMqConsumerEp; -typedef struct SMqCgroupTopicPair { - char key[TSDB_CONSUMER_GROUP_LEN + TSDB_TOPIC_FNAME_LEN]; - SArray* assigned; // SArray - SArray* unassignedConsumer; - SArray* unassignedVg; -} SMqCgroupTopicPair; +static FORCE_INLINE int32_t tEncodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { + int32_t tlen = 0; + tlen += taosEncodeFixedI32(buf, pConsumerEp->vgId); + tlen += taosEncodeSEpSet(buf, &pConsumerEp->epset); + tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId); + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqConsumerEp(void** buf, SMqConsumerEp* pConsumerEp) { + buf = taosDecodeFixedI32(buf, &pConsumerEp->vgId); + buf = taosDecodeSEpSet(buf, &pConsumerEp->epset); + buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId); + return buf; +} + +//unit for rebalance +typedef struct SMqSubscribeObj { + char key[TSDB_SUBSCRIBE_KEY_LEN]; + int32_t epoch; + //TODO: replace with priority queue + int32_t nextConsumerIdx; + SArray* availConsumer; // SArray (consumerId) + SArray* assigned; // SArray + SArray* unassignedConsumer; // SArray + SArray* unassignedVg; // SArray +} SMqSubscribeObj; + +static FORCE_INLINE SMqSubscribeObj* tNewSubscribeObj() { + SMqSubscribeObj* pSub = malloc(sizeof(SMqSubscribeObj)); + pSub->key[0] = 0; + pSub->epoch = 0; + if (pSub == NULL) { + return NULL; + } + pSub->availConsumer = taosArrayInit(0, sizeof(int64_t)); + if (pSub->availConsumer == NULL) { + free(pSub); + return NULL; + } + pSub->assigned = taosArrayInit(0, sizeof(SMqConsumerEp)); + if (pSub->assigned == NULL) { + taosArrayDestroy(pSub->availConsumer); + free(pSub); + return NULL; + } + pSub->unassignedConsumer = taosArrayInit(0, sizeof(SMqConsumerEp)); + if (pSub->assigned == NULL) { + taosArrayDestroy(pSub->availConsumer); + taosArrayDestroy(pSub->unassignedConsumer); + free(pSub); + return NULL; + } + pSub->unassignedVg = taosArrayInit(0, sizeof(SMqConsumerEp)); + if (pSub->assigned == NULL) { + taosArrayDestroy(pSub->availConsumer); + taosArrayDestroy(pSub->unassignedConsumer); + taosArrayDestroy(pSub->unassignedVg); + free(pSub); + return NULL; + } + return NULL; +} + +static FORCE_INLINE int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pSub->key); + tlen += taosEncodeFixedI32(buf, pSub->epoch); + int32_t sz; + + sz = taosArrayGetSize(pSub->availConsumer); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + int64_t* pConsumerId = taosArrayGet(pSub->availConsumer, i); + tlen += taosEncodeFixedI64(buf, *pConsumerId); + } + + sz = taosArrayGetSize(pSub->assigned); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp* pCEp = taosArrayGet(pSub->assigned, i); + tlen += tEncodeSMqConsumerEp(buf, pCEp); + } + + sz = taosArrayGetSize(pSub->unassignedConsumer); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedConsumer, i); + tlen += tEncodeSMqConsumerEp(buf, pCEp); + } + + sz = taosArrayGetSize(pSub->unassignedVg); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp* pCEp = taosArrayGet(pSub->unassignedVg, i); + tlen += tEncodeSMqConsumerEp(buf, pCEp); + } + + return tlen; +} + +static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub) { + buf = taosDecodeStringTo(buf, pSub->key); + buf = taosDecodeFixedI32(buf, &pSub->epoch); + + int32_t sz; + + buf = taosDecodeFixedI32(buf, &sz); + pSub->assigned = taosArrayInit(sz, sizeof(int64_t)); + if (pSub->assigned == NULL) { + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + int64_t consumerId; + buf = taosDecodeFixedI64(buf, &consumerId); + taosArrayPush(pSub->assigned, &consumerId); + } + + buf = taosDecodeFixedI32(buf, &sz); + pSub->unassignedConsumer = taosArrayInit(sz, sizeof(SMqConsumerEp)); + if (pSub->unassignedConsumer == NULL) { + taosArrayDestroy(pSub->assigned); + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp cEp; + buf = tDecodeSMqConsumerEp(buf, &cEp); + taosArrayPush(pSub->unassignedConsumer, &cEp); + } + + buf = taosDecodeFixedI32(buf, &sz); + pSub->unassignedVg = taosArrayInit(sz, sizeof(SMqConsumerEp)); + if (pSub->unassignedVg == NULL) { + taosArrayDestroy(pSub->assigned); + taosArrayDestroy(pSub->unassignedConsumer); + return NULL; + } + for (int32_t i = 0; i < sz; i++) { + SMqConsumerEp cEp; + buf = tDecodeSMqConsumerEp(buf, &cEp); + taosArrayPush(pSub->unassignedVg, &cEp); + } + + return buf; +} typedef struct SMqCGroup { char name[TSDB_CONSUMER_GROUP_LEN]; @@ -358,8 +499,8 @@ typedef struct SMqTopicObj { char *sql; char *logicalPlan; char *physicalPlan; - SHashObj *cgroups; // SHashObj - SHashObj *consumers; // SHashObj + //SHashObj *cgroups; // SHashObj + //SHashObj *consumers; // SHashObj } SMqTopicObj; // TODO: add cache and change name to id @@ -367,18 +508,93 @@ typedef struct SMqConsumerTopic { char name[TSDB_TOPIC_FNAME_LEN]; int32_t epoch; //TODO: replace with something with ep - SList *vgroups; // SList + //SList *vgroups; // SList + //vg assigned to the consumer on the topic SArray *pVgInfo; // SArray } SMqConsumerTopic; +static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqTopicObj* pTopic, SMqSubscribeObj* pSub) { + SMqConsumerTopic* pCTopic = malloc(sizeof(SMqConsumerTopic)); + if (pCTopic == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + strcpy(pCTopic->name, pTopic->name); + pCTopic->epoch = 0; + pCTopic->pVgInfo = taosArrayInit(0, sizeof(int32_t)); + + int32_t unassignedVgSz = taosArrayGetSize(pSub->unassignedVg); + if (unassignedVgSz > 0) { + SMqConsumerEp* pCEp = taosArrayPop(pSub->unassignedVg); + pCEp->consumerId = consumerId; + taosArrayPush(pCTopic->pVgInfo, &pCEp->vgId); + taosArrayPush(pSub->assigned, pCEp); + } + return pCTopic; +} + +static FORCE_INLINE int32_t tEncodeSMqConsumerTopic(void** buf, SMqConsumerTopic* pConsumerTopic) { + int32_t tlen = 0; + tlen += taosEncodeString(buf, pConsumerTopic->name); + tlen += taosEncodeFixedI32(buf, pConsumerTopic->epoch); + int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + int32_t* pVgInfo = taosArrayGet(pConsumerTopic->pVgInfo, i); + tlen += taosEncodeFixedI32(buf, *pVgInfo); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqConsumerTopic(void* buf, SMqConsumerTopic* pConsumerTopic) { + buf = taosDecodeStringTo(buf, pConsumerTopic->name); + buf = taosDecodeFixedI32(buf, &pConsumerTopic->epoch); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pConsumerTopic->pVgInfo = taosArrayInit(sz, sizeof(SMqConsumerTopic)); + for (int32_t i = 0; i < sz; i++) { + int32_t vgInfo; + buf = taosDecodeFixedI32(buf, &vgInfo); + taosArrayPush(pConsumerTopic->pVgInfo, &vgInfo); + } + return buf; +} + typedef struct SMqConsumerObj { int64_t consumerId; SRWLatch lock; char cgroup[TSDB_CONSUMER_GROUP_LEN]; SArray *topics; // SArray - SHashObj *topicHash; //SHashObj + //SHashObj *topicHash; //SHashObj } SMqConsumerObj; +static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) { + int32_t tlen = 0; + tlen += taosEncodeFixedI64(buf, pConsumer->consumerId); + tlen += taosEncodeString(buf, pConsumer->cgroup); + int32_t sz = taosArrayGetSize(pConsumer->topics); + tlen += taosEncodeFixedI32(buf, sz); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerTopic* pConsumerTopic = taosArrayGet(pConsumer->topics, i); + tlen += tEncodeSMqConsumerTopic(buf, pConsumerTopic); + } + return tlen; +} + +static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pConsumer) { + buf = taosDecodeFixedI64(buf, &pConsumer->consumerId); + buf = taosDecodeStringTo(buf, pConsumer->cgroup); + int32_t sz; + buf = taosDecodeFixedI32(buf, &sz); + pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj)); + for (int32_t i = 0; i < sz; i++) { + SMqConsumerTopic cTopic; + buf = tDecodeSMqConsumerTopic(buf, &cTopic); + taosArrayPush(pConsumer->topics, &cTopic); + } + return buf; +} + typedef struct SMqSubConsumerObj { int64_t consumerUid; // if -1, unassigned SList *vgId; // SList diff --git a/source/dnode/mnode/impl/inc/mndInt.h b/source/dnode/mnode/impl/inc/mndInt.h index d2107b9d07..29ccd43622 100644 --- a/source/dnode/mnode/impl/inc/mndInt.h +++ b/source/dnode/mnode/impl/inc/mndInt.h @@ -80,6 +80,7 @@ typedef struct SMnode { SReplica replicas[TSDB_MAX_REPLICA]; tmr_h timer; tmr_h transTimer; + tmr_h mqTimer; char *path; SMnodeCfg cfg; int64_t checkTime; diff --git a/source/dnode/mnode/impl/inc/mndSubscribe.h b/source/dnode/mnode/impl/inc/mndSubscribe.h new file mode 100644 index 0000000000..b8e651e386 --- /dev/null +++ b/source/dnode/mnode/impl/inc/mndSubscribe.h @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_MND_SUBSCRIBE_H_ +#define _TD_MND_SUBSCRIBE_H_ + +#include "mndInt.h" + +#ifdef __cplusplus +extern "C" { +#endif + +int32_t mndInitSubscribe(SMnode *pMnode); +void mndCleanupSubscribe(SMnode *pMnode); + +SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName); +void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub); + +SSdbRaw *mndSubscribeActionEncode(SMqSubscribeObj *pSub); +SSdbRow *mndSubscribeActionDecode(SSdbRaw *pRaw); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_MND_SUBSCRIBE_H_*/ diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index d27bf53a90..5cdd8e77bd 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -30,24 +30,14 @@ #define MND_CONSUMER_VER_NUMBER 1 #define MND_CONSUMER_RESERVE_SIZE 64 -static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer); -static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw); static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionDelete(SSdb *pSdb, SMqConsumerObj *pConsumer); static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pConsumer, SMqConsumerObj *pNewConsumer); -static int32_t mndProcessCreateConsumerMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropConsumerMsg(SMnodeMsg *pMsg); -static int32_t mndProcessDropConsumerInRsp(SMnodeMsg *pMsg); static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg); static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndRetrieveConsumer(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter); -static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); -static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); - int32_t mndInitConsumer(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_CONSUMER, .keyType = SDB_KEY_BINARY, @@ -57,26 +47,29 @@ int32_t mndInitConsumer(SMnode *pMnode) { .updateFp = (SdbUpdateFp)mndConsumerActionUpdate, .deleteFp = (SdbDeleteFp)mndConsumerActionDelete}; - mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); - /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ - /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ - mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); return sdbSetTable(pMnode->pSdb, table); } void mndCleanupConsumer(SMnode *pMnode) {} -static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) { - return 0; -} - -static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { - int32_t size = sizeof(SMqConsumerObj) + MND_CONSUMER_RESERVE_SIZE; - SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, size); +SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + int32_t tlen = tEncodeSMqConsumerObj(NULL, pConsumer); + SSdbRaw *pRaw = sdbAllocRaw(SDB_CONSUMER, MND_CONSUMER_VER_NUMBER, tlen); if (pRaw == NULL) goto CM_ENCODE_OVER; + void* buf = malloc(tlen); + if (buf == NULL) goto CM_ENCODE_OVER; + + void* abuf = buf; + tEncodeSMqConsumerObj(&abuf, pConsumer); + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, CM_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, CM_ENCODE_OVER); + +#if 0 int32_t topicNum = taosArrayGetSize(pConsumer->topics); SDB_SET_INT64(pRaw, dataPos, pConsumer->consumerId, CM_ENCODE_OVER); int32_t len = strlen(pConsumer->cgroup); @@ -101,10 +94,13 @@ static SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) { /*SDB_SET_INT64(pRaw, dataPos, 0[> change to list item <]);*/ } } +#endif SDB_SET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CM_ENCODE_OVER); SDB_SET_DATALEN(pRaw, dataPos, CM_ENCODE_OVER); + terrno = TSDB_CODE_SUCCESS; + CM_ENCODE_OVER: if (terrno != 0) { mError("consumer:%ld, failed to encode to raw:%p since %s", pConsumer->consumerId, pRaw, terrstr()); @@ -116,7 +112,7 @@ CM_ENCODE_OVER: return pRaw; } -static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { +SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { terrno = TSDB_CODE_OUT_OF_MEMORY; int8_t sver = 0; @@ -127,18 +123,27 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { goto CONSUME_DECODE_OVER; } - int32_t size = sizeof(SMqConsumerObj); - SSdbRow *pRow = sdbAllocRow(size); + SSdbRow *pRow = sdbAllocRow(sizeof(SMqConsumerObj)); if (pRow == NULL) goto CONSUME_DECODE_OVER; SMqConsumerObj *pConsumer = sdbGetRowObj(pRow); if (pConsumer == NULL) goto CONSUME_DECODE_OVER; int32_t dataPos = 0; - SDB_GET_INT64(pRaw, dataPos, &pConsumer->consumerId, CONSUME_DECODE_OVER); - int32_t len, topicNum; + int32_t len; SDB_GET_INT32(pRaw, dataPos, &len, CONSUME_DECODE_OVER); - SDB_GET_BINARY(pRaw, dataPos, pConsumer->cgroup, len, CONSUME_DECODE_OVER); + void* buf = malloc(len); + if (buf == NULL) goto CONSUME_DECODE_OVER; + + SDB_GET_BINARY(pRaw, dataPos, buf, len, CONSUME_DECODE_OVER); + + tDecodeSMqConsumerObj(buf, pConsumer); + + SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE, CONSUME_DECODE_OVER); + + terrno = TSDB_CODE_SUCCESS; + +#if 0 SDB_GET_INT32(pRaw, dataPos, &topicNum, CONSUME_DECODE_OVER); for (int i = 0; i < topicNum; i++) { int32_t topicLen; @@ -154,6 +159,7 @@ static SSdbRow *mndConsumerActionDecode(SSdbRaw *pRaw) { int32_t vgSize; SDB_GET_INT32(pRaw, dataPos, &vgSize, CONSUME_DECODE_OVER); } +#endif CONSUME_DECODE_OVER: if (terrno != 0) { @@ -162,8 +168,6 @@ CONSUME_DECODE_OVER: return NULL; } - /*SDB_GET_RESERVE(pRaw, dataPos, MND_CONSUMER_RESERVE_SIZE);*/ - return pRow; } @@ -201,214 +205,13 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) { sdbRelease(pSdb, pConsumer); } -static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { - SMnode *pMnode = pMsg->pMnode; - char *msgStr = pMsg->rpcMsg.pCont; - SCMSubscribeReq subscribe; - tDeserializeSCMSubscribeReq(msgStr, &subscribe); - int64_t consumerId = subscribe.consumerId; - char *consumerGroup = subscribe.consumerGroup; - int32_t cgroupLen = strlen(consumerGroup); - - SArray *newSub = NULL; - int newTopicNum = subscribe.topicNum; - if (newTopicNum) { - newSub = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); - } - SMqConsumerTopic *pConsumerTopics = calloc(newTopicNum, sizeof(SMqConsumerTopic)); - if (pConsumerTopics == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - for (int i = 0; i < newTopicNum; i++) { - char *newTopicName = taosArrayGetP(newSub, i); - SMqConsumerTopic *pConsumerTopic = &pConsumerTopics[i]; - - strcpy(pConsumerTopic->name, newTopicName); - pConsumerTopic->vgroups = tdListNew(sizeof(int64_t)); - } - - taosArrayAddBatch(newSub, pConsumerTopics, newTopicNum); - free(pConsumerTopics); - taosArraySortString(newSub, taosArrayCompareString); - - SArray *oldSub = NULL; - int oldTopicNum = 0; - // create consumer if not exist - SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); - if (pConsumer == NULL) { - // create consumer - pConsumer = malloc(sizeof(SMqConsumerObj)); - if (pConsumer == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pConsumer->consumerId = consumerId; - strcpy(pConsumer->cgroup, consumerGroup); - - } else { - oldSub = pConsumer->topics; - oldTopicNum = taosArrayGetSize(oldSub); - } - STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); - if (pTrans == NULL) { - //TODO: free memory - return -1; - } - - int i = 0, j = 0; - while (i < newTopicNum || j < oldTopicNum) { - SMqConsumerTopic *pOldTopic = NULL; - SMqConsumerTopic *pNewTopic = NULL; - if (i >= newTopicNum) { - // encode unset topic msg to all vnodes related to that topic - pOldTopic = taosArrayGet(oldSub, j); - j++; - } else if (j >= oldTopicNum) { - pNewTopic = taosArrayGet(newSub, i); - i++; - } else { - pNewTopic = taosArrayGet(newSub, i); - pOldTopic = taosArrayGet(oldSub, j); - - char *newName = pNewTopic->name; - char *oldName = pOldTopic->name; - int comp = compareLenPrefixedStr(newName, oldName); - if (comp == 0) { - // do nothing - pOldTopic = pNewTopic = NULL; - i++; - j++; - continue; - } else if (comp < 0) { - pOldTopic = NULL; - i++; - } else { - pNewTopic = NULL; - j++; - } - } - - if (pOldTopic != NULL) { - //cancel subscribe of that old topic - ASSERT(pNewTopic == NULL); - char *oldTopicName = pOldTopic->name; - SList *vgroups = pOldTopic->vgroups; - SListIter iter; - tdListInitIter(vgroups, &iter, TD_LIST_FORWARD); - SListNode *pn; - - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); - ASSERT(pTopic != NULL); - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); - while ((pn = tdListNext(&iter)) != NULL) { - int32_t vgId = *(int64_t *)pn->data; - // acquire and get epset - SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); - // TODO what time to release? - if (pVgObj == NULL) { - // TODO handle error - continue; - } - //build reset msg - void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); - // TODO:serialize - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - STransAction action = {0}; - action.epSet = mndGetVgroupEpset(pMnode, pVgObj); - action.pCont = pMqVgSetReq; - action.contLen = 0; // TODO - action.msgType = TDMT_VND_MQ_SET_CONN; - if (mndTransAppendRedoAction(pTrans, &action) != 0) { - free(pMqVgSetReq); - mndTransDrop(pTrans); - // TODO free - return -1; - } - } - //delete data in mnode - taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); - mndReleaseTopic(pMnode, pTopic); - - } else if (pNewTopic != NULL) { - // save subscribe info to mnode - ASSERT(pOldTopic == NULL); - - char *newTopicName = pNewTopic->name; - SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); - ASSERT(pTopic != NULL); - - SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); - if (pGroup == NULL) { - // add new group - pGroup = malloc(sizeof(SMqCGroup)); - if (pGroup == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pGroup->consumerIds = tdListNew(sizeof(int64_t)); - if (pGroup->consumerIds == NULL) { - free(pGroup); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - pGroup->status = 0; - // add into cgroups - taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); - } - /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/ - - // put the consumer into list - // rebalance will be triggered by timer - tdListAppend(pGroup->consumerIds, &consumerId); - - SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); - sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); - // TODO: error handling - mndTransAppendRedolog(pTrans, pTopicRaw); - - mndReleaseTopic(pMnode, pTopic); - - } else { - ASSERT(0); - } - } - // destroy old sub - taosArrayDestroy(oldSub); - // put new sub into consumerobj - pConsumer->topics = newSub; - - // persist consumerObj - SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); - sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); - // TODO: error handling - mndTransAppendRedolog(pTrans, pConsumerRaw); - - if (mndTransPrepare(pMnode, pTrans) != 0) { - mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); - mndTransDrop(pTrans); - mndReleaseConsumer(pMnode, pConsumer); - return -1; - } - - // TODO: free memory - mndTransDrop(pTrans); - mndReleaseConsumer(pMnode, pConsumer); - return 0; -} - -static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } - +#if 0 static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; STableInfoReq *pInfo = pMsg->rpcMsg.pCont; mDebug("consumer:%s, start to retrieve meta", pInfo->tableFname); -#if 0 SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); if (pDb == NULL) { terrno = TSDB_CODE_MND_DB_NOT_SELECTED; @@ -463,7 +266,6 @@ static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { pMsg->contLen = contLen; mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags); -#endif return 0; } @@ -546,3 +348,4 @@ static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { SSdb *pSdb = pMnode->pSdb; sdbCancelFetch(pSdb, pIter); } +#endif diff --git a/source/dnode/mnode/impl/src/mndProfile.c b/source/dnode/mnode/impl/src/mndProfile.c index 3773750ed3..22fdfde2ac 100644 --- a/source/dnode/mnode/impl/src/mndProfile.c +++ b/source/dnode/mnode/impl/src/mndProfile.c @@ -273,6 +273,7 @@ static int32_t mndSaveQueryStreamList(SConnObj *pConn, SHeartBeatReq *pReq) { } static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { +#if 0 SClientHbRsp* pRsp = malloc(sizeof(SClientHbRsp)); if (pRsp == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -332,6 +333,8 @@ static SClientHbRsp* mndMqHbBuildRsp(SMnode* pMnode, SClientHbReq* pReq) { pRsp->body = buf; pRsp->bodyLen = tlen; return pRsp; +#endif + return NULL; } static int32_t mndProcessHeartBeatReq(SMnodeMsg *pReq) { diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c new file mode 100644 index 0000000000..7b95e93257 --- /dev/null +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -0,0 +1,690 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "mndSubscribe.h" +#include "mndConsumer.h" +#include "mndDb.h" +#include "mndDnode.h" +#include "mndMnode.h" +#include "mndShow.h" +#include "mndStb.h" +#include "mndTopic.h" +#include "mndTrans.h" +#include "mndUser.h" +#include "mndVgroup.h" +#include "tcompare.h" +#include "tname.h" + +#define MND_SUBSCRIBE_VER_NUMBER 1 +#define MND_SUBSCRIBE_RESERVE_SIZE 64 + +static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *); +static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw); +static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *); +static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *); +static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub); + +static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg); +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg); +static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg); + +static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic); + +int32_t mndInitSubscribe(SMnode *pMnode) { + SSdbTable table = {.sdbType = SDB_SUBSCRIBE, + .keyType = SDB_KEY_BINARY, + .encodeFp = (SdbEncodeFp)mndSubActionEncode, + .decodeFp = (SdbDecodeFp)mndSubActionDecode, + .insertFp = (SdbInsertFp)mndSubActionInsert, + .updateFp = (SdbUpdateFp)mndSubActionUpdate, + .deleteFp = (SdbDeleteFp)mndSubActionDelete}; + + mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE, mndProcessSubscribeReq); + /*mndSetMsgHandle(pMnode, TDMT_MND_SUBSCRIBE_RSP, mndProcessSubscribeRsp);*/ + /*mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE, mndProcessSubscribeInternalReq);*/ + mndSetMsgHandle(pMnode, TDMT_VND_SUBSCRIBE_RSP, mndProcessSubscribeInternalRsp); + mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg); + return sdbSetTable(pMnode->pSdb, table); +} + +static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) { + int i = 0; + while (key[i] != ':') { + i++; + } + key[i] = 0; + *topic = strdup(key); + key[i] = ':'; + *cgroup = strdup(&key[i + 1]); + return 0; +} + +static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + SMqSubscribeObj *pSub = NULL; + void *pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); + int sz; + while (pIter != NULL) { + if ((sz = taosArrayGetSize(pSub->unassignedVg)) > 0) { + char *topic = NULL; + char *cgroup = NULL; + mndSplitSubscribeKey(pSub->key, &topic, &cgroup); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); + + // create trans + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + for (int i = 0; i < sz; i++) { + int64_t consumerId = *(int64_t *)taosArrayGet(pSub->availConsumer, pSub->nextConsumerIdx); + SMqConsumerEp *pCEp = taosArrayPop(pSub->unassignedVg); + pCEp->consumerId = consumerId; + taosArrayPush(pSub->assigned, pCEp); + pSub->nextConsumerIdx++; + + // build msg + SMqSetCVgReq req = { + .vgId = pCEp->vgId, + .consumerId = consumerId, + }; + strcpy(req.cGroup, cgroup); + strcpy(req.topicName, topic); + strcpy(req.sql, pTopic->sql); + strcpy(req.logicalPlan, pTopic->logicalPlan); + strcpy(req.physicalPlan, pTopic->physicalPlan); + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + void *reqStr = malloc(tlen); + if (reqStr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + void *abuf = reqStr; + tEncodeSMqSetCVgReq(abuf, &req); + + // persist msg + STransAction action = {0}; + action.epSet = pCEp->epset; + action.pCont = reqStr; + action.contLen = tlen; + action.msgType = TDMT_VND_MQ_SET_CONN; + mndTransAppendRedoAction(pTrans, &action); + + // persist raw + SSdbRaw *pRaw = mndSubActionEncode(pSub); + mndTransAppendRedolog(pTrans, pRaw); + + tfree(topic); + tfree(cgroup); + } + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + } + mndReleaseTopic(pMnode, pTopic); + mndTransDrop(pTrans); + } + pIter = sdbFetch(pSdb, SDB_SUBSCRIBE, NULL, (void **)&pSub); + } + return 0; +} + +static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unassignedVg) { + SMqConsumerEp CEp; + CEp.lastConsumerHbTs = CEp.lastVgHbTs = -1; + int32_t sz; + SVgObj *pVgroup = NULL; + SSdb *pSdb = pMnode->pSdb; + void *pIter = sdbFetch(pSdb, SDB_VGROUP, NULL, (void **)&pVgroup); + while (pIter != NULL) { + if (pVgroup->dbUid == pTopic->dbUid) { + CEp.epset = mndGetVgroupEpset(pMnode, pVgroup); + CEp.vgId = pVgroup->vgId; + taosArrayPush(unassignedVg, &CEp); + } + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + } + return 0; +} + +static int mndBuildMqSetConsumerVgReq(SMnode *pMnode, STrans *pTrans, SMqConsumerObj *pConsumer, + SMqConsumerTopic *pConsumerTopic, SMqTopicObj *pTopic) { + int32_t sz = taosArrayGetSize(pConsumerTopic->pVgInfo); + for (int32_t i = 0; i < sz; i++) { + int32_t vgId = *(int32_t *)taosArrayGet(pConsumerTopic->pVgInfo, i); + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + SMqSetCVgReq req = { + .vgId = vgId, + .consumerId = pConsumer->consumerId, + }; + strcpy(req.cGroup, pConsumer->cgroup); + strcpy(req.topicName, pTopic->name); + strcpy(req.sql, pTopic->sql); + strcpy(req.logicalPlan, pTopic->logicalPlan); + strcpy(req.physicalPlan, pTopic->physicalPlan); + int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req); + void *reqStr = malloc(tlen); + if (reqStr == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + void *abuf = reqStr; + tEncodeSMqSetCVgReq(&abuf, &req); + + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = reqStr; + action.contLen = tlen; + action.msgType = TDMT_VND_MQ_SET_CONN; + + mndReleaseVgroup(pMnode, pVgObj); + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(reqStr); + return -1; + } + } + return 0; +} + +void mndCleanupSubscribe(SMnode *pMnode) {} + +static SSdbRaw *mndSubActionEncode(SMqSubscribeObj *pSub) { + int32_t tlen = tEncodeSubscribeObj(NULL, pSub); + int32_t size = tlen + MND_SUBSCRIBE_RESERVE_SIZE; + + SSdbRaw *pRaw = sdbAllocRaw(SDB_SUBSCRIBE, MND_SUBSCRIBE_VER_NUMBER, size); + if (pRaw == NULL) goto SUB_ENCODE_OVER; + + void *buf = malloc(tlen); + if (buf == NULL) { + goto SUB_ENCODE_OVER; + } + void *abuf = buf; + + tEncodeSubscribeObj(&buf, pSub); + + int32_t dataPos = 0; + SDB_SET_INT32(pRaw, dataPos, tlen, SUB_ENCODE_OVER); + SDB_SET_BINARY(pRaw, dataPos, buf, tlen, SUB_ENCODE_OVER); + SDB_SET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_ENCODE_OVER); + SDB_SET_DATALEN(pRaw, dataPos, SUB_ENCODE_OVER); + +SUB_ENCODE_OVER: + if (terrno != 0) { + mError("subscribe:%s, failed to encode to raw:%p since %s", pSub->key, pRaw, terrstr()); + sdbFreeRaw(pRaw); + return NULL; + } + + mTrace("subscribe:%s, encode to raw:%p, row:%p", pSub->key, pRaw, pSub); + return pRaw; +} + +static SSdbRow *mndSubActionDecode(SSdbRaw *pRaw) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + + int8_t sver = 0; + if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto SUB_DECODE_OVER; + + if (sver != MND_SUBSCRIBE_VER_NUMBER) { + terrno = TSDB_CODE_SDB_INVALID_DATA_VER; + goto SUB_DECODE_OVER; + } + + int32_t size = sizeof(SMqSubscribeObj); + SSdbRow *pRow = sdbAllocRow(size); + if (pRow == NULL) goto SUB_DECODE_OVER; + + SMqSubscribeObj *pSub = sdbGetRowObj(pRow); + if (pSub == NULL) goto SUB_DECODE_OVER; + + int32_t dataPos = 0; + int32_t tlen; + void *buf = malloc(tlen + 1); + if (buf == NULL) goto SUB_DECODE_OVER; + SDB_GET_INT32(pRaw, dataPos, &tlen, SUB_DECODE_OVER); + SDB_GET_BINARY(pRaw, dataPos, buf, tlen, SUB_DECODE_OVER); + SDB_GET_RESERVE(pRaw, dataPos, MND_SUBSCRIBE_RESERVE_SIZE, SUB_DECODE_OVER); + + if (tDecodeSubscribeObj(buf, pSub) == NULL) { + goto SUB_DECODE_OVER; + } + +SUB_DECODE_OVER: + if (terrno != 0) { + mError("subscribe:%s, failed to decode from raw:%p since %s", pSub->key, pRaw, terrstr()); + // TODO free subscribeobj + tfree(pRow); + return NULL; + } + + return pRow; +} + +static int32_t mndSubActionInsert(SSdb *pSdb, SMqSubscribeObj *pSub) { + mTrace("subscribe:%s, perform insert action", pSub->key); + return 0; +} + +static int32_t mndSubActionDelete(SSdb *pSdb, SMqSubscribeObj *pSub) { + mTrace("subscribe:%s, perform delete action", pSub->key); + return 0; +} + +static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubscribeObj *pNewSub) { + mTrace("subscribe:%s, perform update action", pOldSub->key); + return 0; +} + +static void *mndBuildMqVGroupSetReq(SMnode *pMnode, char *topicName, int32_t vgId, int64_t consumerId, char *cgroup) { + return 0; +} + +static char *mndMakeSubscribeKey(char *cgroup, char *topicName) { + char *key = malloc(TSDB_SHOW_SUBQUERY_LEN); + if (key == NULL) { + return NULL; + } + int tlen = strlen(cgroup); + memcpy(key, cgroup, tlen); + key[tlen] = ':'; + strcpy(key + tlen + 1, topicName); + return key; +} + +SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicName) { + SSdb *pSdb = pMnode->pSdb; + char *key = mndMakeSubscribeKey(cgroup, topicName); + SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key); + free(key); + if (pSub == NULL) { + /*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/ + } + return pSub; +} + +void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pSub); +} + +static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + char *msgStr = pMsg->rpcMsg.pCont; + SCMSubscribeReq subscribe; + tDeserializeSCMSubscribeReq(msgStr, &subscribe); + int64_t consumerId = subscribe.consumerId; + char *consumerGroup = subscribe.consumerGroup; + int32_t cgroupLen = strlen(consumerGroup); + + SArray *newSub = subscribe.topicNames; + int newTopicNum = subscribe.topicNum; + + taosArraySortString(newSub, taosArrayCompareString); + + SArray *oldSub = NULL; + int oldTopicNum = 0; + // create consumer if not exist + SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, consumerId); + if (pConsumer == NULL) { + // create consumer + pConsumer = malloc(sizeof(SMqConsumerObj)); + if (pConsumer == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pConsumer->consumerId = consumerId; + strcpy(pConsumer->cgroup, consumerGroup); + taosInitRWLatch(&pConsumer->lock); + } else { + oldSub = pConsumer->topics; + } + pConsumer->topics = taosArrayInit(newTopicNum, sizeof(SMqConsumerTopic)); + + if (oldSub != NULL) { + oldTopicNum = taosArrayGetSize(oldSub); + } + + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg); + if (pTrans == NULL) { + // TODO: free memory + return -1; + } + + int i = 0, j = 0; + while (i < newTopicNum || j < oldTopicNum) { + char *newTopicName = NULL; + char *oldTopicName = NULL; + if (i >= newTopicNum) { + // encode unset topic msg to all vnodes related to that topic + oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; + j++; + } else if (j >= oldTopicNum) { + newTopicName = taosArrayGet(newSub, i); + i++; + } else { + newTopicName = taosArrayGet(newSub, i); + oldTopicName = ((SMqConsumerTopic *)taosArrayGet(oldSub, j))->name; + + int comp = compareLenPrefixedStr(newTopicName, oldTopicName); + if (comp == 0) { + // do nothing + oldTopicName = newTopicName = NULL; + i++; + j++; + continue; + } else if (comp < 0) { + oldTopicName = NULL; + i++; + } else { + newTopicName = NULL; + j++; + } + } + + if (oldTopicName != NULL) { +#if 0 + // cancel subscribe of that old topic + ASSERT(pNewTopic == NULL); + char *oldTopicName = pOldTopic->name; + SList *vgroups = pOldTopic->vgroups; + SListIter iter; + tdListInitIter(vgroups, &iter, TD_LIST_FORWARD); + SListNode *pn; + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, oldTopicName); + ASSERT(pTopic != NULL); + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, oldTopicName); + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); + while ((pn = tdListNext(&iter)) != NULL) { + int32_t vgId = *(int64_t *)pn->data; + // acquire and get epset + SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId); + // TODO what time to release? + if (pVgObj == NULL) { + // TODO handle error + continue; + } + // build reset msg + void *pMqVgSetReq = mndBuildMqVGroupSetReq(pMnode, oldTopicName, vgId, consumerId, consumerGroup); + // TODO:serialize + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + STransAction action = {0}; + action.epSet = mndGetVgroupEpset(pMnode, pVgObj); + action.pCont = pMqVgSetReq; + action.contLen = 0; // TODO + action.msgType = TDMT_VND_MQ_SET_CONN; + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + free(pMqVgSetReq); + mndTransDrop(pTrans); + // TODO free + return -1; + } + } + // delete data in mnode + taosHashRemove(pTopic->cgroups, consumerGroup, cgroupLen); + mndReleaseSubscribe(pMnode, pSub); + mndReleaseTopic(pMnode, pTopic); +#endif + } else if (newTopicName != NULL) { + // save subscribe info to mnode + ASSERT(oldTopicName == NULL); + + SMqTopicObj *pTopic = mndAcquireTopic(pMnode, newTopicName); + if (pTopic == NULL) { + /*terrno = */ + continue; + } + + SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, consumerGroup, newTopicName); + if (pSub == NULL) { + pSub = tNewSubscribeObj(); + if (pSub == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + // set unassigned vg + mndInitUnassignedVg(pMnode, pTopic, pSub->unassignedVg); + } + taosArrayPush(pSub->availConsumer, &consumerId); + + //TODO: no need + SMqConsumerTopic *pConsumerTopic = tNewConsumerTopic(consumerId, pTopic, pSub); + taosArrayPush(pConsumer->topics, pConsumerTopic); + + if (taosArrayGetSize(pConsumerTopic->pVgInfo) > 0) { + int32_t vgId = *(int32_t *)taosArrayGetLast(pConsumerTopic->pVgInfo); + // send setmsg to vnode + if (mndBuildMqSetConsumerVgReq(pMnode, pTrans, pConsumer, pConsumerTopic, pTopic) < 0) { + // TODO + return -1; + } + } + taosArrayDestroy(pConsumerTopic->pVgInfo); + free(pConsumerTopic); + SSdbRaw *pRaw = mndSubActionEncode(pSub); + /*sdbSetRawStatus(pRaw, SDB_STATUS_READY);*/ + mndTransAppendRedolog(pTrans, pRaw); +#if 0 + SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen); + if (pGroup == NULL) { + // add new group + pGroup = malloc(sizeof(SMqCGroup)); + if (pGroup == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pGroup->consumerIds = tdListNew(sizeof(int64_t)); + if (pGroup->consumerIds == NULL) { + free(pGroup); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pGroup->status = 0; + // add into cgroups + taosHashPut(pTopic->cgroups, consumerGroup, cgroupLen, pGroup, sizeof(SMqCGroup)); + } + /*taosHashPut(pTopic->consumers, &pConsumer->consumerId, sizeof(int64_t), pConsumer, sizeof(SMqConsumerObj));*/ + + // put the consumer into list + // rebalance will be triggered by timer + tdListAppend(pGroup->consumerIds, &consumerId); + + SSdbRaw *pTopicRaw = mndTopicActionEncode(pTopic); + sdbSetRawStatus(pTopicRaw, SDB_STATUS_READY); + // TODO: error handling + mndTransAppendRedolog(pTrans, pTopicRaw); + +#endif + mndReleaseTopic(pMnode, pTopic); + mndReleaseSubscribe(pMnode, pSub); + } + } + // part3. persist consumerObj + + // destroy old sub + if (oldSub) taosArrayDestroy(oldSub); + // put new sub into consumerobj + + // persist consumerObj + SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer); + sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY); + // TODO: error handling + mndTransAppendRedolog(pTrans, pConsumerRaw); + + if (mndTransPrepare(pMnode, pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); + return -1; + } + + // TODO: free memory + if (newSub) taosArrayDestroy(newSub); + mndTransDrop(pTrans); + mndReleaseConsumer(pMnode, pConsumer); + return 0; +} + +static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessConsumerMetaMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + STableInfoReq *pInfo = pMsg->rpcMsg.pCont; + + mDebug("subscribe:%s, start to retrieve meta", pInfo->tableFname); + +#if 0 + SDbObj *pDb = mndAcquireDbByConsumer(pMnode, pInfo->tableFname); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + mError("consumer:%s, failed to retrieve meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + SConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pInfo->tableFname); + if (pConsumer == NULL) { + mndReleaseDb(pMnode, pDb); + terrno = TSDB_CODE_MND_INVALID_CONSUMER; + mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + taosRLockLatch(&pConsumer->lock); + int32_t totalCols = pConsumer->numOfColumns + pConsumer->numOfTags; + int32_t contLen = sizeof(STableMetaRsp) + totalCols * sizeof(SSchema); + + STableMetaRsp *pMeta = rpcMallocCont(contLen); + if (pMeta == NULL) { + taosRUnLockLatch(&pConsumer->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseConsumer(pMnode, pConsumer); + terrno = TSDB_CODE_OUT_OF_MEMORY; + mError("consumer:%s, failed to get meta since %s", pInfo->tableFname, terrstr()); + return -1; + } + + memcpy(pMeta->consumerFname, pConsumer->name, TSDB_TABLE_FNAME_LEN); + pMeta->numOfTags = htonl(pConsumer->numOfTags); + pMeta->numOfColumns = htonl(pConsumer->numOfColumns); + pMeta->precision = pDb->cfg.precision; + pMeta->tableType = TSDB_SUPER_TABLE; + pMeta->update = pDb->cfg.update; + pMeta->sversion = htonl(pConsumer->version); + pMeta->tuid = htonl(pConsumer->uid); + + for (int32_t i = 0; i < totalCols; ++i) { + SSchema *pSchema = &pMeta->pSchema[i]; + SSchema *pSrcSchema = &pConsumer->pSchema[i]; + memcpy(pSchema->name, pSrcSchema->name, TSDB_COL_NAME_LEN); + pSchema->type = pSrcSchema->type; + pSchema->colId = htonl(pSrcSchema->colId); + pSchema->bytes = htonl(pSrcSchema->bytes); + } + taosRUnLockLatch(&pConsumer->lock); + mndReleaseDb(pMnode, pDb); + mndReleaseConsumer(pMnode, pConsumer); + + pMsg->pCont = pMeta; + pMsg->contLen = contLen; + + mDebug("consumer:%s, meta is retrieved, cols:%d tags:%d", pInfo->tableFname, pConsumer->numOfColumns, pConsumer->numOfTags); +#endif + return 0; +} + +static int32_t mndGetNumOfConsumers(SMnode *pMnode, char *dbName, int32_t *pNumOfConsumers) { + SSdb *pSdb = pMnode->pSdb; + + SDbObj *pDb = mndAcquireDb(pMnode, dbName); + if (pDb == NULL) { + terrno = TSDB_CODE_MND_DB_NOT_SELECTED; + return -1; + } + + int32_t numOfConsumers = 0; + void *pIter = NULL; + while (1) { + SMqConsumerObj *pConsumer = NULL; + pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer); + if (pIter == NULL) break; + + numOfConsumers++; + + sdbRelease(pSdb, pConsumer); + } + + *pNumOfConsumers = numOfConsumers; + return 0; +} + +static int32_t mndGetConsumerMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaRsp *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + if (mndGetNumOfConsumers(pMnode, pShow->db, &pShow->numOfRows) != 0) { + return -1; + } + + int32_t cols = 0; + SSchema *pSchema = pMeta->pSchema; + + pShow->bytes[cols] = TSDB_TABLE_NAME_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "name"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create_time"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "columns"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 4; + pSchema[cols].type = TSDB_DATA_TYPE_INT; + strcpy(pSchema[cols].name, "tags"); + pSchema[cols].bytes = htonl(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htonl(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_CONSUMER); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tbFname, mndShowStr(pShow->type)); + + return 0; +} + +static void mndCancelGetNextConsumer(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index 16a9828e71..1d4cbf37ce 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -79,8 +79,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { SDB_SET_BINARY(pRaw, dataPos, pTopic->logicalPlan, logicalPlanLen, TOPIC_ENCODE_OVER); int32_t physicalPlanLen = strlen(pTopic->physicalPlan) + 1; - pTopic->physicalPlan = calloc(physicalPlanLen, sizeof(char)); - if (pTopic->physicalPlan == NULL) goto TOPIC_ENCODE_OVER; SDB_SET_INT32(pRaw, dataPos, strlen(pTopic->physicalPlan)+1, TOPIC_ENCODE_OVER); SDB_SET_BINARY(pRaw, dataPos, pTopic->physicalPlan, physicalPlanLen, TOPIC_ENCODE_OVER); @@ -92,12 +90,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) { TOPIC_ENCODE_OVER: if (terrno != TSDB_CODE_SUCCESS) { mError("topic:%s, failed to encode to raw:%p since %s", pTopic->name, pRaw, terrstr()); - /*if (pTopic->logicalPlan) {*/ - /*free(pTopic->logicalPlan);*/ - /*}*/ - /*if (pTopic->physicalPlan) {*/ - /*free(pTopic->physicalPlan);*/ - /*}*/ sdbFreeRaw(pRaw); return NULL; } @@ -138,7 +130,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - pTopic->logicalPlan = calloc(len+1, sizeof(char)); + pTopic->logicalPlan = calloc(len + 1, sizeof(char)); if (pTopic->logicalPlan == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; goto TOPIC_DECODE_OVER; @@ -146,7 +138,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { SDB_GET_BINARY(pRaw, dataPos, pTopic->logicalPlan, len+1, TOPIC_DECODE_OVER); SDB_GET_INT32(pRaw, dataPos, &len, TOPIC_DECODE_OVER); - pTopic->logicalPlan = calloc(len + 1, sizeof(char)); + pTopic->physicalPlan = calloc(len + 1, sizeof(char)); if (pTopic->physicalPlan == NULL) { free(pTopic->logicalPlan); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -154,7 +146,7 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) { } SDB_GET_BINARY(pRaw, dataPos, pTopic->physicalPlan, len+1, TOPIC_DECODE_OVER); - SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER) + SDB_GET_RESERVE(pRaw, dataPos, MND_TOPIC_RESERVE_SIZE, TOPIC_DECODE_OVER); terrno = TSDB_CODE_SUCCESS; diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index cab30702ea..e57ee3eabc 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -18,6 +18,7 @@ #include "mndAuth.h" #include "mndBnode.h" #include "mndCluster.h" +#include "mndConsumer.h" #include "mndDb.h" #include "mndDnode.h" #include "mndFunc.h" @@ -27,6 +28,7 @@ #include "mndShow.h" #include "mndSnode.h" #include "mndStb.h" +#include "mndSubscribe.h" #include "mndSync.h" #include "mndTelem.h" #include "mndTopic.h" @@ -69,15 +71,15 @@ static void mndTransReExecute(void *param, void *tmrId) { taosTmrReset(mndTransReExecute, 3000, pMnode, pMnode->timer, &pMnode->transTimer); } -static void mndCalMqRebalance(void* param, void* tmrId) { - SMnode* pMnode = param; +static void mndCalMqRebalance(void *param, void *tmrId) { + SMnode *pMnode = param; if (mndIsMaster(pMnode)) { - // iterate cgroup, cal rebalance - // sync with raft - // write sdb + SMqTmrMsg *pMsg = rpcMallocCont(sizeof(SMqTmrMsg)); + SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pMsg, .contLen = sizeof(SMqTmrMsg)}; + pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg); } - taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->transTimer); + taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer); } static int32_t mndInitTimer(SMnode *pMnode) { @@ -95,6 +97,11 @@ static int32_t mndInitTimer(SMnode *pMnode) { return -1; } + if (taosTmrReset(mndCalMqRebalance, 3000, pMnode, pMnode->timer, &pMnode->mqTimer)) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; } @@ -102,6 +109,8 @@ static void mndCleanupTimer(SMnode *pMnode) { if (pMnode->timer != NULL) { taosTmrStop(pMnode->transTimer); pMnode->transTimer = NULL; + taosTmrStop(pMnode->mqTimer); + pMnode->mqTimer = NULL; taosTmrCleanUp(pMnode->timer); pMnode->timer = NULL; } @@ -171,6 +180,8 @@ static int32_t mndInitSteps(SMnode *pMnode) { if (mndAllocStep(pMnode, "mnode-auth", mndInitAuth, mndCleanupAuth) != 0) return -1; if (mndAllocStep(pMnode, "mnode-acct", mndInitAcct, mndCleanupAcct) != 0) return -1; if (mndAllocStep(pMnode, "mnode-topic", mndInitTopic, mndCleanupTopic) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-consumer", mndInitConsumer, mndCleanupConsumer) != 0) return -1; + if (mndAllocStep(pMnode, "mnode-subscribe", mndInitSubscribe, mndCleanupSubscribe) != 0) return -1; if (mndAllocStep(pMnode, "mnode-vgroup", mndInitVgroup, mndCleanupVgroup) != 0) return -1; if (mndAllocStep(pMnode, "mnode-stb", mndInitStb, mndCleanupStb) != 0) return -1; if (mndAllocStep(pMnode, "mnode-db", mndInitDb, mndCleanupDb) != 0) return -1; @@ -377,7 +388,7 @@ SMnodeMsg *mndInitMsg(SMnode *pMnode, SRpcMsg *pRpcMsg) { return NULL; } - if (pRpcMsg->msgType != TDMT_MND_TRANS) { + if (pRpcMsg->msgType != TDMT_MND_TRANS && pRpcMsg->msgType != TDMT_MND_MQ_TIMER) { SRpcConnInfo connInfo = {0}; if ((pRpcMsg->msgType & 1U) && rpcGetConnInfo(pRpcMsg->handle, &connInfo) != 0) { taosFreeQitem(pMsg); diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 8fdb6b1657..a9267b0ea3 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -38,10 +38,10 @@ const char *sdbTableName(ESdbType type) { return "auth"; case SDB_ACCT: return "acct"; + case SDB_SUBSCRIBE: + return "subscribe"; case SDB_CONSUMER: return "consumer"; - case SDB_CGROUP: - return "cgroup"; case SDB_TOPIC: return "topic"; case SDB_VGROUP: diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index bb863d6ed0..ea69f4c870 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -108,6 +108,15 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: handle error } break; + case TDMT_VND_MQ_SET_CONN: { + char* reqStr = ptr; + SMqSetCVgReq req; + /*tDecodeSMqSetCVgReq(reqStr, &req);*/ + // create topic if not exist + // convert to task + // write mq meta + } + break; default: ASSERT(0); break; diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index 4e297f4e17..1b4aee3ccf 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -9,9 +9,9 @@ target_include_directories( target_link_libraries( scheduler - PRIVATE os util planner qcom common catalog transport + PUBLIC os util planner qcom common catalog transport ) if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) -endif(${BUILD_TEST}) \ No newline at end of file +endif(${BUILD_TEST}) diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index 792200639a..4b14f9f2c7 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -68,6 +68,25 @@ typedef void* queue[2]; QUEUE_PREV_NEXT(e) = QUEUE_NEXT(e); \ QUEUE_NEXT_PREV(e) = QUEUE_PREV(e); \ } +#define QUEUE_SPLIT(h, q, n) \ + do { \ + QUEUE_PREV(n) = QUEUE_PREV(h); \ + QUEUE_PREV_NEXT(n) = (n); \ + QUEUE_NEXT(n) = (q); \ + QUEUE_PREV(h) = QUEUE_PREV(q); \ + QUEUE_PREV_NEXT(h) = (h); \ + QUEUE_PREV(q) = (n); \ + } while (0) + +#define QUEUE_MOVE(h, n) \ + do { \ + if (QUEUE_IS_EMPTY(h)) { \ + QUEUE_INIT(n); \ + } else { \ + queue* q = QUEUE_HEAD(h); \ + QUEUE_SPLIT(h, q, n); \ + } \ + } while (0) /* Return the element at the front of the queue. */ #define QUEUE_HEAD(q) (QUEUE_NEXT(q)) diff --git a/source/libs/transport/src/trans.c b/source/libs/transport/src/trans.c index 89361b13ad..cb8ef87b48 100644 --- a/source/libs/transport/src/trans.c +++ b/source/libs/transport/src/trans.c @@ -35,6 +35,7 @@ void* rpcOpen(const SRpcInit* pInit) { if (pInit->label) { tstrncpy(pRpc->label, pInit->label, strlen(pInit->label)); } + pRpc->cfp = pInit->cfp; pRpc->numOfThreads = pInit->numOfThreads > TSDB_MAX_RPC_THREADS ? TSDB_MAX_RPC_THREADS : pInit->numOfThreads; pRpc->connType = pInit->connType; pRpc->tcphandle = (*taosHandle[pRpc->connType])(0, pInit->localPort, pRpc->label, pRpc->numOfThreads, NULL, pRpc); diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 29f3361b10..f197e72ec5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -20,12 +20,16 @@ typedef struct SCliConn { uv_connect_t connReq; uv_stream_t* stream; + uv_write_t* writeReq; void* data; queue conn; + char spi; + char secured; } SCliConn; typedef struct SCliMsg { SRpcReqContext* context; queue q; + uint64_t st; } SCliMsg; typedef struct SCliThrdObj { @@ -45,86 +49,169 @@ typedef struct SClientObj { SCliThrdObj** pThreadObj; } SClientObj; -static void clientWriteCb(uv_write_t* req, int status); +// conn pool +static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port); +static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn); + +static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf); static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf); -static void clientConnCb(struct uv_connect_s* req, int status); +static void clientWriteCb(uv_write_t* req, int status); +static void clientConnCb(uv_connect_t* req, int status); static void clientAsyncCb(uv_async_t* handle); +static void clientDestroy(uv_handle_t* handle); +static void clientConnDestroy(SCliConn* pConn); static void* clientThread(void* arg); +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd); + +static void clientAllocrReadBufferCb(uv_handle_t* handle, size_t suggested_size, uv_buf_t* buf) { + // impl later +} +static void clientReadCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { + // impl later + SCliConn* conn = handle->data; + if (nread > 0) { + return; + } + // + uv_close((uv_handle_t*)handle, clientDestroy); +} + +static void clientConnDestroy(SCliConn* conn) { + // impl later + // +} +static void clientDestroy(uv_handle_t* handle) { + SCliConn* conn = handle->data; + clientConnDestroy(conn); +} + static void clientWriteCb(uv_write_t* req, int status) { - // impl later -} -static void clientFailedCb(uv_handle_t* handle) { - // impl later - tDebug("close handle"); -} -static void clientReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { - // impl later -} -static void clientConnCb(struct uv_connect_s* req, int status) { SCliConn* pConn = req->data; + if (status == 0) { + tDebug("data already was written on stream"); + } else { + uv_close((uv_handle_t*)pConn->stream, clientDestroy); + return; + } + + uv_read_start((uv_stream_t*)pConn->stream, clientAllocrReadBufferCb, clientReadCb); + // impl later +} + +static void clientWrite(SCliConn* pConn) { SCliMsg* pMsg = pConn->data; - SEpSet* pEpSet = &pMsg->context->epSet; + SRpcHead* pHead = rpcHeadFromCont(pMsg->context->pCont); + int msgLen = rpcMsgLenFromCont(pMsg->context->contLen); + char* msg = (char*)(pHead); + + uv_buf_t wb = uv_buf_init(msg, msgLen); + uv_write(pConn->writeReq, (uv_stream_t*)pConn->stream, &wb, 1, clientWriteCb); +} +static void clientConnCb(uv_connect_t* req, int status) { + // impl later + SCliConn* pConn = req->data; + if (status != 0) { + tError("failed to connect %s", uv_err_name(status)); + clientConnDestroy(pConn); + return; + } + + SCliMsg* pMsg = pConn->data; + SEpSet* pEpSet = &pMsg->context->epSet; + SRpcMsg rpcMsg; + // rpcMsg.ahandle = pMsg->context->ahandle; + // rpcMsg.pCont = NULL; char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; if (status != 0) { // call user fp later tError("failed to connect server(%s, %d), errmsg: %s", fqdn, port, uv_strerror(status)); - uv_close((uv_handle_t*)req->handle, clientFailedCb); + SRpcInfo* pRpc = pMsg->context->pRpc; + (pRpc->cfp)(NULL, &rpcMsg, pEpSet); + uv_close((uv_handle_t*)req->handle, clientDestroy); return; } assert(pConn->stream == req->handle); - - // impl later } static SCliConn* getConnFromCache(void* cache, char* ip, uint32_t port) { // impl later + return NULL; } -static void clientAsyncCb(uv_async_t* handle) { - SCliThrdObj* pThrd = handle->data; - SCliMsg* pMsg = NULL; - pthread_mutex_lock(&pThrd->msgMtx); - if (!QUEUE_IS_EMPTY(&pThrd->msg)) { - queue* head = QUEUE_HEAD(&pThrd->msg); - pMsg = QUEUE_DATA(head, SCliMsg, q); - QUEUE_REMOVE(head); - } - pthread_mutex_unlock(&pThrd->msgMtx); +static void addConnToCache(void* cache, char* ip, uint32_t port, SCliConn* conn) { + // impl later +} + +static void clientHandleReq(SCliMsg* pMsg, SCliThrdObj* pThrd) { + SEpSet* pEpSet = &pMsg->context->epSet; - SEpSet* pEpSet = &pMsg->context->epSet; char* fqdn = pEpSet->fqdn[pEpSet->inUse]; uint32_t port = pEpSet->port[pEpSet->inUse]; + uint64_t el = taosGetTimestampUs() - pMsg->st; + tDebug("msg tran time cost: %" PRIu64 "", el); + SCliConn* conn = getConnFromCache(pThrd->cache, fqdn, port); if (conn != NULL) { // impl later + conn->data = pMsg; + conn->writeReq->data = conn; + clientWrite(conn); + // uv_buf_t wb; + // uv_write(conn->writeReq, (uv_stream_t*)conn->stream, &wb, 1, clientWriteCb); } else { SCliConn* conn = malloc(sizeof(SCliConn)); conn->stream = (uv_stream_t*)malloc(sizeof(uv_tcp_t)); uv_tcp_init(pThrd->loop, (uv_tcp_t*)(conn->stream)); + conn->writeReq = malloc(sizeof(uv_write_t)); conn->connReq.data = conn; conn->data = pMsg; - struct sockaddr_in addr; uv_ip4_addr(fqdn, port, &addr); - // handle error in callback if connect error + // handle error in callback if fail to connect uv_tcp_connect(&conn->connReq, (uv_tcp_t*)(conn->stream), (const struct sockaddr*)&addr, clientConnCb); + + // SRpcMsg rpcMsg; + // SEpSet* pEpSet = &pMsg->context->epSet; + // SRpcInfo* pRpc = pMsg->context->pRpc; + //// rpcMsg.ahandle = pMsg->context->ahandle; + // rpcMsg.pCont = NULL; + // rpcMsg.ahandle = pMsg->context->ahandle; + // uint64_t el1 = taosGetTimestampUs() - et; + // tError("msg tran back first: time cost: %" PRIu64 "", el1); + // et = taosGetTimestampUs(); + //(pRpc->cfp)(NULL, &rpcMsg, pEpSet); + // uint64_t el2 = taosGetTimestampUs() - et; + // tError("msg tran back second: time cost: %" PRIu64 "", el2); } +} +static void clientAsyncCb(uv_async_t* handle) { + SCliThrdObj* pThrd = handle->data; + SCliMsg* pMsg = NULL; + queue wq; - // SRpcReqContext* pCxt = pMsg->context; + // batch process to avoid to lock/unlock frequently + pthread_mutex_lock(&pThrd->msgMtx); + QUEUE_MOVE(&pThrd->msg, &wq); + pthread_mutex_unlock(&pThrd->msgMtx); - // SRpcHead* pHead = rpcHeadFromCont(pCtx->pCont); - // char* msg = (char*)pHead; - // int len = rpcMsgLenFromCont(pCtx->contLen); - // tmsg_t msgType = pCtx->msgType; - - // impl later + int count = 0; + while (!QUEUE_IS_EMPTY(&wq)) { + queue* h = QUEUE_HEAD(&wq); + QUEUE_REMOVE(h); + pMsg = QUEUE_DATA(h, SCliMsg, q); + clientHandleReq(pMsg, pThrd); + count++; + if (count >= 2) { + tError("send batch size: %d", count); + } + } } static void* clientThread(void* arg) { @@ -142,9 +229,6 @@ void* taosInitClient(uint32_t ip, uint32_t port, char* label, int numOfThreads, SCliThrdObj* pThrd = (SCliThrdObj*)calloc(1, sizeof(SCliThrdObj)); QUEUE_INIT(&pThrd->msg); pthread_mutex_init(&pThrd->msgMtx, NULL); - - // QUEUE_INIT(&pThrd->clientCache); - pThrd->loop = (uv_loop_t*)malloc(sizeof(uv_loop_t)); uv_loop_init(pThrd->loop); @@ -186,6 +270,7 @@ void rpcSendRequest(void* shandle, const SEpSet* pEpSet, SRpcMsg* pMsg, int64_t* } SCliMsg* msg = malloc(sizeof(SCliMsg)); msg->context = pContext; + msg->st = taosGetTimestampUs(); SCliThrdObj* thrd = ((SClientObj*)pRpc->tcphandle)->pThreadObj[index % pRpc->numOfThreads]; diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index 0bf39b9985..bc4cc695b0 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -277,10 +277,6 @@ void uvOnReadCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { } return; } - if (terrno != 0) { - // handle err code - } - if (nread != UV_EOF) { tDebug("Read error %s\n", uv_err_name(nread)); } @@ -309,21 +305,23 @@ void uvOnWriteCb(uv_write_t* req, int status) { void uvWorkerAsyncCb(uv_async_t* handle) { SWorkThrdObj* pThrd = container_of(handle, SWorkThrdObj, workerAsync); SConn* conn = NULL; - - // opt later + queue wq; + // batch process to avoid to lock/unlock frequently pthread_mutex_lock(&pThrd->connMtx); - if (!QUEUE_IS_EMPTY(&pThrd->conn)) { - queue* head = QUEUE_HEAD(&pThrd->conn); - conn = QUEUE_DATA(head, SConn, queue); - QUEUE_REMOVE(head); - } + QUEUE_MOVE(&pThrd->conn, &wq); pthread_mutex_unlock(&pThrd->connMtx); - if (conn == NULL) { - tError("except occurred, do nothing"); - return; + + while (!QUEUE_IS_EMPTY(&wq)) { + queue* head = QUEUE_HEAD(&wq); + QUEUE_REMOVE(head); + SConn* conn = QUEUE_DATA(head, SConn, queue); + if (conn == NULL) { + tError("except occurred, do nothing"); + return; + } + uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); + uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } - uv_buf_t wb = uv_buf_init(conn->writeBuf.buf, conn->writeBuf.len); - uv_write(conn->pWriter, (uv_stream_t*)conn->pTcp, &wb, 1, uvOnWriteCb); } void uvOnAcceptCb(uv_stream_t* stream, int status) { diff --git a/source/libs/transport/test/rclient.c b/source/libs/transport/test/rclient.c index 58fbf6ae85..6339e58560 100644 --- a/source/libs/transport/test/rclient.c +++ b/source/libs/transport/test/rclient.c @@ -34,8 +34,8 @@ typedef struct { static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { SInfo *pInfo = (SInfo *)pMsg->ahandle; - tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, - pMsg->code); + // tDebug("thread:%d, response is received, type:%d contLen:%d code:0x%x", pInfo->index, pMsg->msgType, pMsg->contLen, + // pMsg->code); if (pEpSet) pInfo->epSet = *pEpSet; @@ -57,7 +57,7 @@ static void *sendRequest(void *param) { rpcMsg.contLen = pInfo->msgSize; rpcMsg.ahandle = pInfo; rpcMsg.msgType = 1; - tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); + // tDebug("thread:%d, send request, contLen:%d num:%d", pInfo->index, pInfo->msgSize, pInfo->num); rpcSendRequest(pInfo->pRpc, &pInfo->epSet, &rpcMsg, NULL); if (pInfo->num % 20000 == 0) tInfo("thread:%d, %d requests have been sent", pInfo->index, pInfo->num); tsem_wait(&pInfo->rspSem); diff --git a/tests/script/sh/massiveTable/deployCluster.sh b/tests/script/sh/massiveTable/deployCluster.sh index a86e9220cd..4febe40054 100755 --- a/tests/script/sh/massiveTable/deployCluster.sh +++ b/tests/script/sh/massiveTable/deployCluster.sh @@ -5,6 +5,49 @@ set -e #set -x +masterDnode=slave +dataRootDir="/data" +firstEp="trd02:7000" +startPort=7000 +dnodeNumber=1 +updateSrc=no + +while getopts "hm:f:n:r:p:u:" arg +do + case $arg in + m) + masterDnode=$( echo $OPTARG ) + ;; + n) + dnodeNumber=$(echo $OPTARG) + ;; + u) + updateSrc=$(echo $OPTARG) + ;; + f) + firstEp=$(echo $OPTARG) + ;; + p) + startPort=$(echo $OPTARG) + ;; + r) + dataRootDir=$(echo $OPTARG) + ;; + h) + echo "Usage: `basename $0` -m [if master dnode] " + echo " -n [ dnode number] " + echo " -f [ first ep] " + echo " -p [ start port] " + echo " -r [ dnode root dir] " + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + # deployCluster.sh curr_dir=$(readlink -f "$(dirname "$0")") echo $curr_dir @@ -12,13 +55,21 @@ echo $curr_dir ${curr_dir}/cleanCluster.sh -r "/data" ${curr_dir}/cleanCluster.sh -r "/data2" -${curr_dir}/compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" +if [[ "${updateSrc}" == "yes" ]]; then + ${curr_dir}/compileVersion.sh -r ${curr_dir}/../../../../ -v "3.0" +fi -${curr_dir}/setupDnodes.sh -r "/data" -n 1 -f "trd02:7000" -p 7000 -${curr_dir}/setupDnodes.sh -r "/data2" -n 1 -f "trd02:7000" -p 8000 +${curr_dir}/setupDnodes.sh -r "/data" -n ${dnodeNumber} -f ${firstEp} -p 7000 +${curr_dir}/setupDnodes.sh -r "/data2" -n ${dnodeNumber} -f ${firstEp} -p 8000 -#./setupDnodes.sh -r "/data" -n 2 -f trd02:7000 -p 7000 -#./setupDnodes.sh -r "/data2" -n 2 -f trd02:7000 -p 8000 +if [[ "${masterDnode}" == "master" ]]; then + # create all dnode into cluster + taos -s "create dnode trd02 port 8000;" + taos -s "create dnode trd03 port 7000;" + taos -s "create dnode trd03 port 8000;" + taos -s "create dnode trd04 port 7000;" + taos -s "create dnode trd04 port 8000;" +fi diff --git a/tests/test/c/create_table.c b/tests/test/c/create_table.c index d8fced3011..d387bf483b 100644 --- a/tests/test/c/create_table.c +++ b/tests/test/c/create_table.c @@ -28,9 +28,14 @@ int32_t numOfThreads = 1; int64_t numOfTables = 200000; int32_t createTable = 1; int32_t insertData = 0; -int32_t batchNum = 100; +int32_t batchNumOfTbl = 100; +int32_t batchNumOfRow = 1; int32_t numOfVgroups = 2; int32_t showTablesFlag = 0; +int32_t queryFlag = 0; + +int64_t startTimestamp = 1640966400000; // 2020-01-01 00:00:00.000 + typedef struct { int64_t tableBeginIndex; @@ -167,7 +172,7 @@ void showTables() { void *threadFunc(void *param) { SThreadInfo *pInfo = (SThreadInfo *)param; - char *qstr = malloc(2000 * 1000); + char *qstr = malloc(batchNumOfTbl * batchNumOfRow * 128); int32_t code = 0; TAOS *con = taos_connect(NULL, "root", "taosdata", NULL, 0); @@ -192,7 +197,7 @@ void *threadFunc(void *param) { // batch = MIN(batch, batchNum); int32_t len = sprintf(qstr, "create table"); - for (int32_t i = 0; i < batchNum;) { + for (int32_t i = 0; i < batchNumOfTbl;) { len += sprintf(qstr + len, " %s_t%" PRId64 " using %s tags(%" PRId64 ")", stbName, t, stbName, t); t++; i++; @@ -204,7 +209,7 @@ void *threadFunc(void *param) { int64_t startTs = taosGetTimestampUs(); TAOS_RES *pRes = taos_query(con, qstr); code = taos_errno(pRes); - if ((code != 0) && (code != 0x0002)) { + if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { pError("failed to create table t%" PRId64 ", reason:%s", t, tstrerror(code)); } taos_free_result(pRes); @@ -227,31 +232,49 @@ void *threadFunc(void *param) { if (insertData) { int64_t curMs = 0; int64_t beginMs = taosGetTimestampMs(); + pInfo->startMs = beginMs; + int64_t t = pInfo->tableBeginIndex; + for (; t <= pInfo->tableEndIndex;) { + // int64_t batch = (pInfo->tableEndIndex - t); + // batch = MIN(batch, batchNum); - pInfo->startMs = taosGetTimestampMs(); - for (int64_t t = pInfo->tableBeginIndex; t < pInfo->tableEndIndex; ++t) { - int64_t batch = (pInfo->tableEndIndex - t); - batch = MIN(batch, batchNum); - - int32_t len = sprintf(qstr, "insert into"); - for (int32_t i = 0; i < batch; ++i) { - len += sprintf(qstr + len, " t%" PRId64 " values(now, %" PRId64 ")", t + i, t + i); - } + int32_t len = sprintf(qstr, "insert into "); + + for (int32_t i = 0; i < batchNumOfTbl;) { + int64_t ts = startTimestamp; + len += sprintf(qstr + len, "%s_t%" PRId64 " values ", stbName, t); + for (int32_t j = 0; j < batchNumOfRow; j++) { + len += sprintf(qstr + len, "(%" PRId64 ", 6666) ", ts++); + } + + t++; + i++; + if (t > pInfo->tableEndIndex) { + break; + } + } + int64_t startTs = taosGetTimestampUs(); TAOS_RES *pRes = taos_query(con, qstr); code = taos_errno(pRes); - if (code != 0) { - pError("failed to insert table t%" PRId64 ", reason:%s", t, tstrerror(code)); + if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { + pError("failed to insert %s_t%" PRId64 ", reason:%s", stbName, t, tstrerror(code)); } taos_free_result(pRes); + int64_t endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + // printf("==== %"PRId64" - %"PRId64", %"PRId64"\n", startTs, endTs, delay); + if (delay > pInfo->maxDelay) pInfo->maxDelay = delay; + if (delay < pInfo->minDelay) pInfo->minDelay = delay; curMs = taosGetTimestampMs(); if (curMs - beginMs > 10000) { + beginMs = curMs; + // printf("==== tableBeginIndex: %"PRId64", t: %"PRId64"\n", pInfo->tableBeginIndex, t); printInsertProgress(pInfo, t); } - t += (batch - 1); } - printInsertProgress(pInfo, pInfo->tableEndIndex); + printInsertProgress(pInfo, t); } taos_close(con); @@ -280,9 +303,13 @@ void printHelp() { printf("%s%s\n", indent, "-i"); printf("%s%s%s%d\n", indent, indent, "insertData, default is ", insertData); printf("%s%s\n", indent, "-b"); - printf("%s%s%s%d\n", indent, indent, "batchNum, default is ", batchNum); + printf("%s%s%s%d\n", indent, indent, "batchNumOfTbl, default is ", batchNumOfTbl); printf("%s%s\n", indent, "-w"); printf("%s%s%s%d\n", indent, indent, "showTablesFlag, default is ", showTablesFlag); + printf("%s%s\n", indent, "-q"); + printf("%s%s%s%d\n", indent, indent, "queryFlag, default is ", queryFlag); + printf("%s%s\n", indent, "-l"); + printf("%s%s%s%d\n", indent, indent, "batchNumOfRow, default is ", batchNumOfRow); exit(EXIT_SUCCESS); } @@ -309,10 +336,15 @@ void parseArgument(int32_t argc, char *argv[]) { } else if (strcmp(argv[i], "-i") == 0) { insertData = atoi(argv[++i]); } else if (strcmp(argv[i], "-b") == 0) { - batchNum = atoi(argv[++i]); + batchNumOfTbl = atoi(argv[++i]); + } else if (strcmp(argv[i], "-l") == 0) { + batchNumOfRow = atoi(argv[++i]); } else if (strcmp(argv[i], "-w") == 0) { showTablesFlag = atoi(argv[++i]); + } else if (strcmp(argv[i], "-q") == 0) { + queryFlag = atoi(argv[++i]); } else { + pPrint("%s unknow para: %s %s", GREEN, argv[++i], NC); } } @@ -324,8 +356,10 @@ void parseArgument(int32_t argc, char *argv[]) { pPrint("%s numOfVgroups:%d %s", GREEN, numOfVgroups, NC); pPrint("%s createTable:%d %s", GREEN, createTable, NC); pPrint("%s insertData:%d %s", GREEN, insertData, NC); - pPrint("%s batchNum:%d %s", GREEN, batchNum, NC); + pPrint("%s batchNumOfTbl:%d %s", GREEN, batchNumOfTbl, NC); + pPrint("%s batchNumOfRow:%d %s", GREEN, batchNumOfRow, NC); pPrint("%s showTablesFlag:%d %s", GREEN, showTablesFlag, NC); + pPrint("%s queryFlag:%d %s", GREEN, queryFlag, NC); pPrint("%s start create table performace test %s", GREEN, NC); } @@ -338,8 +372,15 @@ int32_t main(int32_t argc, char *argv[]) { return 0; } - createDbAndStb(); + if (queryFlag) { + //selectRowsFromTable(); + return 0; + } + if (createTable) { + createDbAndStb(); + } + pPrint("%d threads are spawned to create %" PRId64 " tables", numOfThreads, numOfTables); pthread_attr_t thattr; @@ -396,9 +437,11 @@ int32_t main(int32_t argc, char *argv[]) { insertDataSpeed += pInfo[i].insertDataSpeed; } - pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 + if (createTable) { + pPrint("%s total %" PRId64 " tables, %.1f tables/second, threads:%d, maxDelay: %" PRId64 "us, minDelay: %" PRId64 "us %s", GREEN, numOfTables, createTableSpeed, numOfThreads, maxDelay, minDelay, NC); + } if (insertData) { pPrint("%s total %" PRId64 " tables, %.1f rows/second, threads:%d %s", GREEN, numOfTables, insertDataSpeed,