diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8c44a8bfa0..975d14f3ee 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1303,7 +1303,7 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (code != 0) { tmqFreeRspWrapper((SMqRspWrapper*)pWrapper); taosFreeQitem(pWrapper); - tscError("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code); + tqErrorC("consumer:0x%" PRIx64 " put ep res into mqueue failed, code:%d", tmq->consumerId, code); } } } @@ -1312,7 +1312,7 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { { int32_t ret = taosReleaseRef(tmqMgmt.rsetId, pParam->refId); if (ret != 0){ - tscError("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret); + tqErrorC("failed to release ref:%"PRId64 ", code:%d", pParam->refId, ret); } } @@ -1322,7 +1322,7 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (pInfo) { pInfo->code = code; if (tsem2_post(&pInfo->sem) != 0){ - tscError("failed to post rsp sem askep cb"); + tqErrorC("failed to post rsp sem askep cb"); } } } @@ -1409,7 +1409,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { return; } - tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); + tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); int8_t* pTaskType = NULL; while (taosGetQitem(qall, (void**)&pTaskType) != 0) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { @@ -1419,10 +1419,10 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); continue; } - tscDebug("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); + tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); - tscDebug("reset timer fo tmq ask ep:%d", ret); + tqDebugC("reset timer fo tmq ask ep:%d", ret); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn; asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); @@ -1430,7 +1430,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { pTmq->autoCommitInterval / 1000.0); bool ret = taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer); - tscDebug("reset timer fo commit:%d", ret); + tqDebugC("reset timer fo commit:%d", ret); } else { tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } @@ -1471,7 +1471,7 @@ int32_t tmqSubscribeCb(void* param, SDataBuf* pMsg, int32_t code) { pParam->rspErr = code; if (tsem2_post(&pParam->rspSem) != 0){ - tscError("failed to post sem, subscribe cb"); + tqErrorC("failed to post sem, subscribe cb"); } return 0; } @@ -1520,7 +1520,7 @@ void tmqFreeImpl(void* handle) { taosFreeQall(tmq->qall); if(tsem2_destroy(&tmq->rspSem) != 0) { - tscError("failed to destroy sem in free tmq"); + tqErrorC("failed to destroy sem in free tmq"); } taosArrayDestroyEx(tmq->clientTopics, freeClientTopic); @@ -1528,17 +1528,17 @@ void tmqFreeImpl(void* handle) { if (tmq->commitTimer) { if (!taosTmrStopA(&tmq->commitTimer)) { - tscError("failed to stop commit timer"); + tqErrorC("failed to stop commit timer"); } } if (tmq->epTimer) { if (!taosTmrStopA(&tmq->epTimer)) { - tscError("failed to stop ep timer"); + tqErrorC("failed to stop ep timer"); } } if (tmq->hbLiveTimer) { if (!taosTmrStopA(&tmq->hbLiveTimer)) { - tscError("failed to stop hb timer"); + tqErrorC("failed to stop hb timer"); } } taosMemoryFree(tmq); @@ -1723,13 +1723,13 @@ static int32_t syncAskEp(tmq_t* pTmq) { int32_t code = askEp(pTmq, pInfo, true, false); if (code == 0) { if (tsem2_wait(&pInfo->sem) != 0){ - tscError("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 ", failed to wait for sem", pTmq->consumerId); } code = pInfo->code; } if(tsem2_destroy(&pInfo->sem) != 0) { - tscError("failed to destroy sem sync ask ep"); + tqErrorC("failed to destroy sem sync ask ep"); } taosMemoryFree(pInfo); return code; @@ -1842,10 +1842,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } if (tsem2_wait(¶m.rspSem) != 0){ - tscError("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 ", failed to wait semaphore in subscribe", tmq->consumerId); } if(tsem2_destroy(¶m.rspSem) != 0) { - tscError("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 ", failed to destroy semaphore in subscribe", tmq->consumerId); } if (param.rspErr != 0) { @@ -2006,11 +2006,11 @@ END: if (tsem2_post(&tmq->rspSem) != 0){ - tscError("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); + tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId); } ret = taosReleaseRef(tmqMgmt.rsetId, refId); if (ret != 0){ - tscError("failed to release ref:%"PRId64 ", code:%d", refId, ret); + tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret); } EXIT: @@ -2178,7 +2178,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo); - tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, + tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); if (code != 0) { return code; @@ -2535,7 +2535,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); code = taosRemoveRef(tmqMgmt.rsetId, tmq->refId); if (code != 0){ - tscError("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code); + tqErrorC("tmq close failed to remove ref:%" PRId64 ", code:%d", tmq->refId, code); } } return code; @@ -2678,7 +2678,7 @@ static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param SSyncCommitInfo* pInfo = (SSyncCommitInfo*)param; pInfo->code = code; if (tsem2_post(&pInfo->sem) != 0){ - tscError("failed to post rsp sem in commit cb"); + tqErrorC("failed to post rsp sem in commit cb"); } } @@ -2709,12 +2709,12 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { } if (tsem2_wait(&pInfo->sem) != 0){ - tscError("failed to wait sem for sync commit"); + tqErrorC("failed to wait sem for sync commit"); } code = pInfo->code; if(tsem2_destroy(&pInfo->sem) != 0) { - tscError("failed to destroy sem for sync commit"); + tqErrorC("failed to destroy sem for sync commit"); } taosMemoryFree(pInfo); @@ -2781,14 +2781,14 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, commitCallBackFn, pInfo); if (code == 0) { if (tsem2_wait(&pInfo->sem) != 0){ - tscError("failed to wait sem for sync commit offset"); + tqErrorC("failed to wait sem for sync commit offset"); } code = pInfo->code; } if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; if(tsem2_destroy(&pInfo->sem) != 0) { - tscError("failed to destroy sem for sync commit offset"); + tqErrorC("failed to destroy sem for sync commit offset"); } taosMemoryFree(pInfo); @@ -2920,7 +2920,7 @@ END: pCommon->code = code; if (total == pParam->totalReq) { if (tsem2_post(&pCommon->rsp) != 0) { - tscError("failed to post semaphore in get wal cb"); + tqErrorC("failed to post semaphore in get wal cb"); } } @@ -2938,7 +2938,7 @@ static void destroyCommonInfo(SMqVgCommon* pCommon) { } taosArrayDestroy(pCommon->pList); if(tsem2_destroy(&pCommon->rsp) != 0) { - tscError("failed to destroy semaphore for topic:%s", pCommon->pTopicName); + tqErrorC("failed to destroy semaphore for topic:%s", pCommon->pTopicName); } (void)taosThreadMutexDestroy(&pCommon->mutex); taosMemoryFree(pCommon->pTopicName); @@ -2976,7 +2976,7 @@ end: } pParam->code = code; if (tsem2_post(&pParam->sem) != 0){ - tscError("failed to post semaphore in tmCommittedCb"); + tqErrorC("failed to post semaphore in tmCommittedCb"); } return code; } @@ -3042,14 +3042,14 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, NULL, sendInfo); if (code != 0) { if(tsem2_destroy(&pParam->sem) != 0) { - tscError("failed to destroy semaphore in get committed from server1"); + tqErrorC("failed to destroy semaphore in get committed from server1"); } taosMemoryFree(pParam); return code; } if (tsem2_wait(&pParam->sem) != 0){ - tscError("failed to wait semaphore in get committed from server"); + tqErrorC("failed to wait semaphore in get committed from server"); } code = pParam->code; if (code == TSDB_CODE_SUCCESS) { @@ -3061,7 +3061,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep } } if(tsem2_destroy(&pParam->sem) != 0) { - tscError("failed to destroy semaphore in get committed from server2"); + tqErrorC("failed to destroy semaphore in get committed from server2"); } taosMemoryFree(pParam); @@ -3338,7 +3338,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } if (tsem2_wait(&pCommon->rsp) != 0){ - tscError("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 " failed to wait sem in get assignment", tmq->consumerId); } code = pCommon->code; @@ -3403,7 +3403,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { SMqSeekParam* pParam = param; pParam->code = code; if (tsem2_post(&pParam->sem) != 0){ - tscError("failed to post sem in tmqSeekCb"); + tqErrorC("failed to post sem in tmqSeekCb"); } return 0; } @@ -3502,18 +3502,18 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); if (code != 0) { if(tsem2_destroy(&pParam->sem) != 0) { - tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId); } taosMemoryFree(pParam); return code; } if (tsem2_wait(&pParam->sem) != 0){ - tscError("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 "wait rsp sem failed in seek offset", tmq->consumerId); } code = pParam->code; if(tsem2_destroy(&pParam->sem) != 0) { - tscError("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 "destroy rsp sem failed in seek offset", tmq->consumerId); } taosMemoryFree(pParam);