Merge remote-tracking branch 'origin/3.0' into feature/privilege
This commit is contained in:
commit
9b75a243d3
|
@ -1209,10 +1209,10 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
||||||
++pColInfo->numOfBound;
|
++pColInfo->numOfBound;
|
||||||
switch (pSchema[t].type) {
|
switch (pSchema[t].type) {
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
|
pColInfo->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
pColInfo->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
||||||
|
@ -1243,10 +1243,10 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
||||||
++pColInfo->numOfBound;
|
++pColInfo->numOfBound;
|
||||||
switch (pSchema[t].type) {
|
switch (pSchema[t].type) {
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
|
pColInfo->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
pColInfo->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
||||||
|
|
|
@ -44,11 +44,11 @@ int32_t init_env() {
|
||||||
}
|
}
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create stable if not exists st1 (ts timestamp, k int) tags(a int)");
|
pRes = taos_query(pConn, "create stable st1 (ts timestamp, k int) tags(a int)");
|
||||||
if (taos_errno(pRes) != 0) {
|
/*if (taos_errno(pRes) != 0) {*/
|
||||||
printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));
|
/*printf("failed to create super table 123_$^), reason:%s\n", taos_errstr(pRes));*/
|
||||||
return -1;
|
/*return -1;*/
|
||||||
}
|
/*}*/
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
|
pRes = taos_query(pConn, "create table tu using st1 tags(1)");
|
||||||
|
@ -114,19 +114,19 @@ void basic_consume_loop(tmq_t *tmq,
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
int32_t cnt = 0;
|
int32_t cnt = 0;
|
||||||
clock_t startTime = clock();
|
/*clock_t startTime = clock();*/
|
||||||
while (running) {
|
while (running) {
|
||||||
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
|
tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 0);
|
||||||
if (tmqmessage) {
|
if (tmqmessage) {
|
||||||
cnt++;
|
cnt++;
|
||||||
/*msg_process(tmqmessage);*/
|
msg_process(tmqmessage);
|
||||||
tmq_message_destroy(tmqmessage);
|
tmq_message_destroy(tmqmessage);
|
||||||
} else {
|
/*} else {*/
|
||||||
break;
|
/*break;*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
clock_t endTime = clock();
|
/*clock_t endTime = clock();*/
|
||||||
printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);
|
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
|
||||||
|
|
||||||
err = tmq_consumer_close(tmq);
|
err = tmq_consumer_close(tmq);
|
||||||
if (err)
|
if (err)
|
||||||
|
|
|
@ -1096,7 +1096,7 @@ static FORCE_INLINE void* tDeserializeSCMSubscribeReq(void* buf, SCMSubscribeReq
|
||||||
buf = taosDecodeString(buf, &pReq->consumerGroup);
|
buf = taosDecodeString(buf, &pReq->consumerGroup);
|
||||||
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
|
pReq->topicNames = taosArrayInit(pReq->topicNum, sizeof(void*));
|
||||||
for (int i = 0; i < pReq->topicNum; i++) {
|
for (int i = 0; i < pReq->topicNum; i++) {
|
||||||
char* name = NULL;
|
char* name;
|
||||||
buf = taosDecodeString(buf, &name);
|
buf = taosDecodeString(buf, &name);
|
||||||
taosArrayPush(pReq->topicNames, &name);
|
taosArrayPush(pReq->topicNames, &name);
|
||||||
}
|
}
|
||||||
|
@ -1173,9 +1173,60 @@ typedef struct {
|
||||||
} SMqTmrMsg;
|
} SMqTmrMsg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t consumerId;
|
const char* key;
|
||||||
|
SArray* lostConsumers; //SArray<int64_t>
|
||||||
|
SArray* removedConsumers; //SArray<int64_t>
|
||||||
|
SArray* newConsumers; //SArray<int64_t>
|
||||||
|
} SMqRebSubscribe;
|
||||||
|
|
||||||
|
static FORCE_INLINE SMqRebSubscribe* tNewSMqRebSubscribe(const char* key) {
|
||||||
|
SMqRebSubscribe* pRebSub = (SMqRebSubscribe*)calloc(1, sizeof(SMqRebSubscribe));
|
||||||
|
if (pRebSub == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pRebSub->key = key;
|
||||||
|
pRebSub->lostConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||||
|
if (pRebSub->lostConsumers == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pRebSub->removedConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||||
|
if (pRebSub->removedConsumers == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
pRebSub->newConsumers = taosArrayInit(0, sizeof(int64_t));
|
||||||
|
if (pRebSub->newConsumers == NULL) {
|
||||||
|
goto _err;
|
||||||
|
}
|
||||||
|
return pRebSub;
|
||||||
|
_err:
|
||||||
|
taosArrayDestroy(pRebSub->lostConsumers);
|
||||||
|
taosArrayDestroy(pRebSub->removedConsumers);
|
||||||
|
taosArrayDestroy(pRebSub->newConsumers);
|
||||||
|
tfree(pRebSub);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// this message is sent from mnode to mnode(read thread to write thread), so there is no need for serialization / deserialization
|
||||||
|
typedef struct {
|
||||||
|
//SArray* rebSubscribes; //SArray<SMqRebSubscribe>
|
||||||
|
SHashObj* rebSubHash; // SHashObj<key, SMqRebSubscribe>
|
||||||
} SMqDoRebalanceMsg;
|
} SMqDoRebalanceMsg;
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
static FORCE_INLINE SMqDoRebalanceMsg* tNewSMqDoRebalanceMsg() {
|
||||||
|
SMqDoRebalanceMsg *pMsg = malloc(sizeof(SMqDoRebalanceMsg));
|
||||||
|
if (pMsg == NULL) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
pMsg->rebSubscribes = taosArrayInit(0, sizeof(SMqRebSubscribe));
|
||||||
|
if (pMsg->rebSubscribes == NULL) {
|
||||||
|
free(pMsg);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t status;
|
int64_t status;
|
||||||
} SMVSubscribeRsp;
|
} SMVSubscribeRsp;
|
||||||
|
|
|
@ -423,8 +423,16 @@ static FORCE_INLINE int32_t tdSRowSetInfo(SRowBuilder *pBuilder, int32_t nCols,
|
||||||
*/
|
*/
|
||||||
static FORCE_INLINE int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBoundCols,
|
static FORCE_INLINE int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t nCols, int32_t nBoundCols,
|
||||||
int32_t flen, int32_t allNullLen, int32_t boundNullLen) {
|
int32_t flen, int32_t allNullLen, int32_t boundNullLen) {
|
||||||
if ((boundNullLen > 0) && (allNullLen > 0) && isSelectKVRow(boundNullLen, allNullLen)) {
|
if ((boundNullLen > 0) && (allNullLen > 0) && (nBoundCols > 0)) {
|
||||||
pBuilder->rowType = TD_ROW_KV;
|
uint32_t tpLen = allNullLen;
|
||||||
|
uint32_t kvLen = sizeof(col_id_t) + sizeof(SKvRowIdx) * nBoundCols + boundNullLen;
|
||||||
|
if (isSelectKVRow(kvLen, tpLen)) {
|
||||||
|
pBuilder->rowType = TD_ROW_KV;
|
||||||
|
} else {
|
||||||
|
pBuilder->rowType = TD_ROW_TP;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
pBuilder->rowType = TD_ROW_TP;
|
pBuilder->rowType = TD_ROW_TP;
|
||||||
}
|
}
|
||||||
|
|
|
@ -222,7 +222,7 @@ int32_t sdbWriteNotFree(SSdb *pSdb, SSdbRaw *pRaw);
|
||||||
* @param pKey The key value of the row.
|
* @param pKey The key value of the row.
|
||||||
* @return void* The object of the row.
|
* @return void* The object of the row.
|
||||||
*/
|
*/
|
||||||
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey);
|
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Release a row from sdb.
|
* @brief Release a row from sdb.
|
||||||
|
|
|
@ -22,6 +22,16 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
enum {
|
||||||
|
MQ_CONSUMER_STATUS__INIT = 1,
|
||||||
|
MQ_CONSUMER_STATUS__IDLE,
|
||||||
|
MQ_CONSUMER_STATUS__ACTIVE,
|
||||||
|
MQ_CONSUMER_STATUS__LOST,
|
||||||
|
MQ_CONSUMER_STATUS__MODIFY
|
||||||
|
};
|
||||||
|
|
||||||
|
|
||||||
int32_t mndInitConsumer(SMnode *pMnode);
|
int32_t mndInitConsumer(SMnode *pMnode);
|
||||||
void mndCleanupConsumer(SMnode *pMnode);
|
void mndCleanupConsumer(SMnode *pMnode);
|
||||||
|
|
||||||
|
|
|
@ -391,6 +391,7 @@ static FORCE_INLINE int32_t tEncodeSMqSubConsumer(void** buf, const SMqSubConsum
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
||||||
int32_t sz = taosArrayGetSize(pConsumer->vgInfo);
|
int32_t sz = taosArrayGetSize(pConsumer->vgInfo);
|
||||||
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i);
|
SMqConsumerEp* pCEp = taosArrayGet(pConsumer->vgInfo, i);
|
||||||
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
tlen += tEncodeSMqConsumerEp(buf, pCEp);
|
||||||
|
@ -501,9 +502,9 @@ static FORCE_INLINE void* tDecodeSubscribeObj(void* buf, SMqSubscribeObj* pSub)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
SMqConsumerEp cEp = {0};
|
SMqConsumerEp consumerEp = {0};
|
||||||
buf = tDecodeSMqConsumerEp(buf, &cEp);
|
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
|
||||||
taosArrayPush(pSub->unassignedVg, &cEp);
|
taosArrayPush(pSub->unassignedVg, &consumerEp);
|
||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
@ -542,7 +543,8 @@ typedef struct {
|
||||||
int64_t connId;
|
int64_t connId;
|
||||||
SRWLatch lock;
|
SRWLatch lock;
|
||||||
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
char cgroup[TSDB_CONSUMER_GROUP_LEN];
|
||||||
SArray* topics; // SArray<char*>
|
SArray* currentTopics; // SArray<char*>
|
||||||
|
SArray* recentRemovedTopics; // SArray<char*>
|
||||||
int64_t epoch;
|
int64_t epoch;
|
||||||
// stat
|
// stat
|
||||||
int64_t pollCnt;
|
int64_t pollCnt;
|
||||||
|
@ -555,16 +557,25 @@ typedef struct {
|
||||||
} SMqConsumerObj;
|
} SMqConsumerObj;
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
|
static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer) {
|
||||||
|
int32_t sz;
|
||||||
int32_t tlen = 0;
|
int32_t tlen = 0;
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
tlen += taosEncodeFixedI64(buf, pConsumer->consumerId);
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
|
tlen += taosEncodeFixedI64(buf, pConsumer->connId);
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
|
tlen += taosEncodeFixedI64(buf, pConsumer->epoch);
|
||||||
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
|
tlen += taosEncodeFixedI64(buf, pConsumer->pollCnt);
|
||||||
tlen += taosEncodeString(buf, pConsumer->cgroup);
|
tlen += taosEncodeString(buf, pConsumer->cgroup);
|
||||||
int32_t sz = taosArrayGetSize(pConsumer->topics);
|
|
||||||
|
sz = taosArrayGetSize(pConsumer->currentTopics);
|
||||||
tlen += taosEncodeFixedI32(buf, sz);
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
char* topic = taosArrayGetP(pConsumer->topics, i);
|
char* topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||||
|
tlen += taosEncodeString(buf, topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
sz = taosArrayGetSize(pConsumer->recentRemovedTopics);
|
||||||
|
tlen += taosEncodeFixedI32(buf, sz);
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
char* topic = taosArrayGetP(pConsumer->recentRemovedTopics, i);
|
||||||
tlen += taosEncodeString(buf, topic);
|
tlen += taosEncodeString(buf, topic);
|
||||||
}
|
}
|
||||||
return tlen;
|
return tlen;
|
||||||
|
@ -577,12 +588,21 @@ static FORCE_INLINE void* tDecodeSMqConsumerObj(void* buf, SMqConsumerObj* pCons
|
||||||
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
|
buf = taosDecodeFixedI64(buf, &pConsumer->epoch);
|
||||||
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
|
buf = taosDecodeFixedI64(buf, &pConsumer->pollCnt);
|
||||||
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
|
buf = taosDecodeStringTo(buf, pConsumer->cgroup);
|
||||||
|
|
||||||
buf = taosDecodeFixedI32(buf, &sz);
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
pConsumer->topics = taosArrayInit(sz, sizeof(SMqConsumerObj));
|
pConsumer->currentTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
|
||||||
for (int32_t i = 0; i < sz; i++) {
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
char* topic;
|
char* topic;
|
||||||
buf = taosDecodeString(buf, &topic);
|
buf = taosDecodeString(buf, &topic);
|
||||||
taosArrayPush(pConsumer->topics, &topic);
|
taosArrayPush(pConsumer->currentTopics, &topic);
|
||||||
|
}
|
||||||
|
|
||||||
|
buf = taosDecodeFixedI32(buf, &sz);
|
||||||
|
pConsumer->recentRemovedTopics = taosArrayInit(sz, sizeof(SMqConsumerObj));
|
||||||
|
for (int32_t i = 0; i < sz; i++) {
|
||||||
|
char* topic;
|
||||||
|
buf = taosDecodeString(buf, &topic);
|
||||||
|
taosArrayPush(pConsumer->recentRemovedTopics, &topic);
|
||||||
}
|
}
|
||||||
return buf;
|
return buf;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,7 +25,8 @@ extern "C" {
|
||||||
int32_t mndInitSubscribe(SMnode *pMnode);
|
int32_t mndInitSubscribe(SMnode *pMnode);
|
||||||
void mndCleanupSubscribe(SMnode *pMnode);
|
void mndCleanupSubscribe(SMnode *pMnode);
|
||||||
|
|
||||||
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *CGroup, char *topicName);
|
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *CGroup, const char *topicName);
|
||||||
|
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char* key);
|
||||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -61,6 +61,7 @@ SMqConsumerObj* mndCreateConsumer(int64_t consumerId, const char* cgroup) {
|
||||||
}
|
}
|
||||||
pConsumer->epoch = 1;
|
pConsumer->epoch = 1;
|
||||||
pConsumer->consumerId = consumerId;
|
pConsumer->consumerId = consumerId;
|
||||||
|
pConsumer->status = MQ_CONSUMER_STATUS__INIT;
|
||||||
strcpy(pConsumer->cgroup, cgroup);
|
strcpy(pConsumer->cgroup, cgroup);
|
||||||
taosInitRWLatch(&pConsumer->lock);
|
taosInitRWLatch(&pConsumer->lock);
|
||||||
return pConsumer;
|
return pConsumer;
|
||||||
|
|
|
@ -33,12 +33,6 @@
|
||||||
|
|
||||||
#define MND_SUBSCRIBE_REBALANCE_CNT 3
|
#define MND_SUBSCRIBE_REBALANCE_CNT 3
|
||||||
|
|
||||||
enum {
|
|
||||||
MQ_CONSUMER_STATUS__INIT = 1,
|
|
||||||
MQ_CONSUMER_STATUS__ACTIVE,
|
|
||||||
MQ_CONSUMER_STATUS__LOST,
|
|
||||||
};
|
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
|
MQ_SUBSCRIBE_STATUS__ACTIVE = 1,
|
||||||
MQ_SUBSCRIBE_STATUS__DELETED,
|
MQ_SUBSCRIBE_STATUS__DELETED,
|
||||||
|
@ -58,10 +52,13 @@ static int32_t mndProcessSubscribeInternalReq(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
|
static int32_t mndProcessSubscribeInternalRsp(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
|
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
|
||||||
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
|
static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg);
|
||||||
|
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg);
|
||||||
|
|
||||||
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
static int mndPersistMqSetConnReq(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic, const char *cgroup,
|
||||||
const SMqConsumerEp *pSub);
|
const SMqConsumerEp *pSub);
|
||||||
|
|
||||||
|
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp);
|
||||||
|
|
||||||
static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
|
static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSubscribeObj *pSub);
|
||||||
|
|
||||||
int32_t mndInitSubscribe(SMnode *pMnode) {
|
int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||||
|
@ -77,6 +74,7 @@ int32_t mndInitSubscribe(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
|
mndSetMsgHandle(pMnode, TDMT_VND_MQ_SET_CONN_RSP, mndProcessSubscribeInternalRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_TIMER, mndProcessMqTimerMsg);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_GET_SUB_EP, mndProcessGetSubEpReq);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_MQ_DO_REBALANCE, mndProcessDoRebalanceMsg);
|
||||||
return sdbSetTable(pMnode->pSdb, table);
|
return sdbSetTable(pMnode->pSdb, table);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -106,18 +104,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
|
||||||
return pSub;
|
return pSub;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqTopicObj *pTopic,
|
static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsumerEp *pConsumerEp) {
|
||||||
const SMqConsumerEp *pConsumerEp, const char *cgroup) {
|
SMqSetCVgReq req = {
|
||||||
SMqSetCVgReq req = {0};
|
.vgId = pConsumerEp->vgId,
|
||||||
strcpy(req.cgroup, cgroup);
|
.oldConsumerId = pConsumerEp->oldConsumerId,
|
||||||
strcpy(req.topicName, pTopic->name);
|
.newConsumerId = pConsumerEp->consumerId,
|
||||||
req.sql = pTopic->sql;
|
};
|
||||||
req.logicalPlan = pTopic->logicalPlan;
|
|
||||||
req.physicalPlan = pTopic->physicalPlan;
|
|
||||||
req.qmsg = pConsumerEp->qmsg;
|
|
||||||
req.oldConsumerId = pConsumerEp->oldConsumerId;
|
|
||||||
req.newConsumerId = pConsumerEp->consumerId;
|
|
||||||
req.vgId = pConsumerEp->vgId;
|
|
||||||
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
int32_t tlen = tEncodeSMqSetCVgReq(NULL, &req);
|
||||||
void *buf = malloc(sizeof(SMsgHead) + tlen);
|
void *buf = malloc(sizeof(SMsgHead) + tlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
|
@ -128,22 +121,23 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqTopicOb
|
||||||
|
|
||||||
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
|
pMsgHead->contLen = htonl(sizeof(SMsgHead) + tlen);
|
||||||
pMsgHead->vgId = htonl(pConsumerEp->vgId);
|
pMsgHead->vgId = htonl(pConsumerEp->vgId);
|
||||||
|
|
||||||
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
void *abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
|
||||||
tEncodeSMqSetCVgReq(&abuf, &req);
|
tEncodeSMqSetCVgReq(&abuf, &req);
|
||||||
|
|
||||||
*pBuf = buf;
|
*pBuf = buf;
|
||||||
*pLen = tlen;
|
*pLen = tlen;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqTopicObj *pTopic,
|
static int32_t mndPersistRebalanceMsg(SMnode *pMnode, STrans *pTrans, const SMqConsumerEp *pConsumerEp) {
|
||||||
const SMqConsumerEp *pConsumerEp, const char *cgroup) {
|
|
||||||
int32_t vgId = pConsumerEp->vgId;
|
int32_t vgId = pConsumerEp->vgId;
|
||||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
SVgObj *pVgObj = mndAcquireVgroup(pMnode, vgId);
|
||||||
|
|
||||||
void *buf;
|
void *buf;
|
||||||
int32_t tlen;
|
int32_t tlen;
|
||||||
if (mndBuildRebalanceMsg(&buf, &tlen, pTopic, pConsumerEp, cgroup) < 0) {
|
if (mndBuildRebalanceMsg(&buf, &tlen, pConsumerEp) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,7 +220,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
|
||||||
rsp.consumerId = consumerId;
|
rsp.consumerId = consumerId;
|
||||||
rsp.epoch = pConsumer->epoch;
|
rsp.epoch = pConsumer->epoch;
|
||||||
if (pReq->epoch != rsp.epoch) {
|
if (pReq->epoch != rsp.epoch) {
|
||||||
SArray *pTopics = pConsumer->topics;
|
SArray *pTopics = pConsumer->currentTopics;
|
||||||
int sz = taosArrayGetSize(pTopics);
|
int sz = taosArrayGetSize(pTopics);
|
||||||
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
|
rsp.topics = taosArrayInit(sz, sizeof(SMqSubTopicEp));
|
||||||
for (int i = 0; i < sz; i++) {
|
for (int i = 0; i < sz; i++) {
|
||||||
|
@ -234,12 +228,13 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
|
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topicName);
|
||||||
ASSERT(pSub);
|
ASSERT(pSub);
|
||||||
int csz = taosArrayGetSize(pSub->consumers);
|
int csz = taosArrayGetSize(pSub->consumers);
|
||||||
//TODO: change to bsearch
|
// TODO: change to bsearch
|
||||||
for (int j = 0; j < csz; j++) {
|
for (int j = 0; j < csz; j++) {
|
||||||
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
|
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
|
||||||
if (consumerId == pSubConsumer->consumerId) {
|
if (consumerId == pSubConsumer->consumerId) {
|
||||||
int vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
|
int vgsz = taosArrayGetSize(pSubConsumer->vgInfo);
|
||||||
SMqSubTopicEp topicEp;
|
SMqSubTopicEp topicEp;
|
||||||
|
strcpy(topicEp.topic, topicName);
|
||||||
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
|
topicEp.vgs = taosArrayInit(vgsz, sizeof(SMqSubVgEp));
|
||||||
for (int k = 0; k < vgsz; k++) {
|
for (int k = 0; k < vgsz; k++) {
|
||||||
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
|
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, k);
|
||||||
|
@ -280,11 +275,27 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static SMqRebSubscribe *mndGetOrCreateRebSub(SHashObj *pHash, const char *key) {
|
||||||
|
SMqRebSubscribe *pRebSub = taosHashGet(pHash, key, strlen(key));
|
||||||
|
if (pRebSub == NULL) {
|
||||||
|
pRebSub = tNewSMqRebSubscribe(key);
|
||||||
|
if (pRebSub == NULL) {
|
||||||
|
// TODO
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
taosHashPut(pHash, key, strlen(key), pRebSub, sizeof(SMqRebSubscribe));
|
||||||
|
}
|
||||||
|
return pRebSub;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
SMqConsumerObj *pConsumer;
|
SMqConsumerObj *pConsumer;
|
||||||
void *pIter = NULL;
|
void *pIter = NULL;
|
||||||
|
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
|
||||||
|
pRebMsg->rebSubHash = taosHashInit(64, MurmurHash3_32, true, HASH_NO_LOCK);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
pIter = sdbFetch(pSdb, SDB_CONSUMER, pIter, (void **)&pConsumer);
|
||||||
if (pIter == NULL) break;
|
if (pIter == NULL) break;
|
||||||
|
@ -293,12 +304,48 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
int32_t old =
|
int32_t old =
|
||||||
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
|
atomic_val_compare_exchange_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE, MQ_CONSUMER_STATUS__LOST);
|
||||||
if (old == MQ_CONSUMER_STATUS__ACTIVE) {
|
if (old == MQ_CONSUMER_STATUS__ACTIVE) {
|
||||||
SMqDoRebalanceMsg *pRebMsg = rpcMallocCont(sizeof(SMqDoRebalanceMsg));
|
// get all topics of that topic
|
||||||
pRebMsg->consumerId = pConsumer->consumerId;
|
int sz = taosArrayGetSize(pConsumer->currentTopics);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)};
|
for (int i = 0; i < sz; i++) {
|
||||||
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
|
char *topic = taosArrayGetP(pConsumer->currentTopics, i);
|
||||||
|
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
|
||||||
|
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||||
|
taosArrayPush(pRebSub->lostConsumers, &pConsumer->consumerId);
|
||||||
|
}
|
||||||
|
/*pRebMsg->consumerId = pConsumer->consumerId;*/
|
||||||
|
/*SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen =
|
||||||
|
* sizeof(SMqDoRebalanceMsg)};*/
|
||||||
|
/*pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);*/
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
int32_t status = atomic_load_32(&pConsumer->status);
|
||||||
|
if (status == MQ_CONSUMER_STATUS__INIT || status == MQ_CONSUMER_STATUS__MODIFY) {
|
||||||
|
SArray *rebSubs;
|
||||||
|
if (status == MQ_CONSUMER_STATUS__INIT) {
|
||||||
|
rebSubs = pConsumer->currentTopics;
|
||||||
|
} else {
|
||||||
|
rebSubs = pConsumer->recentRemovedTopics;
|
||||||
|
}
|
||||||
|
int sz = taosArrayGetSize(rebSubs);
|
||||||
|
for (int i = 0; i < sz; i++) {
|
||||||
|
char *topic = taosArrayGetP(rebSubs, i);
|
||||||
|
char *key = mndMakeSubscribeKey(pConsumer->cgroup, topic);
|
||||||
|
SMqRebSubscribe *pRebSub = mndGetOrCreateRebSub(pRebMsg->rebSubHash, key);
|
||||||
|
if (status == MQ_CONSUMER_STATUS__INIT) {
|
||||||
|
taosArrayPush(pRebSub->newConsumers, &pConsumer->consumerId);
|
||||||
|
} else if (status == MQ_CONSUMER_STATUS__MODIFY) {
|
||||||
|
taosArrayPush(pRebSub->removedConsumers, &pConsumer->consumerId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (taosHashGetSize(pRebMsg->rebSubHash) != 0) {
|
||||||
|
mInfo("mq rebalance will be triggered");
|
||||||
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_DO_REBALANCE, .pCont = pRebMsg, .contLen = sizeof(SMqDoRebalanceMsg)};
|
||||||
|
pMnode->putReqToMWriteQFp(pMnode->pDnode, &rpcMsg);
|
||||||
|
} else {
|
||||||
|
taosHashCleanup(pRebMsg->rebSubHash);
|
||||||
|
rpcFreeCont(pRebMsg);
|
||||||
}
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -306,77 +353,111 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
|
||||||
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->pMnode;
|
SMnode *pMnode = pMsg->pMnode;
|
||||||
SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont;
|
SMqDoRebalanceMsg *pReq = (SMqDoRebalanceMsg *)pMsg->rpcMsg.pCont;
|
||||||
SMqConsumerObj *pConsumer = mndAcquireConsumer(pMnode, pReq->consumerId);
|
|
||||||
int topicSz = taosArrayGetSize(pConsumer->topics);
|
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, &pMsg->rpcMsg);
|
||||||
for (int i = 0; i < topicSz; i++) {
|
void *pIter = NULL;
|
||||||
char *topic = taosArrayGetP(pConsumer->topics, i);
|
|
||||||
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic);
|
mInfo("mq rebalance start");
|
||||||
int32_t consumerNum = taosArrayGetSize(pSub->consumers);
|
|
||||||
|
while (1) {
|
||||||
|
pIter = taosHashIterate(pReq->rebSubHash, pIter);
|
||||||
|
if (pIter == NULL) break;
|
||||||
|
SMqRebSubscribe *pRebSub = (SMqRebSubscribe *)pIter;
|
||||||
|
SMqSubscribeObj *pSub = mndAcquireSubscribeByKey(pMnode, pRebSub->key);
|
||||||
|
|
||||||
|
mInfo("mq rebalance subscription: %s", pSub->key);
|
||||||
|
|
||||||
|
// remove lost consumer
|
||||||
|
for (int i = 0; i < taosArrayGetSize(pRebSub->lostConsumers); i++) {
|
||||||
|
int64_t lostConsumerId = *(int64_t *)taosArrayGet(pRebSub->lostConsumers, i);
|
||||||
|
|
||||||
|
mInfo("mq remove lost consumer %ld", lostConsumerId);
|
||||||
|
|
||||||
|
for (int j = 0; j < taosArrayGetSize(pSub->consumers); j++) {
|
||||||
|
SMqConsumerEp *pConsumerEp = taosArrayGet(pSub->consumers, j);
|
||||||
|
if (pConsumerEp->consumerId == lostConsumerId) {
|
||||||
|
taosArrayPush(pSub->unassignedVg, pConsumerEp);
|
||||||
|
taosArrayRemove(pSub->consumers, j);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate rebalance
|
||||||
|
int32_t consumerNum = taosArrayGetSize(pSub->consumers);
|
||||||
if (consumerNum != 0) {
|
if (consumerNum != 0) {
|
||||||
int32_t vgNum = pSub->vgNum;
|
int32_t vgNum = pSub->vgNum;
|
||||||
int32_t vgEachConsumer = vgNum / consumerNum;
|
int32_t vgEachConsumer = vgNum / consumerNum;
|
||||||
int32_t left = vgNum % consumerNum;
|
int32_t imbalanceVg = vgNum % consumerNum;
|
||||||
int32_t leftUsed = 0;
|
int32_t imbalanceSolved = 0;
|
||||||
SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));
|
SArray *unassignedVgStash = taosArrayInit(0, sizeof(SMqConsumerEp));
|
||||||
SArray *unassignedConsumer = taosArrayInit(0, sizeof(int32_t));
|
SArray *unassignedConsumerIdx = taosArrayInit(0, sizeof(int32_t));
|
||||||
for (int32_t j = 0; j < consumerNum; j++) {
|
|
||||||
bool changed = false;
|
// iterate all consumers, set unassignedVgStash
|
||||||
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
|
for (int i = 0; i < consumerNum; i++) {
|
||||||
int32_t vgOneConsumer = taosArrayGetSize(pSubConsumer->vgInfo);
|
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
|
||||||
bool canUseLeft = leftUsed < left;
|
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
|
||||||
if (vgOneConsumer > vgEachConsumer + canUseLeft) {
|
int vgThisConsumerAfterRb;
|
||||||
changed = true;
|
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1;
|
||||||
if (canUseLeft) leftUsed++;
|
else vgThisConsumerAfterRb = vgEachConsumer;
|
||||||
// put into unassigned
|
|
||||||
while (taosArrayGetSize(pSubConsumer->vgInfo) > vgEachConsumer + canUseLeft) {
|
mInfo("mq consumer:%ld ,connectted vgroup change from %d %d", pSubConsumer->consumerId, vgThisConsumerBeforeRb, vgThisConsumerAfterRb);
|
||||||
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
|
|
||||||
ASSERT(pConsumerEp != NULL);
|
while(taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
|
||||||
taosArrayPush(unassignedVgStash, pConsumerEp);
|
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
|
||||||
// build msg and persist into trans
|
ASSERT(pConsumerEp != NULL);
|
||||||
}
|
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
|
||||||
} else if (vgOneConsumer < vgEachConsumer) {
|
taosArrayPush(unassignedVgStash, pConsumerEp);
|
||||||
changed = true;
|
|
||||||
// assign from unassigned
|
|
||||||
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) {
|
|
||||||
// if no unassgined, save j
|
|
||||||
if (taosArrayGetSize(unassignedVgStash) == 0) {
|
|
||||||
taosArrayPush(unassignedConsumer, &j);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
|
|
||||||
ASSERT(pConsumerEp != NULL);
|
|
||||||
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
|
||||||
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
|
||||||
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
|
||||||
// build msg and persist into trans
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (changed) {
|
|
||||||
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
|
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
|
||||||
pRebConsumer->epoch++;
|
int32_t status = atomic_load_32(&pRebConsumer->status);
|
||||||
SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
|
if (vgThisConsumerAfterRb != vgThisConsumerBeforeRb ||
|
||||||
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
(vgThisConsumerAfterRb != 0 && status != MQ_CONSUMER_STATUS__ACTIVE) ||
|
||||||
mndTransAppendRedolog(pTrans, pConsumerRaw);
|
(vgThisConsumerAfterRb == 0 && status != MQ_CONSUMER_STATUS__LOST)
|
||||||
}
|
) {
|
||||||
|
pRebConsumer->epoch++;
|
||||||
|
if (vgThisConsumerAfterRb != 0) {
|
||||||
|
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
|
||||||
|
} else {
|
||||||
|
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
mInfo("mq consumer:%ld , status change from %d %d", pRebConsumer->consumerId, status, pRebConsumer->status);
|
||||||
|
|
||||||
|
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
|
||||||
|
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
||||||
|
mndTransAppendRedolog(pTrans, pConsumerRaw);
|
||||||
|
}
|
||||||
|
mndReleaseConsumer(pMnode, pRebConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumer); j++) {
|
//assign to vgroup
|
||||||
int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumer, j);
|
if (taosArrayGetSize(unassignedVgStash) != 0) {
|
||||||
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
|
for (int i = 0; i < consumerNum; i++) {
|
||||||
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer) {
|
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, i);
|
||||||
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
|
int vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
|
||||||
ASSERT(pConsumerEp != NULL);
|
int vgThisConsumerAfterRb;
|
||||||
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
if (i < imbalanceVg) vgThisConsumerAfterRb = vgEachConsumer + 1;
|
||||||
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
else vgThisConsumerAfterRb = vgEachConsumer;
|
||||||
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
|
||||||
// build msg and persist into trans
|
while(taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerBeforeRb) {
|
||||||
|
SMqConsumerEp* pConsumerEp = taosArrayPop(unassignedVgStash);
|
||||||
|
ASSERT(pConsumerEp != NULL);
|
||||||
|
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
|
||||||
|
|
||||||
|
|
||||||
|
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
||||||
|
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
||||||
|
|
||||||
|
mInfo("mq consumer:%ld , assign vgroup %d, previously assigned to consumer %ld", pSubConsumer->consumerId, pConsumerEp->vgId, pConsumerEp->oldConsumerId);
|
||||||
|
|
||||||
|
mndPersistRebalanceMsg(pMnode, pTrans, pConsumerEp);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ASSERT(taosArrayGetSize(unassignedVgStash) == 0);
|
ASSERT(taosArrayGetSize(unassignedVgStash) == 0);
|
||||||
|
|
||||||
// send msg to vnode
|
// TODO: log rebalance statistics
|
||||||
// log rebalance statistics
|
|
||||||
SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
|
SSdbRaw *pSubRaw = mndSubActionEncode(pSub);
|
||||||
sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
|
sdbSetRawStatus(pSubRaw, SDB_STATUS_READY);
|
||||||
mndTransAppendRedolog(pTrans, pSubRaw);
|
mndTransAppendRedolog(pTrans, pSubRaw);
|
||||||
|
@ -386,15 +467,111 @@ static int32_t mndProcessDoRebalanceMsg(SMnodeMsg *pMsg) {
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||||
mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
mError("mq-rebalance-trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
mndReleaseConsumer(pMnode, pConsumer);
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
for (int32_t j = 0; j < consumerNum; j++) {
|
||||||
|
bool changed = false;
|
||||||
|
bool unfished = false;
|
||||||
|
|
||||||
|
bool canUseLeft = imbalanceSolved < imbalanceVg;
|
||||||
|
bool mustUseLeft = canUseLeft && (imbalanceVg - imbalanceSolved >= consumerNum - j);
|
||||||
|
ASSERT(imbalanceVg - imbalanceSolved <= consumerNum - j);
|
||||||
|
|
||||||
|
int32_t maxVg = vgEachConsumer + canUseLeft;
|
||||||
|
int32_t minVg = vgEachConsumer + mustUseLeft;
|
||||||
|
|
||||||
|
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, j);
|
||||||
|
int32_t vgThisConsumerBeforeRb = taosArrayGetSize(pSubConsumer->vgInfo);
|
||||||
|
int32_t vgThisConsumerAfterRb;
|
||||||
|
if (vgThisConsumerBeforeRb > maxVg) {
|
||||||
|
vgThisConsumerAfterRb = maxVg;
|
||||||
|
imbalanceSolved++;
|
||||||
|
changed = true;
|
||||||
|
} else if (vgThisConsumerBeforeRb < minVg) {
|
||||||
|
vgThisConsumerAfterRb = minVg;
|
||||||
|
if (mustUseLeft) imbalanceSolved++;
|
||||||
|
changed = true;
|
||||||
|
} else {
|
||||||
|
vgThisConsumerAfterRb = vgThisConsumerBeforeRb;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (vgThisConsumerBeforeRb > vgThisConsumerAfterRb) {
|
||||||
|
while (taosArrayGetSize(pSubConsumer->vgInfo) > vgThisConsumerAfterRb) {
|
||||||
|
// put into unassigned
|
||||||
|
SMqConsumerEp *pConsumerEp = taosArrayPop(pSubConsumer->vgInfo);
|
||||||
|
ASSERT(pConsumerEp != NULL);
|
||||||
|
ASSERT(pConsumerEp->consumerId == pSubConsumer->consumerId);
|
||||||
|
taosArrayPush(unassignedVgStash, pConsumerEp);
|
||||||
|
}
|
||||||
|
|
||||||
|
} else if (vgThisConsumerBeforeRb < vgThisConsumerAfterRb) {
|
||||||
|
// assign from unassigned
|
||||||
|
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgThisConsumerAfterRb) {
|
||||||
|
// if no unassgined, save j
|
||||||
|
if (taosArrayGetSize(unassignedVgStash) == 0) {
|
||||||
|
taosArrayPush(unassignedConsumerIdx, &j);
|
||||||
|
unfished = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
// assign vg to consumer
|
||||||
|
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
|
||||||
|
ASSERT(pConsumerEp != NULL);
|
||||||
|
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
||||||
|
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
||||||
|
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
||||||
|
// build msg and persist into trans
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (changed && !unfished) {
|
||||||
|
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
|
||||||
|
pRebConsumer->epoch++;
|
||||||
|
if (vgThisConsumerAfterRb != 0) {
|
||||||
|
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
|
||||||
|
} else {
|
||||||
|
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__IDLE);
|
||||||
|
}
|
||||||
|
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
|
||||||
|
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
||||||
|
mndTransAppendRedolog(pTrans, pConsumerRaw);
|
||||||
|
mndReleaseConsumer(pMnode, pRebConsumer);
|
||||||
|
// TODO: save history
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t j = 0; j < taosArrayGetSize(unassignedConsumerIdx); j++) {
|
||||||
|
bool canUseLeft = imbalanceSolved < imbalanceVg;
|
||||||
|
int32_t consumerIdx = *(int32_t *)taosArrayGet(unassignedConsumerIdx, j);
|
||||||
|
SMqSubConsumer *pSubConsumer = taosArrayGet(pSub->consumers, consumerIdx);
|
||||||
|
if (canUseLeft) imbalanceSolved++;
|
||||||
|
// must use
|
||||||
|
int32_t vgThisConsumerAfterRb = taosArrayGetSize(pSubConsumer->vgInfo) + canUseLeft;
|
||||||
|
while (taosArrayGetSize(pSubConsumer->vgInfo) < vgEachConsumer + canUseLeft) {
|
||||||
|
// assign vg to consumer
|
||||||
|
SMqConsumerEp *pConsumerEp = taosArrayPop(unassignedVgStash);
|
||||||
|
ASSERT(pConsumerEp != NULL);
|
||||||
|
pConsumerEp->oldConsumerId = pConsumerEp->consumerId;
|
||||||
|
pConsumerEp->consumerId = pSubConsumer->consumerId;
|
||||||
|
taosArrayPush(pSubConsumer->vgInfo, pConsumerEp);
|
||||||
|
// build msg and persist into trans
|
||||||
|
}
|
||||||
|
SMqConsumerObj *pRebConsumer = mndAcquireConsumer(pMnode, pSubConsumer->consumerId);
|
||||||
|
pRebConsumer->epoch++;
|
||||||
|
atomic_store_32(&pRebConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);
|
||||||
|
SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pRebConsumer);
|
||||||
|
sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);
|
||||||
|
mndTransAppendRedolog(pTrans, pConsumerRaw);
|
||||||
|
mndReleaseConsumer(pMnode, pRebConsumer);
|
||||||
|
// TODO: save history
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
//update consumer status for the subscribption
|
//update consumer status for the subscribption
|
||||||
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
|
for (int i = 0; i < taosArrayGetSize(pSub->assigned); i++) {
|
||||||
|
@ -518,11 +695,11 @@ static int mndInitUnassignedVg(SMnode *pMnode, const SMqTopicObj *pTopic, SMqSub
|
||||||
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
|
mError("unsupport topic: %s, sql: %s", pTopic->name, pTopic->sql);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
if (pArray && taosArrayGetSize(pArray) != 1) {
|
/*if (pArray && taosArrayGetSize(pArray) != 1) {*/
|
||||||
terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;
|
/*terrno = TSDB_CODE_MND_UNSUPPORTED_TOPIC;*/
|
||||||
mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));
|
/*mError("unsupport topic: %s, sql: %s, plan level: %ld", pTopic->name, pTopic->sql, taosArrayGetSize(pArray));*/
|
||||||
return -1;
|
/*return -1;*/
|
||||||
}
|
/*}*/
|
||||||
|
|
||||||
SMqConsumerEp consumerEp = {0};
|
SMqConsumerEp consumerEp = {0};
|
||||||
consumerEp.status = 0;
|
consumerEp.status = 0;
|
||||||
|
@ -697,7 +874,7 @@ static char *mndMakeSubscribeKey(const char *cgroup, const char *topicName) {
|
||||||
return key;
|
return key;
|
||||||
}
|
}
|
||||||
|
|
||||||
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicName) {
|
SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, const char *cgroup, const char *topicName) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
char *key = mndMakeSubscribeKey(cgroup, topicName);
|
char *key = mndMakeSubscribeKey(cgroup, topicName);
|
||||||
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
||||||
|
@ -708,6 +885,15 @@ SMqSubscribeObj *mndAcquireSubscribe(SMnode *pMnode, char *cgroup, char *topicNa
|
||||||
return pSub;
|
return pSub;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SMqSubscribeObj *mndAcquireSubscribeByKey(SMnode *pMnode, const char *key) {
|
||||||
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
|
SMqSubscribeObj *pSub = sdbAcquire(pSdb, SDB_SUBSCRIBE, key);
|
||||||
|
if (pSub == NULL) {
|
||||||
|
/*terrno = TSDB_CODE_MND_CONSUMER_NOT_EXIST;*/
|
||||||
|
}
|
||||||
|
return pSub;
|
||||||
|
}
|
||||||
|
|
||||||
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
|
void mndReleaseSubscribe(SMnode *pMnode, SMqSubscribeObj *pSub) {
|
||||||
SSdb *pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
sdbRelease(pSdb, pSub);
|
sdbRelease(pSdb, pSub);
|
||||||
|
@ -737,9 +923,9 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
createConsumer = true;
|
createConsumer = true;
|
||||||
} else {
|
} else {
|
||||||
pConsumer->epoch++;
|
pConsumer->epoch++;
|
||||||
oldSub = pConsumer->topics;
|
oldSub = pConsumer->currentTopics;
|
||||||
}
|
}
|
||||||
pConsumer->topics = newSub;
|
pConsumer->currentTopics = newSub;
|
||||||
|
|
||||||
if (oldSub != NULL) {
|
if (oldSub != NULL) {
|
||||||
oldTopicNum = taosArrayGetSize(oldSub);
|
oldTopicNum = taosArrayGetSize(oldSub);
|
||||||
|
@ -796,11 +982,14 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
for (int vgi = 0; vgi < vgsz; vgi++) {
|
for (int vgi = 0; vgi < vgsz; vgi++) {
|
||||||
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
|
SMqConsumerEp *pConsumerEp = taosArrayGet(pSubConsumer->vgInfo, vgi);
|
||||||
mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
|
mndPersistCancelConnReq(pMnode, pTrans, pConsumerEp);
|
||||||
|
taosArrayPush(pSub->unassignedVg, pConsumerEp);
|
||||||
}
|
}
|
||||||
|
taosArrayRemove(pSub->consumers, ci);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;
|
atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__MODIFY);
|
||||||
|
/*pSub->status = MQ_SUBSCRIBE_STATUS__DELETED;*/
|
||||||
} else if (newTopicName != NULL) {
|
} else if (newTopicName != NULL) {
|
||||||
ASSERT(oldTopicName == NULL);
|
ASSERT(oldTopicName == NULL);
|
||||||
|
|
||||||
|
@ -830,6 +1019,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
|
||||||
pConsumerEp->consumerId = consumerId;
|
pConsumerEp->consumerId = consumerId;
|
||||||
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
|
taosArrayPush(mqSubConsumer.vgInfo, pConsumerEp);
|
||||||
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
mndPersistMqSetConnReq(pMnode, pTrans, pTopic, cgroup, pConsumerEp);
|
||||||
|
atomic_store_32(&pConsumer->hbStatus, MQ_CONSUMER_STATUS__ACTIVE);
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
SSdbRaw *pRaw = mndSubActionEncode(pSub);
|
||||||
|
|
|
@ -107,7 +107,7 @@ static SHashObj *sdbGetHash(SSdb *pSdb, int32_t type) {
|
||||||
return hash;
|
return hash;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, void *pKey) {
|
static int32_t sdbGetkeySize(SSdb *pSdb, ESdbType type, const void *pKey) {
|
||||||
int32_t keySize;
|
int32_t keySize;
|
||||||
EKeyType keyType = pSdb->keyTypes[type];
|
EKeyType keyType = pSdb->keyTypes[type];
|
||||||
|
|
||||||
|
@ -263,7 +263,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *sdbAcquire(SSdb *pSdb, ESdbType type, void *pKey) {
|
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
|
||||||
terrno = 0;
|
terrno = 0;
|
||||||
|
|
||||||
SHashObj *hash = sdbGetHash(pSdb, type);
|
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||||
|
|
|
@ -77,6 +77,7 @@ void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t
|
||||||
pColList->boundedColumns[i] = pSchema[i].colId;
|
pColList->boundedColumns[i] = pSchema[i].colId;
|
||||||
}
|
}
|
||||||
pColList->allNullLen += pColList->flen;
|
pColList->allNullLen += pColList->flen;
|
||||||
|
pColList->boundNullLen = pColList->allNullLen; // default set allNullLen
|
||||||
pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
|
pColList->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -292,6 +292,7 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
|
||||||
int32_t nCols = pColList->numOfCols;
|
int32_t nCols = pColList->numOfCols;
|
||||||
|
|
||||||
pColList->numOfBound = 0;
|
pColList->numOfBound = 0;
|
||||||
|
pColList->boundNullLen = 0;
|
||||||
memset(pColList->boundedColumns, 0, sizeof(int32_t) * nCols);
|
memset(pColList->boundedColumns, 0, sizeof(int32_t) * nCols);
|
||||||
for (int32_t i = 0; i < nCols; ++i) {
|
for (int32_t i = 0; i < nCols; ++i) {
|
||||||
pColList->cols[i].valStat = VAL_STAT_NONE;
|
pColList->cols[i].valStat = VAL_STAT_NONE;
|
||||||
|
@ -323,6 +324,17 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo*
|
||||||
pColList->cols[index].valStat = VAL_STAT_HAS;
|
pColList->cols[index].valStat = VAL_STAT_HAS;
|
||||||
pColList->boundedColumns[pColList->numOfBound] = index;
|
pColList->boundedColumns[pColList->numOfBound] = index;
|
||||||
++pColList->numOfBound;
|
++pColList->numOfBound;
|
||||||
|
switch (pSchema[t].type) {
|
||||||
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
|
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||||
|
break;
|
||||||
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
|
pColList->boundNullLen += (sizeof(VarDataOffsetT) + VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
pColList->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
|
pColList->orderStatus = isOrdered ? ORDER_STATUS_ORDERED : ORDER_STATUS_DISORDERED;
|
||||||
|
@ -450,7 +462,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
*len = pBuilder->extendedRowSize;
|
// *len = pBuilder->extendedRowSize;
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,7 +492,7 @@ static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlo
|
||||||
|
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &len, tmpTokenBuf));
|
CHECK_CODE(parseOneRow(pCxt, pDataBlock, tinfo.precision, &len, tmpTokenBuf));
|
||||||
pDataBlock->size += len;
|
pDataBlock->size += extendedRowSize; //len;
|
||||||
|
|
||||||
NEXT_TOKEN(pCxt->pSql, sToken);
|
NEXT_TOKEN(pCxt->pSql, sToken);
|
||||||
if (TK_RP != sToken.type) {
|
if (TK_RP != sToken.type) {
|
||||||
|
|
Loading…
Reference in New Issue