opti:commit logic

This commit is contained in:
wangmm0220 2023-09-05 10:35:05 +08:00
parent 9a2da3adee
commit 53c4f4a147
2 changed files with 13 additions and 21 deletions

View File

@ -442,7 +442,6 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet); taosMemoryFree(pBuf->pEpSet);
@ -513,14 +512,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
int64_t transporterId = 0;
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
if(code != 0){
return code;
}
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
atomic_add_fetch_32(&pParamSet->totalRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
return code;
SEp* pEp = GET_ACTIVE_EP(epSet);
int64_t transporterId = 0;
return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
} }
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
@ -538,7 +537,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
return NULL; return NULL;
} }
static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam){
SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet));
if (pParamSet == NULL) { if (pParamSet == NULL) {
return NULL; return NULL;
@ -548,13 +547,11 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p
pParamSet->epoch = tmq->epoch; pParamSet->epoch = tmq->epoch;
pParamSet->callbackFn = pCommitFp; pParamSet->callbackFn = pCommitFp;
pParamSet->userParam = userParam; pParamSet->userParam = userParam;
pParamSet->waitingRspNum = rspNum; pParamSet->waitingRspNum = 0;
return pParamSet; return pParamSet;
} }
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){
SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
if (pTopic == NULL) { if (pTopic == NULL) {
@ -575,11 +572,10 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient
} }
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) { static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) {
int32_t code = 0;
tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
taosRLockLatch(&tmq->lock); taosRLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL; SMqClientVg* pVg = NULL;
code = getClientVg(tmq, pTopicName, vgId, &pVg); int32_t code = getClientVg(tmq, pTopicName, vgId, &pVg);
if(code != 0){ if(code != 0){
goto end; goto end;
} }
@ -597,7 +593,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
char commitBuf[TSDB_OFFSET_LEN] = {0}; char commitBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam);
if (pParamSet == NULL) { if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
@ -661,8 +657,7 @@ end:
static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) {
int32_t code = 0; int32_t code = 0;
// init as 1 to prevent concurrency issue SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam);
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1);
if (pParamSet == NULL) { if (pParamSet == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
goto end; goto end;
@ -709,10 +704,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
// request is sent // request is sent
if (pParamSet->totalRspNum != 0) { if (pParamSet->totalRspNum != 0) {
// count down since waiting rsp num init as 1
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
return; return;
} }
code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
end: end:
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
@ -2580,8 +2574,6 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
} }
taosMemoryFree(pParamSet); taosMemoryFree(pParamSet);
// tmq->needReportOffsetRows = true;
taosReleaseRef(tmqMgmt.rsetId, refId); taosReleaseRef(tmqMgmt.rsetId, refId);
return 0; return 0;
} }

View File

@ -696,7 +696,7 @@ static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) { if (0 == g_once_commit_flag && code == 0) {
g_once_commit_flag = 1; g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
} }