diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 8482ba8a78..6f4f15d1e8 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -135,13 +135,11 @@ static inline int STupleKeyCmpr(const void* pKey1, int kLen1, const void* pKey2, } enum { - TMQ_MSG_TYPE__DUMMY = 0, - TMQ_MSG_TYPE__POLL_DATA_RSP, + TMQ_MSG_TYPE__POLL_DATA_RSP = 0, TMQ_MSG_TYPE__POLL_META_RSP, TMQ_MSG_TYPE__EP_RSP, TMQ_MSG_TYPE__POLL_DATA_META_RSP, TMQ_MSG_TYPE__WALINFO_RSP, - TMQ_MSG_TYPE__END_RSP, }; enum { diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 9d1a8a9189..a0af96853e 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3183,32 +3183,6 @@ int32_t tEncodeSTqCheckInfo(SEncoder* pEncoder, const STqCheckInfo* pInfo); int32_t tDecodeSTqCheckInfo(SDecoder* pDecoder, STqCheckInfo* pInfo); void tDeleteSTqCheckInfo(STqCheckInfo* pInfo); -typedef struct { - char topic[TSDB_TOPIC_FNAME_LEN]; -} STqDelCheckInfoReq; - -typedef struct { - int32_t vgId; - int64_t offset; - char topicName[TSDB_TOPIC_FNAME_LEN]; - char cgroup[TSDB_CGROUP_LEN]; -} SMqOffset; - -typedef struct { - int64_t consumerId; - int32_t num; - SMqOffset* offsets; -} SMqCMCommitOffsetReq; - -typedef struct { - int32_t reserved; -} SMqCMCommitOffsetRsp; - -int32_t tEncodeSMqOffset(SEncoder* encoder, const SMqOffset* pOffset); -int32_t tDecodeSMqOffset(SDecoder* decoder, SMqOffset* pOffset); -int32_t tEncodeSMqCMCommitOffsetReq(SEncoder* encoder, const SMqCMCommitOffsetReq* pReq); -int32_t tDecodeSMqCMCommitOffsetReq(SDecoder* decoder, SMqCMCommitOffsetReq* pReq); - // tqOffset enum { TMQ_OFFSET__RESET_NONE = -3, diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index c4021e5302..054302974c 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -29,8 +29,6 @@ #define OFFSET_IS_RESET_OFFSET(_of) ((_of) < 0) -typedef void (*__tmq_askep_fn_t)(tmq_t* pTmq, int32_t code, SDataBuf* pBuf, void* pParam); - struct SMqMgmt { int8_t inited; tmr_h timer; @@ -42,11 +40,13 @@ volatile int32_t tmqInitRes = 0; // initialize rsp code static struct SMqMgmt tmqMgmt = {0}; typedef struct { + int32_t code; int8_t tmqRspType; int32_t epoch; } SMqRspWrapper; typedef struct { + int32_t code; int8_t tmqRspType; int32_t epoch; SMqAskEpRsp msg; @@ -133,7 +133,6 @@ enum { enum { TMQ_DELAYED_TASK__ASK_EP = 1, - TMQ_DELAYED_TASK__REPORT, TMQ_DELAYED_TASK__COMMIT, }; @@ -165,6 +164,7 @@ typedef struct { } SMqClientTopic; typedef struct { + int32_t code; int8_t tmqRspType; int32_t epoch; // epoch can be used to guard the vgHandle int32_t vgId; @@ -189,8 +189,8 @@ typedef struct { typedef struct { int64_t refId; + bool sync; void* pParam; - __tmq_askep_fn_t pUserFn; } SMqAskEpCbParam; typedef struct { @@ -252,13 +252,12 @@ typedef struct SSyncCommitInfo { int32_t code; } SSyncCommitInfo; -static int32_t doAskEp(tmq_t* tmq); +static int32_t syncAskEp(tmq_t* tmq); static int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg); static int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet); static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffsetVal* offset, const char* pTopicName, SMqCommitCbParamSet* pParamSet); static void commitRspCountDown(SMqCommitCbParamSet* pParamSet, int64_t consumerId, const char* pTopic, int32_t vgId); -static void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param); -static void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param); +static void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpset); tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); @@ -848,7 +847,7 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { while (pTaskType != NULL) { if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) { - asyncAskEp(pTmq, addToQueueCallbackFn, NULL); + askEp(pTmq, NULL, false, false); int64_t* pRefId = taosMemoryMalloc(sizeof(int64_t)); *pRefId = pTmq->refId; @@ -865,7 +864,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { tscDebug("consumer:0x%" PRIx64 " next commit to vnode(s) in %.2fs", pTmq->consumerId, pTmq->autoCommitInterval / 1000.0); taosTmrReset(tmqAssignDelayedCommitTask, pTmq->autoCommitInterval, pRefId, tmqMgmt.timer, &pTmq->commitTimer); - } else if (*pTaskType == TMQ_DELAYED_TASK__REPORT) { + } else { + tscError("consumer:0x%" PRIx64 " invalid task type:%d", pTmq->consumerId, *pTaskType); } taosFreeQitem(pTaskType); @@ -876,10 +876,8 @@ int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { return 0; } -static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { - if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { - // do nothing - } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { +static void tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { + if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; tDeleteSMqAskEpRsp(&pEpRspWrapper->msg); } else if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { @@ -907,8 +905,6 @@ static void* tmqFreeRspWrapper(SMqRspWrapper* rspWrapper) { taosArrayDestroy(pRsp->taosxRsp.createTableLen); taosArrayDestroyP(pRsp->taosxRsp.createTableReq, taosMemoryFree); } - - return NULL; } void tmqClearUnhandleMsg(tmq_t* tmq) { @@ -1222,7 +1218,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } int32_t retryCnt = 0; - while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { + while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) { if (retryCnt++ > MAX_RETRY_COUNT) { tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId); code = TSDB_CODE_MND_CONSUMER_NOT_READY; @@ -1305,33 +1301,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { uint64_t requestId = pParam->requestId; tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId); if (tmq == NULL) { - taosMemoryFree(pParam); - taosMemoryFreeClear(pMsg->pData); - taosMemoryFreeClear(pMsg->pEpSet); - terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - return -1; + code = TSDB_CODE_TMQ_CONSUMER_CLOSED; + goto FAIL; } - if (code != 0) { - if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); - tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", - tmq->consumerId); - } else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); - if (pRspWrapper == NULL) { - tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64, - tmq->consumerId, vgId, requestId); - goto END; - } - - pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP; - taosWriteQitem(tmq->mqueue, pRspWrapper); - } else{ - tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s, reqId:0x%" PRIx64, tmq->consumerId, - vgId, tstrerror(code), requestId); - } + SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); + if (pRspWrapper == NULL) { + tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); + taosReleaseRef(tmqMgmt.rsetId, refId); + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } + if(code != 0){ goto END; } @@ -1340,30 +1322,19 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (msgEpoch < clientEpoch) { // do not write into queue since updating epoch reset tscWarn("consumer:0x%" PRIx64 - " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, + " msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); code = -1; goto END; } ASSERT(msgEpoch == clientEpoch); - // handle meta rsp int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType; - - SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0); - if (pRspWrapper == NULL) { - tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId); - code = TSDB_CODE_OUT_OF_MEMORY; - goto END; - } - pRspWrapper->tmqRspType = rspType; pRspWrapper->reqId = requestId; pRspWrapper->pEpset = pMsg->pEpSet; pMsg->pEpSet = NULL; - pRspWrapper->vgId = vgId; - strcpy(pRspWrapper->topicName, pParam->topicName); if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SDecoder decoder; @@ -1392,22 +1363,22 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType); } +END: + pRspWrapper->code = code; + pRspWrapper->vgId = vgId; + strcpy(pRspWrapper->topicName, pParam->topicName); taosWriteQitem(tmq->mqueue, pRspWrapper); int32_t total = taosQueueItemSize(tmq->mqueue); tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64, tmq->consumerId, rspType, vgId, total, requestId); - -END: - if(code != 0){ - setVgIdle(tmq, pParam->topicName, vgId); - } - - tsem_post(&tmq->rspSem); taosReleaseRef(tmqMgmt.rsetId, refId); + +FAIL: + tsem_post(&tmq->rspSem); taosMemoryFree(pParam); - taosMemoryFreeClear(pMsg->pData); - taosMemoryFreeClear(pMsg->pEpSet); + if(pMsg) taosMemoryFreeClear(pMsg->pData); + if(pMsg) taosMemoryFreeClear(pMsg->pEpSet); return code; } @@ -1478,7 +1449,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) bool set = false; int32_t topicNumGet = taosArrayGetSize(pRsp->topics); - if (epoch <= tmq->epoch) { + if (topicNumGet <= 0 && epoch <= tmq->epoch) { + tscInfo("consumer:0x%" PRIx64 " no update ep epoch from %d to epoch %d, incoming topics:%d", + tmq->consumerId, tmq->epoch, epoch, topicNumGet); return false; } @@ -1546,48 +1519,6 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) return set; } -int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) { - SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; - tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); - if (tmq == NULL) { - code = TSDB_CODE_TMQ_CONSUMER_CLOSED; - goto END; - } - - if (code != TSDB_CODE_SUCCESS) { - tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); - taosReleaseRef(tmqMgmt.rsetId, pParam->refId); - goto END; - } - - SMqRspHead* head = pMsg->pData; - int32_t epoch = atomic_load_32(&tmq->epoch); - if (head->epoch <= epoch) { - tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, no need to update local ep", - tmq->consumerId, head->epoch, epoch); - - if (tmq->status == TMQ_CONSUMER_STATUS__RECOVER) { - SMqAskEpRsp rsp; - tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); - int8_t flag = (taosArrayGetSize(rsp.topics) == 0) ? TMQ_CONSUMER_STATUS__NO_TOPIC : TMQ_CONSUMER_STATUS__READY; - atomic_store_8(&tmq->status, flag); - tDeleteSMqAskEpRsp(&rsp); - } - - } else { - tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId, - head->epoch, epoch); - } - taosReleaseRef(tmqMgmt.rsetId, pParam->refId); - -END: - pParam->pUserFn(tmq, code, pMsg, pParam->pParam); - taosMemoryFree(pMsg->pEpSet); - taosMemoryFree(pMsg->pData); - taosMemoryFree(pParam); - return code; -} - void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqClientTopic* pTopic, SMqClientVg* pVg) { int32_t groupLen = strlen(tmq->groupId); memcpy(pReq->subKey, tmq->groupId, groupLen); @@ -1722,10 +1653,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p int64_t transporterId = 0; char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); - - tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, - pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo); + tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId, + pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); if(code != 0){ goto FAIL; } @@ -1735,11 +1665,9 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pTmq->pollCnt++; return 0; - FAIL: - taosMemoryFreeClear(pParam); taosMemoryFreeClear(msg); - return code; + return tmqPollCb(pParam, NULL, code); } // broadcast the poll request to all related vnodes @@ -1776,8 +1704,6 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { atomic_store_32(&pVg->vgSkipCnt, 0); code = doTmqPollImpl(tmq, pTopic, pVg, timeout); if (code != TSDB_CODE_SUCCESS) { - atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); - tsem_post(&tmq->rspSem); goto end; } } @@ -1789,22 +1715,6 @@ end: return code; } -static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) { - if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { - if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) { - SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper; - SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; - doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg); - tDeleteSMqAskEpRsp(rspMsg); - } else { - tmqFreeRspWrapper(rspWrapper); - } - } else { - return -1; - } - return 0; -} - static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal* rspOffset, int64_t sver, int64_t ever, int64_t consumerId, bool hasData){ if (!pVg->seekUpdated) { tscDebug("consumer:0x%" PRIx64" local offset is update, since seekupdate not set", consumerId); @@ -1839,11 +1749,28 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType); - if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__END_RSP) { + if (pRspWrapper->code != 0) { + SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; + if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER); + tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER", tmq->consumerId); + } else if (pRspWrapper->code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { + terrno = pRspWrapper->code; + tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(pRspWrapper->code)); + taosFreeQitem(pRspWrapper); + return NULL; + } else{ + if(pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID){ // for vnode transform + askEp(tmq, NULL, false, true); + } + tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code)); + taosWLockLatch(&tmq->lock); + SMqClientVg* pVg = getVgInfo(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); + pVg->emptyBlockReceiveTs = taosGetTimestampMs(); + taosWUnLockLatch(&tmq->lock); + } + setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); taosFreeQitem(pRspWrapper); - terrno = TSDB_CODE_TQ_NO_COMMITTED_OFFSET; - tscError("consumer:0x%" PRIx64 " unexpected rsp from poll, code:%s", tmq->consumerId, tstrerror(terrno)); - return NULL; } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -1878,9 +1805,10 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " empty block received, vgId:%d, offset:%s, vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); - pRspWrapper = tmqFreeRspWrapper(pRspWrapper); pVg->emptyBlockReceiveTs = taosGetTimestampMs(); - taosFreeQitem(pollRspWrapper); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); + taosWUnLockLatch(&tmq->lock); } else { // build rsp int64_t numOfRows = 0; SMqRspObj* pRsp = tmqBuildRspFromWrapper(pollRspWrapper, pVg, &numOfRows); @@ -1890,20 +1818,18 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { ", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, buf, pDataRsp->blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); - taosFreeQitem(pollRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; } - taosWUnLockLatch(&tmq->lock); } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); - pRspWrapper = tmqFreeRspWrapper(pRspWrapper); - taosFreeQitem(pollRspWrapper); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) { - // todo handle the wal range and epset for each vgroup SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; int32_t consumerEpoch = atomic_load_32(&tmq->epoch); @@ -1924,15 +1850,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { updateVgInfo(pVg, &pollRspWrapper->metaRsp.rspOffset, &pollRspWrapper->metaRsp.rspOffset, pollRspWrapper->metaRsp.head.walsver, pollRspWrapper->metaRsp.head.walever, tmq->consumerId, true); // build rsp SMqMetaRspObj* pRsp = tmqBuildMetaRspFromWrapper(pollRspWrapper); - taosFreeQitem(pollRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); - pRspWrapper = tmqFreeRspWrapper(pRspWrapper); - taosFreeQitem(pollRspWrapper); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); } } else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) { SMqPollRspWrapper* pollRspWrapper = (SMqPollRspWrapper*)pRspWrapper; @@ -1956,8 +1882,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tscDebug("consumer:0x%" PRIx64 " taosx empty block received, vgId:%d, vg total:%" PRId64 ", reqId:0x%" PRIx64, tmq->consumerId, pVg->vgId, pVg->numOfRows, pollRspWrapper->reqId); pVg->emptyBlockReceiveTs = taosGetTimestampMs(); - pRspWrapper = tmqFreeRspWrapper(pRspWrapper); - taosFreeQitem(pollRspWrapper); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); continue; } else { @@ -1982,20 +1908,25 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows, tmq->totalRows, pollRspWrapper->reqId); - taosFreeQitem(pollRspWrapper); + taosFreeQitem(pRspWrapper); taosWUnLockLatch(&tmq->lock); return pRsp; } else { tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d", tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch); setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId); - pRspWrapper = tmqFreeRspWrapper(pRspWrapper); - taosFreeQitem(pollRspWrapper); + tmqFreeRspWrapper(pRspWrapper); + taosFreeQitem(pRspWrapper); } - } else { - tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId); - tmqHandleNoPollRsp(tmq, pRspWrapper); + } else if(pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP){ + tscDebug("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); + SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)pRspWrapper; + SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg; + doUpdateLocalEp(tmq, pEpRspWrapper->epoch, rspMsg); + tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); + } else { + tscError("consumer:0x%" PRIx64 " invalid msg received:%d", tmq->consumerId, pRspWrapper->tmqRspType); } } } @@ -2018,7 +1949,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { while (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__RECOVER) { int32_t retryCnt = 0; - while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) { + while (TSDB_CODE_MND_CONSUMER_NOT_READY == syncAskEp(tmq)) { if (retryCnt++ > 40) { return NULL; } @@ -2407,55 +2338,64 @@ end: } } -void defaultAskEpCb(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { - SAskEpInfo* pInfo = param; - pInfo->code = code; +int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { + SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param; + tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId); + if (tmq == NULL) { + code = TSDB_CODE_TMQ_CONSUMER_CLOSED; + goto FAIL; + } - if (pTmq == NULL || code != TSDB_CODE_SUCCESS){ + if (code != TSDB_CODE_SUCCESS) { + tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); goto END; } - SMqRspHead* head = pDataBuf->pData; - SMqAskEpRsp rsp; - tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp); - doUpdateLocalEp(pTmq, head->epoch, &rsp); - tDeleteSMqAskEpRsp(&rsp); + SMqRspHead* head = pMsg->pData; + int32_t epoch = atomic_load_32(&tmq->epoch); + tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d", tmq->consumerId, + head->epoch, epoch); + if(pParam->sync){ + SMqAskEpRsp rsp = {0}; + tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &rsp); + doUpdateLocalEp(tmq, head->epoch, &rsp); + tDeleteSMqAskEpRsp(&rsp); + }else{ + SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0); + if (pWrapper == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto END; + } + + pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; + pWrapper->epoch = head->epoch; + memcpy(&pWrapper->msg, pMsg->pData, sizeof(SMqRspHead)); + tDecodeSMqAskEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), &pWrapper->msg); + + taosWriteQitem(tmq->mqueue, pWrapper); + } END: - tsem_post(&pInfo->sem); + taosReleaseRef(tmqMgmt.rsetId, pParam->refId); + +FAIL: + if(pParam->sync){ + SAskEpInfo* pInfo = pParam->pParam; + pInfo->code = code; + tsem_post(&pInfo->sem); + } + + taosMemoryFree(pMsg->pEpSet); + taosMemoryFree(pMsg->pData); + taosMemoryFree(pParam); + return code; } -void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) { - if (pTmq == NULL){ - terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED; - return; - } - if (code != TSDB_CODE_SUCCESS) { - terrno = code; - return; - } - - SMqAskEpRspWrapper* pWrapper = taosAllocateQitem(sizeof(SMqAskEpRspWrapper), DEF_QITEM, 0); - if (pWrapper == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return; - } - - SMqRspHead* head = pDataBuf->pData; - - pWrapper->tmqRspType = TMQ_MSG_TYPE__EP_RSP; - pWrapper->epoch = head->epoch; - memcpy(&pWrapper->msg, pDataBuf->pData, sizeof(SMqRspHead)); - tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &pWrapper->msg); - - taosWriteQitem(pTmq->mqueue, pWrapper); -} - -int32_t doAskEp(tmq_t* pTmq) { +int32_t syncAskEp(tmq_t* pTmq) { SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo)); tsem_init(&pInfo->sem, 0, 0); - asyncAskEp(pTmq, defaultAskEpCb, pInfo); + askEp(pTmq, pInfo, true, false); tsem_wait(&pInfo->sem); int32_t code = pInfo->code; @@ -2464,10 +2404,10 @@ int32_t doAskEp(tmq_t* pTmq) { return code; } -void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { +void askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { SMqAskEpReq req = {0}; req.consumerId = pTmq->consumerId; - req.epoch = pTmq->epoch; + req.epoch = updateEpSet ? -1 :pTmq->epoch; strcpy(req.cgroup, pTmq->groupId); int code = 0; SMqAskEpCbParam* pParam = NULL; @@ -2501,7 +2441,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { } pParam->refId = pTmq->refId; - pParam->pUserFn = askEpFn; + pParam->sync = sync; pParam->pParam = param; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); @@ -2515,7 +2455,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { sendInfo->requestId = generateRequestId(); sendInfo->requestObjRefId = 0; sendInfo->param = pParam; - sendInfo->fp = askEpCallbackFn; + sendInfo->fp = askEpCb; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); @@ -2530,7 +2470,7 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) { FAIL: taosMemoryFreeClear(pParam); taosMemoryFreeClear(pReq); - askEpFn(pTmq, code, NULL, param); + askEpCb(pParam, NULL, code); } int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 90812a66b2..03285edcf7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -478,7 +478,7 @@ static int32_t taosAddSystemCfg(SConfig *pCfg) { if (cfgAddTimezone(pCfg, "timezone", tsTimezoneStr, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddLocale(pCfg, "locale", tsLocale, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddCharset(pCfg, "charset", tsCharset, CFG_SCOPE_BOTH) != 0) return -1; - if (cfgAddBool(pCfg, "assert", 1, CFG_SCOPE_BOTH) != 0) return -1; + if (cfgAddBool(pCfg, "assert", tsAssert, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddBool(pCfg, "enableCoreFile", 1, CFG_SCOPE_BOTH) != 0) return -1; if (cfgAddFloat(pCfg, "numOfCores", tsNumOfCores, 1, 100000, CFG_SCOPE_BOTH) != 0) return -1; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2285d0df23..b4ce55b71d 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -5384,43 +5384,7 @@ int32_t tDeserializeSServerStatusRsp(void *buf, int32_t bufLen, SServerStatusRsp tDecoderClear(&decoder); return 0; } -int32_t tEncodeSMqOffset(SEncoder *encoder, const SMqOffset *pOffset) { - if (tEncodeI32(encoder, pOffset->vgId) < 0) return -1; - if (tEncodeI64(encoder, pOffset->offset) < 0) return -1; - if (tEncodeCStr(encoder, pOffset->topicName) < 0) return -1; - if (tEncodeCStr(encoder, pOffset->cgroup) < 0) return -1; - return encoder->pos; -} -int32_t tDecodeSMqOffset(SDecoder *decoder, SMqOffset *pOffset) { - if (tDecodeI32(decoder, &pOffset->vgId) < 0) return -1; - if (tDecodeI64(decoder, &pOffset->offset) < 0) return -1; - if (tDecodeCStrTo(decoder, pOffset->topicName) < 0) return -1; - if (tDecodeCStrTo(decoder, pOffset->cgroup) < 0) return -1; - return 0; -} - -int32_t tEncodeSMqCMCommitOffsetReq(SEncoder *encoder, const SMqCMCommitOffsetReq *pReq) { - if (tStartEncode(encoder) < 0) return -1; - if (tEncodeI32(encoder, pReq->num) < 0) return -1; - for (int32_t i = 0; i < pReq->num; i++) { - tEncodeSMqOffset(encoder, &pReq->offsets[i]); - } - tEndEncode(encoder); - return encoder->pos; -} - -int32_t tDecodeSMqCMCommitOffsetReq(SDecoder *decoder, SMqCMCommitOffsetReq *pReq) { - if (tStartDecode(decoder) < 0) return -1; - if (tDecodeI32(decoder, &pReq->num) < 0) return -1; - pReq->offsets = (SMqOffset *)tDecoderMalloc(decoder, sizeof(SMqOffset) * pReq->num); - if (pReq->offsets == NULL) return -1; - for (int32_t i = 0; i < pReq->num; i++) { - tDecodeSMqOffset(decoder, &pReq->offsets[i]); - } - tEndDecode(decoder); - return 0; -} int32_t tSerializeSExplainRsp(void *buf, int32_t bufLen, SExplainRsp *pRsp) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 7f96255b1e..c1494fd0d0 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndConsumer.h" #include "mndPrivilege.h" +#include "mndVgroup.h" #include "mndShow.h" #include "mndSubscribe.h" #include "mndTopic.h" @@ -401,6 +402,9 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName); if(pSub == NULL){ +#ifdef TMQ_DEBUG + ASSERT(0); +#endif continue; } taosWLockLatch(&pSub->lock); @@ -498,7 +502,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, topic); // txn guarantees pSub is created if(pSub == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif continue; } taosRLockLatch(&pSub->lock); @@ -509,7 +515,9 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { // 2.1 fetch topic schema SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic); if(pTopic == NULL) { +#ifdef TMQ_DEBUG ASSERT(0); +#endif taosRUnLockLatch(&pSub->lock); mndReleaseSubscribe(pMnode, pSub); continue; @@ -542,6 +550,14 @@ static int32_t mndProcessAskEpReq(SRpcMsg *pMsg) { SMqVgEp *pVgEp = taosArrayGetP(pConsumerEp->vgs, j); char offsetKey[TSDB_PARTITION_KEY_LEN]; mndMakePartitionKey(offsetKey, pConsumer->cgroup, topic, pVgEp->vgId); + + if(epoch == -1){ + SVgObj *pVgroup = mndAcquireVgroup(pMnode, pVgEp->vgId); + if(pVgroup){ + pVgEp->epSet = mndGetVgroupEpset(pMnode, pVgroup); + mndReleaseVgroup(pMnode, pVgroup); + } + } // 2.2.1 build vg ep SMqSubVgEp vgEp = { .epSet = pVgEp->epSet, @@ -648,7 +664,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) { } // check topic existence - pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe"); + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe"); if (pTrans == NULL) { goto _over; } diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index c756341164..ae58eeee35 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -847,7 +847,11 @@ end: mndReleaseSubscribe(pMnode, pSub); mndTransDrop(pTrans); - return code; + if (code != 0) { + mError("cgroup %s on topic:%s, failed to drop", dropReq.cgroup, dropReq.topic); + return code; + } + return TSDB_CODE_ACTION_IN_PROGRESS; } void mndCleanupSubscribe(SMnode *pMnode) {} diff --git a/source/dnode/mnode/impl/src/mndTopic.c b/source/dnode/mnode/impl/src/mndTopic.c index a9fb5096fb..94fd6027c0 100644 --- a/source/dnode/mnode/impl/src/mndTopic.c +++ b/source/dnode/mnode/impl/src/mndTopic.c @@ -485,12 +485,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq * topicObj.astLen = strlen(pCreate->ast) + 1; } } - /*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/ - /*topicObj.ast = NULL;*/ - /*topicObj.astLen = 0;*/ - /*topicObj.physicalPlan = NULL;*/ - /*topicObj.withTbName = 1;*/ - /*topicObj.withSchema = 1;*/ SSdbRaw *pCommitRaw = mndTopicActionEncode(&topicObj); if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index c6a424666c..72310f6b19 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -88,14 +88,10 @@ typedef struct { int64_t snapshotVer; SWalReader* pWalReader; SWalRef* pRef; - // STqPushHandle pushHandle; // push STqExecHandle execHandle; // exec SRpcMsg* msg; tq_handle_status status; } STqHandle; -typedef struct { - int64_t snapshotVer; -} SStreamHandle; struct STQ { SVnode* pVnode; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 58544090e2..3aeb679eb7 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -670,7 +670,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg STqHandle* pHandle = NULL; while (1) { pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey)); - if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) { + if (pHandle) { + break; + } + taosRLockLatch(&pTq->lock); + ret = tqMetaGetHandle(pTq, req.subKey); + taosRUnLockLatch(&pTq->lock); + + if (ret < 0) { break; } } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 39627a5f7b..4d470ee5b6 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -1087,6 +1087,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { int32_t vgId = TD_VID(pTq->pVnode); // update the table list for each consumer handle + taosWLockLatch(&pTq->lock); while (1) { pIter = taosHashIterate(pTq->pHandle, pIter); if (pIter == NULL) { @@ -1116,6 +1117,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId); taosArrayDestroy(list); taosHashCancelIterate(pTq->pHandle, pIter); + taosWUnLockLatch(&pTq->lock); return ret; } tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL); @@ -1125,7 +1127,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { } } } - + taosWUnLockLatch(&pTq->lock); // update the table list handle for each stream scanner/wal reader taosWLockLatch(&pTq->pStreamMeta->lock); while (1) { diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index c2c8da134c..0fc1c8bff0 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -293,7 +293,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/count.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/countAlwaysReturnValue.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/countAlwaysReturnValue.py -R -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/db.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/db.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/db.py -N 3 -n 3 -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/diff.py -R @@ -431,7 +431,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/upper.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -R -,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py +,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/case_when.py -R ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/blockSMA.py -R @@ -480,7 +480,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeModifyMeta.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeModifyMeta.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 6 -M 3 -,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 6 -M 3 -n 3 +,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopDnodeCreateStb.py -N 6 -M 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 6 -M 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopMnodeCreateStb.py -N 6 -M 3 -n 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeSep1VnodeStopVnodeCreateStb.py -N 6 -M 3 @@ -878,9 +878,9 @@ ,,y,script,./test.sh -f tsim/dnode/offline_reason.sim ,,y,script,./test.sh -f tsim/dnode/redistribute_vgroup_replica1.sim ,,y,script,./test.sh -f tsim/dnode/vnode_clean.sim -,,y,script,./test.sh -f tsim/dnode/use_dropped_dnode.sim -,,y,script,./test.sh -f tsim/dnode/split_vgroup_replica1.sim -,,y,script,./test.sh -f tsim/dnode/split_vgroup_replica3.sim +,,y,script,./test.sh -f tsim/dnode/use_dropped_dnode.sim +,,y,script,./test.sh -f tsim/dnode/split_vgroup_replica1.sim +,,y,script,./test.sh -f tsim/dnode/split_vgroup_replica3.sim ,,y,script,./test.sh -f tsim/import/basic.sim ,,y,script,./test.sh -f tsim/import/commit.sim ,,y,script,./test.sh -f tsim/import/large.sim @@ -1153,9 +1153,9 @@ ,,y,script,./test.sh -f tsim/catalog/alterInCurrent.sim ,,y,script,./test.sh -f tsim/scalar/in.sim ,,y,script,./test.sh -f tsim/scalar/scalar.sim -,,y,script,./test.sh -f tsim/scalar/filter.sim -,,y,script,./test.sh -f tsim/scalar/caseWhen.sim -,,y,script,./test.sh -f tsim/scalar/tsConvert.sim +,,y,script,./test.sh -f tsim/scalar/filter.sim +,,y,script,./test.sh -f tsim/scalar/caseWhen.sim +,,y,script,./test.sh -f tsim/scalar/tsConvert.sim ,,y,script,./test.sh -f tsim/alter/cached_schema_after_alter.sim ,,y,script,./test.sh -f tsim/alter/dnode.sim ,,y,script,./test.sh -f tsim/alter/table.sim @@ -1242,8 +1242,8 @@ ,,y,script,./test.sh -f tsim/tag/drop_tag.sim ,,y,script,./test.sh -f tsim/tag/tbNameIn.sim ,,y,script,./test.sh -f tmp/monitor.sim -,,y,script,./test.sh -f tsim/tagindex/add_index.sim -,,n,script,./test.sh -f tsim/tagindex/sma_and_tag_index.sim +,,y,script,./test.sh -f tsim/tagindex/add_index.sim +,,n,script,./test.sh -f tsim/tagindex/sma_and_tag_index.sim #develop test