From b8f041f5ea6ae6b9594b730cccfbac55e9fdda7e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Sep 2023 10:05:46 +0800 Subject: [PATCH 01/12] fix:use vgstatus before if rebalance --- source/client/src/clientTmq.c | 97 +++++++++++------------------------ 1 file changed, 31 insertions(+), 66 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index e861bd4b92..c6755f0bba 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -197,10 +197,7 @@ typedef struct { typedef struct { int64_t refId; - int32_t epoch; char topicName[TSDB_TOPIC_FNAME_LEN]; -// SMqClientVg* pVg; -// SMqClientTopic* pTopic; int32_t vgId; uint64_t requestId; // request id for debug purpose } SMqPollCbParam; @@ -1313,52 +1310,37 @@ static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollCbParam* pParam = (SMqPollCbParam*)param; - - int64_t refId = pParam->refId; -// SMqClientVg* pVg = pParam->pVg; -// SMqClientTopic* pTopic = pParam->pTopic; - + int64_t refId = pParam->refId; + int32_t vgId = pParam->vgId; + uint64_t requestId = pParam->requestId; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - taosMemoryFree(pParam); - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - return -1; + goto FAILED; } - int32_t epoch = pParam->epoch; - int32_t vgId = pParam->vgId; - uint64_t requestId = pParam->requestId; - if (code != 0) { - if (pMsg->pData) taosMemoryFree(pMsg->pData); - if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet); - // in case of consumer mismatch, wait for 500ms and retry if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { -// taosMsleep(500); atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); - tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER", + tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId); } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { - tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64, - tmq->consumerId, vgId, epoch, requestId); - goto CREATE_MSG_FAIL; + tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64, + tmq->consumerId, vgId, requestId); + goto FAILED; } pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; taosWriteQitem(tmq->mqueue, pRspWrapper); -// } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert -// taosMsleep(5); } else{ - tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId, - vgId, epoch, tstrerror(code), requestId); + tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s, reqId:0x%" PRIx64, tmq->consumerId, + vgId, tstrerror(code), requestId); } - goto CREATE_MSG_FAIL; + goto FAILED; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; @@ -1368,43 +1350,27 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); - - tsem_post(&tmq->rspSem); - taosReleaseRef(tmqMgmt.rsetId, refId); - - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pParam); - - return 0; + goto FAILED; } - if (msgEpoch != clientEpoch) { - tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64, - tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); - } + ASSERT(msgEpoch == clientEpoch); // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId, - epoch); - goto CREATE_MSG_FAIL; + tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); + goto FAILED; } pRspWrapper->tmqRspType = rspType; -// pRspWrapper->vgHandle = pVg; -// pRspWrapper->topicHandle = pTopic; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; + pMsg->pEpSet = NULL; pRspWrapper->vgId = vgId; strcpy(pRspWrapper->topicName, pParam->topicName); - pMsg->pEpSet = NULL; if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SDecoder decoder; tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); @@ -1432,7 +1398,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } - taosMemoryFree(pMsg->pData); taosWriteQitem(tmq->mqueue, pRspWrapper); int32_t total = taosQueueItemSize(tmq->mqueue); @@ -1442,22 +1407,23 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); taosMemoryFree(pParam); + taosMemoryFreeClear(pMsg->pData); return 0; -CREATE_MSG_FAIL: - if (epoch == tmq->epoch) { - taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); - if(pVg){ - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - } - taosWUnLockLatch(&tmq->lock); +FAILED: + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); + if(pVg){ + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } + taosWUnLockLatch(&tmq->lock); tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); taosMemoryFree(pParam); + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); return -1; } @@ -1467,6 +1433,7 @@ typedef struct SVgroupSaveInfo { STqOffsetVal commitOffset; STqOffsetVal seekOffset; int64_t numOfRows; + int32_t vgStatus; } SVgroupSaveInfo; static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap, @@ -1475,7 +1442,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic pTopicEp->schema.nCols = 0; pTopicEp->schema.pSchema = NULL; - char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0}; int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs); tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN); @@ -1497,7 +1464,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic .pollCnt = 0, .vgId = pVgEp->vgId, .epSet = pVgEp->epSet, - .vgStatus = TMQ_VG_STATUS__IDLE, + .vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE, .vgSkipCnt = 0, .emptyBlockReceiveTs = 0, .numOfRows = pInfo ? pInfo->numOfRows : 0, @@ -1509,7 +1476,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic clientVg.offsetInfo.walVerBegin = -1; clientVg.offsetInfo.walVerEnd = -1; clientVg.seekUpdated = false; -// clientVg.receivedInfoFromVnode = false; taosArrayPush(pTopic->vgs, &clientVg); } @@ -1565,7 +1531,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId, vgKey, buf); - SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows}; + SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, + .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows, + .vgStatus = pVgCur->vgStatus}; taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo)); } } @@ -1766,9 +1734,6 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p } pParam->refId = pTmq->refId; - pParam->epoch = pTmq->epoch; -// pParam->pVg = pVg; // pVg may be released,fix it -// pParam->pTopic = pTopic; strcpy(pParam->topicName, pTopic->topicName); pParam->vgId = pVg->vgId; pParam->requestId = req.reqId; From f0fbe08e1de0e214afa3306ac3adfd7b561aa1a8 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Sep 2023 16:45:46 +0800 Subject: [PATCH 02/12] fix:core dump if tmq is null --- include/client/taos.h | 23 ++++++++++------------- source/client/src/clientTmq.c | 5 ++++- source/libs/parser/src/parUtil.c | 2 ++ source/util/src/terror.c | 4 ++-- utils/test/c/varbinary_test.c | 1 + 5 files changed, 19 insertions(+), 16 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 5b7946c9ad..a640e12ac5 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -268,6 +268,13 @@ typedef enum tmq_conf_res_t { TMQ_CONF_OK = 0, } tmq_conf_res_t; +typedef enum tmq_res_t { + TMQ_RES_INVALID = -1, + TMQ_RES_DATA = 1, + TMQ_RES_TABLE_META = 2, + TMQ_RES_METADATA = 3, +} tmq_res_t; + typedef struct tmq_topic_assignment { int32_t vgId; int64_t currentOffset; @@ -302,6 +309,8 @@ DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); // The current offset is the offset of the last consumed message + 1 DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId); +DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); +DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); @@ -309,34 +318,22 @@ DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res); DLL_EXPORT const char *tmq_err2str(int32_t code); /* ------------------------------ TAOSX -----------------------------------*/ -// note: following apis are unstable -enum tmq_res_t { - TMQ_RES_INVALID = -1, - TMQ_RES_DATA = 1, - TMQ_RES_TABLE_META = 2, - TMQ_RES_METADATA = 3, -}; - typedef struct tmq_raw_data { void *raw; uint32_t raw_len; uint16_t raw_type; } tmq_raw_data; -typedef enum tmq_res_t tmq_res_t; - -DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res); -DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw); DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw); DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname); DLL_EXPORT int taos_write_raw_block_with_fields(TAOS *taos, int rows, char *pData, const char *tbname, TAOS_FIELD *fields, int numFields); DLL_EXPORT void tmq_free_raw(tmq_raw_data raw); + // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); DLL_EXPORT void tmq_free_json_meta(char *jsonMeta); - /* ---------------------------- TAOSX END -------------------------------- */ typedef enum { diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c6755f0bba..8eca73da3d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1315,8 +1315,11 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { uint64_t requestId = pParam->requestId; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { + taosMemoryFree(pParam); + taosMemoryFreeClear(pMsg->pData); + taosMemoryFreeClear(pMsg->pEpSet); terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - goto FAILED; + return -1; } if (code != 0) { diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c index b216ec5491..9a35b2f804 100644 --- a/source/libs/parser/src/parUtil.c +++ b/source/libs/parser/src/parUtil.c @@ -37,6 +37,8 @@ static char* getSyntaxErrFormat(int32_t errCode) { return "Column ambiguously defined: %s"; case TSDB_CODE_PAR_WRONG_VALUE_TYPE: return "Invalid value type: %s"; + case TSDB_CODE_PAR_INVALID_VARBINARY: + return "Invalid varbinary value: %s"; case TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION: return "There mustn't be aggregation"; case TSDB_CODE_PAR_WRONG_NUMBER_OF_SELECT: diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 278b23b5a9..5edc3e3f08 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -537,7 +537,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ROW_LENGTH, "Row length exceeds TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TOO_MANY_COLUMNS, "Too many columns") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FIRST_COLUMN, "First column must be timestamp") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid binary/nchar column/tag length") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid varbinary/binary/nchar column/tag length") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TAGS_NUM, "Invalid number of tag columns") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query") @@ -556,7 +556,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_ALLOWED_WIN_QUERY, "Window query not su TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_DROP_COL, "No columns can be dropped") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COL_JSON, "Only tag can be json type") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_VALUE_TOO_LONG, "Value too long for column/tag") -TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VARBINARY, "Invalidate varbinary type") +TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VARBINARY, "Invalid varbinary value") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_DELETE_WHERE, "The DELETE statement must have a definite time window range") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_REDISTRIBUTE_VG, "The REDISTRIBUTE VGROUP statement only support 1 to 3 dnodes") TAOS_DEFINE_ERROR(TSDB_CODE_PAR_FILL_NOT_ALLOWED_FUNC, "Fill not allowed") diff --git a/utils/test/c/varbinary_test.c b/utils/test/c/varbinary_test.c index e29b94ad1f..389f409e1a 100644 --- a/utils/test/c/varbinary_test.c +++ b/utils/test/c/varbinary_test.c @@ -85,6 +85,7 @@ void varbinary_sql_test() { // test insert pRes = taos_query(taos, "insert into tb2 using stb tags (2, 'tb2_bin1', 093) values (now + 2s, 'nchar1', 892, 0.3)"); + printf("error:%s", taos_errstr(pRes)); ASSERT(taos_errno(pRes) != 0); pRes = taos_query(taos, "insert into tb3 using stb tags (3, 'tb3_bin1', 0x7f829) values (now + 3s, 'nchar1', 0x7f829, 0.3)"); From 02ac3eac5a634492360f5c15c6f4bae08f73471e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 1 Sep 2023 16:51:44 +0800 Subject: [PATCH 03/12] fix:logic error --- source/dnode/vnode/src/tq/tq.c | 3 ++- source/dnode/vnode/src/tq/tqUtil.c | 7 +------ 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 5b848b51bd..a4b10ac858 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1742,6 +1742,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); int32_t len = pMsg->contLen - sizeof(SMsgHead); SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS}; + bool allStopped = false; SStreamTaskNodeUpdateMsg req = {0}; @@ -1787,7 +1788,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) { pMeta->closedTask += 1; int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); - bool allStopped = (pMeta->closedTask == numOfTasks); + allStopped = (pMeta->closedTask == numOfTasks); if (allStopped) { pMeta->closedTask = 0; } else { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 60d23663d0..79a87f86e4 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -225,12 +225,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int totalRows = 0; while (1) { int32_t savedEpoch = atomic_load_32(&pHandle->epoch); - if (savedEpoch > pRequest->epoch) { - tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey:%s vgId:%d offset %" PRId64 - ", found new consumer epoch %d, discard req epoch %d", - pRequest->consumerId, pRequest->epoch, pHandle->subKey, vgId, fetchVer, savedEpoch, pRequest->epoch); - break; - } + ASSERT(savedEpoch <= pRequest->epoch); if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); From 90e938e48f8d4be240f3dc7b7f59c2b168bed20f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 4 Sep 2023 16:53:12 +0800 Subject: [PATCH 04/12] fix:set vg status idle if reveive poll callback --- source/client/src/clientTmq.c | 25 ++++++++--------------- source/dnode/mnode/impl/src/mndConsumer.c | 1 - 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 8eca73da3d..87511865e9 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1333,7 +1333,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (pRspWrapper == NULL) { tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64, tmq->consumerId, vgId, requestId); - goto FAILED; + goto END; } pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; @@ -1343,7 +1343,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { vgId, tstrerror(code), requestId); } - goto FAILED; + goto END; } int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch; @@ -1353,7 +1353,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); - goto FAILED; + code = -1; + goto END; } ASSERT(msgEpoch == clientEpoch); @@ -1364,7 +1365,8 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); if (pRspWrapper == NULL) { tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); - goto FAILED; + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; } pRspWrapper->tmqRspType = rspType; @@ -1407,14 +1409,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, rspType, vgId, total, requestId); - tsem_post(&tmq->rspSem); - taosReleaseRef(tmqMgmt.rsetId, refId); - taosMemoryFree(pParam); - taosMemoryFreeClear(pMsg->pData); - - return 0; - -FAILED: +END: taosWLockLatch(&tmq->lock); SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); if(pVg){ @@ -1428,7 +1423,7 @@ FAILED: taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pEpSet); - return -1; + return code; } typedef struct SVgroupSaveInfo { @@ -1844,13 +1839,9 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } - // update the status - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - // update the valid wal version range pVg->offsetInfo.walVerBegin = sver; pVg->offsetInfo.walVerEnd = ever + 1; -// pVg->receivedInfoFromVnode = true; } static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index f25bd2cffb..7f96255b1e 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -401,7 +401,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ - ASSERT(0); continue; } taosWLockLatch(&pSub->lock); From 9a2da3adee182b707a37b724f659b8e86213365c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 4 Sep 2023 18:31:07 +0800 Subject: [PATCH 05/12] fix:set vg status idle if reveive poll callback --- source/client/src/clientTmq.c | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 87511865e9..0a64a02f20 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1308,6 +1308,15 @@ static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){ return NULL; } +static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId){ + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, topicName, vgId); + if(pVg){ + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + } + taosWUnLockLatch(&tmq->lock); +} + int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { SMqPollCbParam* pParam = (SMqPollCbParam*)param; int64_t refId = pParam->refId; @@ -1410,12 +1419,9 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tmq->consumerId, rspType, vgId, total, requestId); END: - taosWLockLatch(&tmq->lock); - SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId); - if(pVg){ - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + if(code != 0){ + setVgIdle(tmq, pParam->topicName, vgId); } - taosWUnLockLatch(&tmq->lock); tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); @@ -1839,6 +1845,9 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal tscDebug("consumer:0x%" PRIx64" local offset is NOT update, since seekupdate is set", consumerId); } + // update the status + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + // update the valid wal version range pVg->offsetInfo.walVerBegin = sver; pVg->offsetInfo.walVerEnd = ever + 1; @@ -1922,6 +1931,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { // todo handle the wal range and epset for each vgroup @@ -1953,6 +1963,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -2010,6 +2021,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else { tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); From 53c4f4a147368c2351e63fc730b50145006471bb Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 5 Sep 2023 10:35:05 +0800 Subject: [PATCH 06/12] opti:commit logic --- source/client/src/clientTmq.c | 32 ++++++++++++-------------------- utils/test/c/tmqSim.c | 2 +- 2 files changed, 13 insertions(+), 21 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0a64a02f20..0ec47a1cb3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -442,7 +442,6 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) { SMqCommitCbParam* pParam = (SMqCommitCbParam*)param; SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params; -// taosMemoryFree(pParam->pOffset); taosMemoryFree(pBuf->pData); taosMemoryFree(pBuf->pEpSet); @@ -513,14 +512,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pMsgSendInfo->fp = tmqCommitCb; pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; + int64_t transporterId = 0; + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); + if(code != 0){ + return code; + } atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); atomic_add_fetch_32(&pParamSet->totalRspNum, 1); - - SEp* pEp = GET_ACTIVE_EP(epSet); - - - int64_t transporterId = 0; - return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); + return code; } static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { @@ -538,7 +537,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { return NULL; } -static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ +static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { return NULL; @@ -548,13 +547,11 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p pParamSet->epoch = tmq->epoch; pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; - pParamSet->waitingRspNum = rspNum; + pParamSet->waitingRspNum = 0; return pParamSet; } - - static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){ SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName); if (pTopic == NULL) { @@ -575,11 +572,10 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient } static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) { - int32_t code = 0; tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId); taosRLockLatch(&tmq->lock); SMqClientVg* pVg = NULL; - code = getClientVg(tmq, pTopicName, vgId, &pVg); + int32_t code = getClientVg(tmq, pTopicName, vgId, &pVg); if(code != 0){ goto end; } @@ -597,7 +593,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq char commitBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); if (pParamSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -661,8 +657,7 @@ end: static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { int32_t code = 0; - // init as 1 to prevent concurrency issue - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1); + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); if (pParamSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -709,10 +704,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us // request is sent if (pParamSet->totalRspNum != 0) { - // count down since waiting rsp num init as 1 - commitRspCountDown(pParamSet, tmq->consumerId, "", 0); return; } + code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; end: taosMemoryFree(pParamSet); @@ -2580,8 +2574,6 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) { } taosMemoryFree(pParamSet); -// tmq->needReportOffsetRows = true; - taosReleaseRef(tmqMgmt.rsetId, refId); return 0; } diff --git a/utils/test/c/tmqSim.c b/utils/test/c/tmqSim.c index fbfacd9eda..6b774b3eff 100644 --- a/utils/test/c/tmqSim.c +++ b/utils/test/c/tmqSim.c @@ -696,7 +696,7 @@ static int32_t g_once_commit_flag = 0; static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code); - if (0 == g_once_commit_flag) { + if (0 == g_once_commit_flag && code == 0) { g_once_commit_flag = 1; notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT); } From 4e46ce4c03bdccafc65646ac58d7744d4cf82d5c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 5 Sep 2023 16:04:44 +0800 Subject: [PATCH 07/12] fix:wait pHandle idle if vnode receives subscribe msg --- source/dnode/vnode/src/tq/tq.c | 47 ++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 17 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index b99f54642e..74b1dcec00 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -269,19 +269,21 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { } tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey); + taosWLockLatch(&pTq->lock); + STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey); code = 0; + taosWUnLockLatch(&pTq->lock); goto end; } // 2. check consumer-vg assignment status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != req.consumerId) { tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, req.consumerId, vgId, req.subKey, pHandle->consumerId); - taosRUnLockLatch(&pTq->lock); + taosWUnLockLatch(&pTq->lock); code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; goto end; } @@ -289,7 +291,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) { // if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to // TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek. tqUnregisterPushHandle(pTq, pHandle); - taosRUnLockLatch(&pTq->lock); + taosWUnLockLatch(&pTq->lock); end: rsp.code = code; @@ -496,15 +498,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); // 1. find handle + taosRLockLatch(&pTq->lock); STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); if (pHandle == NULL) { tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey); terrno = TSDB_CODE_INVALID_MSG; + taosRUnLockLatch(&pTq->lock); return -1; } // 2. check re-balance status - taosRLockLatch(&pTq->lock); if (pHandle->consumerId != consumerId) { tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64, consumerId, vgId, req.subKey, pHandle->consumerId); @@ -580,7 +583,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg bool exec = tqIsHandleExec(pHandle); if(exec){ - tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId, + tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId, pHandle->subKey, pHandle); taosWUnLockLatch(&pTq->lock); taosMsleep(10); @@ -689,19 +692,29 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } ret = tqMetaSaveHandle(pTq, req.subKey, &handle); } else { - taosWLockLatch(&pTq->lock); - - if (pHandle->consumerId == req.newConsumerId) { // do nothing - tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); - } else { - tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, - req.newConsumerId); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, 0); - tqUnregisterPushHandle(pTq, pHandle); - ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + while(1){ + taosWLockLatch(&pTq->lock); + bool exec = tqIsHandleExec(pHandle); + if(exec){ + tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId, + pHandle->subKey, pHandle); + taosWUnLockLatch(&pTq->lock); + taosMsleep(10); + continue; + } + if (pHandle->consumerId == req.newConsumerId) { // do nothing + tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId); + } else { + tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, + req.newConsumerId); + atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, 0); + tqUnregisterPushHandle(pTq, pHandle); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + } + taosWUnLockLatch(&pTq->lock); + break; } - taosWUnLockLatch(&pTq->lock); } end: From 0e9adcd57383e3d296ca48ea730bb708fde54dfc Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 5 Sep 2023 16:45:19 +0800 Subject: [PATCH 08/12] fix:add error code 0x4012 to ignore list --- tests/pytest/auto_crash_gen.py | 4 ++-- tests/pytest/auto_crash_gen_valgrind.py | 4 ++-- tests/pytest/auto_crash_gen_valgrind_cluster.py | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/tests/pytest/auto_crash_gen.py b/tests/pytest/auto_crash_gen.py index 00e1786399..343cbd72c3 100755 --- a/tests/pytest/auto_crash_gen.py +++ b/tests/pytest/auto_crash_gen.py @@ -219,11 +219,11 @@ def get_auto_mix_cmds(args_list ,valgrind=valgrind_mode): if valgrind : - crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203 '%(crash_gen_path ,arguments) + crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203,0x4012 '%(crash_gen_path ,arguments) else: - crash_gen_cmd = 'cd %s && ./crash_gen.sh %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203'%(crash_gen_path ,arguments) + crash_gen_cmd = 'cd %s && ./crash_gen.sh %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203,0x4012'%(crash_gen_path ,arguments) return crash_gen_cmd diff --git a/tests/pytest/auto_crash_gen_valgrind.py b/tests/pytest/auto_crash_gen_valgrind.py index e37cda0a27..29d9d61732 100755 --- a/tests/pytest/auto_crash_gen_valgrind.py +++ b/tests/pytest/auto_crash_gen_valgrind.py @@ -220,11 +220,11 @@ def get_auto_mix_cmds(args_list ,valgrind=valgrind_mode): if valgrind : - crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203 '%(crash_gen_path ,arguments) + crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203,0x4012 '%(crash_gen_path ,arguments) else: - crash_gen_cmd = 'cd %s && ./crash_gen.sh %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203'%(crash_gen_path ,arguments) + crash_gen_cmd = 'cd %s && ./crash_gen.sh %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203,0x4012'%(crash_gen_path ,arguments) return crash_gen_cmd diff --git a/tests/pytest/auto_crash_gen_valgrind_cluster.py b/tests/pytest/auto_crash_gen_valgrind_cluster.py index af19836a83..8546d436de 100755 --- a/tests/pytest/auto_crash_gen_valgrind_cluster.py +++ b/tests/pytest/auto_crash_gen_valgrind_cluster.py @@ -220,11 +220,11 @@ def get_auto_mix_cmds(args_list ,valgrind=valgrind_mode): if valgrind : - crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -i 3 %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0707,0x0203 '%(crash_gen_path ,arguments) + crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -i 3 %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0707,0x0203,0x4012 '%(crash_gen_path ,arguments) else: - crash_gen_cmd = 'cd %s && ./crash_gen.sh -i 3 %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0014,0x0707,0x0203'%(crash_gen_path ,arguments) + crash_gen_cmd = 'cd %s && ./crash_gen.sh -i 3 %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0014,0x0707,0x0203,0x4012'%(crash_gen_path ,arguments) return crash_gen_cmd From 897fd5b8b5f0fe2b4ef01809b16cd179c95960d3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 6 Sep 2023 10:15:41 +0800 Subject: [PATCH 09/12] fix:tmq close error because of same committed offset --- source/client/src/clientTmq.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0ec47a1cb3..3969a68fd1 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -982,7 +982,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { } if (tmq->autoCommit) { int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0) { + if (rsp != 0 && rsp != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) { return rsp; } } @@ -2122,7 +2122,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { // if auto commit is set, commit before close consumer. Otherwise, do nothing. if (tmq->autoCommit) { int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0) { + if (rsp != 0 && rsp != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) { return rsp; } } From cc62961337e692be232988eb57d6cddbfeff840e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 6 Sep 2023 14:45:07 +0800 Subject: [PATCH 10/12] fix:heap use after free of pParamSet since the first commitCb is called before send another commit message --- source/client/src/clientTmq.c | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3969a68fd1..d76aa2456e 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -234,7 +234,6 @@ typedef struct { int64_t refId; int32_t epoch; int32_t waitingRspNum; - int32_t totalRspNum; int32_t code; tmq_commit_cb* callbackFn; /*SArray* successfulOffsets;*/ @@ -513,12 +512,12 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET; int64_t transporterId = 0; + atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo); if(code != 0){ + atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1); return code; } - atomic_add_fetch_32(&pParamSet->waitingRspNum, 1); - atomic_add_fetch_32(&pParamSet->totalRspNum, 1); return code; } @@ -537,7 +536,7 @@ static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) { return NULL; } -static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam){ +static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam, int32_t rspNum){ SMqCommitCbParamSet* pParamSet = taosMemoryCalloc(1, sizeof(SMqCommitCbParamSet)); if (pParamSet == NULL) { return NULL; @@ -547,7 +546,7 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p pParamSet->epoch = tmq->epoch; pParamSet->callbackFn = pCommitFp; pParamSet->userParam = userParam; - pParamSet->waitingRspNum = 0; + pParamSet->waitingRspNum = rspNum; return pParamSet; } @@ -593,7 +592,7 @@ static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STq char commitBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(commitBuf, tListLen(commitBuf), &pVg->offsetInfo.committedOffset); - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 0); if (pParamSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -657,7 +656,8 @@ end: static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* userParam) { int32_t code = 0; - SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam); + // init as 1 to prevent concurrency issue + SMqCommitCbParamSet* pParamSet = prepareCommitCbParamSet(tmq, pCommitFp, userParam, 1); if (pParamSet == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; goto end; @@ -703,7 +703,9 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics); // request is sent - if (pParamSet->totalRspNum != 0) { + if (pParamSet->waitingRspNum != 1) { + // count down since waiting rsp num init as 1 + commitRspCountDown(pParamSet, tmq->consumerId, "", 0); return; } code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; @@ -3005,7 +3007,12 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId); - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo); + if(code != 0){ + taosMemoryFree(pParam); + taosMemoryFree(msg); + goto end; + } } tsem_wait(&pCommon->rsp); From 629de12bb1cdf8c39295f2dee38c371a613e4b77 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 6 Sep 2023 18:29:28 +0800 Subject: [PATCH 11/12] fix:do not return error if commit nothing --- source/client/src/clientTmq.c | 132 ++++++++---------- tests/system-test/7-tmq/subscribeDb3.py | 4 +- .../tmqConsFromTsdb1-1ctb-funcNFilter.py | 2 +- .../7-tmq/tmqConsFromTsdb1-1ctb.py | 2 +- .../tmqConsFromTsdb1-mutilVg-mutilCtb.py | 2 +- .../7-tmq/tmqConsFromTsdb1-mutilVg.py | 2 +- tests/system-test/7-tmq/tmqConsFromTsdb1.py | 2 +- 7 files changed, 63 insertions(+), 83 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index d76aa2456e..ed83e41427 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -190,7 +190,6 @@ typedef struct { typedef struct { int64_t refId; - int32_t epoch; void* pParam; __tmq_askep_fn_t pUserFn; } SMqAskEpCbParam; @@ -708,7 +707,6 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us commitRspCountDown(pParamSet, tmq->consumerId, "", 0); return; } - code = TSDB_CODE_TMQ_SAME_COMMITTED_VALUE; end: taosMemoryFree(pParamSet); @@ -743,20 +741,6 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) { taosMemoryFree(param); } -//void tmqAssignDelayedReportTask(void* param, void* tmrId) { -// int64_t refId = *(int64_t*)param; -// tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); -// if (tmq != NULL) { -// int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0); -// *pTaskType = TMQ_DELAYED_TASK__REPORT; -// taosWriteQitem(tmq->delayedTask, pTaskType); -// tsem_post(&tmq->rspSem); -// } -// -// taosReleaseRef(tmqMgmt.rsetId, refId); -// taosMemoryFree(param); -//} - int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { if (pMsg) { taosMemoryFree(pMsg->pData); @@ -984,7 +968,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { } if (tmq->autoCommit) { int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0 && rsp != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) { + if (rsp != 0) { return rsp; } } @@ -1085,7 +1069,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->status = TMQ_CONSUMER_STATUS__INIT; pTmq->pollCnt = 0; pTmq->epoch = 0; -// pTmq->needReportOffsetRows = true; // set conf strcpy(pTmq->clientId, conf->clientId); @@ -1146,7 +1129,7 @@ _failed: } int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { - if(tmq == NULL) return TSDB_CODE_INVALID_PARA; + if(tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA; const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most const SArray* container = &topic_list->container; int32_t sz = taosArrayGetSize(container); @@ -1222,7 +1205,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + if(code != 0){ + goto FAIL; + } // avoid double free if msg is sent buf = NULL; @@ -1239,7 +1225,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { if (retryCnt++ > MAX_RETRY_COUNT) { - tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt); + tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); code = TSDB_CODE_MND_CONSUMER_NOT_READY; goto FAIL; } @@ -1512,10 +1498,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) taosWLockLatch(&tmq->lock); int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics); - char vgKey[TSDB_TOPIC_FNAME_LEN + 22]; + char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0}; tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d", tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur); - // todo extract method for (int32_t i = 0; i < topicNumCur; i++) { // find old topic SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i); @@ -1566,32 +1551,17 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); - if (tmq == NULL) { - terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; -// pParam->pUserFn(tmq, terrno, NULL, pParam->pParam); - - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pParam); - return terrno; + code = TSDB_CODE_TMQ_CONSUMER_CLOSED; + goto END; } if (code != TSDB_CODE_SUCCESS) { tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); - pParam->pUserFn(tmq, code, NULL, pParam->pParam); - taosReleaseRef(tmqMgmt.rsetId, pParam->refId); - - taosMemoryFree(pMsg->pData); - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pParam); - return code; + goto END; } - // tmq's epoch is monotonically increase, - // so it's safe to discard any old epoch msg. - // Epoch will only increase when received newer epoch ep msg SMqRspHead* head = pMsg->pData; int32_t epoch = atomic_load_32(&tmq->epoch); if (head->epoch <= epoch) { @@ -1610,10 +1580,10 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, head->epoch, epoch); } - - pParam->pUserFn(tmq, code, pMsg, pParam->pParam); taosReleaseRef(tmqMgmt.rsetId, pParam->refId); +END: + pParam->pUserFn(tmq, code, pMsg, pParam->pParam); taosMemoryFree(pMsg->pEpSet); taosMemoryFree(pMsg->pData); taosMemoryFree(pParam); @@ -1925,9 +1895,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { // todo handle the wal range and epset for each vgroup @@ -1957,9 +1927,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -2015,9 +1985,9 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); pRspWrapper = tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pollRspWrapper); - setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); } } else { tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); @@ -2124,7 +2094,7 @@ int32_t tmq_consumer_close(tmq_t* tmq) { // if auto commit is set, commit before close consumer. Otherwise, do nothing. if (tmq->autoCommit) { int32_t rsp = tmq_commit_sync(tmq, NULL); - if (rsp != 0 && rsp != TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) { + if (rsp != 0) { return rsp; } } @@ -2440,23 +2410,29 @@ end: } } -void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { +void defaultAskEpCb(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { SAskEpInfo* pInfo = param; pInfo->code = code; - if (code == TSDB_CODE_SUCCESS) { - SMqRspHead* head = pDataBuf->pData; - - SMqAskEpRsp rsp; - tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp); - doUpdateLocalEp(pTmq, head->epoch, &rsp); - tDeleteSMqAskEpRsp(&rsp); + if (pTmq == NULL || code != TSDB_CODE_SUCCESS){ + goto END; } + SMqRspHead* head = pDataBuf->pData; + SMqAskEpRsp rsp; + tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp); + doUpdateLocalEp(pTmq, head->epoch, &rsp); + tDeleteSMqAskEpRsp(&rsp); + +END: tsem_post(&pInfo->sem); } void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { + if (pTmq == NULL){ + terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; + return; + } if (code != TSDB_CODE_SUCCESS) { terrno = code; return; @@ -2482,7 +2458,7 @@ int32_t doAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); tsem_init(&pInfo->sem, 0, 0); - asyncAskEp(pTmq, updateEpCallbackFn, pInfo); + asyncAskEp(pTmq, defaultAskEpCb, pInfo); tsem_wait(&pInfo->sem); int32_t code = pInfo->code; @@ -2496,49 +2472,45 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { req.consumerId = pTmq->consumerId; req.epoch = pTmq->epoch; strcpy(req.cgroup, pTmq->groupId); + int code = 0; + SMqAskEpCbParam* pParam = NULL; + void* pReq = NULL; int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); if (tlen < 0) { tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); - askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param); - return; + code = TSDB_CODE_INVALID_PARA; + goto FAIL; } - void* pReq = taosMemoryCalloc(1, tlen); + pReq = taosMemoryCalloc(1, tlen); if (pReq == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); - askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); - return; + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); - taosMemoryFree(pReq); - - askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param); - return; + code = TSDB_CODE_INVALID_PARA; + goto FAIL; } - SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); + pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); - taosMemoryFree(pReq); - - askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); - return; + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } pParam->refId = pTmq->refId; - pParam->epoch = pTmq->epoch; pParam->pUserFn = askEpFn; pParam->pParam = param; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { - taosMemoryFree(pParam); - taosMemoryFree(pReq); - askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param); - return; + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL}; @@ -2553,7 +2525,15 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); int64_t transporterId = 0; - asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + if(code == 0){ + return; + } + +FAIL: + taosMemoryFreeClear(pParam); + taosMemoryFreeClear(pReq); + askEpFn(pTmq, code, NULL, param); } int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index b66334a6a6..9e8ca99930 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -94,7 +94,7 @@ class TDTestCase: resultList=[] while 1: tdSql.query("select * from %s.consumeresult"%cdbName) - #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))) if tdSql.getRows() == expectRows: break else: @@ -336,7 +336,7 @@ class TDTestCase: for i in range(expectRows): totalConsumeRows += resultList[i] - if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0: + if totalConsumeRows > expectrowcnt or totalConsumeRows < 0: tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt)) tdLog.exit("tmq consume rows error!") diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py index 6a03f0f751..117c3ce637 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb-funcNFilter.py @@ -218,7 +218,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py index c11159c6e5..2864240441 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-1ctb.py @@ -216,7 +216,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted)) - if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py index 439845aa54..d8606efe58 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg-mutilCtb.py @@ -218,7 +218,7 @@ class TDTestCase: tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) - if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py index 53ff020b08..05aa82c929 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1-mutilVg.py @@ -216,7 +216,7 @@ class TDTestCase: actConsumeTotalRows = resultList[0] - if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted): + if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted): tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) tdLog.exit("%d tmq consume rows error!"%consumerId) diff --git a/tests/system-test/7-tmq/tmqConsFromTsdb1.py b/tests/system-test/7-tmq/tmqConsFromTsdb1.py index 4bb6cf463f..dcaa6ceb7c 100644 --- a/tests/system-test/7-tmq/tmqConsFromTsdb1.py +++ b/tests/system-test/7-tmq/tmqConsFromTsdb1.py @@ -217,7 +217,7 @@ class TDTestCase: tdLog.info("act consume rows: %d"%(actConsumeTotalRows)) tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted)) - if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)): + if not ((actConsumeTotalRows >= 0) and (actConsumeTotalRows <= totalRowsInserted)): tdLog.exit("%d tmq consume rows error!"%consumerId) time.sleep(10) From f4ec83025d1cdc7e339d2421cf46df847473837a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 6 Sep 2023 19:46:17 +0800 Subject: [PATCH 12/12] opti:optimize code logic & fix python test case error --- source/client/src/clientTmq.c | 103 +++++++++++++----------- tests/system-test/7-tmq/subscribeDb3.py | 2 +- 2 files changed, 58 insertions(+), 47 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index ed83e41427..c4021e5302 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -152,7 +152,6 @@ typedef struct { int32_t vgId; int32_t vgStatus; int32_t vgSkipCnt; // here used to mark the slow vgroups -// bool receivedInfoFromVnode; // has already received info from vnode int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp. SEpSet epSet; @@ -1314,7 +1313,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { } if (code != 0) { - // in case of consumer mismatch, wait for 500ms and retry if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", @@ -1672,35 +1670,35 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClie return pRspObj; } -static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&pTmq->rspSem); - return -1; -} - static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { SMqPollReq req = {0}; + char* msg = NULL; + SMqPollCbParam* pParam = NULL; + SMsgSendInfo* sendInfo = NULL; + int code = 0; tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg); int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); - if (msgSize < 0) { - return handleErrorBeforePoll(pVg, pTmq); + if (msgSize < 0){ + code = TSDB_CODE_INVALID_MSG; + goto FAIL; } - char* msg = taosMemoryCalloc(1, msgSize); + msg = taosMemoryCalloc(1, msgSize); if (NULL == msg) { - return handleErrorBeforePoll(pVg, pTmq); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { - taosMemoryFree(msg); - return handleErrorBeforePoll(pVg, pTmq); + code = TSDB_CODE_INVALID_MSG; + goto FAIL; } - SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); + pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); if (pParam == NULL) { - taosMemoryFree(msg); - return handleErrorBeforePoll(pVg, pTmq); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } pParam->refId = pTmq->refId; @@ -1708,11 +1706,10 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pParam->vgId = pVg->vgId; pParam->requestId = req.reqId; - SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); + sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); if (sendInfo == NULL) { - taosMemoryFree(pParam); - taosMemoryFree(msg); - return handleErrorBeforePoll(pVg, pTmq); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; } sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; @@ -1728,13 +1725,21 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId); - asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + if(code != 0){ + goto FAIL; + } pVg->pollCnt++; pVg->seekUpdated = false; // reset this flag. pTmq->pollCnt++; - return TSDB_CODE_SUCCESS; + return 0; + +FAIL: + taosMemoryFreeClear(pParam); + taosMemoryFreeClear(msg); + return code; } // broadcast the poll request to all related vnodes @@ -1771,6 +1776,8 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { atomic_store_32(&pVg->vgSkipCnt, 0); code = doTmqPollImpl(tmq, pTopic, pVg, timeout); if (code != TSDB_CODE_SUCCESS) { + atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); + tsem_post(&tmq->rspSem); goto end; } } @@ -1782,19 +1789,15 @@ end: return code; } -static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) { +static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) { if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { - /*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/ if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg); - /*tmqClearUnhandleMsg(tmq);*/ tDeleteSMqAskEpRsp(rspMsg); - *pReset = true; } else { tmqFreeRspWrapper(rspWrapper); - *pReset = false; } } else { return -1; @@ -1819,7 +1822,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal pVg->offsetInfo.walVerEnd = ever + 1; } -static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { +static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems); while (1) { @@ -1972,12 +1975,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { tmq->totalRows += numOfRows; - char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); - tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 - ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, - tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, - tmq->totalRows, pollRspWrapper->reqId); + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset); + tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64 + ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, + tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, + tmq->totalRows, pollRspWrapper->reqId); taosFreeQitem(pollRspWrapper); taosWUnLockLatch(&tmq->lock); @@ -1991,14 +1994,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { } } else { tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); - - bool reset = false; - tmqHandleNoPollRsp(tmq, pRspWrapper, &reset); + tmqHandleNoPollRsp(tmq, pRspWrapper); taosFreeQitem(pRspWrapper); - if (pollIfReset && reset) { - tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId); - tmqPollImpl(tmq, timeout); - } } } } @@ -2006,7 +2003,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { if(tmq == NULL) return NULL; - void* rspObj; + void* rspObj = NULL; int64_t startTime = taosGetTimestampMs(); tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, @@ -2038,7 +2035,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); } - rspObj = tmqHandleAllRsp(tmq, timeout, false); + rspObj = tmqHandleAllRsp(tmq, timeout); if (rspObj) { tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; @@ -2728,7 +2725,14 @@ int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* ep sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO; int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo); + if(code != 0){ + taosMemoryFree(buf); + taosMemoryFree(sendInfo); + tsem_destroy(&pParam->sem); + taosMemoryFree(pParam); + return code; + } tsem_wait(&pParam->sem); code = pParam->code; @@ -3142,7 +3146,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_ sendInfo->msgType = TDMT_VND_TMQ_SEEK; int64_t transporterId = 0; - asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); + if(code != 0){ + taosMemoryFree(msg); + taosMemoryFree(sendInfo); + tsem_destroy(&pParam->sem); + taosMemoryFree(pParam); + return code; + } tsem_wait(&pParam->sem); code = pParam->code; diff --git a/tests/system-test/7-tmq/subscribeDb3.py b/tests/system-test/7-tmq/subscribeDb3.py index 9e8ca99930..37e3a17100 100644 --- a/tests/system-test/7-tmq/subscribeDb3.py +++ b/tests/system-test/7-tmq/subscribeDb3.py @@ -94,7 +94,7 @@ class TDTestCase: resultList=[] while 1: tdSql.query("select * from %s.consumeresult"%cdbName) - tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))) + # tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))) if tdSql.getRows() == expectRows: break else: