diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 010e538fb6..9db4496377 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -4158,6 +4158,7 @@ typedef struct { typedef struct { SArray* topicPrivileges; // SArray + int32_t debugFlag; } SMqHbRsp; typedef struct { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 4367ca4124..b702f79ddf 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -24,6 +24,13 @@ #include "tref.h" #include "ttimer.h" +#define tqFatalC(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqErrorC(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqWarnC(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqInfoC(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqDebugC(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqTraceC(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) + #define EMPTY_BLOCK_POLL_IDLE_DURATION 10 #define DEFAULT_AUTO_COMMIT_INTERVAL 5000 #define DEFAULT_HEARTBEAT_INTERVAL 3000 @@ -580,7 +587,7 @@ static int32_t getTopicByName(tmq_t* tmq, const char* pTopicName, SMqClientTopic return 0; } - tscError("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName); + tqErrorC("consumer:0x%" PRIx64 ", total:%d, failed to find topic:%s", tmq->consumerId, numOfTopics, pTopicName); return TSDB_CODE_TMQ_INVALID_TOPIC; } @@ -604,7 +611,7 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient SMqClientTopic* pTopic = NULL; int32_t code = getTopicByName(tmq, pTopicName, &pTopic); if (code != 0) { - tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); return code; } @@ -638,12 +645,12 @@ static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal code = doSendCommitMsg(tmq, pVg->vgId, &pVg->epSet, offsetVal, pTopicName, pParamSet); if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", + tqErrorC("consumer:0x%" PRIx64 " topic:%s on vgId:%d end commit msg failed, send offset:%s committed:%s, code:%s", tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf, tstrerror(terrno)); return code; } - tscInfo("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", + tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal); return code; @@ -651,7 +658,7 @@ static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) { - tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); + tqInfoC("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); SMqCommitCbParamSet* pParamSet = NULL; int32_t code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0, &pParamSet); if (code != 0){ @@ -721,7 +728,7 @@ static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){ int32_t code = 0; taosRLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); + tqDebugC("consumer:0x%" PRIx64 " start to commit offset for %d topics", tmq->consumerId, numOfTopics); for (int32_t i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); @@ -730,7 +737,7 @@ static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){ goto END; } int32_t numOfVgroups = taosArrayGetSize(pTopic->vgs); - tscDebug("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); + tqDebugC("consumer:0x%" PRIx64 " commit offset for topics:%s, numOfVgs:%d", tmq->consumerId, pTopic->topicName, numOfVgroups); for (int32_t j = 0; j < numOfVgroups; j++) { SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j); if (pVg == NULL) { @@ -740,12 +747,12 @@ static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){ code = innerCommit(tmq, pTopic->topicName, &pVg->offsetInfo.endOffset, pVg, pParamSet); if (code != 0){ - tscDebug("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current:%" PRId64 ", ordinal:%d/%d", + tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d, no commit, code:%s, current offset version:%" PRId64 ", ordinal:%d/%d", tmq->consumerId, pTopic->topicName, pVg->vgId, tstrerror(code), pVg->offsetInfo.endOffset.version, j + 1, numOfVgroups); } } } - tscDebug("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT, + tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT, numOfTopics); END: taosRUnLockLatch(&tmq->lock); @@ -758,7 +765,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us // init waitingRspNum as DEFAULT_COMMIT_CNT to prevent concurrency issue code = prepareCommitCbParamSet(tmq, pCommitFp, userParam, DEFAULT_COMMIT_CNT, &pParamSet); if (code != 0) { - tscError("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " prepareCommitCbParamSet failed, code:%s", tmq->consumerId, tstrerror(code)); if (pCommitFp != NULL) { pCommitFp(tmq, code, userParam); } @@ -766,12 +773,12 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us } code = innerCommitAll(tmq, pParamSet); if (code != 0){ - tscError("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " innerCommitAll failed, code:%s", tmq->consumerId, tstrerror(code)); } code = commitRspCountDown(pParamSet, tmq->consumerId, "init", -1); if (code != 0) { - tscError("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " commit rsp count down failed, code:%s", tmq->consumerId, tstrerror(code)); } return; } @@ -840,7 +847,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { for (int32_t j = 0; j < topicNumCur; j++) { SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, j); if (pTopicCur && strcmp(pTopicCur->topicName, privilege->topic) == 0) { - tscInfo("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); + tqInfoC("consumer:0x%" PRIx64 ", has no privilege, topic:%s", tmq->consumerId, privilege->topic); pTopicCur->noPrivilege = 1; } } @@ -850,6 +857,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { (void)taosReleaseRef(tmqMgmt.rsetId, refId); } + tqDebugFlag = rsp.debugFlag; tDestroySMqHbRsp(&rsp); END: @@ -905,7 +913,7 @@ void tmqSendHbReq(void* param, void* tmrId) { offRows->ever = pVg->offsetInfo.walVerEnd == -1 ? 0 : pVg->offsetInfo.walVerEnd; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &offRows->offset); - tscDebug("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64, + tqDebugC("consumer:0x%" PRIx64 ",report offset, group:%s vgId:%d, offset:%s/%" PRId64 ", rows:%" PRId64, tmq->consumerId, tmq->groupId, offRows->vgId, buf, offRows->ever, offRows->rows); } } @@ -913,18 +921,18 @@ void tmqSendHbReq(void* param, void* tmrId) { int32_t tlen = tSerializeSMqHbReq(NULL, 0, &req); if (tlen < 0) { - tscError("tSerializeSMqHbReq failed"); + tqErrorC("tSerializeSMqHbReq failed"); goto END; } void* pReq = taosMemoryCalloc(1, tlen); if (tlen < 0) { - tscError("failed to malloc MqHbReq msg, size:%d", tlen); + tqErrorC("failed to malloc MqHbReq msg, size:%d", tlen); goto END; } if (tSerializeSMqHbReq(pReq, tlen, &req) < 0) { - tscError("tSerializeSMqHbReq %d failed", tlen); + tqErrorC("tSerializeSMqHbReq %d failed", tlen); taosMemoryFree(pReq); goto END; } @@ -948,7 +956,7 @@ void tmqSendHbReq(void* param, void* tmrId) { int64_t transporterId = 0; int32_t code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); if (code != 0) { - tscError("tmqSendHbReq asyncSendMsgToServer failed"); + tqErrorC("tmqSendHbReq asyncSendMsgToServer failed"); } (void)atomic_val_compare_exchange_8(&pollFlag, 1, 0); @@ -962,7 +970,7 @@ END: static void defaultCommitCbFn(tmq_t* pTmq, int32_t code, void* param) { if (code != 0) { - tscError("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 ", failed to commit offset, code:%s", pTmq->consumerId, tstrerror(code)); } } @@ -972,7 +980,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { code = taosAllocateQall(&qall); if (code) { - tscError("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code)); return; } @@ -984,26 +992,26 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { return; } - tscDebug("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); + tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); int8_t* pTaskType = NULL; (void)taosGetQitem(qall, (void**)&pTaskType); while (pTaskType != NULL) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { - tscDebug("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId); + tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId); code = askEp(pTmq, NULL, false, false); if (code != 0) { - tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); } (void)taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->epTimer); } else if (*pTaskType == TMQ_DELAYED_TASK__COMMIT) { tmq_commit_cb* pCallbackFn = (pTmq->commitCb != NULL) ? pTmq->commitCb : defaultCommitCbFn; asyncCommitAllOffsets(pTmq, pCallbackFn, pTmq->commitCbUserParam); - tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, + tqDebugC("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); (void)taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, (void*)(pTmq->refId), tmqMgmt.timer, &pTmq->commitTimer); } else { - tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); + tqErrorC("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } taosFreeQitem(pTaskType); @@ -1090,16 +1098,16 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { for (int i = 0; i < taosArrayGetSize(tmq->clientTopics); i++) { SMqClientTopic* topic = taosArrayGet(tmq->clientTopics, i); if (topic == NULL) { - tscError("topic is null"); + tqErrorC("topic is null"); continue; } char* tmp = strchr(topic->topicName, '.'); if (tmp == NULL) { - tscError("topic name is invalid:%s", topic->topicName); + tqErrorC("topic name is invalid:%s", topic->topicName); continue; } if (tmq_list_append(*topics, tmp + 1) != 0) { - tscError("failed to append topic:%s", tmp + 1); + tqErrorC("failed to append topic:%s", tmp + 1); continue; } } @@ -1149,7 +1157,7 @@ void tmqFreeImpl(void* handle) { } taosMemoryFree(tmq); - tscInfo("consumer:0x%" PRIx64 " closed", id); + tqInfoC("consumer:0x%" PRIx64 " closed", id); } static void tmqMgmtInit(void) { @@ -1200,7 +1208,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); if (pTmq == NULL) { - tscError("failed to create consumer, groupId:%s", conf->groupId); + tqErrorC("failed to create consumer, groupId:%s", conf->groupId); SET_ERROR_MSG_TMQ("malloc tmq failed") return NULL; } @@ -1210,13 +1218,13 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); if (pTmq->clientTopics == NULL) { - tscError("failed to create consumer, groupId:%s", conf->groupId); + tqErrorC("failed to create consumer, groupId:%s", conf->groupId); SET_ERROR_MSG_TMQ("malloc client topics failed") goto _failed; } code = taosOpenQueue(&pTmq->mqueue); if (code) { - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), + tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); SET_ERROR_MSG_TMQ("open queue failed") goto _failed; @@ -1224,7 +1232,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { code = taosOpenQueue(&pTmq->delayedTask); if (code) { - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), + tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); SET_ERROR_MSG_TMQ("open delayed task queue failed") goto _failed; @@ -1232,14 +1240,14 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { code = taosAllocateQall(&pTmq->qall); if (code) { - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), + tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); SET_ERROR_MSG_TMQ("allocate qall failed") goto _failed; } if (conf->groupId[0] == 0) { - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), + tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code), pTmq->groupId); SET_ERROR_MSG_TMQ("malloc tmq element failed or group is empty") goto _failed; @@ -1280,7 +1288,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { // init semaphore if (tsem2_init(&pTmq->rspSem, 0, 0) != 0) { - tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, + tqErrorC("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, tstrerror(TAOS_SYSTEM_ERROR(errno)), pTmq->groupId); SET_ERROR_MSG_TMQ("init t_sem failed") goto _failed; @@ -1290,7 +1298,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { code = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ, &pTmq->pTscObj); if (code) { terrno = code; - tscError("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); + tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, terrstr(), pTmq->groupId); SET_ERROR_MSG_TMQ("init tscObj failed") goto _failed; } @@ -1309,7 +1317,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { char buf[TSDB_OFFSET_LEN] = {0}; STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; tFormatOffset(buf, tListLen(buf), &offset); - tscInfo("consumer:0x%" PRIx64 " is setup, refId:%" PRId64 + tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64 ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s", pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf); @@ -1330,7 +1338,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SCMSubscribeReq req = {0}; int32_t code = 0; - tscInfo("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); + tqInfoC("consumer:0x%" PRIx64 " cgroup:%s, subscribe %d topics", tmq->consumerId, tmq->groupId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, TSDB_CLIENT_ID_LEN); @@ -1362,7 +1370,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SName name = {0}; code = tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic)); if (code) { - tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId, + tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to set topic name, code:%d", tmq->consumerId, tmq->groupId, code); goto END; } @@ -1374,7 +1382,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { code = tNameExtractFullName(&name, topicFName); if (code) { - tscError("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId, + tqErrorC("consumer:0x%" PRIx64 " cgroup:%s, failed to extract topic name, code:%d", tmq->consumerId, tmq->groupId, code); taosMemoryFree(topicFName); goto END; @@ -1385,7 +1393,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { taosMemoryFree(topicFName); goto END; } - tscInfo("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); + tqInfoC("consumer:0x%" PRIx64 " subscribe topic:%s", tmq->consumerId, topicFName); } int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req); @@ -1439,7 +1447,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while ((code = syncAskEp(tmq)) != 0) { if (retryCnt++ > SUBSCRIBE_RETRY_MAX_COUNT || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { - tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", + tqErrorC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes, code:%s", tmq->consumerId, tstrerror(code)); if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) { code = 0; @@ -1447,7 +1455,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { goto END; } - tscInfo("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); + tqInfoC("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); taosMsleep(SUBSCRIBE_RETRY_INTERVAL); } @@ -1548,7 +1556,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } if (pMsg->pData == NULL) { - tscError("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId); + tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d, since msg is NULL", tmq->consumerId, vgId); code = TSDB_CODE_TSC_INTERNAL_ERROR; goto END; } @@ -1565,7 +1573,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } if (msgEpoch != clientEpoch) { - tscError("consumer:0x%" PRIx64 + tqErrorC("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; @@ -1591,7 +1599,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pRspWrapper->dataRsp.common.rspOffset); - tscDebug("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d,QID:0x%" PRIx64, + tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, req ver:%" PRId64 ", rsp:%s type %d,QID:0x%" PRIx64, tmq->consumerId, vgId, pRspWrapper->dataRsp.common.reqOffset.version, buf, rspType, requestId); } else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) { SDecoder decoder = {0}; @@ -1623,10 +1631,10 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } tDecoderClear(&decoder); (void)memcpy(&pRspWrapper->batchMetaRsp, pMsg->pData, sizeof(SMqRspHead)); - tscDebug("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d,QID:0x%" PRIx64, tmq->consumerId, vgId, + tqDebugC("consumer:0x%" PRIx64 " recv poll batchmeta rsp, vgId:%d,QID:0x%" PRIx64, tmq->consumerId, vgId, requestId); } else { // invalid rspType - tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); + tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } END: @@ -1636,11 +1644,11 @@ END: (void)strcpy(pRspWrapper->topicName, pParam->topicName); code = taosWriteQitem(tmq->mqueue, pRspWrapper); if (code != 0) { - tscError("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); + tqErrorC("consumer:0x%" PRIx64 " put poll res into mqueue failed, code:%d", tmq->consumerId, code); } } int32_t total = taosQueueItemSize(tmq->mqueue); - tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64, + tqDebugC("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d,QID:0x%" PRIx64, tmq ? tmq->consumerId : 0, rspType, vgId, total, requestId); if (tmq) (void)tsem2_post(&tmq->rspSem); @@ -1671,10 +1679,10 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); tstrncpy(pTopic->db, pTopicEp->db, TSDB_DB_FNAME_LEN); - tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); + tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d", tmq->consumerId, pTopic->topicName, vgNumGet); pTopic->vgs = taosArrayInit(vgNumGet, sizeof(SMqClientVg)); if (pTopic->vgs == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName); + tqErrorC("consumer:0x%" PRIx64 ", failed to init vgs for topic:%s", tmq->consumerId, pTopic->topicName); return; } for (int32_t j = 0; j < vgNumGet; j++) { @@ -1688,7 +1696,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic STqOffsetVal offsetNew = {0}; offsetNew.type = tmq->resetOffsetCfg; - tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId, + tqInfoC("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId, pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps, pVgEp->epSet.eps[pVgEp->epSet.inUse].port); SMqClientVg clientVg = { @@ -1716,71 +1724,52 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic clientVg.offsetInfo.beginOffset = offsetNew; } if (taosArrayPush(pTopic->vgs, &clientVg) == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId, + tqErrorC("consumer:0x%" PRIx64 ", failed to push vg:%d into topic:%s", tmq->consumerId, pVgEp->vgId, pTopic->topicName); freeClientVg(&clientVg); } } } -static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { - int32_t topicNumGet = taosArrayGetSize(pRsp->topics); - if (epoch <= tmq->epoch ) { - tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, - tmq->epoch, epoch, topicNumGet); - return; - } - - SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); - if (newTopics == NULL) { - tscError("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno); - return; - } - +static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){ SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); if (pVgOffsetHashMap == NULL) { - (void)taosArrayDestroy(newTopics); - tscError("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno); + tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno); return; } - taosWLockLatch(&tmq->lock); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); - - char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0}; - tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", - tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); for (int32_t i = 0; i < topicNumCur; i++) { // find old topic SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); if (pTopicCur && pTopicCur->vgs) { int32_t vgNumCur = taosArrayGetSize(pTopicCur->vgs); - tscInfo("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); + tqInfoC("consumer:0x%" PRIx64 ", current vg num: %d", tmq->consumerId, vgNumCur); for (int32_t j = 0; j < vgNumCur; j++) { SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, j); if (pVgCur == NULL) { continue; } + char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0}; (void)sprintf(vgKey, "%s:%d", pTopicCur->topicName, pVgCur->vgId); char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVgCur->offsetInfo.endOffset); - tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, - vgKey, buf); + tqInfoC("consumer:0x%" PRIx64 ", vgId:%d vgKey:%s, offset:%s", tmq->consumerId, pVgCur->vgId, vgKey, buf); SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, - .seekOffset = pVgCur->offsetInfo.beginOffset, - .commitOffset = pVgCur->offsetInfo.committedOffset, - .numOfRows = pVgCur->numOfRows, - .vgStatus = pVgCur->vgStatus}; + .seekOffset = pVgCur->offsetInfo.beginOffset, + .commitOffset = pVgCur->offsetInfo.committedOffset, + .numOfRows = pVgCur->numOfRows, + .vgStatus = pVgCur->vgStatus}; if (taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)) != 0) { - tscError("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId); + tqErrorC("consumer:0x%" PRIx64 ", failed to put vg:%d into hashmap", tmq->consumerId, pVgCur->vgId); } } } } - for (int32_t i = 0; i < topicNumGet; i++) { + for (int32_t i = 0; i < taosArrayGetSize(pRsp->topics); i++) { SMqClientTopic topic = {0}; SMqSubTopicEp* pTopicEp = taosArrayGet(pRsp->topics, i); if (pTopicEp == NULL) { @@ -1788,13 +1777,36 @@ static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) } initClientTopicFromRsp(&topic, pTopicEp, pVgOffsetHashMap, tmq); if (taosArrayPush(newTopics, &topic) == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName); + tqErrorC("consumer:0x%" PRIx64 ", failed to push topic:%s into new topics", tmq->consumerId, topic.topicName); freeClientTopic(&topic); } } taosHashCleanup(pVgOffsetHashMap); +} +static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { + int32_t topicNumGet = taosArrayGetSize(pRsp->topics); + // vnode transform (epoch == tmq->epoch && topicNumGet != 0) + // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0) + if (epoch < tmq->epoch || (epoch == tmq->epoch && topicNumGet == 0)) { + tqDebugC("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", tmq->consumerId, + tmq->epoch, epoch, topicNumGet); + return; + } + + SArray* newTopics = taosArrayInit(topicNumGet, sizeof(SMqClientTopic)); + if (newTopics == NULL) { + tqErrorC("consumer:0x%" PRIx64 " taos array init null, code:%d", tmq->consumerId, terrno); + return; + } + tqInfoC("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", + tmq->consumerId, tmq->epoch, epoch, topicNumGet, (int)taosArrayGetSize(tmq->clientTopics)); + + taosWLockLatch(&tmq->lock); + if (topicNumGet > 0){ + buildNewTopicList(tmq, newTopics, pRsp); + } // destroy current buffered existed topics info if (tmq->clientTopics) { taosArrayDestroyEx(tmq->clientTopics, freeClientTopic); @@ -1805,7 +1817,7 @@ static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__READY); atomic_store_32(&tmq->epoch, epoch); - tscInfo("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); + tqInfoC("consumer:0x%" PRIx64 " update topic info completed", tmq->consumerId); } void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { @@ -1849,7 +1861,7 @@ int32_t tmqBuildBatchMetaRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqBatchMet pRspObj->common.vgId = pWrapper->vgHandle->vgId; (void)memcpy(&pRspObj->rsp, &pWrapper->batchMetaRsp, sizeof(SMqBatchMetaRsp)); - tscDebug("build batchmeta Rsp from wrapper"); + tqDebugC("build batchmeta Rsp from wrapper"); *ppRspObj = pRspObj; return 0; } @@ -1864,7 +1876,7 @@ void changeByteEndian(char* pData) { // length | version: int32_t blockVersion = *(int32_t*)p; if (blockVersion != BLOCK_VERSION_1) { - tscError("invalid block version:%d", blockVersion); + tqErrorC("invalid block version:%d", blockVersion); return; } *(int32_t*)p = BLOCK_VERSION_2; @@ -1921,7 +1933,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg pDataRsp->withSchema = true; pDataRsp->blockSchema = taosArrayInit(pDataRsp->blockNum, sizeof(void*)); if (pDataRsp->blockSchema == NULL) { - tscError("failed to allocate memory for blockSchema"); + tqErrorC("failed to allocate memory for blockSchema"); return; } } @@ -1940,7 +1952,7 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg SSchemaWrapper* schema = tCloneSSchemaWrapper(&pWrapper->topicHandle->schema); if (schema) { if (taosArrayPush(pDataRsp->blockSchema, &schema) == NULL) { - tscError("failed to push schema into blockSchema"); + tqErrorC("failed to push schema into blockSchema"); continue; } } @@ -2031,7 +2043,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); - tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, + tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); if (code != 0) { return code; @@ -2050,7 +2062,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { taosWLockLatch(&tmq->lock); int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); + tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); for (int i = 0; i < numOfTopics; i++) { SMqClientTopic* pTopic = taosArrayGet(tmq->clientTopics, i); @@ -2059,7 +2071,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } int32_t numOfVg = taosArrayGetSize(pTopic->vgs); if (pTopic->noPrivilege) { - tscDebug("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName); + tqDebugC("consumer:0x%" PRIx64 " has no privilegr for topic:%s", tmq->consumerId, pTopic->topicName); continue; } for (int j = 0; j < numOfVg; j++) { @@ -2069,14 +2081,14 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } int64_t elapsed = taosGetTimestampMs() - pVg->emptyBlockReceiveTs; if (elapsed < EMPTY_BLOCK_POLL_IDLE_DURATION && elapsed >= 0) { // less than 10ms - tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, + tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for 10ms before start next poll", tmq->consumerId, tmq->epoch, pVg->vgId); continue; } elapsed = taosGetTimestampMs() - pVg->blockReceiveTs; if (tmq->replayEnable && elapsed < pVg->blockSleepForReplay && elapsed >= 0) { - tscDebug("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", + tqDebugC("consumer:0x%" PRIx64 " epoch %d, vgId:%d idle for %" PRId64 "ms before start next poll when replay", tmq->consumerId, tmq->epoch, pVg->vgId, pVg->blockSleepForReplay); continue; } @@ -2084,7 +2096,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT); if (vgStatus == TMQ_VG_STATUS__WAIT) { int32_t vgSkipCnt = atomic_add_fetch_32(&pVg->vgSkipCnt, 1); - tscDebug("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, + tqDebugC("consumer:0x%" PRIx64 " epoch %d wait poll-rsp, skip vgId:%d skip cnt %d", tmq->consumerId, tmq->epoch, pVg->vgId, vgSkipCnt); continue; } @@ -2099,20 +2111,20 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { end: taosWUnLockLatch(&tmq->lock); - tscDebug("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code); + tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code); return code; } static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData) { if (!pVg->seekUpdated) { - tscDebug("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId); + tqDebugC("consumer:0x%" PRIx64 " local offset is update, since seekupdate not set", consumerId); if (hasData) { tOffsetCopy(&pVg->offsetInfo.beginOffset, reqOffset); } tOffsetCopy(&pVg->offsetInfo.endOffset, rspOffset); } else { - tscDebug("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId); + tqDebugC("consumer:0x%" PRIx64 " local offset is NOT update, since seekupdate is set", consumerId); } // update the status @@ -2124,7 +2136,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal } static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { - tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); + tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); while (1) { SMqRspWrapper* pRspWrapper = NULL; @@ -2138,7 +2150,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } } - tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType); + tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType); if (pRspWrapper->code != 0) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -2146,15 +2158,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform int32_t code = askEp(tmq, NULL, false, true); if (code != 0) { - tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); } } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { int32_t code = askEp(tmq, NULL, false, false); if (code != 0) { - tscError("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); } } - tscInfo("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, + tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code)); taosWLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; @@ -2179,7 +2191,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { - tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); @@ -2190,7 +2202,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { if (pollRspWrapper->pEpset != NULL) { SEp* pEp = GET_ACTIVE_EP(pollRspWrapper->pEpset); SEp* pOld = GET_ACTIVE_EP(&(pVg->epSet)); - tscDebug("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId, + tqDebugC("consumer:0x%" PRIx64 " update epset vgId:%d, ep:%s:%d, old ep:%s:%d", tmq->consumerId, pVg->vgId, pEp->fqdn, pEp->port, pOld->fqdn, pOld->port); pVg->epSet = *pollRspWrapper->pEpset; } @@ -2201,7 +2213,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pDataRsp->rspOffset); if (pDataRsp->blockNum == 0) { - tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 + tqDebugC("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); pVg->emptyBlockReceiveTs = taosGetTimestampMs(); @@ -2219,12 +2231,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pVg->blockSleepForReplay = pRsp->rsp.sleepTime; if (pVg->blockSleepForReplay > 0) { if (taosTmrStart(tmqReplayTask, pVg->blockSleepForReplay, (void*)(tmq->refId), tmqMgmt.timer) == NULL) { - tscError("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64, + tqErrorC("consumer:0x%" PRIx64 " failed to start replay timer, vgId:%d, sleep:%" PRId64, tmq->consumerId, pVg->vgId, pVg->blockSleepForReplay); } } } - tscDebug("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 + tqDebugC("consumer:0x%" PRIx64 " process poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); @@ -2234,7 +2246,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { return pRsp; } } else { - tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tqInfoC("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); @@ -2244,7 +2256,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); - tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); + tqDebugC("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); if (pollRspWrapper->metaRsp.head.epoch == consumerEpoch) { taosWLockLatch(&tmq->lock); @@ -2253,7 +2265,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { - tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); @@ -2271,7 +2283,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { taosWUnLockLatch(&tmq->lock); return pRsp; } else { - tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tqInfoC("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); @@ -2281,7 +2293,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); - tscDebug("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); + tqDebugC("consumer:0x%" PRIx64 " process meta rsp", tmq->consumerId); if (pollRspWrapper->batchMetaRsp.head.epoch == consumerEpoch) { taosWLockLatch(&tmq->lock); @@ -2290,7 +2302,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { - tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); @@ -2309,7 +2321,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { taosWUnLockLatch(&tmq->lock); return pRsp; } else { - tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tqInfoC("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->batchMetaRsp.head.epoch, consumerEpoch); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); @@ -2327,7 +2339,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { pollRspWrapper->vgHandle = pVg; pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); if (pollRspWrapper->vgHandle == NULL || pollRspWrapper->topicHandle == NULL) { - tscError("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, + tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); @@ -2339,7 +2351,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmq->consumerId, pDataRsp->blockNum != 0); if (pDataRsp->blockNum == 0) { - tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ",QID:0x%" PRIx64, + tqDebugC("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ",QID:0x%" PRIx64, tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId); pVg->emptyBlockReceiveTs = taosGetTimestampMs(); tmqFreeRspWrapper(pRspWrapper); @@ -2354,13 +2366,13 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { int64_t numOfRows = 0; SMqTaosxRspObj* pRsp = NULL; if (tmqBuildTaosxRspFromWrapper(pollRspWrapper, pVg, &numOfRows, &pRsp) != 0) { - tscError("consumer:0x%" PRIx64 " build taosx rsp failed, vgId:%d", tmq->consumerId, pVg->vgId); + tqErrorC("consumer:0x%" PRIx64 " build taosx rsp failed, vgId:%d", tmq->consumerId, pVg->vgId); } tmq->totalRows += numOfRows; char buf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); - tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 + tqDebugC("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 ", vg total:%" PRId64 ", total:%" PRId64 ",QID:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); @@ -2370,21 +2382,21 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { taosWUnLockLatch(&tmq->lock); return pRsp; } else { - tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", + tqInfoC("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { - tscDebug("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); + tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)pRspWrapper; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg); tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); } else { - tscError("consumer:0x%" PRIx64 " invalid msg received:%d", tmq->consumerId, pRspWrapper->tmqRspType); + tqErrorC("consumer:0x%" PRIx64 " invalid msg received:%d", tmq->consumerId, pRspWrapper->tmqRspType); } } } @@ -2395,12 +2407,12 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { void* rspObj = NULL; int64_t startTime = taosGetTimestampMs(); - tscDebug("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, + tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout); // in no topic status, delayed task also need to be processed if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { - tscInfo("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); + tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); taosMsleep(500); // sleep for a while return NULL; } @@ -2411,12 +2423,12 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tmqHandleAllDelayedTask(tmq); if (tmqPollImpl(tmq, timeout) < 0) { - tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); } rspObj = tmqHandleAllRsp(tmq, timeout); if (rspObj) { - tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); + tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } @@ -2424,7 +2436,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; if (elapsedTime > timeout || elapsedTime < 0) { - tscDebug("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, + tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, tmq->consumerId, tmq->epoch, startTime, currentTime); return NULL; } @@ -2438,33 +2450,34 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { static void displayConsumeStatistics(tmq_t* pTmq) { taosRLockLatch(&pTmq->lock); int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); - tscDebug("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", + tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); - tscDebug("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId); + tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId); for (int32_t i = 0; i < numOfTopics; ++i) { SMqClientTopic* pTopics = taosArrayGet(pTmq->clientTopics, i); if (pTopics == NULL) continue; - tscDebug("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i); + tqInfoC("consumer:0x%" PRIx64 " topic:%d", pTmq->consumerId, i); int32_t numOfVgs = taosArrayGetSize(pTopics->vgs); for (int32_t j = 0; j < numOfVgs; ++j) { SMqClientVg* pVg = taosArrayGet(pTopics->vgs, j); if (pVg == NULL) continue; - tscDebug("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); + tqInfoC("topic:%s, %d. vgId:%d rows:%" PRId64, pTopics->topicName, j, pVg->vgId, pVg->numOfRows); } } taosRUnLockLatch(&pTmq->lock); - tscDebug("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); + tqInfoC("consumer:0x%" PRIx64 " rows dist end", pTmq->consumerId); } int32_t tmq_unsubscribe(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; int32_t code = 0; int8_t status = atomic_load_8(&tmq->status); - tscInfo("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status); + tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status); + displayConsumeStatistics(tmq); if (status != TMQ_CONSUMER_STATUS__READY) { - tscInfo("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status); + tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status); goto END; } if (tmq->autoCommit) { @@ -2492,8 +2505,7 @@ END: int32_t tmq_consumer_close(tmq_t* tmq) { if (tmq == NULL) return TSDB_CODE_INVALID_PARA; - tscInfo("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); - displayConsumeStatistics(tmq); + tqInfoC("consumer:0x%" PRIx64 " start to close consumer, status:%d", tmq->consumerId, tmq->status); int32_t code = tmq_unsubscribe(tmq); if (code == 0) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__CLOSED); @@ -2600,7 +2612,7 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { if (common->reqOffset.type == TMQ_OFFSET__LOG) { return common->reqOffset.version; } else { - tscError("invalid offset type:%d", common->reqOffset.type); + tqErrorC("invalid offset type:%d", common->reqOffset.type); } } else if (TD_RES_TMQ_META(res)) { SMqMetaRspObj* pRspObj = (SMqMetaRspObj*)res; @@ -2613,7 +2625,7 @@ int64_t tmq_get_vgroup_offset(TAOS_RES* res) { return pBtRspObj->rsp.rspOffset.version; } } else { - tscError("invalid tmq type:%d", *(int8_t*)res); + tqErrorC("invalid tmq type:%d", *(int8_t*)res); } // data from tsdb, no valid offset info @@ -2639,7 +2651,7 @@ const char* tmq_get_table_name(TAOS_RES* res) { void tmq_commit_async(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_cb* cb, void* param) { if (tmq == NULL) { - tscError("invalid tmq handle, null"); + tqErrorC("invalid tmq handle, null"); if (cb != NULL) { cb(tmq, TSDB_CODE_INVALID_PARA, param); } @@ -2660,7 +2672,7 @@ static void commitCallBackFn(tmq_t* UNUSED_PARAM(tmq), int32_t code, void* param int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { if (tmq == NULL) { - tscError("invalid tmq handle, null"); + tqErrorC("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -2668,11 +2680,11 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); if (pInfo == NULL) { - tscError("failed to allocate memory for sync commit"); + tqErrorC("failed to allocate memory for sync commit"); return terrno; } if (tsem2_init(&pInfo->sem, 0, 0) != 0) { - tscError("failed to init sem for sync commit"); + tqErrorC("failed to init sem for sync commit"); taosMemoryFree(pInfo); return TSDB_CODE_OUT_OF_MEMORY; } @@ -2690,19 +2702,19 @@ int32_t tmq_commit_sync(tmq_t* tmq, const TAOS_RES* pRes) { (void)tsem2_destroy(&pInfo->sem); taosMemoryFree(pInfo); - tscInfo("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code)); + tqInfoC("consumer:0x%" PRIx64 " sync res commit done, code:%s", tmq->consumerId, tstrerror(code)); return code; } // wal range will be ok after calling tmq_get_topic_assignment or poll interface static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) { if (offset->walVerBegin == -1 || offset->walVerEnd == -1) { - tscError("Assignment or poll interface need to be called first"); + tqErrorC("Assignment or poll interface need to be called first"); return TSDB_CODE_TMQ_NEED_INITIALIZED; } if (value != -1 && (value < offset->walVerBegin || value > offset->walVerEnd)) { - tscError("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value, + tqErrorC("invalid seek params, offset:%" PRId64 ", valid range:[%" PRId64 ", %" PRId64 "]", value, offset->walVerBegin, offset->walVerEnd); return TSDB_CODE_TMQ_VERSION_OUT_OF_RANGE; } @@ -2712,7 +2724,7 @@ static int32_t checkWalRange(SVgOffsetInfo* offset, int64_t value) { int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL || pTopicName == NULL) { - tscError("invalid tmq handle, null"); + tqErrorC("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -2740,7 +2752,7 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, SSyncCommitInfo* pInfo = taosMemoryMalloc(sizeof(SSyncCommitInfo)); if (pInfo == NULL) { - tscError("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 " failed to prepare seek operation", tmq->consumerId); return TSDB_CODE_OUT_OF_MEMORY; } @@ -2760,7 +2772,7 @@ int32_t tmq_commit_offset_sync(tmq_t* tmq, const char* pTopicName, int32_t vgId, (void)tsem2_destroy(&pInfo->sem); taosMemoryFree(pInfo); - tscInfo("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId, + tqInfoC("consumer:0x%" PRIx64 " sync send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); return code; @@ -2770,7 +2782,7 @@ void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, i void* param) { int32_t code = 0; if (tmq == NULL || pTopicName == NULL) { - tscError("invalid tmq handle, null"); + tqErrorC("invalid tmq handle, null"); code = TSDB_CODE_INVALID_PARA; goto end; } @@ -2799,7 +2811,7 @@ void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, i code = asyncCommitOffset(tmq, tname, vgId, &offsetVal, cb, param); - tscInfo("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId, + tqInfoC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); end: @@ -2822,7 +2834,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { } if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); + tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); goto END; } @@ -2831,7 +2843,7 @@ int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { } SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); - tscDebug("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); + tqDebugC("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch); if (pParam->sync) { SMqAskEpRsp rsp = {0}; if (tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp) != NULL) { @@ -2906,25 +2918,25 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); if (tlen < 0) { - tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); return TSDB_CODE_INVALID_PARA; } pReq = taosMemoryCalloc(1, tlen); if (pReq == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); + tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); return terrno; } if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { - tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); + tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); taosMemoryFree(pReq); return TSDB_CODE_INVALID_PARA; } pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { - tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); + tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); taosMemoryFree(pReq); return terrno; } @@ -2949,7 +2961,7 @@ int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); - tscDebug("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); + tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); } @@ -2977,11 +2989,11 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { int32_t commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId) { int32_t waitingRspNum = atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); if (waitingRspNum == 0) { - tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, + tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d all commit-rsp received, commit completed", consumerId, pTopic, vgId); return tmqCommitDone(pParamSet); } else { - tscInfo("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, + tqDebugC("consumer:0x%" PRIx64 " topic:%s vgId:%d commit-rsp received, remain:%d", consumerId, pTopic, vgId, waitingRspNum); } return 0; @@ -3033,7 +3045,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { int32_t total = atomic_add_fetch_32(&pCommon->numOfRsp, 1); if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId, + tqErrorC("consumer:0x%" PRIx64 " failed to get the wal info from vgId:%d for topic:%s", pCommon->consumerId, pParam->vgId, pCommon->pTopicName); } else { @@ -3054,7 +3066,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { (void)taosThreadMutexLock(&pCommon->mutex); if (taosArrayPush(pCommon->pList, &assignment) == NULL) { - tscError("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId, + tqErrorC("consumer:0x%" PRIx64 " failed to push the wal info from vgId:%d for topic:%s", pCommon->consumerId, pParam->vgId, pCommon->pTopicName); code = TSDB_CODE_TSC_INTERNAL_ERROR; } @@ -3204,7 +3216,7 @@ int64_t getCommittedFromServer(tmq_t* tmq, char* tname, int32_t vgId, SEpSet* ep int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) { if (tmq == NULL || pTopicName == NULL) { - tscError("invalid tmq handle, null"); + tqErrorC("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -3224,7 +3236,7 @@ int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) { SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; int32_t type = pOffsetInfo->endOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { - tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type); + tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, position error", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } @@ -3254,16 +3266,16 @@ int64_t tmq_position(tmq_t* tmq, const char* pTopicName, int32_t vgId) { position = code; } } else { - tscError("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); + tqErrorC("consumer:0x%" PRIx64 " offset type:%d can not be reach here", tmq->consumerId, type); } - tscInfo("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position); + tqInfoC("consumer:0x%" PRIx64 " tmq_position vgId:%d position:%" PRId64, tmq->consumerId, vgId, position); return position; } int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) { if (tmq == NULL || pTopicName == NULL) { - tscError("invalid tmq handle, null"); + tqErrorC("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -3282,14 +3294,14 @@ int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) { SVgOffsetInfo* pOffsetInfo = &pVg->offsetInfo; if (isInSnapshotMode(pOffsetInfo->endOffset.type, tmq->useSnapshot)) { - tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, + tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->endOffset.type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } if (isInSnapshotMode(pOffsetInfo->committedOffset.type, tmq->useSnapshot)) { - tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, + tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, committed error", tmq->consumerId, pOffsetInfo->committedOffset.type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; @@ -3307,14 +3319,14 @@ int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) { committed = getCommittedFromServer(tmq, tname, vgId, &epSet); end: - tscInfo("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed); + tqInfoC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed); return committed; } int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_assignment** assignment, int32_t* numOfAssignment) { if (tmq == NULL || pTopicName == NULL || assignment == NULL || numOfAssignment == NULL) { - tscError("invalid tmq handle, null"); + tqErrorC("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } *numOfAssignment = 0; @@ -3330,7 +3342,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a SMqClientTopic* pTopic = NULL; int32_t code = getTopicByName(tmq, tname, &pTopic); if (code != 0) { - tscError("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); + tqErrorC("consumer:0x%" PRIx64 " invalid topic name:%s", tmq->consumerId, pTopicName); goto end; } @@ -3343,7 +3355,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } int32_t type = pClientVg->offsetInfo.beginOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { - tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); + tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, assignment not allowed", tmq->consumerId, type); code = TSDB_CODE_TMQ_SNAPSHOT_ERROR; goto end; } @@ -3351,7 +3363,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a *assignment = taosMemoryCalloc(*numOfAssignment, sizeof(tmq_topic_assignment)); if (*assignment == NULL) { - tscError("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId, + tqErrorC("consumer:0x%" PRIx64 " failed to malloc buffer, size:%" PRIzu, tmq->consumerId, (*numOfAssignment) * sizeof(tmq_topic_assignment)); code = terrno; goto end; @@ -3374,7 +3386,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a pAssignment->begin = pClientVg->offsetInfo.walVerBegin; pAssignment->end = pClientVg->offsetInfo.walVerEnd; pAssignment->vgId = pClientVg->vgId; - tscInfo("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId, + tqInfoC("consumer:0x%" PRIx64 " get assignment from local:%d->%" PRId64, tmq->consumerId, pAssignment->vgId, pAssignment->currentOffset); } @@ -3459,7 +3471,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pClientVg->offsetInfo.beginOffset); - tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId, + tqInfoC("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s,QID:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); if (code != 0) { @@ -3492,7 +3504,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } SVgOffsetInfo* pOffsetInfo = &pClientVg->offsetInfo; - tscInfo("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName, + tqInfoC("consumer:0x%" PRIx64 " %s vgId:%d offset is update to:%" PRId64, tmq->consumerId, pTopic->topicName, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; @@ -3538,7 +3550,7 @@ static int32_t tmqSeekCb(void* param, SDataBuf* pMsg, int32_t code) { // there is no data to poll int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_t offset) { if (tmq == NULL || pTopicName == NULL) { - tscError("invalid tmq handle, null"); + tqErrorC("invalid tmq handle, null"); return TSDB_CODE_INVALID_PARA; } @@ -3559,7 +3571,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ int32_t type = pOffsetInfo->endOffset.type; if (isInSnapshotMode(type, tmq->useSnapshot)) { - tscError("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); + tqErrorC("consumer:0x%" PRIx64 " offset type:%d not wal version, seek not allowed", tmq->consumerId, type); taosWUnLockLatch(&tmq->lock); return TSDB_CODE_TMQ_SNAPSHOT_ERROR; } @@ -3570,7 +3582,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ return code; } - tscInfo("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); + tqInfoC("consumer:0x%" PRIx64 " seek to %" PRId64 " on vgId:%d", tmq->consumerId, offset, vgId); // update the offset, and then commit to vnode pOffsetInfo->endOffset.type = TMQ_OFFSET__LOG; pOffsetInfo->endOffset.version = offset; @@ -3638,7 +3650,7 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ (void)tsem2_destroy(&pParam->sem); taosMemoryFree(pParam); - tscInfo("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code)); + tqInfoC("consumer:0x%" PRIx64 "send seek to vgId:%d, return code:%s", tmq->consumerId, vgId, tstrerror(code)); return code; } diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 7973aa1b46..dd10aa5155 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7050,6 +7050,7 @@ int32_t tSerializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { if (tEncodeI8(&encoder, privilege->noPrivilege) < 0) return -1; } + if (tEncodeI32(&encoder, pRsp->debugFlag) < 0) return -1; tEndEncode(&encoder); int32_t tlen = encoder.pos; @@ -7075,6 +7076,10 @@ int32_t tDeserializeSMqHbRsp(void *buf, int32_t bufLen, SMqHbRsp *pRsp) { if (tDecodeI8(&decoder, &data->noPrivilege) < 0) return -1; } } + + if (!tDecodeIsEnd(&decoder)) { + if (tDecodeI32(&decoder, &pRsp->debugFlag) < 0) return -1; + } tEndDecode(&decoder); tDecoderClear(&decoder); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7639349bac..136d3f9c6c 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -244,6 +244,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { } storeOffsetRows(pMnode, &req, pConsumer); + rsp.debugFlag = tqDebugFlag; code = buildMqHbRsp(pMsg, &rsp); END: @@ -609,7 +610,7 @@ END: tDeleteSMqConsumerObj(pConsumerNew); taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree); - return code == TSDB_CODE_TMQ_NO_NEED_REBALANCE ? 0 : code; + return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code; } SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {