opti:optimize code logic & fix python test case error

This commit is contained in:
wangmm0220 2023-09-06 19:46:17 +08:00
parent 629de12bb1
commit f4ec83025d
2 changed files with 58 additions and 47 deletions

View File

@ -152,7 +152,6 @@ typedef struct {
int32_t vgId;
int32_t vgStatus;
int32_t vgSkipCnt; // here used to mark the slow vgroups
// bool receivedInfoFromVnode; // has already received info from vnode
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
SEpSet epSet;
@ -1314,7 +1313,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
if (code != 0) {
// in case of consumer mismatch, wait for 500ms and retry
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER",
@ -1672,35 +1670,35 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClie
return pRspObj;
}
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&pTmq->rspSem);
return -1;
}
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
SMqPollReq req = {0};
char* msg = NULL;
SMqPollCbParam* pParam = NULL;
SMsgSendInfo* sendInfo = NULL;
int code = 0;
tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
return handleErrorBeforePoll(pVg, pTmq);
if (msgSize < 0){
code = TSDB_CODE_INVALID_MSG;
goto FAIL;
}
char* msg = taosMemoryCalloc(1, msgSize);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
return handleErrorBeforePoll(pVg, pTmq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
return handleErrorBeforePoll(pVg, pTmq);
code = TSDB_CODE_INVALID_MSG;
goto FAIL;
}
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
taosMemoryFree(msg);
return handleErrorBeforePoll(pVg, pTmq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pParam->refId = pTmq->refId;
@ -1708,11 +1706,10 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pParam->vgId = pVg->vgId;
pParam->requestId = req.reqId;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pParam);
taosMemoryFree(msg);
return handleErrorBeforePoll(pVg, pTmq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
@ -1728,13 +1725,21 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
if(code != 0){
goto FAIL;
}
pVg->pollCnt++;
pVg->seekUpdated = false; // reset this flag.
pTmq->pollCnt++;
return TSDB_CODE_SUCCESS;
return 0;
FAIL:
taosMemoryFreeClear(pParam);
taosMemoryFreeClear(msg);
return code;
}
// broadcast the poll request to all related vnodes
@ -1771,6 +1776,8 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
atomic_store_32(&pVg->vgSkipCnt, 0);
code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
goto end;
}
}
@ -1782,19 +1789,15 @@ end:
return code;
}
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/
tDeleteSMqAskEpRsp(rspMsg);
*pReset = true;
} else {
tmqFreeRspWrapper(rspWrapper);
*pReset = false;
}
} else {
return -1;
@ -1819,7 +1822,7 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
pVg->offsetInfo.walVerEnd = ever + 1;
}
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
while (1) {
@ -1972,12 +1975,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId);
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper);
taosWUnLockLatch(&tmq->lock);
@ -1991,14 +1994,8 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
}
} else {
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
bool reset = false;
tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
tmqHandleNoPollRsp(tmq, pRspWrapper);
taosFreeQitem(pRspWrapper);
if (pollIfReset && reset) {
tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, timeout);
}
}
}
}
@ -2006,7 +2003,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if(tmq == NULL) return NULL;
void* rspObj;
void* rspObj = NULL;
int64_t startTime = taosGetTimestampMs();
tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
@ -2038,7 +2035,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
}
rspObj = tmqHandleAllRsp(tmq, timeout, false);
rspObj = tmqHandleAllRsp(tmq, timeout);
if (rspObj) {
tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj;
@ -2728,7 +2725,14 @@ int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* ep
sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
if(code != 0){
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return code;
}
tsem_wait(&pParam->sem);
code = pParam->code;
@ -3142,7 +3146,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if(code != 0){
taosMemoryFree(msg);
taosMemoryFree(sendInfo);
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return code;
}
tsem_wait(&pParam->sem);
code = pParam->code;

View File

@ -94,7 +94,7 @@ class TDTestCase:
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)))
# tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)))
if tdSql.getRows() == expectRows:
break
else: