|
|
|
@ -140,214 +140,6 @@ typedef enum _mgmt_table {
|
|
|
|
|
#define TSDB_COL_IS_NORMAL_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_NORMAL)
|
|
|
|
|
#define TSDB_COL_IS_UD_COL(f) ((f & (~(TSDB_COL_NULL))) == TSDB_COL_UDC)
|
|
|
|
|
#define TSDB_COL_REQ_NULL(f) (((f)&TSDB_COL_NULL) != 0)
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t keyLen;
|
|
|
|
|
int32_t valueLen;
|
|
|
|
|
void* key;
|
|
|
|
|
void* value;
|
|
|
|
|
} SKv;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t connId;
|
|
|
|
|
int32_t hbType;
|
|
|
|
|
} SClientHbKey;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
SClientHbKey connKey;
|
|
|
|
|
SHashObj* info; // hash<Skv.key, Skv>
|
|
|
|
|
} SClientHbReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int64_t reqId;
|
|
|
|
|
SArray* reqs; // SArray<SClientHbReq>
|
|
|
|
|
} SClientHbBatchReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
SClientHbKey connKey;
|
|
|
|
|
int32_t status;
|
|
|
|
|
int32_t bodyLen;
|
|
|
|
|
void* body;
|
|
|
|
|
} SClientHbRsp;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int64_t reqId;
|
|
|
|
|
int64_t rspId;
|
|
|
|
|
SArray* rsps; // SArray<SClientHbRsp>
|
|
|
|
|
} SClientHbBatchRsp;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
|
|
|
|
|
return taosIntHash_64(key, keyLen);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
|
|
|
|
|
void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq);
|
|
|
|
|
|
|
|
|
|
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp);
|
|
|
|
|
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
|
|
|
|
|
SClientHbReq* req = (SClientHbReq*)pReq;
|
|
|
|
|
if (req->info) taosHashCleanup(req->info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
|
|
|
|
|
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
|
|
|
|
|
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
|
|
|
|
|
if (deep) {
|
|
|
|
|
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
|
|
|
|
|
} else {
|
|
|
|
|
taosArrayDestroy(req->reqs);
|
|
|
|
|
}
|
|
|
|
|
free(pReq);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp);
|
|
|
|
|
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp);
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pKv->keyLen);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pKv->valueLen);
|
|
|
|
|
tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen);
|
|
|
|
|
tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pKv->keyLen);
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pKv->valueLen);
|
|
|
|
|
buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen);
|
|
|
|
|
buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pKey->connId);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pKey->hbType);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pKey->connId);
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pKey->hbType);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbVgInfo {
|
|
|
|
|
int32_t vgId;
|
|
|
|
|
} SMqHbVgInfo;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbTopicInfo {
|
|
|
|
|
int32_t epoch;
|
|
|
|
|
int64_t topicUid;
|
|
|
|
|
char name[TSDB_TOPIC_FNAME_LEN];
|
|
|
|
|
SArray* pVgInfo;
|
|
|
|
|
} SMqHbTopicInfo;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
|
|
|
|
|
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
|
|
|
|
|
tlen += taosEncodeString(buf, pTopicInfo->name);
|
|
|
|
|
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, sz);
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
|
|
|
|
|
tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
|
|
|
|
|
}
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
|
|
|
|
|
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
|
|
|
|
|
buf = taosDecodeStringTo(buf, pTopicInfo->name);
|
|
|
|
|
int32_t sz;
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &sz);
|
|
|
|
|
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbVgInfo vgInfo;
|
|
|
|
|
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
|
|
|
|
|
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbMsg {
|
|
|
|
|
int32_t status; // ask hb endpoint
|
|
|
|
|
int32_t epoch;
|
|
|
|
|
int64_t consumerId;
|
|
|
|
|
SArray* pTopics; // SArray<SMqHbTopicInfo>
|
|
|
|
|
} SMqHbMsg;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pMsg->status);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pMsg->epoch);
|
|
|
|
|
tlen += taosEncodeFixedI64(buf, pMsg->consumerId);
|
|
|
|
|
int32_t sz = taosArrayGetSize(pMsg->pTopics);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, sz);
|
|
|
|
|
for (int i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
|
|
|
|
|
tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
|
|
|
|
|
}
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pMsg->status);
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pMsg->epoch);
|
|
|
|
|
buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
|
|
|
|
|
int32_t sz;
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &sz);
|
|
|
|
|
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
|
|
|
|
|
for (int i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbTopicInfo topicInfo;
|
|
|
|
|
buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
|
|
|
|
|
taosArrayPush(pMsg->pTopics, &topicInfo);
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqSetCVgReq {
|
|
|
|
|
int32_t vgId;
|
|
|
|
|
int64_t consumerId;
|
|
|
|
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
|
|
|
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
|
|
|
|
} SMqSetCVgReq;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
|
|
|
|
int32_t tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
|
|
|
|
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
|
|
|
|
tlen += taosEncodeString(buf, pReq->topicName);
|
|
|
|
|
tlen += taosEncodeString(buf, pReq->cGroup);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
|
|
|
|
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
|
|
|
|
buf = taosDecodeStringTo(buf, pReq->topicName);
|
|
|
|
|
buf = taosDecodeStringTo(buf, pReq->cGroup);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t vgId;
|
|
|
|
|
char* dbName;
|
|
|
|
@ -384,6 +176,18 @@ typedef struct SSubmitBlk {
|
|
|
|
|
char data[];
|
|
|
|
|
} SSubmitBlk;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
/* data */
|
|
|
|
|
} SSubmitReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
/* data */
|
|
|
|
|
} SSubmitRsp;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
/* data */
|
|
|
|
|
} SSubmitReqReader;
|
|
|
|
|
|
|
|
|
|
// Submit message for this TSDB
|
|
|
|
|
typedef struct {
|
|
|
|
|
SMsgHead header;
|
|
|
|
@ -486,92 +290,6 @@ static FORCE_INLINE void* taosDecodeSEpSet(void* buf, SEpSet* pEp) {
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbRsp {
|
|
|
|
|
int8_t status; //idle or not
|
|
|
|
|
int8_t vnodeChanged;
|
|
|
|
|
int8_t epChanged; // should use new epset
|
|
|
|
|
int8_t reserved;
|
|
|
|
|
SEpSet epSet;
|
|
|
|
|
} SMqHbRsp;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI8(buf, pRsp->status);
|
|
|
|
|
tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
|
|
|
|
|
tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
|
|
|
|
|
tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
|
|
|
|
|
buf = taosDecodeFixedI8(buf, &pRsp->status);
|
|
|
|
|
buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
|
|
|
|
|
buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
|
|
|
|
|
buf = taosDecodeSEpSet(buf, &pRsp->epSet);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbOneTopicBatchRsp {
|
|
|
|
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
|
|
|
SArray* rsps; // SArray<SMqHbRsp>
|
|
|
|
|
} SMqHbOneTopicBatchRsp;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeString(buf, pBatchRsp->topicName);
|
|
|
|
|
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, sz);
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
|
|
|
|
|
tlen += taosEncodeSMqHbRsp(buf, pRsp);
|
|
|
|
|
}
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
|
|
|
|
|
int32_t sz;
|
|
|
|
|
buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &sz);
|
|
|
|
|
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbRsp rsp;
|
|
|
|
|
buf = taosDecodeSMqHbRsp(buf, &rsp);
|
|
|
|
|
buf = taosArrayPush(pBatchRsp->rsps, &rsp);
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbBatchRsp {
|
|
|
|
|
int64_t consumerId;
|
|
|
|
|
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
|
|
|
|
|
} SMqHbBatchRsp;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
|
|
|
|
|
int32_t sz;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, sz);
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i);
|
|
|
|
|
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
|
|
|
|
|
}
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
|
|
|
|
|
buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
|
|
|
|
|
int32_t sz;
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &sz);
|
|
|
|
|
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbOneTopicBatchRsp rsp;
|
|
|
|
|
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
|
|
|
|
|
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t acctId;
|
|
|
|
|
int64_t clusterId;
|
|
|
|
@ -1152,45 +870,7 @@ typedef struct {
|
|
|
|
|
char desc[TSDB_STEP_DESC_LEN];
|
|
|
|
|
} SStartupReq;
|
|
|
|
|
|
|
|
|
|
// mq related
|
|
|
|
|
typedef struct {
|
|
|
|
|
} SMqConnectReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
} SMqConnectRsp;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
} SMqDisconnectReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
} SMqDisconnectRsp;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
} SMqAckReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
} SMqAckRsp;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
} SMqResetReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
} SMqResetRsp;
|
|
|
|
|
// mq related end
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
/* data */
|
|
|
|
|
} SSubmitReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
/* data */
|
|
|
|
|
} SSubmitRsp;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
/* data */
|
|
|
|
|
} SSubmitReqReader;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
typedef struct SSubQueryMsg {
|
|
|
|
|
SMsgHead header;
|
|
|
|
|
uint64_t sId;
|
|
|
|
|
uint64_t queryId;
|
|
|
|
@ -1557,6 +1237,318 @@ typedef struct {
|
|
|
|
|
|
|
|
|
|
#pragma pack(pop)
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int32_t tEncodeSMsgHead(void** buf, const SMsgHead* pMsg) {
|
|
|
|
|
int32_t tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pMsg->contLen);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pMsg->vgId);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbRsp {
|
|
|
|
|
int8_t status; //idle or not
|
|
|
|
|
int8_t vnodeChanged;
|
|
|
|
|
int8_t epChanged; // should use new epset
|
|
|
|
|
int8_t reserved;
|
|
|
|
|
SEpSet epSet;
|
|
|
|
|
} SMqHbRsp;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqHbRsp(void** buf, const SMqHbRsp* pRsp) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI8(buf, pRsp->status);
|
|
|
|
|
tlen += taosEncodeFixedI8(buf, pRsp->vnodeChanged);
|
|
|
|
|
tlen += taosEncodeFixedI8(buf, pRsp->epChanged);
|
|
|
|
|
tlen += taosEncodeSEpSet(buf, &pRsp->epSet);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqHbRsp(void* buf, SMqHbRsp* pRsp) {
|
|
|
|
|
buf = taosDecodeFixedI8(buf, &pRsp->status);
|
|
|
|
|
buf = taosDecodeFixedI8(buf, &pRsp->vnodeChanged);
|
|
|
|
|
buf = taosDecodeFixedI8(buf, &pRsp->epChanged);
|
|
|
|
|
buf = taosDecodeSEpSet(buf, &pRsp->epSet);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbOneTopicBatchRsp {
|
|
|
|
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
|
|
|
SArray* rsps; // SArray<SMqHbRsp>
|
|
|
|
|
} SMqHbOneTopicBatchRsp;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqHbOneTopicBatchRsp(void** buf, const SMqHbOneTopicBatchRsp* pBatchRsp) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeString(buf, pBatchRsp->topicName);
|
|
|
|
|
int32_t sz = taosArrayGetSize(pBatchRsp->rsps);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, sz);
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbRsp* pRsp = (SMqHbRsp*)taosArrayGet(pBatchRsp->rsps, i);
|
|
|
|
|
tlen += taosEncodeSMqHbRsp(buf, pRsp);
|
|
|
|
|
}
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqHbOneTopicBatchRsp(void* buf, SMqHbOneTopicBatchRsp* pBatchRsp) {
|
|
|
|
|
int32_t sz;
|
|
|
|
|
buf = taosDecodeStringTo(buf, pBatchRsp->topicName);
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &sz);
|
|
|
|
|
pBatchRsp->rsps = taosArrayInit(sz, sizeof(SMqHbRsp));
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbRsp rsp;
|
|
|
|
|
buf = taosDecodeSMqHbRsp(buf, &rsp);
|
|
|
|
|
buf = taosArrayPush(pBatchRsp->rsps, &rsp);
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbBatchRsp {
|
|
|
|
|
int64_t consumerId;
|
|
|
|
|
SArray* batchRsps; // SArray<SMqHbOneTopicBatchRsp>
|
|
|
|
|
} SMqHbBatchRsp;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqHbBatchRsp(void** buf, const SMqHbBatchRsp* pBatchRsp) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI64(buf, pBatchRsp->consumerId);
|
|
|
|
|
int32_t sz;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, sz);
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbOneTopicBatchRsp* pRsp = (SMqHbOneTopicBatchRsp*) taosArrayGet(pBatchRsp->batchRsps, i);
|
|
|
|
|
tlen += taosEncodeSMqHbOneTopicBatchRsp(buf, pRsp);
|
|
|
|
|
}
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqHbBatchRsp(void* buf, SMqHbBatchRsp* pBatchRsp) {
|
|
|
|
|
buf = taosDecodeFixedI64(buf, &pBatchRsp->consumerId);
|
|
|
|
|
int32_t sz;
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &sz);
|
|
|
|
|
pBatchRsp->batchRsps = taosArrayInit(sz, sizeof(SMqHbOneTopicBatchRsp));
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbOneTopicBatchRsp rsp;
|
|
|
|
|
buf = taosDecodeSMqHbOneTopicBatchRsp(buf, &rsp);
|
|
|
|
|
buf = taosArrayPush(pBatchRsp->batchRsps, &rsp);
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t keyLen;
|
|
|
|
|
int32_t valueLen;
|
|
|
|
|
void* key;
|
|
|
|
|
void* value;
|
|
|
|
|
} SKv;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int32_t connId;
|
|
|
|
|
int32_t hbType;
|
|
|
|
|
} SClientHbKey;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
SClientHbKey connKey;
|
|
|
|
|
SHashObj* info; // hash<Skv.key, Skv>
|
|
|
|
|
} SClientHbReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int64_t reqId;
|
|
|
|
|
SArray* reqs; // SArray<SClientHbReq>
|
|
|
|
|
} SClientHbBatchReq;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
SClientHbKey connKey;
|
|
|
|
|
int32_t status;
|
|
|
|
|
int32_t bodyLen;
|
|
|
|
|
void* body;
|
|
|
|
|
} SClientHbRsp;
|
|
|
|
|
|
|
|
|
|
typedef struct {
|
|
|
|
|
int64_t reqId;
|
|
|
|
|
int64_t rspId;
|
|
|
|
|
SArray* rsps; // SArray<SClientHbRsp>
|
|
|
|
|
} SClientHbBatchRsp;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE uint32_t hbKeyHashFunc(const char* key, uint32_t keyLen) {
|
|
|
|
|
return taosIntHash_64(key, keyLen);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int tSerializeSClientHbReq(void** buf, const SClientHbReq* pReq);
|
|
|
|
|
void* tDeserializeSClientHbReq(void* buf, SClientHbReq* pReq);
|
|
|
|
|
|
|
|
|
|
int tSerializeSClientHbRsp(void** buf, const SClientHbRsp* pRsp);
|
|
|
|
|
void* tDeserializeSClientHbRsp(void* buf, SClientHbRsp* pRsp);
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void tFreeClientHbReq(void *pReq) {
|
|
|
|
|
SClientHbReq* req = (SClientHbReq*)pReq;
|
|
|
|
|
if (req->info) taosHashCleanup(req->info);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int tSerializeSClientHbBatchReq(void** buf, const SClientHbBatchReq* pReq);
|
|
|
|
|
void* tDeserializeSClientHbBatchReq(void* buf, SClientHbBatchReq* pReq);
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void tFreeClientHbBatchReq(void* pReq, bool deep) {
|
|
|
|
|
SClientHbBatchReq *req = (SClientHbBatchReq*)pReq;
|
|
|
|
|
if (deep) {
|
|
|
|
|
taosArrayDestroyEx(req->reqs, tFreeClientHbReq);
|
|
|
|
|
} else {
|
|
|
|
|
taosArrayDestroy(req->reqs);
|
|
|
|
|
}
|
|
|
|
|
free(pReq);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
int tSerializeSClientHbBatchRsp(void** buf, const SClientHbBatchRsp* pBatchRsp);
|
|
|
|
|
void* tDeserializeSClientHbBatchRsp(void* buf, SClientHbBatchRsp* pBatchRsp);
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSKv(void** buf, const SKv* pKv) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pKv->keyLen);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pKv->valueLen);
|
|
|
|
|
tlen += taosEncodeBinary(buf, pKv->key, pKv->keyLen);
|
|
|
|
|
tlen += taosEncodeBinary(buf, pKv->value, pKv->valueLen);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSKv(void* buf, SKv* pKv) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pKv->keyLen);
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pKv->valueLen);
|
|
|
|
|
buf = taosDecodeBinary(buf, &pKv->key, pKv->keyLen);
|
|
|
|
|
buf = taosDecodeBinary(buf, &pKv->value, pKv->valueLen);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSClientHbKey(void** buf, const SClientHbKey* pKey) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pKey->connId);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pKey->hbType);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSClientHbKey(void* buf, SClientHbKey* pKey) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pKey->connId);
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pKey->hbType);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbVgInfo {
|
|
|
|
|
int32_t vgId;
|
|
|
|
|
} SMqHbVgInfo;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqVgInfo(void** buf, const SMqHbVgInfo* pVgInfo) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pVgInfo->vgId);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqVgInfo(void* buf, SMqHbVgInfo* pVgInfo) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pVgInfo->vgId);
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbTopicInfo {
|
|
|
|
|
int32_t epoch;
|
|
|
|
|
int64_t topicUid;
|
|
|
|
|
char name[TSDB_TOPIC_FNAME_LEN];
|
|
|
|
|
SArray* pVgInfo;
|
|
|
|
|
} SMqHbTopicInfo;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pTopicInfo->epoch);
|
|
|
|
|
tlen += taosEncodeFixedI64(buf, pTopicInfo->topicUid);
|
|
|
|
|
tlen += taosEncodeString(buf, pTopicInfo->name);
|
|
|
|
|
int32_t sz = taosArrayGetSize(pTopicInfo->pVgInfo);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, sz);
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbVgInfo* pVgInfo = (SMqHbVgInfo*)taosArrayGet(pTopicInfo->pVgInfo, i);
|
|
|
|
|
tlen += taosEncodeSMqVgInfo(buf, pVgInfo);
|
|
|
|
|
}
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqHbTopicInfoMsg(void* buf, SMqHbTopicInfo* pTopicInfo) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pTopicInfo->epoch);
|
|
|
|
|
buf = taosDecodeFixedI64(buf, &pTopicInfo->topicUid);
|
|
|
|
|
buf = taosDecodeStringTo(buf, pTopicInfo->name);
|
|
|
|
|
int32_t sz;
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &sz);
|
|
|
|
|
pTopicInfo->pVgInfo = taosArrayInit(sz, sizeof(SMqHbVgInfo));
|
|
|
|
|
for (int32_t i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbVgInfo vgInfo;
|
|
|
|
|
buf = taosDecodeSMqVgInfo(buf, &vgInfo);
|
|
|
|
|
taosArrayPush(pTopicInfo->pVgInfo, &vgInfo);
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqHbMsg {
|
|
|
|
|
int32_t status; // ask hb endpoint
|
|
|
|
|
int32_t epoch;
|
|
|
|
|
int64_t consumerId;
|
|
|
|
|
SArray* pTopics; // SArray<SMqHbTopicInfo>
|
|
|
|
|
} SMqHbMsg;
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int taosEncodeSMqMsg(void** buf, const SMqHbMsg* pMsg) {
|
|
|
|
|
int tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pMsg->status);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pMsg->epoch);
|
|
|
|
|
tlen += taosEncodeFixedI64(buf, pMsg->consumerId);
|
|
|
|
|
int32_t sz = taosArrayGetSize(pMsg->pTopics);
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, sz);
|
|
|
|
|
for (int i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbTopicInfo* topicInfo = (SMqHbTopicInfo*)taosArrayGet(pMsg->pTopics, i);
|
|
|
|
|
tlen += taosEncodeSMqHbTopicInfoMsg(buf, topicInfo);
|
|
|
|
|
}
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pMsg->status);
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pMsg->epoch);
|
|
|
|
|
buf = taosDecodeFixedI64(buf, &pMsg->consumerId);
|
|
|
|
|
int32_t sz;
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &sz);
|
|
|
|
|
pMsg->pTopics = taosArrayInit(sz, sizeof(SMqHbTopicInfo));
|
|
|
|
|
for (int i = 0; i < sz; i++) {
|
|
|
|
|
SMqHbTopicInfo topicInfo;
|
|
|
|
|
buf = taosDecodeSMqHbTopicInfoMsg(buf, &topicInfo);
|
|
|
|
|
taosArrayPush(pMsg->pTopics, &topicInfo);
|
|
|
|
|
}
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
typedef struct SMqSetCVgReq {
|
|
|
|
|
int32_t vgId;
|
|
|
|
|
int64_t consumerId;
|
|
|
|
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
|
|
|
char cGroup[TSDB_CONSUMER_GROUP_LEN];
|
|
|
|
|
char* sql;
|
|
|
|
|
char* logicalPlan;
|
|
|
|
|
char* physicalPlan;
|
|
|
|
|
SArray* tasks; // SArray<SSubQueryMsg>
|
|
|
|
|
} SMqSetCVgReq;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
|
|
|
|
int32_t tlen = 0;
|
|
|
|
|
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
|
|
|
|
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
|
|
|
|
tlen += taosEncodeString(buf, pReq->topicName);
|
|
|
|
|
tlen += taosEncodeString(buf, pReq->cGroup);
|
|
|
|
|
tlen += taosEncodeString(buf, pReq->sql);
|
|
|
|
|
tlen += taosEncodeString(buf, pReq->logicalPlan);
|
|
|
|
|
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
|
|
|
|
return tlen;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
|
|
|
|
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
|
|
|
|
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
|
|
|
|
buf = taosDecodeStringTo(buf, pReq->topicName);
|
|
|
|
|
buf = taosDecodeStringTo(buf, pReq->cGroup);
|
|
|
|
|
buf = taosDecodeString(buf, &pReq->sql);
|
|
|
|
|
buf = taosDecodeString(buf, &pReq->logicalPlan);
|
|
|
|
|
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
|
|
|
|
pReq->tasks = NULL;
|
|
|
|
|
return buf;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#ifdef __cplusplus
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|