Merge pull request #22230 from taosdata/mark/tmq
fix:semaphore always wait in tmq commit logic
This commit is contained in:
commit
bae028d65c
|
@ -780,6 +780,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
|
||||
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
|
||||
#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011)
|
||||
#define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012)
|
||||
|
||||
// stream
|
||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||
|
|
|
@ -586,30 +586,36 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
|
|||
if(code != 0){
|
||||
goto end;
|
||||
}
|
||||
if (offsetVal->type > 0 && !tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
|
||||
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
|
||||
|
||||
char commitBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
|
||||
|
||||
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0);
|
||||
if (pParamSet == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
|
||||
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
|
||||
taosMemoryFree(pParamSet);
|
||||
goto end;
|
||||
}
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
|
||||
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
|
||||
pVg->offsetInfo.committedOffset = *offsetVal;
|
||||
if (offsetVal->type <= 0) {
|
||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||
goto end;
|
||||
}
|
||||
if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)){
|
||||
code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
|
||||
goto end;
|
||||
}
|
||||
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
|
||||
|
||||
char commitBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset);
|
||||
|
||||
SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0);
|
||||
if (pParamSet == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto end;
|
||||
}
|
||||
code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s",
|
||||
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno));
|
||||
taosMemoryFree(pParamSet);
|
||||
goto end;
|
||||
}
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s",
|
||||
tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf);
|
||||
pVg->offsetInfo.committedOffset = *offsetVal;
|
||||
|
||||
end:
|
||||
taosRUnLockLatch(&tmq->lock);
|
||||
|
@ -650,7 +656,8 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
|
|||
code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam);
|
||||
|
||||
end:
|
||||
if(code != TSDB_CODE_SUCCESS){
|
||||
if(code != TSDB_CODE_SUCCESS && pCommitFp != NULL){
|
||||
if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
|
||||
pCommitFp(tmq, code, userParam);
|
||||
}
|
||||
}
|
||||
|
@ -2350,7 +2357,7 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
|
|||
tsem_destroy(&pInfo->sem);
|
||||
taosMemoryFree(pInfo);
|
||||
|
||||
tscDebug("consumer:0x%" PRIx64 " sync commit done, code:%s", tmq->consumerId, tstrerror(code));
|
||||
tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code));
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -2406,15 +2413,17 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
|
|||
tsem_init(&pInfo->sem, 0, 0);
|
||||
pInfo->code = 0;
|
||||
|
||||
asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
|
||||
|
||||
tsem_wait(&pInfo->sem);
|
||||
code = pInfo->code;
|
||||
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
|
||||
if(code == 0){
|
||||
tsem_wait(&pInfo->sem);
|
||||
code = pInfo->code;
|
||||
}
|
||||
|
||||
if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
|
||||
tsem_destroy(&pInfo->sem);
|
||||
taosMemoryFree(pInfo);
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " sync send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
||||
tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
||||
|
||||
return code;
|
||||
}
|
||||
|
@ -2451,10 +2460,11 @@ void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, i
|
|||
|
||||
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param);
|
||||
|
||||
tscInfo("consumer:0x%" PRIx64 " async send seek to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
||||
tscInfo("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64" code:%s", tmq->consumerId, vgId, offset, tstrerror(code));
|
||||
|
||||
end:
|
||||
if(code != 0 && cb != NULL){
|
||||
if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
|
||||
cb(tmq, code, param);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -94,7 +94,7 @@ void mndDropConsumerFromSdb(SMnode *pMnode, int64_t consumerId){
|
|||
|
||||
bool mndRebTryStart() {
|
||||
int32_t old = atomic_val_compare_exchange_32(&mqRebInExecCnt, 0, 1);
|
||||
mInfo("tq timer, rebalance counter old val:%d", old);
|
||||
mDebug("tq timer, rebalance counter old val:%d", old);
|
||||
return old == 0;
|
||||
}
|
||||
|
||||
|
@ -116,7 +116,7 @@ void mndRebCntDec() {
|
|||
int32_t newVal = val - 1;
|
||||
int32_t oldVal = atomic_val_compare_exchange_32(&mqRebInExecCnt, val, newVal);
|
||||
if (oldVal == val) {
|
||||
mInfo("rebalance trans end, rebalance counter:%d", newVal);
|
||||
mDebug("rebalance trans end, rebalance counter:%d", newVal);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -903,7 +903,7 @@ static int32_t mndConsumerActionInsert(SSdb *pSdb, SMqConsumerObj *pConsumer) {
|
|||
mInfo("consumer:0x%" PRIx64 " sub insert, cgroup:%s status:%d(%s) epoch:%d",
|
||||
pConsumer->consumerId, pConsumer->cgroup, pConsumer->status, mndConsumerStatusName(pConsumer->status),
|
||||
pConsumer->epoch);
|
||||
pConsumer->subscribeTime = taosGetTimestampMs();
|
||||
pConsumer->subscribeTime = pConsumer->createTime;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -575,10 +575,18 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
// 1. find handle
|
||||
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if (pHandle == NULL) {
|
||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
return -1;
|
||||
do{
|
||||
if (tqMetaGetHandle(pTq, req.subKey) == 0){
|
||||
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
|
||||
if(pHandle != NULL){
|
||||
break;
|
||||
}
|
||||
}
|
||||
tqError("tmq poll: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", consumerId, vgId, req.subKey);
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
return -1;
|
||||
}while(0);
|
||||
}
|
||||
|
||||
// 2. check re-balance status
|
||||
|
|
|
@ -338,7 +338,7 @@ static int buildHandle(STQ* pTq, STqHandle* handle){
|
|||
taosArrayDestroy(tbUidList);
|
||||
return -1;
|
||||
}
|
||||
tqDebug("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid);
|
||||
tqInfo("vgId:%d, tq try to get ctb for stb subscribe, suid:%" PRId64, pVnode->config.vgId, handle->execHandle.execTb.suid);
|
||||
handle->execHandle.pTqReader = tqReaderOpen(pVnode);
|
||||
tqReaderSetTbUidList(handle->execHandle.pTqReader, tbUidList, NULL);
|
||||
taosArrayDestroy(tbUidList);
|
||||
|
@ -356,7 +356,7 @@ static int restoreHandle(STQ* pTq, void* pVal, int vLen, STqHandle* handle){
|
|||
if(buildHandle(pTq, handle) < 0){
|
||||
return -1;
|
||||
}
|
||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
|
@ -384,7 +384,7 @@ int32_t tqCreateHandle(STQ* pTq, SMqRebVgReq* req, STqHandle* handle){
|
|||
if(buildHandle(pTq, handle) < 0){
|
||||
return -1;
|
||||
}
|
||||
tqDebug("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
tqInfo("tq restore %s consumer %" PRId64 " vgId:%d", handle->subKey, handle->consumerId, vgId);
|
||||
return taosHashPut(pTq->pHandle, handle->subKey, strlen(handle->subKey), handle, sizeof(STqHandle));
|
||||
}
|
||||
|
||||
|
|
|
@ -371,9 +371,9 @@ int32_t walFetchHead(SWalReader *pRead, int64_t ver, SWalCkHead *pHead) {
|
|||
pRead->pWal->vers.appliedVer);
|
||||
|
||||
// TODO: valid ver
|
||||
// if (ver > pRead->pWal->vers.appliedVer) {
|
||||
// return -1;
|
||||
// }
|
||||
if (ver > pRead->pWal->vers.commitVer) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pRead->curVersion != ver) {
|
||||
code = walReaderSeekVer(pRead, ver);
|
||||
|
|
|
@ -643,6 +643,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value")
|
||||
|
||||
// stream
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||
|
|
Loading…
Reference in New Issue