fix:conflicts from 3.0

This commit is contained in:
wangmm0220 2024-09-23 10:27:38 +08:00
parent c2fb23d841
commit c9c5c24d85
1 changed files with 36 additions and 36 deletions

View File

@ -1303,7 +1303,7 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (code != 0) { if (code != 0) {
tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); tmqFreeRspWrapper((SMqRspWrapper*)pWrapper);
taosFreeQitem(pWrapper); taosFreeQitem(pWrapper);
tscError("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code); tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code);
} }
} }
} }
@ -1312,7 +1312,7 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
{ {
int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId); int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
if (ret != 0){ if (ret != 0){
tscError("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret); tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret);
} }
} }
@ -1322,7 +1322,7 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pInfo) { if (pInfo) {
pInfo->code = code; pInfo->code = code;
if (tsem2_post(&pInfo->sem) != 0){ if (tsem2_post(&pInfo->sem) != 0){
tscError("failed to post rsp sem askep cb"); tqErrorC("failed to post rsp sem askep cb");
} }
} }
} }
@ -1409,7 +1409,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
return; return;
} }
tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
int8_t* pTaskType = NULL; int8_t* pTaskType = NULL;
while (taosGetQitem(qall, (void**)&pTaskType) != 0) { while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
@ -1419,10 +1419,10 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
continue; continue;
} }
tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->epTimer); &pTmq->epTimer);
tscDebug("reset timer fo tmq ask ep:%d", ret); tqDebugC("reset timer fo tmq ask ep:%d", ret);
} else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) {
tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn; tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn;
asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam);
@ -1430,7 +1430,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
pTmq->autoCommitInterval / 1000.0); pTmq->autoCommitInterval / 1000.0);
bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer,
&pTmq->commitTimer); &pTmq->commitTimer);
tscDebug("reset timer fo commit:%d", ret); tqDebugC("reset timer fo commit:%d", ret);
} else { } else {
tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType);
} }
@ -1471,7 +1471,7 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) {
pParam->rspErr = code; pParam->rspErr = code;
if (tsem2_post(&pParam->rspSem) != 0){ if (tsem2_post(&pParam->rspSem) != 0){
tscError("failed to post sem, subscribe cb"); tqErrorC("failed to post sem, subscribe cb");
} }
return 0; return 0;
} }
@ -1520,7 +1520,7 @@ void tmqFreeImpl(void* handle) {
taosFreeQall(tmq->qall); taosFreeQall(tmq->qall);
if(tsem2_destroy(&tmq->rspSem) != 0) { if(tsem2_destroy(&tmq->rspSem) != 0) {
tscError("failed to destroy sem in free tmq"); tqErrorC("failed to destroy sem in free tmq");
} }
taosArrayDestroyEx(tmq->clientTopics, freeClientTopic); taosArrayDestroyEx(tmq->clientTopics, freeClientTopic);
@ -1528,17 +1528,17 @@ void tmqFreeImpl(void* handle) {
if (tmq->commitTimer) { if (tmq->commitTimer) {
if (!taosTmrStopA(&tmq->commitTimer)) { if (!taosTmrStopA(&tmq->commitTimer)) {
tscError("failed to stop commit timer"); tqErrorC("failed to stop commit timer");
} }
} }
if (tmq->epTimer) { if (tmq->epTimer) {
if (!taosTmrStopA(&tmq->epTimer)) { if (!taosTmrStopA(&tmq->epTimer)) {
tscError("failed to stop ep timer"); tqErrorC("failed to stop ep timer");
} }
} }
if (tmq->hbLiveTimer) { if (tmq->hbLiveTimer) {
if (!taosTmrStopA(&tmq->hbLiveTimer)) { if (!taosTmrStopA(&tmq->hbLiveTimer)) {
tscError("failed to stop hb timer"); tqErrorC("failed to stop hb timer");
} }
} }
taosMemoryFree(tmq); taosMemoryFree(tmq);
@ -1723,13 +1723,13 @@ static int32_t syncAskEp(tmq_t* pTmq) {
int32_t code = askEp(pTmq, pInfo, true, false); int32_t code = askEp(pTmq, pInfo, true, false);
if (code == 0) { if (code == 0) {
if (tsem2_wait(&pInfo->sem) != 0){ if (tsem2_wait(&pInfo->sem) != 0){
tscError("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId); tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId);
} }
code = pInfo->code; code = pInfo->code;
} }
if(tsem2_destroy(&pInfo->sem) != 0) { if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem sync ask ep"); tqErrorC("failed to destroy sem sync ask ep");
} }
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
return code; return code;
@ -1842,10 +1842,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
} }
if (tsem2_wait(&param.rspSem) != 0){ if (tsem2_wait(&param.rspSem) != 0){
tscError("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId); tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId);
} }
if(tsem2_destroy(&param.rspSem) != 0) { if(tsem2_destroy(&param.rspSem) != 0) {
tscError("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId); tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId);
} }
if (param.rspErr != 0) { if (param.rspErr != 0) {
@ -2006,11 +2006,11 @@ END:
if (tsem2_post(&tmq->rspSem) != 0){ if (tsem2_post(&tmq->rspSem) != 0){
tscError("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
} }
ret = taosReleaseRef(tmqMgmt.rsetId, refId); ret = taosReleaseRef(tmqMgmt.rsetId, refId);
if (ret != 0){ if (ret != 0){
tscError("failed to release ref:%"PRId64 ", code:%d", refId, ret); tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret);
} }
EXIT: EXIT:
@ -2178,7 +2178,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if (code != 0) { if (code != 0) {
return code; return code;
@ -2535,7 +2535,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED);
code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId); code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId);
if (code != 0){ if (code != 0){
tscError("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code); tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code);
} }
} }
return code; return code;
@ -2678,7 +2678,7 @@ static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param
SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param; SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param;
pInfo->code = code; pInfo->code = code;
if (tsem2_post(&pInfo->sem) != 0){ if (tsem2_post(&pInfo->sem) != 0){
tscError("failed to post rsp sem in commit cb"); tqErrorC("failed to post rsp sem in commit cb");
} }
} }
@ -2709,12 +2709,12 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) {
} }
if (tsem2_wait(&pInfo->sem) != 0){ if (tsem2_wait(&pInfo->sem) != 0){
tscError("failed to wait sem for sync commit"); tqErrorC("failed to wait sem for sync commit");
} }
code = pInfo->code; code = pInfo->code;
if(tsem2_destroy(&pInfo->sem) != 0) { if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem for sync commit"); tqErrorC("failed to destroy sem for sync commit");
} }
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
@ -2781,14 +2781,14 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId,
code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo);
if (code == 0) { if (code == 0) {
if (tsem2_wait(&pInfo->sem) != 0){ if (tsem2_wait(&pInfo->sem) != 0){
tscError("failed to wait sem for sync commit offset"); tqErrorC("failed to wait sem for sync commit offset");
} }
code = pInfo->code; code = pInfo->code;
} }
if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS;
if(tsem2_destroy(&pInfo->sem) != 0) { if(tsem2_destroy(&pInfo->sem) != 0) {
tscError("failed to destroy sem for sync commit offset"); tqErrorC("failed to destroy sem for sync commit offset");
} }
taosMemoryFree(pInfo); taosMemoryFree(pInfo);
@ -2920,7 +2920,7 @@ END:
pCommon->code = code; pCommon->code = code;
if (total == pParam->totalReq) { if (total == pParam->totalReq) {
if (tsem2_post(&pCommon->rsp) != 0) { if (tsem2_post(&pCommon->rsp) != 0) {
tscError("failed to post semaphore in get wal cb"); tqErrorC("failed to post semaphore in get wal cb");
} }
} }
@ -2938,7 +2938,7 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) {
} }
taosArrayDestroy(pCommon->pList); taosArrayDestroy(pCommon->pList);
if(tsem2_destroy(&pCommon->rsp) != 0) { if(tsem2_destroy(&pCommon->rsp) != 0) {
tscError("failed to destroy semaphore for topic:%s", pCommon->pTopicName); tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName);
} }
(void)taosThreadMutexDestroy(&pCommon->mutex); (void)taosThreadMutexDestroy(&pCommon->mutex);
taosMemoryFree(pCommon->pTopicName); taosMemoryFree(pCommon->pTopicName);
@ -2976,7 +2976,7 @@ end:
} }
pParam->code = code; pParam->code = code;
if (tsem2_post(&pParam->sem) != 0){ if (tsem2_post(&pParam->sem) != 0){
tscError("failed to post semaphore in tmCommittedCb"); tqErrorC("failed to post semaphore in tmCommittedCb");
} }
return code; return code;
} }
@ -3042,14 +3042,14 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo);
if (code != 0) { if (code != 0) {
if(tsem2_destroy(&pParam->sem) != 0) { if(tsem2_destroy(&pParam->sem) != 0) {
tscError("failed to destroy semaphore in get committed from server1"); tqErrorC("failed to destroy semaphore in get committed from server1");
} }
taosMemoryFree(pParam); taosMemoryFree(pParam);
return code; return code;
} }
if (tsem2_wait(&pParam->sem) != 0){ if (tsem2_wait(&pParam->sem) != 0){
tscError("failed to wait semaphore in get committed from server"); tqErrorC("failed to wait semaphore in get committed from server");
} }
code = pParam->code; code = pParam->code;
if (code == TSDB_CODE_SUCCESS) { if (code == TSDB_CODE_SUCCESS) {
@ -3061,7 +3061,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep
} }
} }
if(tsem2_destroy(&pParam->sem) != 0) { if(tsem2_destroy(&pParam->sem) != 0) {
tscError("failed to destroy semaphore in get committed from server2"); tqErrorC("failed to destroy semaphore in get committed from server2");
} }
taosMemoryFree(pParam); taosMemoryFree(pParam);
@ -3338,7 +3338,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
} }
if (tsem2_wait(&pCommon->rsp) != 0){ if (tsem2_wait(&pCommon->rsp) != 0){
tscError("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId); tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId);
} }
code = pCommon->code; code = pCommon->code;
@ -3403,7 +3403,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqSeekParam* pParam = param; SMqSeekParam* pParam = param;
pParam->code = code; pParam->code = code;
if (tsem2_post(&pParam->sem) != 0){ if (tsem2_post(&pParam->sem) != 0){
tscError("failed to post sem in tmqSeekCb"); tqErrorC("failed to post sem in tmqSeekCb");
} }
return 0; return 0;
} }
@ -3502,18 +3502,18 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
if (code != 0) { if (code != 0) {
if(tsem2_destroy(&pParam->sem) != 0) { if(tsem2_destroy(&pParam->sem) != 0) {
tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId); tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
} }
taosMemoryFree(pParam); taosMemoryFree(pParam);
return code; return code;
} }
if (tsem2_wait(&pParam->sem) != 0){ if (tsem2_wait(&pParam->sem) != 0){
tscError("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId); tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId);
} }
code = pParam->code; code = pParam->code;
if(tsem2_destroy(&pParam->sem) != 0) { if(tsem2_destroy(&pParam->sem) != 0) {
tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId); tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId);
} }
taosMemoryFree(pParam); taosMemoryFree(pParam);