fix:semaphore always wait in tmq commit logic
This commit is contained in:
parent
68eb1cfc1e
commit
5fa580960e
|
@ -780,6 +780,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
|
#define TSDB_CODE_TMQ_INVALID_TOPIC TAOS_DEF_ERROR_CODE(0, 0x4009)
|
||||||
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
|
#define TSDB_CODE_TMQ_NEED_INITIALIZED TAOS_DEF_ERROR_CODE(0, 0x4010)
|
||||||
#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011)
|
#define TSDB_CODE_TMQ_NO_COMMITTED TAOS_DEF_ERROR_CODE(0, 0x4011)
|
||||||
|
#define TSDB_CODE_TMQ_SAME_COMMITTED_VALUE TAOS_DEF_ERROR_CODE(0, 0x4012)
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||||
|
|
|
@ -586,10 +586,14 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq
|
||||||
if(code != 0){
|
if(code != 0){
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
if (offsetVal->type <= 0 || tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)) {
|
if (offsetVal->type <= 0) {
|
||||||
code = TSDB_CODE_TMQ_INVALID_MSG;
|
code = TSDB_CODE_TMQ_INVALID_MSG;
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
if (tOffsetEqual(offsetVal, &pVg->offsetInfo.committedOffset)){
|
||||||
|
code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE;
|
||||||
|
goto end;
|
||||||
|
}
|
||||||
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||||
tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
|
tFormatOffset(offsetBuf, tListLen(offsetBuf), offsetVal);
|
||||||
|
|
||||||
|
@ -653,6 +657,7 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c
|
||||||
|
|
||||||
end:
|
end:
|
||||||
if(code != TSDB_CODE_SUCCESS && pCommitFp != NULL){
|
if(code != TSDB_CODE_SUCCESS && pCommitFp != NULL){
|
||||||
|
if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
|
||||||
pCommitFp(tmq, code, userParam);
|
pCommitFp(tmq, code, userParam);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2412,6 +2417,7 @@ int32_t tmq_commit_offset_sync(tmq_t *tmq, const char *pTopicName, int32_t vgId,
|
||||||
code = pInfo->code;
|
code = pInfo->code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
|
||||||
tsem_destroy(&pInfo->sem);
|
tsem_destroy(&pInfo->sem);
|
||||||
taosMemoryFree(pInfo);
|
taosMemoryFree(pInfo);
|
||||||
|
|
||||||
|
@ -2456,6 +2462,7 @@ void tmq_commit_offset_async(tmq_t *tmq, const char *pTopicName, int32_t vgId, i
|
||||||
|
|
||||||
end:
|
end:
|
||||||
if(code != 0 && cb != NULL){
|
if(code != 0 && cb != NULL){
|
||||||
|
if(code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
|
||||||
cb(tmq, code, param);
|
cb(tmq, code, param);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -642,6 +642,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_CLOSED, "Consumer closed")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_CONSUMER_ERROR, "Consumer error, to see log")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_TOPIC_OUT_OF_RANGE, "Topic num out of range")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_GROUP_OUT_OF_RANGE, "Group num out of range 100")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed value")
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||||
|
|
Loading…
Reference in New Issue