Merge pull request #11970 from taosdata/feature/tq
refacor(tmq): extract unassigned vg out of hash
This commit is contained in:
commit
1c8910de73
|
@ -188,6 +188,7 @@ typedef struct SRequestSendRecvBody {
|
|||
|
||||
typedef struct {
|
||||
int8_t resType;
|
||||
int32_t code;
|
||||
char topic[TSDB_TOPIC_FNAME_LEN];
|
||||
int32_t vgId;
|
||||
SSchemaWrapper schema;
|
||||
|
@ -308,9 +309,8 @@ int hbAddConnInfo(SAppHbMgr* pAppHbMgr, SClientHbKey connKey, void* key, void* v
|
|||
void hbMgrInitMqHbRspHandle();
|
||||
|
||||
SRequestObj* launchQueryImpl(SRequestObj* pRequest, SQuery* pQuery, int32_t code, bool keepQuery);
|
||||
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
|
||||
|
||||
int32_t getQueryPlan(SRequestObj* pRequest, SQuery* pQuery, SArray** pNodeList);
|
||||
int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -110,16 +110,23 @@ int taos_errno(TAOS_RES *tres) {
|
|||
return terrno;
|
||||
}
|
||||
|
||||
if (TD_RES_TMQ(tres)) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
return ((SRequestObj *)tres)->code;
|
||||
}
|
||||
|
||||
const char *taos_errstr(TAOS_RES *res) {
|
||||
SRequestObj *pRequest = (SRequestObj *)res;
|
||||
|
||||
if (pRequest == NULL) {
|
||||
if (res == NULL) {
|
||||
return (const char *)tstrerror(terrno);
|
||||
}
|
||||
|
||||
if (TD_RES_TMQ(res)) {
|
||||
return "success";
|
||||
}
|
||||
|
||||
SRequestObj *pRequest = (SRequestObj *)res;
|
||||
if (NULL != pRequest->msgBuf && (strlen(pRequest->msgBuf) > 0 || pRequest->code == TSDB_CODE_RPC_FQDN_ERROR)) {
|
||||
return pRequest->msgBuf;
|
||||
} else {
|
||||
|
@ -131,7 +138,7 @@ void taos_free_result(TAOS_RES *res) {
|
|||
if (NULL == res) {
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
if (TD_RES_QUERY(res)) {
|
||||
SRequestObj *pRequest = (SRequestObj *)res;
|
||||
destroyRequest(pRequest);
|
||||
|
@ -632,9 +639,7 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
|
|||
return stmtSetTbName(stmt, name);
|
||||
}
|
||||
|
||||
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) {
|
||||
return taos_stmt_set_tbname(stmt, name);
|
||||
}
|
||||
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); }
|
||||
|
||||
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
|
||||
if (stmt == NULL || bind == NULL) {
|
||||
|
@ -648,7 +653,7 @@ int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
|
|||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
||||
return stmtBindBatch(stmt, bind, -1);
|
||||
}
|
||||
|
||||
|
@ -696,7 +701,7 @@ int taos_stmt_bind_single_param_batch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, in
|
|||
terrno = TSDB_CODE_INVALID_PARA;
|
||||
return terrno;
|
||||
}
|
||||
|
||||
|
||||
return stmtBindBatch(stmt, bind, colIdx);
|
||||
}
|
||||
|
||||
|
@ -750,9 +755,7 @@ TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
|
|||
return stmtUseResult(stmt);
|
||||
}
|
||||
|
||||
char *taos_stmt_errstr(TAOS_STMT *stmt) {
|
||||
return (char *)stmtErrstr(stmt);
|
||||
}
|
||||
char *taos_stmt_errstr(TAOS_STMT *stmt) { return (char *)stmtErrstr(stmt); }
|
||||
|
||||
int taos_stmt_affected_rows(TAOS_STMT *stmt) {
|
||||
if (stmt == NULL) {
|
||||
|
|
|
@ -514,12 +514,12 @@ void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp);
|
|||
typedef struct {
|
||||
int64_t consumerId; // -1 for unassigned
|
||||
SArray* vgs; // SArray<SMqVgEp*>
|
||||
} SMqConsumerEpInSub;
|
||||
} SMqConsumerEp;
|
||||
|
||||
SMqConsumerEpInSub* tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub* pEpInSub);
|
||||
void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub* pEpInSub);
|
||||
int32_t tEncodeSMqConsumerEpInSub(void** buf, const SMqConsumerEpInSub* pEpInSub);
|
||||
void* tDecodeSMqConsumerEpInSub(const void* buf, SMqConsumerEpInSub* pEpInSub);
|
||||
SMqConsumerEp* tCloneSMqConsumerEp(const SMqConsumerEp* pEp);
|
||||
void tDeleteSMqConsumerEp(SMqConsumerEp* pEp);
|
||||
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
|
||||
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp);
|
||||
|
||||
typedef struct {
|
||||
char key[TSDB_SUBSCRIBE_KEY_LEN];
|
||||
|
@ -529,9 +529,8 @@ typedef struct {
|
|||
int8_t withTbName;
|
||||
int8_t withSchema;
|
||||
int8_t withTag;
|
||||
SHashObj* consumerHash; // consumerId -> SMqConsumerEpInSub
|
||||
// TODO put -1 into unassignVgs
|
||||
// SArray* unassignedVgs;
|
||||
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
|
||||
SArray* unassignedVgs; // SArray<SMqVgEp*>
|
||||
} SMqSubscribeObj;
|
||||
|
||||
SMqSubscribeObj* tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]);
|
||||
|
@ -542,7 +541,7 @@ void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub);
|
|||
|
||||
typedef struct {
|
||||
int32_t epoch;
|
||||
SArray* consumers; // SArray<SMqConsumerEpInSub*>
|
||||
SArray* consumers; // SArray<SMqConsumerEp*>
|
||||
} SMqSubActionLogEntry;
|
||||
|
||||
SMqSubActionLogEntry* tCloneSMqSubActionLogEntry(SMqSubActionLogEntry* pEntry);
|
||||
|
|
|
@ -302,8 +302,8 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
|
|||
mndReleaseTopic(pMnode, pTopic);
|
||||
|
||||
// 2.2 iterate all vg assigned to the consumer of that topic
|
||||
SMqConsumerEpInSub *pEpInSub = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
int32_t vgNum = taosArrayGetSize(pEpInSub->vgs);
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
int32_t vgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
|
||||
topicEp.vgs = taosArrayInit(vgNum, sizeof(SMqSubVgEp));
|
||||
if (topicEp.vgs == NULL) {
|
||||
|
@ -313,7 +313,7 @@ static int32_t mndProcessAskEpReq(SNodeMsg *pMsg) {
|
|||
}
|
||||
|
||||
for (int32_t j = 0; j < vgNum; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, j);
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
char offsetKey[TSDB_PARTITION_KEY_LEN];
|
||||
mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId);
|
||||
// 2.2.1 build vg ep
|
||||
|
|
|
@ -211,42 +211,47 @@ void *tDecodeSMqVgEp(const void *buf, SMqVgEp *pVgEp) {
|
|||
return (void *)buf;
|
||||
}
|
||||
|
||||
SMqConsumerEpInSub *tCloneSMqConsumerEpInSub(const SMqConsumerEpInSub *pEpInSub) {
|
||||
SMqConsumerEpInSub *pEpInSubNew = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));
|
||||
if (pEpInSubNew == NULL) return NULL;
|
||||
pEpInSubNew->consumerId = pEpInSub->consumerId;
|
||||
pEpInSubNew->vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp);
|
||||
return pEpInSubNew;
|
||||
SMqConsumerEp *tCloneSMqConsumerEp(const SMqConsumerEp *pConsumerEpOld) {
|
||||
SMqConsumerEp *pConsumerEpNew = taosMemoryMalloc(sizeof(SMqConsumerEp));
|
||||
if (pConsumerEpNew == NULL) return NULL;
|
||||
pConsumerEpNew->consumerId = pConsumerEpOld->consumerId;
|
||||
pConsumerEpNew->vgs = taosArrayDeepCopy(pConsumerEpOld->vgs, (FCopy)tCloneSMqVgEp);
|
||||
return pConsumerEpNew;
|
||||
}
|
||||
|
||||
void tDeleteSMqConsumerEpInSub(SMqConsumerEpInSub *pEpInSub) {
|
||||
taosArrayDestroyEx(pEpInSub->vgs, (FDelete)tDeleteSMqVgEp);
|
||||
void tDeleteSMqConsumerEp(SMqConsumerEp *pConsumerEp) {
|
||||
//
|
||||
taosArrayDestroyP(pConsumerEp->vgs, (FDelete)tDeleteSMqVgEp);
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqConsumerEpInSub(void **buf, const SMqConsumerEpInSub *pEpInSub) {
|
||||
int32_t tEncodeSMqConsumerEp(void **buf, const SMqConsumerEp *pConsumerEp) {
|
||||
int32_t tlen = 0;
|
||||
tlen += taosEncodeFixedI64(buf, pEpInSub->consumerId);
|
||||
int32_t sz = taosArrayGetSize(pEpInSub->vgs);
|
||||
tlen += taosEncodeFixedI64(buf, pConsumerEp->consumerId);
|
||||
tlen += taosEncodeArray(buf, pConsumerEp->vgs, (FEncode)tEncodeSMqVgEp);
|
||||
#if 0
|
||||
int32_t sz = taosArrayGetSize(pConsumerEp->vgs);
|
||||
tlen += taosEncodeFixedI32(buf, sz);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, i);
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, i);
|
||||
tlen += tEncodeSMqVgEp(buf, pVgEp);
|
||||
}
|
||||
/*tlen += taosEncodeArray(buf, pEpInSub->vgs, (FEncode)tEncodeSMqVgEp);*/
|
||||
#endif
|
||||
return tlen;
|
||||
}
|
||||
|
||||
void *tDecodeSMqConsumerEpInSub(const void *buf, SMqConsumerEpInSub *pEpInSub) {
|
||||
buf = taosDecodeFixedI64(buf, &pEpInSub->consumerId);
|
||||
/*buf = taosDecodeArray(buf, &pEpInSub->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));*/
|
||||
void *tDecodeSMqConsumerEp(const void *buf, SMqConsumerEp *pConsumerEp) {
|
||||
buf = taosDecodeFixedI64(buf, &pConsumerEp->consumerId);
|
||||
buf = taosDecodeArray(buf, &pConsumerEp->vgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqSubVgEp));
|
||||
#if 0
|
||||
int32_t sz;
|
||||
buf = taosDecodeFixedI32(buf, &sz);
|
||||
pEpInSub->vgs = taosArrayInit(sz, sizeof(void *));
|
||||
pConsumerEp->vgs = taosArrayInit(sz, sizeof(void *));
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
SMqVgEp *pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||
buf = tDecodeSMqVgEp(buf, pVgEp);
|
||||
taosArrayPush(pEpInSub->vgs, &pVgEp);
|
||||
taosArrayPush(pConsumerEp->vgs, &pVgEp);
|
||||
}
|
||||
#endif
|
||||
|
||||
return (void *)buf;
|
||||
}
|
||||
|
@ -258,13 +263,11 @@ SMqSubscribeObj *tNewSubscribeObj(const char key[TSDB_SUBSCRIBE_KEY_LEN]) {
|
|||
taosInitRWLatch(&pSubNew->lock);
|
||||
pSubNew->vgNum = 0;
|
||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
// TODO set free fp
|
||||
SMqConsumerEpInSub epInSub = {
|
||||
.consumerId = -1,
|
||||
.vgs = taosArrayInit(0, sizeof(void *)),
|
||||
};
|
||||
int64_t unexistKey = -1;
|
||||
taosHashPut(pSubNew->consumerHash, &unexistKey, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub));
|
||||
// TODO set hash free fp
|
||||
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
|
||||
|
||||
pSubNew->unassignedVgs = taosArrayInit(0, sizeof(void *));
|
||||
|
||||
return pSubNew;
|
||||
}
|
||||
|
||||
|
@ -281,25 +284,27 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
|
|||
|
||||
pSubNew->vgNum = pSub->vgNum;
|
||||
pSubNew->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_NO_LOCK);
|
||||
/*taosHashSetFreeFp(pSubNew->consumerHash, taosArrayDestroy);*/
|
||||
void *pIter = NULL;
|
||||
SMqConsumerEpInSub *pEpInSub = NULL;
|
||||
// TODO set hash free fp
|
||||
/*taosHashSetFreeFp(pSubNew->consumerHash, tDeleteSMqConsumerEp);*/
|
||||
void *pIter = NULL;
|
||||
SMqConsumerEp *pConsumerEp = NULL;
|
||||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
pEpInSub = (SMqConsumerEpInSub *)pIter;
|
||||
SMqConsumerEpInSub newEp = {
|
||||
.consumerId = pEpInSub->consumerId,
|
||||
.vgs = taosArrayDeepCopy(pEpInSub->vgs, (FCopy)tCloneSMqVgEp),
|
||||
pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
SMqConsumerEp newEp = {
|
||||
.consumerId = pConsumerEp->consumerId,
|
||||
.vgs = taosArrayDeepCopy(pConsumerEp->vgs, (FCopy)tCloneSMqVgEp),
|
||||
};
|
||||
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEpInSub));
|
||||
taosHashPut(pSubNew->consumerHash, &newEp.consumerId, sizeof(int64_t), &newEp, sizeof(SMqConsumerEp));
|
||||
}
|
||||
pSubNew->unassignedVgs = taosArrayDeepCopy(pSub->unassignedVgs, (FCopy)tCloneSMqVgEp);
|
||||
return pSubNew;
|
||||
}
|
||||
|
||||
void tDeleteSubscribeObj(SMqSubscribeObj *pSub) {
|
||||
/*taosArrayDestroyEx(pSub->consumerEps, (FDelete)tDeleteSMqConsumerEpInSub);*/
|
||||
taosHashCleanup(pSub->consumerHash);
|
||||
taosArrayDestroyP(pSub->unassignedVgs, (FDelete)tDeleteSMqVgEp);
|
||||
}
|
||||
|
||||
int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
||||
|
@ -319,12 +324,12 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
|
|||
while (1) {
|
||||
pIter = taosHashIterate(pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
|
||||
tlen += tEncodeSMqConsumerEpInSub(buf, pEpInSub);
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
tlen += tEncodeSMqConsumerEp(buf, pConsumerEp);
|
||||
cnt++;
|
||||
}
|
||||
ASSERT(cnt == sz);
|
||||
/*tlen += taosEncodeArray(buf, pSub->consumerEps, (FEncode)tEncodeSMqConsumerEpInSub);*/
|
||||
tlen += taosEncodeArray(buf, pSub->unassignedVgs, (FEncode)tEncodeSMqVgEp);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
|
@ -342,13 +347,12 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
|
|||
|
||||
pSub->consumerHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
|
||||
for (int32_t i = 0; i < sz; i++) {
|
||||
/*SMqConsumerEpInSub* pEpInSub = taosMemoryMalloc(sizeof(SMqConsumerEpInSub));*/
|
||||
SMqConsumerEpInSub epInSub = {0};
|
||||
buf = tDecodeSMqConsumerEpInSub(buf, &epInSub);
|
||||
taosHashPut(pSub->consumerHash, &epInSub.consumerId, sizeof(int64_t), &epInSub, sizeof(SMqConsumerEpInSub));
|
||||
SMqConsumerEp consumerEp = {0};
|
||||
buf = tDecodeSMqConsumerEp(buf, &consumerEp);
|
||||
taosHashPut(pSub->consumerHash, &consumerEp.consumerId, sizeof(int64_t), &consumerEp, sizeof(SMqConsumerEp));
|
||||
}
|
||||
|
||||
/*buf = taosDecodeArray(buf, &pSub->consumerEps, (FDecode)tDecodeSMqConsumerEpInSub, sizeof(SMqConsumerEpInSub));*/
|
||||
buf = taosDecodeArray(buf, &pSub->unassignedVgs, (FDecode)tDecodeSMqVgEp, sizeof(SMqVgEp));
|
||||
return (void *)buf;
|
||||
}
|
||||
|
||||
|
@ -356,12 +360,12 @@ SMqSubActionLogEntry *tCloneSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
|
|||
SMqSubActionLogEntry *pEntryNew = taosMemoryMalloc(sizeof(SMqSubActionLogEntry));
|
||||
if (pEntryNew == NULL) return NULL;
|
||||
pEntryNew->epoch = pEntry->epoch;
|
||||
pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEpInSub);
|
||||
pEntryNew->consumers = taosArrayDeepCopy(pEntry->consumers, (FCopy)tCloneSMqConsumerEp);
|
||||
return pEntryNew;
|
||||
}
|
||||
|
||||
void tDeleteSMqSubActionLogEntry(SMqSubActionLogEntry *pEntry) {
|
||||
taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEpInSub);
|
||||
taosArrayDestroyEx(pEntry->consumers, (FDelete)tDeleteSMqConsumerEp);
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEntry) {
|
||||
|
@ -381,12 +385,12 @@ SMqSubActionLogObj *tCloneSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
|
|||
SMqSubActionLogObj *pLogNew = taosMemoryMalloc(sizeof(SMqSubActionLogObj));
|
||||
if (pLogNew == NULL) return pLogNew;
|
||||
memcpy(pLogNew->key, pLog->key, TSDB_SUBSCRIBE_KEY_LEN);
|
||||
pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEpInSub);
|
||||
pLogNew->logs = taosArrayDeepCopy(pLog->logs, (FCopy)tCloneSMqConsumerEp);
|
||||
return pLogNew;
|
||||
}
|
||||
|
||||
void tDeleteSMqSubActionLogObj(SMqSubActionLogObj *pLog) {
|
||||
taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEpInSub);
|
||||
taosArrayDestroyEx(pLog->logs, (FDelete)tDeleteSMqConsumerEp);
|
||||
}
|
||||
|
||||
int32_t tEncodeSMqSubActionLogObj(void **buf, const SMqSubActionLogObj *pLog) {
|
||||
|
|
|
@ -504,11 +504,8 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
plan = nodesListGetNode(inner->pNodeList, 0);
|
||||
}
|
||||
|
||||
int64_t unexistKey = -1;
|
||||
SMqConsumerEpInSub* pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
||||
ASSERT(pEpInSub);
|
||||
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
||||
ASSERT(pSub->unassignedVgs);
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
||||
void* pIter = NULL;
|
||||
while (1) {
|
||||
|
@ -524,7 +521,7 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
SMqVgEp* pVgEp = taosMemoryMalloc(sizeof(SMqVgEp));
|
||||
pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
pVgEp->vgId = pVgroup->vgId;
|
||||
taosArrayPush(pEpInSub->vgs, &pVgEp);
|
||||
taosArrayPush(pSub->unassignedVgs, &pVgEp);
|
||||
|
||||
mDebug("init subscription %s, assign vg: %d", pSub->key, pVgEp->vgId);
|
||||
|
||||
|
@ -543,17 +540,11 @@ int32_t mndSchedInitSubEp(SMnode* pMnode, const SMqTopicObj* pTopic, SMqSubscrib
|
|||
} else {
|
||||
pVgEp->qmsg = strdup("");
|
||||
}
|
||||
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
||||
|
||||
/*taosArrayPush(pSub->unassignedVg, &consumerEp);*/
|
||||
}
|
||||
|
||||
pEpInSub = taosHashGet(pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
||||
ASSERT(pSub->unassignedVgs->size > 0);
|
||||
|
||||
ASSERT(pEpInSub->vgs->size > 0);
|
||||
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
||||
qDestroyQueryPlan(pPlan);
|
||||
|
||||
|
|
|
@ -85,7 +85,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
|
|||
pSub->withSchema = pTopic->withSchema;
|
||||
pSub->withTag = pTopic->withTag;
|
||||
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
||||
ASSERT(pSub->unassignedVgs->size == 0);
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
||||
if (mndSchedInitSubEp(pMnode, pTopic, pSub) < 0) {
|
||||
tDeleteSubscribeObj(pSub);
|
||||
|
@ -93,7 +94,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 1);
|
||||
ASSERT(pSub->unassignedVgs->size > 0);
|
||||
ASSERT(taosHashGetSize(pSub->consumerHash) == 0);
|
||||
|
||||
return pSub;
|
||||
}
|
||||
|
@ -185,7 +187,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
if (pInput->pTopic != NULL) {
|
||||
// create subscribe
|
||||
pOutput->pSub = mndCreateSub(pMnode, pInput->pTopic, pInput->pRebInfo->key);
|
||||
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 1);
|
||||
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) == 0);
|
||||
} else {
|
||||
pOutput->pSub = tCloneSubscribeObj(pInput->pOldSub);
|
||||
}
|
||||
|
@ -196,21 +198,20 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
// 1. build temporary hash(vgId -> SMqRebOutputVg) to store modified vg
|
||||
SHashObj *pHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
|
||||
|
||||
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0);
|
||||
// 2. check and get actual removed consumers, put their vg into hash
|
||||
int32_t removedNum = taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
||||
int32_t actualRemoved = 0;
|
||||
for (int32_t i = 0; i < removedNum; i++) {
|
||||
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->removedConsumers, i);
|
||||
ASSERT(consumerId > 0);
|
||||
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
ASSERT(pEpInSub);
|
||||
if (pEpInSub) {
|
||||
ASSERT(consumerId == pEpInSub->consumerId);
|
||||
SMqConsumerEp *pConsumerEp = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));
|
||||
ASSERT(pConsumerEp);
|
||||
if (pConsumerEp) {
|
||||
ASSERT(consumerId == pConsumerEp->consumerId);
|
||||
actualRemoved++;
|
||||
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
|
||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
for (int32_t j = 0; j < consumerVgNum; j++) {
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pEpInSub->vgs, j);
|
||||
SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j);
|
||||
SMqRebOutputVg outputVg = {
|
||||
.oldConsumerId = consumerId,
|
||||
.newConsumerId = -1,
|
||||
|
@ -224,16 +225,12 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
}
|
||||
}
|
||||
ASSERT(removedNum == actualRemoved);
|
||||
ASSERT(taosHashGetSize(pOutput->pSub->consumerHash) > 0);
|
||||
|
||||
// if previously no consumer, there are vgs not assigned
|
||||
{
|
||||
int64_t unexistKey = -1;
|
||||
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
||||
ASSERT(pEpInSub);
|
||||
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
|
||||
int32_t consumerVgNum = taosArrayGetSize(pOutput->pSub->unassignedVgs);
|
||||
for (int32_t i = 0; i < consumerVgNum; i++) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pOutput->pSub->unassignedVgs);
|
||||
SMqRebOutputVg rebOutput = {
|
||||
.oldConsumerId = -1,
|
||||
.newConsumerId = -1,
|
||||
|
@ -246,7 +243,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
// 3. calc vg number of each consumer
|
||||
int32_t oldSz = 0;
|
||||
if (pInput->pOldSub) {
|
||||
oldSz = taosHashGetSize(pInput->pOldSub->consumerHash) - 1;
|
||||
oldSz = taosHashGetSize(pInput->pOldSub->consumerHash);
|
||||
}
|
||||
int32_t afterRebConsumerNum =
|
||||
oldSz + taosArrayGetSize(pInput->pRebInfo->newConsumers) - taosArrayGetSize(pInput->pRebInfo->removedConsumers);
|
||||
|
@ -264,23 +261,22 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
|
||||
if (pEpInSub->consumerId == -1) continue;
|
||||
ASSERT(pEpInSub->consumerId > 0);
|
||||
int32_t consumerVgNum = taosArrayGetSize(pEpInSub->vgs);
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
ASSERT(pConsumerEp->consumerId > 0);
|
||||
int32_t consumerVgNum = taosArrayGetSize(pConsumerEp->vgs);
|
||||
// all old consumers still existing are touched
|
||||
// TODO optimize: touch only consumer whose vgs changed
|
||||
taosArrayPush(pOutput->touchedConsumers, &pEpInSub->consumerId);
|
||||
taosArrayPush(pOutput->touchedConsumers, &pConsumerEp->consumerId);
|
||||
if (consumerVgNum > minVgCnt) {
|
||||
if (imbCnt < imbConsumerNum) {
|
||||
if (consumerVgNum == minVgCnt + 1) {
|
||||
continue;
|
||||
} else {
|
||||
// pop until equal minVg + 1
|
||||
while (taosArrayGetSize(pEpInSub->vgs) > minVgCnt + 1) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
|
||||
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
|
||||
SMqRebOutputVg outputVg = {
|
||||
.oldConsumerId = pEpInSub->consumerId,
|
||||
.oldConsumerId = pConsumerEp->consumerId,
|
||||
.newConsumerId = -1,
|
||||
.pVgEp = pVgEp,
|
||||
};
|
||||
|
@ -290,10 +286,10 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
}
|
||||
} else {
|
||||
// pop until equal minVg
|
||||
while (taosArrayGetSize(pEpInSub->vgs) > minVgCnt) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pEpInSub->vgs);
|
||||
while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) {
|
||||
SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs);
|
||||
SMqRebOutputVg outputVg = {
|
||||
.oldConsumerId = pEpInSub->consumerId,
|
||||
.oldConsumerId = pConsumerEp->consumerId,
|
||||
.newConsumerId = -1,
|
||||
.pVgEp = pVgEp,
|
||||
};
|
||||
|
@ -309,12 +305,11 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
for (int32_t i = 0; i < consumerNum; i++) {
|
||||
int64_t consumerId = *(int64_t *)taosArrayGet(pInput->pRebInfo->newConsumers, i);
|
||||
ASSERT(consumerId > 0);
|
||||
SMqConsumerEpInSub newConsumerEp;
|
||||
SMqConsumerEp newConsumerEp;
|
||||
newConsumerEp.consumerId = consumerId;
|
||||
newConsumerEp.vgs = taosArrayInit(0, sizeof(void *));
|
||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp,
|
||||
sizeof(SMqConsumerEpInSub));
|
||||
/*SMqConsumerEpInSub *pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/
|
||||
taosHashPut(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t), &newConsumerEp, sizeof(SMqConsumerEp));
|
||||
/*SMqConsumer* pTestNew = taosHashGet(pOutput->pSub->consumerHash, &consumerId, sizeof(int64_t));*/
|
||||
/*ASSERT(pTestNew->consumerId == consumerId);*/
|
||||
/*ASSERT(pTestNew->vgs == newConsumerEp.vgs);*/
|
||||
taosArrayPush(pOutput->newConsumers, &consumerId);
|
||||
|
@ -329,25 +324,24 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
while (1) {
|
||||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
if (pIter == NULL) break;
|
||||
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
|
||||
if (pEpInSub->consumerId == -1) continue;
|
||||
ASSERT(pEpInSub->consumerId > 0);
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
ASSERT(pConsumerEp->consumerId > 0);
|
||||
|
||||
// push until equal minVg
|
||||
while (taosArrayGetSize(pEpInSub->vgs) < minVgCnt) {
|
||||
while (taosArrayGetSize(pConsumerEp->vgs) < minVgCnt) {
|
||||
// iter hash and find one vg
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
ASSERT(pRemovedIter);
|
||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||
// push
|
||||
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pEpInSub->consumerId;
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
}
|
||||
}
|
||||
|
||||
// 7. handle unassigned vg
|
||||
if (taosHashGetSize(pOutput->pSub->consumerHash) != 1) {
|
||||
if (taosHashGetSize(pOutput->pSub->consumerHash) != 0) {
|
||||
// if has consumer, assign all left vg
|
||||
while (1) {
|
||||
pRemovedIter = taosHashIterate(pHash, pRemovedIter);
|
||||
|
@ -355,20 +349,14 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
pIter = taosHashIterate(pOutput->pSub->consumerHash, pIter);
|
||||
ASSERT(pIter);
|
||||
pRebVg = (SMqRebOutputVg *)pRemovedIter;
|
||||
SMqConsumerEpInSub *pEpInSub = (SMqConsumerEpInSub *)pIter;
|
||||
if (pEpInSub->consumerId == -1) continue;
|
||||
ASSERT(pEpInSub->consumerId > 0);
|
||||
taosArrayPush(pEpInSub->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pEpInSub->consumerId;
|
||||
SMqConsumerEp *pConsumerEp = (SMqConsumerEp *)pIter;
|
||||
ASSERT(pConsumerEp->consumerId > 0);
|
||||
taosArrayPush(pConsumerEp->vgs, &pRebVg->pVgEp);
|
||||
pRebVg->newConsumerId = pConsumerEp->consumerId;
|
||||
taosArrayPush(pOutput->rebVgs, pRebVg);
|
||||
}
|
||||
} else {
|
||||
// if all consumer is removed, put all vg into unassigned
|
||||
int64_t unexistKey = -1;
|
||||
SMqConsumerEpInSub *pEpInSub = taosHashGet(pOutput->pSub->consumerHash, &unexistKey, sizeof(int64_t));
|
||||
ASSERT(pEpInSub);
|
||||
ASSERT(pEpInSub->consumerId == -1);
|
||||
|
||||
pIter = NULL;
|
||||
SMqRebOutputVg *pRebOutput = NULL;
|
||||
while (1) {
|
||||
|
@ -376,7 +364,7 @@ static int32_t mndDoRebalance(SMnode *pMnode, const SMqRebInputObj *pInput, SMqR
|
|||
if (pIter == NULL) break;
|
||||
pRebOutput = (SMqRebOutputVg *)pIter;
|
||||
ASSERT(pRebOutput->newConsumerId == -1);
|
||||
taosArrayPush(pEpInSub->vgs, &pRebOutput->pVgEp);
|
||||
taosArrayPush(pOutput->pSub->unassignedVgs, &pRebOutput->pVgEp);
|
||||
taosArrayPush(pOutput->rebVgs, pRebOutput);
|
||||
}
|
||||
}
|
||||
|
@ -512,6 +500,7 @@ static int32_t mndProcessRebalanceReq(SNodeMsg *pMsg) {
|
|||
// possibly no vg is changed
|
||||
/*ASSERT(taosArrayGetSize(rebOutput.rebVgs) != 0);*/
|
||||
|
||||
// TODO replace assert with error check
|
||||
ASSERT(mndPersistRebResult(pMnode, pMsg, &rebOutput) == 0);
|
||||
|
||||
if (rebInput.pTopic) {
|
||||
|
@ -631,6 +620,10 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
|
|||
pOldSub->consumerHash = pNewSub->consumerHash;
|
||||
pNewSub->consumerHash = tmp;
|
||||
|
||||
SArray *tmp1 = pOldSub->unassignedVgs;
|
||||
pOldSub->unassignedVgs = pNewSub->unassignedVgs;
|
||||
pNewSub->unassignedVgs = tmp1;
|
||||
|
||||
taosWUnLockLatch(&pOldSub->lock);
|
||||
return 0;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue