Merge pull request #12339 from taosdata/feature/tq
enh(tmq): only read committed log
This commit is contained in:
commit
4a7910973e
|
@ -291,6 +291,109 @@ typedef struct {
|
||||||
SSchema* pSchema;
|
SSchema* pSchema;
|
||||||
} SSchemaWrapper;
|
} SSchemaWrapper;
|
||||||
|
|
||||||
|
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
|
||||||
|
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
||||||
|
if (pSW == NULL) return pSW;
|
||||||
|
pSW->nCols = pSchemaWrapper->nCols;
|
||||||
|
pSW->sver = pSchemaWrapper->sver;
|
||||||
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
|
if (pSW->pSchema == NULL) {
|
||||||
|
taosMemoryFree(pSW);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema));
|
||||||
|
return pSW;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
|
||||||
|
taosMemoryFree(pSchemaWrapper->pSchema);
|
||||||
|
taosMemoryFree(pSchemaWrapper);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pSchema->flags);
|
||||||
|
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
|
||||||
|
tlen += taosEncodeFixedI16(buf, pSchema->colId);
|
||||||
|
tlen += taosEncodeString(buf, pSchema->name);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
|
||||||
|
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
||||||
|
buf = taosDecodeFixedI8(buf, &pSchema->flags);
|
||||||
|
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
||||||
|
buf = taosDecodeFixedI16(buf, &pSchema->colId);
|
||||||
|
buf = taosDecodeStringTo(buf, pSchema->name);
|
||||||
|
return (void*)buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
|
||||||
|
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
|
||||||
|
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
|
||||||
|
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
|
||||||
|
if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1;
|
||||||
|
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
|
||||||
|
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
|
||||||
|
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
|
||||||
|
if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1;
|
||||||
|
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeVariantI32(buf, pSW->nCols);
|
||||||
|
tlen += taosEncodeVariantI32(buf, pSW->sver);
|
||||||
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
|
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
|
||||||
|
}
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
||||||
|
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
||||||
|
buf = taosDecodeVariantI32(buf, &pSW->sver);
|
||||||
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
|
if (pSW->pSchema == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
|
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
||||||
|
}
|
||||||
|
return (void*)buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
|
||||||
|
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
|
||||||
|
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
|
||||||
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
|
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
||||||
|
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
||||||
|
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
||||||
|
|
||||||
|
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
||||||
|
if (pSW->pSchema == NULL) return -1;
|
||||||
|
for (int32_t i = 0; i < pSW->nCols; i++) {
|
||||||
|
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
|
STSchema* tdGetSTSChemaFromSSChema(SSchema** pSchema, int32_t nCols);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -1873,7 +1976,7 @@ typedef struct SMqHbTopicInfo {
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
int64_t topicUid;
|
int64_t topicUid;
|
||||||
char name[TSDB_TOPIC_FNAME_LEN];
|
char name[TSDB_TOPIC_FNAME_LEN];
|
||||||
SArray* pVgInfo;
|
SArray* pVgInfo; // SArray<SMqHbVgInfo>
|
||||||
} SMqHbTopicInfo;
|
} SMqHbTopicInfo;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
|
static FORCE_INLINE int32_t taosEncodeSMqHbTopicInfoMsg(void** buf, const SMqHbTopicInfo* pTopicInfo) {
|
||||||
|
@ -1992,49 +2095,6 @@ static FORCE_INLINE void* tDecodeSMqRebVgReq(const void* buf, SMqRebVgReq* pReq)
|
||||||
return (void*)buf;
|
return (void*)buf;
|
||||||
}
|
}
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int8_t reserved;
|
|
||||||
} SMqRebVgRsp;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t leftForVer;
|
|
||||||
int32_t vgId;
|
|
||||||
int32_t epoch;
|
|
||||||
int64_t consumerId;
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
|
||||||
char cgroup[TSDB_CGROUP_LEN];
|
|
||||||
char* sql;
|
|
||||||
char* physicalPlan;
|
|
||||||
char* qmsg;
|
|
||||||
} SMqSetCVgReq;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq* pReq) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->leftForVer);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->vgId);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pReq->epoch);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pReq->consumerId);
|
|
||||||
tlen += taosEncodeString(buf, pReq->topicName);
|
|
||||||
tlen += taosEncodeString(buf, pReq->cgroup);
|
|
||||||
tlen += taosEncodeString(buf, pReq->sql);
|
|
||||||
tlen += taosEncodeString(buf, pReq->physicalPlan);
|
|
||||||
tlen += taosEncodeString(buf, pReq->qmsg);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->leftForVer);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->vgId);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pReq->epoch);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pReq->consumerId);
|
|
||||||
buf = taosDecodeStringTo(buf, pReq->topicName);
|
|
||||||
buf = taosDecodeStringTo(buf, pReq->cgroup);
|
|
||||||
buf = taosDecodeString(buf, &pReq->sql);
|
|
||||||
buf = taosDecodeString(buf, &pReq->physicalPlan);
|
|
||||||
buf = taosDecodeString(buf, &pReq->qmsg);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
@ -2056,109 +2116,6 @@ int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset);
|
||||||
int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
|
int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq);
|
||||||
int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
|
int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq);
|
||||||
|
|
||||||
static FORCE_INLINE SSchemaWrapper* tCloneSSchemaWrapper(const SSchemaWrapper* pSchemaWrapper) {
|
|
||||||
SSchemaWrapper* pSW = (SSchemaWrapper*)taosMemoryMalloc(sizeof(SSchemaWrapper));
|
|
||||||
if (pSW == NULL) return pSW;
|
|
||||||
pSW->nCols = pSchemaWrapper->nCols;
|
|
||||||
pSW->sver = pSchemaWrapper->sver;
|
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
|
||||||
if (pSW->pSchema == NULL) {
|
|
||||||
taosMemoryFree(pSW);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
memcpy(pSW->pSchema, pSchemaWrapper->pSchema, pSW->nCols * sizeof(SSchema));
|
|
||||||
return pSW;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSSchemaWrapper(SSchemaWrapper* pSchemaWrapper) {
|
|
||||||
taosMemoryFree(pSchemaWrapper->pSchema);
|
|
||||||
taosMemoryFree(pSchemaWrapper);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSSchema(void** buf, const SSchema* pSchema) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI8(buf, pSchema->type);
|
|
||||||
tlen += taosEncodeFixedI8(buf, pSchema->flags);
|
|
||||||
tlen += taosEncodeFixedI32(buf, pSchema->bytes);
|
|
||||||
tlen += taosEncodeFixedI16(buf, pSchema->colId);
|
|
||||||
tlen += taosEncodeString(buf, pSchema->name);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSSchema(const void* buf, SSchema* pSchema) {
|
|
||||||
buf = taosDecodeFixedI8(buf, &pSchema->type);
|
|
||||||
buf = taosDecodeFixedI8(buf, &pSchema->flags);
|
|
||||||
buf = taosDecodeFixedI32(buf, &pSchema->bytes);
|
|
||||||
buf = taosDecodeFixedI16(buf, &pSchema->colId);
|
|
||||||
buf = taosDecodeStringTo(buf, pSchema->name);
|
|
||||||
return (void*)buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSchema(SEncoder* pEncoder, const SSchema* pSchema) {
|
|
||||||
if (tEncodeI8(pEncoder, pSchema->type) < 0) return -1;
|
|
||||||
if (tEncodeI8(pEncoder, pSchema->flags) < 0) return -1;
|
|
||||||
if (tEncodeI32v(pEncoder, pSchema->bytes) < 0) return -1;
|
|
||||||
if (tEncodeI16v(pEncoder, pSchema->colId) < 0) return -1;
|
|
||||||
if (tEncodeCStr(pEncoder, pSchema->name) < 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tDecodeSSchema(SDecoder* pDecoder, SSchema* pSchema) {
|
|
||||||
if (tDecodeI8(pDecoder, &pSchema->type) < 0) return -1;
|
|
||||||
if (tDecodeI8(pDecoder, &pSchema->flags) < 0) return -1;
|
|
||||||
if (tDecodeI32v(pDecoder, &pSchema->bytes) < 0) return -1;
|
|
||||||
if (tDecodeI16v(pDecoder, &pSchema->colId) < 0) return -1;
|
|
||||||
if (tDecodeCStrTo(pDecoder, pSchema->name) < 0) return -1;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWrapper* pSW) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeVariantI32(buf, pSW->nCols);
|
|
||||||
tlen += taosEncodeVariantI32(buf, pSW->sver);
|
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
|
||||||
tlen += taosEncodeSSchema(buf, &pSW->pSchema[i]);
|
|
||||||
}
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
|
|
||||||
buf = taosDecodeVariantI32(buf, &pSW->nCols);
|
|
||||||
buf = taosDecodeVariantI32(buf, &pSW->sver);
|
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
|
||||||
if (pSW->pSchema == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
|
||||||
buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
|
|
||||||
}
|
|
||||||
return (void*)buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSSchemaWrapper(SEncoder* pEncoder, const SSchemaWrapper* pSW) {
|
|
||||||
if (tEncodeI32v(pEncoder, pSW->nCols) < 0) return -1;
|
|
||||||
if (tEncodeI32v(pEncoder, pSW->sver) < 0) return -1;
|
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
|
||||||
if (tEncodeSSchema(pEncoder, &pSW->pSchema[i]) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tDecodeSSchemaWrapper(SDecoder* pDecoder, SSchemaWrapper* pSW) {
|
|
||||||
if (tDecodeI32v(pDecoder, &pSW->nCols) < 0) return -1;
|
|
||||||
if (tDecodeI32v(pDecoder, &pSW->sver) < 0) return -1;
|
|
||||||
|
|
||||||
pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
|
|
||||||
if (pSW->pSchema == NULL) return -1;
|
|
||||||
for (int32_t i = 0; i < pSW->nCols; i++) {
|
|
||||||
if (tDecodeSSchema(pDecoder, &pSW->pSchema[i]) < 0) return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_TABLE_FNAME_LEN];
|
char name[TSDB_TABLE_FNAME_LEN];
|
||||||
char stb[TSDB_TABLE_FNAME_LEN];
|
char stb[TSDB_TABLE_FNAME_LEN];
|
||||||
|
@ -2427,6 +2384,21 @@ typedef struct {
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SMqSubVgEp;
|
} SMqSubVgEp;
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
|
||||||
|
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
|
||||||
|
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
||||||
|
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
||||||
|
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
|
||||||
|
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||||
int8_t isSchemaAdaptive;
|
int8_t isSchemaAdaptive;
|
||||||
|
@ -2434,6 +2406,43 @@ typedef struct {
|
||||||
SSchemaWrapper schema;
|
SSchemaWrapper schema;
|
||||||
} SMqSubTopicEp;
|
} SMqSubTopicEp;
|
||||||
|
|
||||||
|
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
||||||
|
int32_t tlen = 0;
|
||||||
|
tlen += taosEncodeString(buf, pTopicEp->topic);
|
||||||
|
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
|
||||||
|
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
|
||||||
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
|
||||||
|
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
|
||||||
|
}
|
||||||
|
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
|
||||||
|
buf = taosDecodeStringTo(buf, pTopicEp->topic);
|
||||||
|
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
|
||||||
|
int32_t sz;
|
||||||
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
|
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
|
||||||
|
if (pTopicEp->vgs == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
SMqSubVgEp vgEp;
|
||||||
|
buf = tDecodeSMqSubVgEp(buf, &vgEp);
|
||||||
|
taosArrayPush(pTopicEp->vgs, &vgEp);
|
||||||
|
}
|
||||||
|
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
|
||||||
|
return buf;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
||||||
|
// taosMemoryFree(pSubTopicEp->schema.pSchema);
|
||||||
|
taosArrayDestroy(pSubTopicEp->vgs);
|
||||||
|
}
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
SMqRspHead head;
|
SMqRspHead head;
|
||||||
int64_t reqOffset;
|
int64_t reqOffset;
|
||||||
|
@ -2512,58 +2521,6 @@ typedef struct {
|
||||||
SArray* topics; // SArray<SMqSubTopicEp>
|
SArray* topics; // SArray<SMqSubTopicEp>
|
||||||
} SMqAskEpRsp;
|
} SMqAskEpRsp;
|
||||||
|
|
||||||
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
|
|
||||||
// taosMemoryFree(pSubTopicEp->schema.pSchema);
|
|
||||||
taosArrayDestroy(pSubTopicEp->vgs);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqSubVgEp(void** buf, const SMqSubVgEp* pVgEp) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeFixedI32(buf, pVgEp->vgId);
|
|
||||||
tlen += taosEncodeFixedI64(buf, pVgEp->offset);
|
|
||||||
tlen += taosEncodeSEpSet(buf, &pVgEp->epSet);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqSubVgEp(void* buf, SMqSubVgEp* pVgEp) {
|
|
||||||
buf = taosDecodeFixedI32(buf, &pVgEp->vgId);
|
|
||||||
buf = taosDecodeFixedI64(buf, &pVgEp->offset);
|
|
||||||
buf = taosDecodeSEpSet(buf, &pVgEp->epSet);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqSubTopicEp(void** buf, const SMqSubTopicEp* pTopicEp) {
|
|
||||||
int32_t tlen = 0;
|
|
||||||
tlen += taosEncodeString(buf, pTopicEp->topic);
|
|
||||||
tlen += taosEncodeFixedI8(buf, pTopicEp->isSchemaAdaptive);
|
|
||||||
int32_t sz = taosArrayGetSize(pTopicEp->vgs);
|
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqSubVgEp* pVgEp = (SMqSubVgEp*)taosArrayGet(pTopicEp->vgs, i);
|
|
||||||
tlen += tEncodeSMqSubVgEp(buf, pVgEp);
|
|
||||||
}
|
|
||||||
tlen += taosEncodeSSchemaWrapper(buf, &pTopicEp->schema);
|
|
||||||
return tlen;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicEp) {
|
|
||||||
buf = taosDecodeStringTo(buf, pTopicEp->topic);
|
|
||||||
buf = taosDecodeFixedI8(buf, &pTopicEp->isSchemaAdaptive);
|
|
||||||
int32_t sz;
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
|
||||||
pTopicEp->vgs = taosArrayInit(sz, sizeof(SMqSubVgEp));
|
|
||||||
if (pTopicEp->vgs == NULL) {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
|
||||||
SMqSubVgEp vgEp;
|
|
||||||
buf = tDecodeSMqSubVgEp(buf, &vgEp);
|
|
||||||
taosArrayPush(pTopicEp->vgs, &vgEp);
|
|
||||||
}
|
|
||||||
buf = taosDecodeSSchemaWrapper(buf, &pTopicEp->schema);
|
|
||||||
return buf;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) {
|
static FORCE_INLINE int32_t tEncodeSMqAskEpRsp(void** buf, const SMqAskEpRsp* pRsp) {
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
// tlen += taosEncodeString(buf, pRsp->cgroup);
|
// tlen += taosEncodeString(buf, pRsp->cgroup);
|
||||||
|
|
|
@ -208,6 +208,7 @@ int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readN
|
||||||
int64_t walGetFirstVer(SWal *);
|
int64_t walGetFirstVer(SWal *);
|
||||||
int64_t walGetSnapshotVer(SWal *);
|
int64_t walGetSnapshotVer(SWal *);
|
||||||
int64_t walGetLastVer(SWal *);
|
int64_t walGetLastVer(SWal *);
|
||||||
|
int64_t walGetCommittedVer(SWal *);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -452,6 +452,7 @@ typedef struct {
|
||||||
int8_t withSchema;
|
int8_t withSchema;
|
||||||
int8_t withTag;
|
int8_t withTag;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
|
int32_t consumerCnt;
|
||||||
int32_t sqlLen;
|
int32_t sqlLen;
|
||||||
int32_t astLen;
|
int32_t astLen;
|
||||||
char* sql;
|
char* sql;
|
||||||
|
|
|
@ -787,6 +787,8 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// do not show for cleared subscription
|
||||||
|
#if 0
|
||||||
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
int32_t sz = taosArrayGetSize(pSub->unassignedVgs);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
SMqVgEp *pVgEp = taosArrayGetP(pSub->unassignedVgs, i);
|
||||||
|
@ -829,6 +831,7 @@ static int32_t mndRetrieveSubscribe(SNodeMsg *pReq, SShowObj *pShow, SSDataBlock
|
||||||
numOfRows++;
|
numOfRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
taosRUnLockLatch(&pSub->lock);
|
taosRUnLockLatch(&pSub->lock);
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,6 +89,8 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
|
SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
|
SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
|
SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);
|
||||||
|
|
||||||
|
SDB_SET_INT32(pRaw, dataPos, pTopic->consumerCnt, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
SDB_SET_BINARY(pRaw, dataPos, pTopic->sql, pTopic->sqlLen, TOPIC_ENCODE_OVER);
|
||||||
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
|
SDB_SET_INT32(pRaw, dataPos, pTopic->astLen, TOPIC_ENCODE_OVER);
|
||||||
|
@ -152,6 +154,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
|
SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);
|
||||||
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
|
SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
|
SDB_GET_INT32(pRaw, dataPos, &pTopic->consumerCnt, TOPIC_DECODE_OVER);
|
||||||
|
|
||||||
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
|
SDB_GET_INT32(pRaw, dataPos, &pTopic->sqlLen, TOPIC_DECODE_OVER);
|
||||||
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
|
pTopic->sql = taosMemoryCalloc(pTopic->sqlLen, sizeof(char));
|
||||||
if (pTopic->sql == NULL) {
|
if (pTopic->sql == NULL) {
|
||||||
|
|
|
@ -401,7 +401,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__EARLIEAST) {
|
||||||
fetchOffset = walGetFirstVer(pTq->pWal);
|
fetchOffset = walGetFirstVer(pTq->pWal);
|
||||||
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
} else if (pReq->currentOffset == TMQ_CONF__RESET_OFFSET__LATEST) {
|
||||||
fetchOffset = walGetLastVer(pTq->pWal);
|
fetchOffset = walGetCommittedVer(pTq->pWal);
|
||||||
} else {
|
} else {
|
||||||
fetchOffset = pReq->currentOffset + 1;
|
fetchOffset = pReq->currentOffset + 1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,8 @@ int64_t FORCE_INLINE walGetSnaphostVer(SWal* pWal) { return pWal->vers.snapshotV
|
||||||
|
|
||||||
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
|
int64_t FORCE_INLINE walGetLastVer(SWal* pWal) { return pWal->vers.lastVer; }
|
||||||
|
|
||||||
|
int64_t FORCE_INLINE walGetCommittedVer(SWal* pWal) { return pWal->vers.commitVer; }
|
||||||
|
|
||||||
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
static FORCE_INLINE int walBuildMetaName(SWal* pWal, int metaVer, char* buf) {
|
||||||
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
return sprintf(buf, "%s/meta-ver%d", pWal->path, metaVer);
|
||||||
}
|
}
|
||||||
|
|
|
@ -143,7 +143,11 @@ void walSetReaderCapacity(SWalReadHandle *pRead, int32_t capacity) { pRead->capa
|
||||||
|
|
||||||
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
|
int32_t walFetchHead(SWalReadHandle *pRead, int64_t ver, SWalHead *pHead) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
// TODO: valid ver
|
// TODO: valid ver
|
||||||
|
if (ver > pRead->pWal->vers.commitVer) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
if (pRead->curVersion != ver) {
|
if (pRead->curVersion != ver) {
|
||||||
code = walReadSeekVer(pRead, ver);
|
code = walReadSeekVer(pRead, ver);
|
||||||
|
|
Loading…
Reference in New Issue