fix(tmq): fix race condition.
This commit is contained in:
parent
05ddf68247
commit
9d680a0995
|
@ -147,7 +147,6 @@ typedef struct {
|
||||||
} SMqClientVg;
|
} SMqClientVg;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
// subscribe info
|
|
||||||
char topicName[TSDB_TOPIC_FNAME_LEN];
|
char topicName[TSDB_TOPIC_FNAME_LEN];
|
||||||
char db[TSDB_DB_FNAME_LEN];
|
char db[TSDB_DB_FNAME_LEN];
|
||||||
SArray* vgs; // SArray<SMqClientVg>
|
SArray* vgs; // SArray<SMqClientVg>
|
||||||
|
@ -381,7 +380,7 @@ char** tmq_list_to_c_array(const tmq_list_t* list) {
|
||||||
return container->pData;
|
return container->pData;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tmqMakeTopicVgKey(char* dst, const char* topicName, int32_t vg) {
|
static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
|
||||||
return sprintf(dst, "%s:%d", topicName, vg);
|
return sprintf(dst, "%s:%d", topicName, vg);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -424,7 +423,7 @@ static void tmqCommitRspCountDown(SMqCommitCbParamSet* pParamSet) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
||||||
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
|
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
|
||||||
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
|
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
|
||||||
|
|
||||||
|
@ -478,7 +477,7 @@ int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pTopic, SMqCommitCbParamSet* pParamSet) {
|
static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, const char* pTopicName, SMqCommitCbParamSet* pParamSet) {
|
||||||
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
|
STqOffset* pOffset = taosMemoryCalloc(1, sizeof(STqOffset));
|
||||||
if (pOffset == NULL) {
|
if (pOffset == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
@ -490,7 +489,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
||||||
int32_t groupLen = strlen(tmq->groupId);
|
int32_t groupLen = strlen(tmq->groupId);
|
||||||
memcpy(pOffset->subKey, tmq->groupId, groupLen);
|
memcpy(pOffset->subKey, tmq->groupId, groupLen);
|
||||||
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
|
pOffset->subKey[groupLen] = TMQ_SEPARATOR;
|
||||||
strcpy(pOffset->subKey + groupLen + 1, pTopic->topicName);
|
strcpy(pOffset->subKey + groupLen + 1, pTopicName);
|
||||||
|
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
@ -527,7 +526,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
||||||
pParam->vgId = pVg->vgId;
|
pParam->vgId = pVg->vgId;
|
||||||
pParam->pTmq = tmq;
|
pParam->pTmq = tmq;
|
||||||
|
|
||||||
tstrncpy(pParam->topicName, pTopic->topicName, tListLen(pParam->topicName));
|
tstrncpy(pParam->topicName, pTopicName, tListLen(pParam->topicName));
|
||||||
|
|
||||||
// build send info
|
// build send info
|
||||||
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||||
|
@ -544,7 +543,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
||||||
.handle = NULL,
|
.handle = NULL,
|
||||||
};
|
};
|
||||||
|
|
||||||
SEp* pEp = &pVg->epSet.eps[pVg->epSet.inUse];
|
SEp* pEp = GET_ACTIVE_EP(&pVg->epSet);
|
||||||
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d", tmq->consumerId,
|
tscDebug("consumer:0x%" PRIx64 " topic:%s on vgId:%d offset:%" PRId64 " prev:%" PRId64 ", ep:%s:%d", tmq->consumerId,
|
||||||
pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
|
pOffset->subKey, pVg->vgId, pOffset->val.version, pVg->committedOffset.version, pEp->fqdn, pEp->port);
|
||||||
|
|
||||||
|
@ -567,7 +566,7 @@ static int32_t tmqSendCommitReq(tmq_t* tmq, SMqClientVg* pVg, SMqClientTopic* pT
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
|
static int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_commit_cb* userCb, void* userParam) {
|
||||||
char* topic;
|
char* topic;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
if (TD_RES_TMQ(msg)) {
|
if (TD_RES_TMQ(msg)) {
|
||||||
|
@ -591,6 +590,7 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
pParamSet->refId = tmq->refId;
|
pParamSet->refId = tmq->refId;
|
||||||
pParamSet->epoch = tmq->epoch;
|
pParamSet->epoch = tmq->epoch;
|
||||||
pParamSet->automatic = 0;
|
pParamSet->automatic = 0;
|
||||||
|
@ -601,15 +601,18 @@ int32_t tmqCommitMsgImpl(tmq_t* tmq, const TAOS_RES* msg, int8_t async, tmq_comm
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&tmq->lock);
|
||||||
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
for (int32_t i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) {
|
||||||
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i);
|
||||||
if (strcmp(pTopic->topicName, topic) != 0) continue;
|
if (strcmp(pTopic->topicName, topic) != 0) continue;
|
||||||
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
for (int32_t j = 0; j < taosArrayGetSize(pTopic->vgs); j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
if (pVg->vgId != vgId) continue;
|
if (pVg->vgId != vgId) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
||||||
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
|
if (tmqSendCommitReq(tmq, pVg, pTopic->topicName, pParamSet) < 0) {
|
||||||
tsem_destroy(&pParamSet->rspSem);
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
taosMemoryFree(pParamSet);
|
taosMemoryFree(pParamSet);
|
||||||
goto FAIL;
|
goto FAIL;
|
||||||
|
@ -623,6 +626,7 @@ HANDLE_RSP:
|
||||||
if (pParamSet->totalRspNum == 0) {
|
if (pParamSet->totalRspNum == 0) {
|
||||||
tsem_destroy(&pParamSet->rspSem);
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
taosMemoryFree(pParamSet);
|
taosMemoryFree(pParamSet);
|
||||||
|
taosThreadMutexUnlock(&tmq->lock);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -631,15 +635,18 @@ HANDLE_RSP:
|
||||||
code = pParamSet->rspErr;
|
code = pParamSet->rspErr;
|
||||||
tsem_destroy(&pParamSet->rspSem);
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
taosMemoryFree(pParamSet);
|
taosMemoryFree(pParamSet);
|
||||||
|
taosThreadMutexUnlock(&tmq->lock);
|
||||||
return code;
|
return code;
|
||||||
} else {
|
} else {
|
||||||
code = 0;
|
code = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
FAIL:
|
FAIL:
|
||||||
|
taosThreadMutexUnlock(&tmq->lock);
|
||||||
if (code != 0 && async) {
|
if (code != 0 && async) {
|
||||||
userCb(tmq, code, userParam);
|
userCb(tmq, code, userParam);
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -672,7 +679,9 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
|
||||||
// init as 1 to prevent concurrency issue
|
// init as 1 to prevent concurrency issue
|
||||||
pParamSet->waitingRspNum = 1;
|
pParamSet->waitingRspNum = 1;
|
||||||
|
|
||||||
|
taosThreadMutexLock(&tmq->lock);
|
||||||
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
||||||
|
|
||||||
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
|
tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics);
|
||||||
|
|
||||||
for (int32_t i = 0; i < numOfTopics; i++) {
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
|
@ -681,18 +690,20 @@ static int32_t tmqCommitConsumerImpl(tmq_t* tmq, int8_t automatic, int8_t async,
|
||||||
// todo race condition: fix it
|
// todo race condition: fix it
|
||||||
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs);
|
||||||
for (int32_t j = 0; j < numOfVgroups; j++) {
|
for (int32_t j = 0; j < numOfVgroups; j++) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg clientVg = *(SMqClientVg*)taosArrayGet(pTopic->vgs, j);
|
||||||
if (pVg->currentOffset.type > 0 && !tOffsetEqual(&pVg->currentOffset, &pVg->committedOffset)) {
|
if (clientVg.currentOffset.type > 0 && !tOffsetEqual(&clientVg.currentOffset, &clientVg.committedOffset)) {
|
||||||
if (tmqSendCommitReq(tmq, pVg, pTopic, pParamSet) < 0) {
|
if (tmqSendCommitReq(tmq, &clientVg, pTopic->topicName, pParamSet) < 0) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
|
tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, not commit, current:%" PRId64 ", ordinal:%d/%d",
|
||||||
tmq->consumerId, pTopic->topicName, pVg->vgId, pVg->currentOffset.version, j + 1, numOfVgroups);
|
tmq->consumerId, pTopic->topicName, clientVg.vgId, clientVg.currentOffset.version, j + 1, numOfVgroups);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taosThreadMutexUnlock(&tmq->lock);
|
||||||
|
|
||||||
// no request is sent
|
// no request is sent
|
||||||
if (pParamSet->totalRspNum == 0) {
|
if (pParamSet->totalRspNum == 0) {
|
||||||
tsem_destroy(&pParamSet->rspSem);
|
tsem_destroy(&pParamSet->rspSem);
|
||||||
|
@ -1369,8 +1380,10 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
||||||
|
|
||||||
for (int32_t j = 0; j < vgNumGet; j++) {
|
for (int32_t j = 0; j < vgNumGet; j++) {
|
||||||
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
SMqSubVgEp* pVgEp = taosArrayGet(pTopicEp->vgs, j);
|
||||||
sprintf(vgKey, "%s:%d", pTopic->topicName, pVgEp->vgId);
|
|
||||||
|
makeTopicVgroupKey(vgKey, pTopic->topicName, pVgEp->vgId);
|
||||||
STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
STqOffsetVal* pOffset = taosHashGet(pVgOffsetHashMap, vgKey, strlen(vgKey));
|
||||||
|
|
||||||
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
STqOffsetVal offsetNew = {.type = tmq->resetOffsetCfg};
|
||||||
if (pOffset != NULL) {
|
if (pOffset != NULL) {
|
||||||
offsetNew = *pOffset;
|
offsetNew = *pOffset;
|
||||||
|
@ -1428,7 +1441,8 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
|
tscDebug("consumer:0x%" PRIx64 ", new vg num: %d", tmq->consumerId, vgNumCur);
|
||||||
for (int32_t j = 0; j < vgNumCur; j++) {
|
for (int32_t j = 0; j < vgNumCur; j++) {
|
||||||
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j);
|
||||||
sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId);
|
makeTopicVgroupKey(vgKey, pTopicCur->topicName, pVgCur->vgId);
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &pVgCur->currentOffset);
|
tFormatOffset(buf, 80, &pVgCur->currentOffset);
|
||||||
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
|
tscDebug("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch,
|
||||||
|
@ -1454,7 +1468,6 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq->clientTopics = newTopics;
|
tmq->clientTopics = newTopics;
|
||||||
|
|
||||||
taosThreadMutexUnlock(&tmq->lock);
|
taosThreadMutexUnlock(&tmq->lock);
|
||||||
|
|
||||||
int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
|
int8_t flag = (topicNumGet == 0)? TMQ_CONSUMER_STATUS__NO_TOPIC:TMQ_CONSUMER_STATUS__READY;
|
||||||
|
@ -1465,7 +1478,7 @@ static bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||||
return set;
|
return set;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
static int32_t tmqAskEpCb(void* param, SDataBuf* pMsg, int32_t code) {
|
||||||
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
||||||
int8_t async = pParam->async;
|
int8_t async = pParam->async;
|
||||||
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
|
||||||
|
@ -1757,7 +1770,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
||||||
}
|
}
|
||||||
|
|
||||||
// broadcast the poll request to all related vnodes
|
// broadcast the poll request to all related vnodes
|
||||||
int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
|
||||||
tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
|
tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
|
||||||
|
|
||||||
|
@ -1793,7 +1806,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
|
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
|
||||||
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
||||||
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
|
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
|
||||||
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
|
||||||
|
@ -1813,7 +1826,7 @@ int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset)
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
|
||||||
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
|
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
|
@ -2117,11 +2130,9 @@ const char* tmq_get_table_name(TAOS_RES* res) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
||||||
//
|
|
||||||
tmqCommitInner(tmq, msg, 0, 1, cb, param);
|
tmqCommitInner(tmq, msg, 0, 1, cb, param);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
|
int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* msg) {
|
||||||
//
|
|
||||||
return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
|
return tmqCommitInner(tmq, msg, 0, 0, NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue