Merge pull request #22684 from taosdata/mark/tmq-3.0

fix:use vgstatus before if rebalance
This commit is contained in:
Haojun Liao 2023-09-07 09:20:27 +08:00 committed by GitHub
commit 0f909285c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 235 additions and 262 deletions

View File

@ -268,6 +268,13 @@ typedef enum tmq_conf_res_t {
TMQ_CONF_OK = 0,
} tmq_conf_res_t;
typedef enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
TMQ_RES_METADATA = 3,
} tmq_res_t;
typedef struct tmq_topic_assignment {
int32_t vgId;
int64_t currentOffset;
@ -302,6 +309,8 @@ DLL_EXPORT int32_t tmq_offset_seek(tmq_t *tmq, const char *pTopicName, int32_t
DLL_EXPORT int64_t tmq_position(tmq_t *tmq, const char *pTopicName, int32_t vgId); // The current offset is the offset of the last consumed message + 1
DLL_EXPORT int64_t tmq_committed(tmq_t *tmq, const char *pTopicName, int32_t vgId);
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
@ -309,34 +318,22 @@ DLL_EXPORT int64_t tmq_get_vgroup_offset(TAOS_RES* res);
DLL_EXPORT const char *tmq_err2str(int32_t code);
/* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable
enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
TMQ_RES_METADATA = 3,
};
typedef struct tmq_raw_data {
void *raw;
uint32_t raw_len;
uint16_t raw_type;
} tmq_raw_data;
typedef enum tmq_res_t tmq_res_t;
DLL_EXPORT const char *tmq_get_table_name(TAOS_RES *res);
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
DLL_EXPORT int32_t tmq_get_raw(TAOS_RES *res, tmq_raw_data *raw);
DLL_EXPORT int32_t tmq_write_raw(TAOS *taos, tmq_raw_data raw);
DLL_EXPORT int taos_write_raw_block(TAOS *taos, int numOfRows, char *pData, const char *tbname);
DLL_EXPORT int taos_write_raw_block_with_fields(TAOS *taos, int rows, char *pData, const char *tbname,
TAOS_FIELD *fields, int numFields);
DLL_EXPORT void tmq_free_raw(tmq_raw_data raw);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res);
DLL_EXPORT void tmq_free_json_meta(char *jsonMeta);
/* ---------------------------- TAOSX END -------------------------------- */
typedef enum {

View File

@ -152,7 +152,6 @@ typedef struct {
int32_t vgId;
int32_t vgStatus;
int32_t vgSkipCnt; // here used to mark the slow vgroups
// bool receivedInfoFromVnode; // has already received info from vnode
int64_t emptyBlockReceiveTs; // once empty block is received, idle for ignoreCnt then start to poll data
bool seekUpdated; // offset is updated by seek operator, therefore, not update by vnode rsp.
SEpSet epSet;
@ -190,17 +189,13 @@ typedef struct {
typedef struct {
int64_t refId;
int32_t epoch;
void* pParam;
__tmq_askep_fn_t pUserFn;
} SMqAskEpCbParam;
typedef struct {
int64_t refId;
int32_t epoch;
char topicName[TSDB_TOPIC_FNAME_LEN];
// SMqClientVg* pVg;
// SMqClientTopic* pTopic;
int32_t vgId;
uint64_t requestId; // request id for debug purpose
} SMqPollCbParam;
@ -237,7 +232,6 @@ typedef struct {
int64_t refId;
int32_t epoch;
int32_t waitingRspNum;
int32_t totalRspNum;
int32_t code;
tmq_commit_cb* callbackFn;
/*SArray* successfulOffsets;*/
@ -445,7 +439,6 @@ static int32_t tmqCommitCb(void* param, SDataBuf* pBuf, int32_t code) {
SMqCommitCbParam* pParam = (SMqCommitCbParam*)param;
SMqCommitCbParamSet* pParamSet = (SMqCommitCbParamSet*)pParam->params;
// taosMemoryFree(pParam->pOffset);
taosMemoryFree(pBuf->pData);
taosMemoryFree(pBuf->pEpSet);
@ -516,14 +509,14 @@ static int32_t doSendCommitMsg(tmq_t* tmq, int32_t vgId, SEpSet* epSet, STqOffse
pMsgSendInfo->fp = tmqCommitCb;
pMsgSendInfo->msgType = TDMT_VND_TMQ_COMMIT_OFFSET;
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
atomic_add_fetch_32(&pParamSet->totalRspNum, 1);
SEp* pEp = GET_ACTIVE_EP(epSet);
int64_t transporterId = 0;
return asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
atomic_add_fetch_32(&pParamSet->waitingRspNum, 1);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, pMsgSendInfo);
if(code != 0){
atomic_sub_fetch_32(&pParamSet->waitingRspNum, 1);
return code;
}
return code;
}
static SMqClientTopic* getTopicByName(tmq_t* tmq, const char* pTopicName) {
@ -556,8 +549,6 @@ static SMqCommitCbParamSet* prepareCommitCbParamSet(tmq_t* tmq, tmq_commit_cb* p
return pParamSet;
}
static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClientVg** pVg){
SMqClientTopic* pTopic = getTopicByName(tmq, pTopicName);
if (pTopic == NULL) {
@ -578,11 +569,10 @@ static int32_t getClientVg(tmq_t* tmq, char* pTopicName, int32_t vgId, SMqClient
}
static int32_t asyncCommitOffset(tmq_t* tmq, char* pTopicName, int32_t vgId, STqOffsetVal* offsetVal, tmq_commit_cb* pCommitFp, void* userParam) {
int32_t code = 0;
tscInfo("consumer:0x%" PRIx64 " do manual commit offset for %s, vgId:%d", tmq->consumerId, pTopicName, vgId);
taosRLockLatch(&tmq->lock);
SMqClientVg* pVg = NULL;
code = getClientVg(tmq, pTopicName, vgId, &pVg);
int32_t code = getClientVg(tmq, pTopicName, vgId, &pVg);
if(code != 0){
goto end;
}
@ -711,7 +701,7 @@ static void asyncCommitAllOffsets(tmq_t* tmq, tmq_commit_cb* pCommitFp, void* us
tscInfo("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - 1, numOfTopics);
// request is sent
if (pParamSet->totalRspNum != 0) {
if (pParamSet->waitingRspNum != 1) {
// count down since waiting rsp num init as 1
commitRspCountDown(pParamSet, tmq->consumerId, "", 0);
return;
@ -750,20 +740,6 @@ void tmqAssignDelayedCommitTask(void* param, void* tmrId) {
taosMemoryFree(param);
}
//void tmqAssignDelayedReportTask(void* param, void* tmrId) {
// int64_t refId = *(int64_t*)param;
// tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
// if (tmq != NULL) {
// int8_t* pTaskType = taosAllocateQitem(sizeof(int8_t), DEF_QITEM, 0);
// *pTaskType = TMQ_DELAYED_TASK__REPORT;
// taosWriteQitem(tmq->delayedTask, pTaskType);
// tsem_post(&tmq->rspSem);
// }
//
// taosReleaseRef(tmqMgmt.rsetId, refId);
// taosMemoryFree(param);
//}
int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) {
if (pMsg) {
taosMemoryFree(pMsg->pData);
@ -1092,7 +1068,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
pTmq->pollCnt = 0;
pTmq->epoch = 0;
// pTmq->needReportOffsetRows = true;
// set conf
strcpy(pTmq->clientId, conf->clientId);
@ -1153,7 +1128,7 @@ _failed:
}
int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
if(tmq == NULL) return TSDB_CODE_INVALID_PARA;
if(tmq == NULL || topic_list == NULL) return TSDB_CODE_INVALID_PARA;
const int32_t MAX_RETRY_COUNT = 120 * 2; // let's wait for 2 mins at most
const SArray* container = &topic_list->container;
int32_t sz = taosArrayGetSize(container);
@ -1229,7 +1204,10 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if(code != 0){
goto FAIL;
}
// avoid double free if msg is sent
buf = NULL;
@ -1246,7 +1224,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == doAskEp(tmq)) {
if (retryCnt++ > MAX_RETRY_COUNT) {
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry:%d in 500ms", tmq->consumerId, retryCnt);
tscError("consumer:0x%" PRIx64 ", mnd not ready for subscribe, retry more than 2 minutes", tmq->consumerId);
code = TSDB_CODE_MND_CONSUMER_NOT_READY;
goto FAIL;
}
@ -1311,54 +1289,50 @@ static SMqClientTopic* getTopicInfo(tmq_t* tmq, char* topicName){
return NULL;
}
static void setVgIdle(tmq_t* tmq, char* topicName, int32_t vgId){
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, topicName, vgId);
if(pVg){
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosWUnLockLatch(&tmq->lock);
}
int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
SMqPollCbParam* pParam = (SMqPollCbParam*)param;
int64_t refId = pParam->refId;
// SMqClientVg* pVg = pParam->pVg;
// SMqClientTopic* pTopic = pParam->pTopic;
int64_t refId = pParam->refId;
int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, refId);
if (tmq == NULL) {
taosMemoryFree(pParam);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return -1;
}
int32_t epoch = pParam->epoch;
int32_t vgId = pParam->vgId;
uint64_t requestId = pParam->requestId;
if (code != 0) {
if (pMsg->pData) taosMemoryFree(pMsg->pData);
if (pMsg->pEpSet) taosMemoryFree(pMsg->pEpSet);
// in case of consumer mismatch, wait for 500ms and retry
if (code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
// taosMsleep(500);
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__RECOVER);
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, wait for 500ms and set status to be RECOVER",
tscDebug("consumer:0x%" PRIx64 " wait for the re-balance, set status to be RECOVER",
tmq->consumerId);
} else if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d since out of memory, reqId:0x%" PRIx64,
tmq->consumerId, vgId, epoch, requestId);
goto CREATE_MSG_FAIL;
tscWarn("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since out of memory, reqId:0x%" PRIx64,
tmq->consumerId, vgId, requestId);
goto END;
}
pRspWrapper->tmqRspType = TMQ_MSG_TYPE__END_RSP;
taosWriteQitem(tmq->mqueue, pRspWrapper);
// } else if (code == TSDB_CODE_WAL_LOG_NOT_EXIST) { // poll data while insert
// taosMsleep(5);
} else{
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, epoch %d, since %s, reqId:0x%" PRIx64, tmq->consumerId,
vgId, epoch, tstrerror(code), requestId);
tscError("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s, reqId:0x%" PRIx64, tmq->consumerId,
vgId, tstrerror(code), requestId);
}
goto CREATE_MSG_FAIL;
goto END;
}
int32_t msgEpoch = ((SMqRspHead*)pMsg->pData)->epoch;
@ -1368,43 +1342,29 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tscWarn("consumer:0x%" PRIx64
" msg discard from vgId:%d since from earlier epoch, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return 0;
code = -1;
goto END;
}
if (msgEpoch != clientEpoch) {
tscWarn("consumer:0x%" PRIx64 " mismatch rsp from vgId:%d, epoch %d, current epoch %d, reqId:0x%" PRIx64,
tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId);
}
ASSERT(msgEpoch == clientEpoch);
// handle meta rsp
int8_t rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM, 0);
if (pRspWrapper == NULL) {
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, epoch %d since out of memory", tmq->consumerId, vgId,
epoch);
goto CREATE_MSG_FAIL;
tscWarn("consumer:0x%" PRIx64 " msg discard from vgId:%d, since out of memory", tmq->consumerId, vgId);
code = TSDB_CODE_OUT_OF_MEMORY;
goto END;
}
pRspWrapper->tmqRspType = rspType;
// pRspWrapper->vgHandle = pVg;
// pRspWrapper->topicHandle = pTopic;
pRspWrapper->reqId = requestId;
pRspWrapper->pEpset = pMsg->pEpSet;
pMsg->pEpSet = NULL;
pRspWrapper->vgId = vgId;
strcpy(pRspWrapper->topicName, pParam->topicName);
pMsg->pEpSet = NULL;
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
SDecoder decoder;
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead));
@ -1432,34 +1392,24 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
tscError("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
}
taosMemoryFree(pMsg->pData);
taosWriteQitem(tmq->mqueue, pRspWrapper);
int32_t total = taosQueueItemSize(tmq->mqueue);
tscDebug("consumer:0x%" PRIx64 " put poll res into mqueue, type:%d, vgId:%d, total in queue:%d, reqId:0x%" PRIx64,
tmq->consumerId, rspType, vgId, total, requestId);
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pParam);
return 0;
CREATE_MSG_FAIL:
if (epoch == tmq->epoch) {
taosWLockLatch(&tmq->lock);
SMqClientVg* pVg = getVgInfo(tmq, pParam->topicName, vgId);
if(pVg){
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosWUnLockLatch(&tmq->lock);
END:
if(code != 0){
setVgIdle(tmq, pParam->topicName, vgId);
}
tsem_post(&tmq->rspSem);
taosReleaseRef(tmqMgmt.rsetId, refId);
taosMemoryFree(pParam);
taosMemoryFreeClear(pMsg->pData);
taosMemoryFreeClear(pMsg->pEpSet);
return -1;
return code;
}
typedef struct SVgroupSaveInfo {
@ -1467,6 +1417,7 @@ typedef struct SVgroupSaveInfo {
STqOffsetVal commitOffset;
STqOffsetVal seekOffset;
int64_t numOfRows;
int32_t vgStatus;
} SVgroupSaveInfo;
static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopicEp, SHashObj* pVgOffsetHashMap,
@ -1475,7 +1426,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
pTopicEp->schema.nCols = 0;
pTopicEp->schema.pSchema = NULL;
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
int32_t vgNumGet = taosArrayGetSize(pTopicEp->vgs);
tstrncpy(pTopic->topicName, pTopicEp->topic, TSDB_TOPIC_FNAME_LEN);
@ -1497,7 +1448,7 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
.pollCnt = 0,
.vgId = pVgEp->vgId,
.epSet = pVgEp->epSet,
.vgStatus = TMQ_VG_STATUS__IDLE,
.vgStatus = pInfo ? pInfo->vgStatus : TMQ_VG_STATUS__IDLE,
.vgSkipCnt = 0,
.emptyBlockReceiveTs = 0,
.numOfRows = pInfo ? pInfo->numOfRows : 0,
@ -1509,7 +1460,6 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
clientVg.offsetInfo.walVerBegin = -1;
clientVg.offsetInfo.walVerEnd = -1;
clientVg.seekUpdated = false;
// clientVg.receivedInfoFromVnode = false;
taosArrayPush(pTopic->vgs, &clientVg);
}
@ -1546,10 +1496,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
taosWLockLatch(&tmq->lock);
int32_t topicNumCur = taosArrayGetSize(tmq->clientTopics);
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
char vgKey[TSDB_TOPIC_FNAME_LEN + 22] = {0};
tscInfo("consumer:0x%" PRIx64 " update ep epoch from %d to epoch %d, incoming topics:%d, existed topics:%d",
tmq->consumerId, tmq->epoch, epoch, topicNumGet, topicNumCur);
// todo extract method
for (int32_t i = 0; i < topicNumCur; i++) {
// find old topic
SMqClientTopic* pTopicCur = taosArrayGet(tmq->clientTopics, i);
@ -1565,7 +1514,9 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
tscInfo("consumer:0x%" PRIx64 ", epoch:%d vgId:%d vgKey:%s, offset:%s", tmq->consumerId, epoch, pVgCur->vgId,
vgKey, buf);
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset, .commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows};
SVgroupSaveInfo info = {.currentOffset = pVgCur->offsetInfo.endOffset, .seekOffset = pVgCur->offsetInfo.beginOffset,
.commitOffset = pVgCur->offsetInfo.committedOffset, .numOfRows = pVgCur->numOfRows,
.vgStatus = pVgCur->vgStatus};
taosHashPut(pVgOffsetHashMap, vgKey, strlen(vgKey), &info, sizeof(SVgroupSaveInfo));
}
}
@ -1598,32 +1549,17 @@ static bool doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp)
int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
tmq_t* tmq = taosAcquireRef(tmqMgmt.rsetId, pParam->refId);
if (tmq == NULL) {
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
// pParam->pUserFn(tmq, terrno, NULL, pParam->pParam);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return terrno;
code = TSDB_CODE_TMQ_CONSUMER_CLOSED;
goto END;
}
if (code != TSDB_CODE_SUCCESS) {
tscError("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
pParam->pUserFn(tmq, code, NULL, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pParam);
return code;
goto END;
}
// tmq's epoch is monotonically increase,
// so it's safe to discard any old epoch msg.
// Epoch will only increase when received newer epoch ep msg
SMqRspHead* head = pMsg->pData;
int32_t epoch = atomic_load_32(&tmq->epoch);
if (head->epoch <= epoch) {
@ -1642,10 +1578,10 @@ int32_t askEpCallbackFn(void* param, SDataBuf* pMsg, int32_t code) {
tscInfo("consumer:0x%" PRIx64 ", recv ep, msg epoch %d, current epoch %d, update local ep", tmq->consumerId,
head->epoch, epoch);
}
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
taosReleaseRef(tmqMgmt.rsetId, pParam->refId);
END:
pParam->pUserFn(tmq, code, pMsg, pParam->pParam);
taosMemoryFree(pMsg->pEpSet);
taosMemoryFree(pMsg->pData);
taosMemoryFree(pParam);
@ -1734,50 +1670,46 @@ SMqTaosxRspObj* tmqBuildTaosxRspFromWrapper(SMqPollRspWrapper* pWrapper, SMqClie
return pRspObj;
}
static int32_t handleErrorBeforePoll(SMqClientVg* pVg, tmq_t* pTmq) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&pTmq->rspSem);
return -1;
}
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
SMqPollReq req = {0};
char* msg = NULL;
SMqPollCbParam* pParam = NULL;
SMsgSendInfo* sendInfo = NULL;
int code = 0;
tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
return handleErrorBeforePoll(pVg, pTmq);
if (msgSize < 0){
code = TSDB_CODE_INVALID_MSG;
goto FAIL;
}
char* msg = taosMemoryCalloc(1, msgSize);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
return handleErrorBeforePoll(pVg, pTmq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
taosMemoryFree(msg);
return handleErrorBeforePoll(pVg, pTmq);
code = TSDB_CODE_INVALID_MSG;
goto FAIL;
}
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
taosMemoryFree(msg);
return handleErrorBeforePoll(pVg, pTmq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch;
// pParam->pVg = pVg; // pVg may be released,fix it
// pParam->pTopic = pTopic;
strcpy(pParam->topicName, pTopic->topicName);
pParam->vgId = pVg->vgId;
pParam->requestId = req.reqId;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pParam);
taosMemoryFree(msg);
return handleErrorBeforePoll(pVg, pTmq);
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
@ -1793,13 +1725,21 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
tscDebug("consumer:0x%" PRIx64 " send poll to %s vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, pTmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, &transporterId, sendInfo);
if(code != 0){
goto FAIL;
}
pVg->pollCnt++;
pVg->seekUpdated = false; // reset this flag.
pTmq->pollCnt++;
return TSDB_CODE_SUCCESS;
return 0;
FAIL:
taosMemoryFreeClear(pParam);
taosMemoryFreeClear(msg);
return code;
}
// broadcast the poll request to all related vnodes
@ -1836,6 +1776,8 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
atomic_store_32(&pVg->vgSkipCnt, 0);
code = doTmqPollImpl(tmq, pTopic, pVg, timeout);
if (code != TSDB_CODE_SUCCESS) {
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
tsem_post(&tmq->rspSem);
goto end;
}
}
@ -1847,19 +1789,15 @@ end:
return code;
}
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper, bool* pReset) {
static int32_t tmqHandleNoPollRsp(tmq_t* tmq, SMqRspWrapper* rspWrapper) {
if (rspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
/*printf("ep %d %d\n", rspMsg->head.epoch, tmq->epoch);*/
if (rspWrapper->epoch > atomic_load_32(&tmq->epoch)) {
SMqAskEpRspWrapper* pEpRspWrapper = (SMqAskEpRspWrapper*)rspWrapper;
SMqAskEpRsp* rspMsg = &pEpRspWrapper->msg;
doUpdateLocalEp(tmq, rspWrapper->epoch, rspMsg);
/*tmqClearUnhandleMsg(tmq);*/
tDeleteSMqAskEpRsp(rspMsg);
*pReset = true;
} else {
tmqFreeRspWrapper(rspWrapper);
*pReset = false;
}
} else {
return -1;
@ -1882,10 +1820,9 @@ static void updateVgInfo(SMqClientVg* pVg, STqOffsetVal* reqOffset, STqOffsetVal
// update the valid wal version range
pVg->offsetInfo.walVerBegin = sver;
pVg->offsetInfo.walVerEnd = ever + 1;
// pVg->receivedInfoFromVnode = true;
}
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
tscDebug("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, tmq->qall->numOfItems);
while (1) {
@ -1961,6 +1898,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pDataRsp->head.epoch, consumerEpoch);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
@ -1992,6 +1930,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->metaRsp.head.epoch, consumerEpoch);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
@ -2036,12 +1975,12 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
tmq->totalRows += numOfRows;
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId);
char buf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(buf, TSDB_OFFSET_LEN, &pVg->offsetInfo.endOffset);
tscDebug("consumer:0x%" PRIx64 " process taosx poll rsp, vgId:%d, offset:%s, blocks:%d, rows:%" PRId64
", vg total:%" PRId64 ", total:%" PRId64 ", reqId:0x%" PRIx64,
tmq->consumerId, pVg->vgId, buf, pollRspWrapper->dataRsp.blockNum, numOfRows, pVg->numOfRows,
tmq->totalRows, pollRspWrapper->reqId);
taosFreeQitem(pollRspWrapper);
taosWUnLockLatch(&tmq->lock);
@ -2049,19 +1988,14 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
} else {
tscInfo("consumer:0x%" PRIx64 " vgId:%d msg discard since epoch mismatch: msg epoch %d, consumer epoch %d",
tmq->consumerId, pollRspWrapper->vgId, pollRspWrapper->taosxRsp.head.epoch, consumerEpoch);
setVgIdle(tmq, pollRspWrapper->topicName, pollRspWrapper->vgId);
pRspWrapper = tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pollRspWrapper);
}
} else {
tscDebug("consumer:0x%" PRIx64 " not data msg received", tmq->consumerId);
bool reset = false;
tmqHandleNoPollRsp(tmq, pRspWrapper, &reset);
tmqHandleNoPollRsp(tmq, pRspWrapper);
taosFreeQitem(pRspWrapper);
if (pollIfReset && reset) {
tscDebug("consumer:0x%" PRIx64 ", reset and repoll", tmq->consumerId);
tmqPollImpl(tmq, timeout);
}
}
}
}
@ -2069,7 +2003,7 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout, bool pollIfReset) {
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if(tmq == NULL) return NULL;
void* rspObj;
void* rspObj = NULL;
int64_t startTime = taosGetTimestampMs();
tscInfo("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
@ -2101,7 +2035,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
tscError("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
}
rspObj = tmqHandleAllRsp(tmq, timeout, false);
rspObj = tmqHandleAllRsp(tmq, timeout);
if (rspObj) {
tscDebug("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj;
@ -2473,23 +2407,29 @@ end:
}
}
void updateEpCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
void defaultAskEpCb(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
SAskEpInfo* pInfo = param;
pInfo->code = code;
if (code == TSDB_CODE_SUCCESS) {
SMqRspHead* head = pDataBuf->pData;
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
doUpdateLocalEp(pTmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
if (pTmq == NULL || code != TSDB_CODE_SUCCESS){
goto END;
}
SMqRspHead* head = pDataBuf->pData;
SMqAskEpRsp rsp;
tDecodeSMqAskEpRsp(POINTER_SHIFT(pDataBuf->pData, sizeof(SMqRspHead)), &rsp);
doUpdateLocalEp(pTmq, head->epoch, &rsp);
tDeleteSMqAskEpRsp(&rsp);
END:
tsem_post(&pInfo->sem);
}
void addToQueueCallbackFn(tmq_t* pTmq, int32_t code, SDataBuf* pDataBuf, void* param) {
if (pTmq == NULL){
terrno = TSDB_CODE_TMQ_CONSUMER_CLOSED;
return;
}
if (code != TSDB_CODE_SUCCESS) {
terrno = code;
return;
@ -2515,7 +2455,7 @@ int32_t doAskEp(tmq_t* pTmq) {
SAskEpInfo* pInfo = taosMemoryMalloc(sizeof(SAskEpInfo));
tsem_init(&pInfo->sem, 0, 0);
asyncAskEp(pTmq, updateEpCallbackFn, pInfo);
asyncAskEp(pTmq, defaultAskEpCb, pInfo);
tsem_wait(&pInfo->sem);
int32_t code = pInfo->code;
@ -2529,49 +2469,45 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
req.consumerId = pTmq->consumerId;
req.epoch = pTmq->epoch;
strcpy(req.cgroup, pTmq->groupId);
int code = 0;
SMqAskEpCbParam* pParam = NULL;
void* pReq = NULL;
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
return;
code = TSDB_CODE_INVALID_PARA;
goto FAIL;
}
void* pReq = taosMemoryCalloc(1, tlen);
pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
taosMemoryFree(pReq);
askEpFn(pTmq, TSDB_CODE_INVALID_PARA, NULL, param);
return;
code = TSDB_CODE_INVALID_PARA;
goto FAIL;
}
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
taosMemoryFree(pReq);
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
pParam->refId = pTmq->refId;
pParam->epoch = pTmq->epoch;
pParam->pUserFn = askEpFn;
pParam->pParam = param;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pParam);
taosMemoryFree(pReq);
askEpFn(pTmq, TSDB_CODE_OUT_OF_MEMORY, NULL, param);
return;
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
@ -2586,7 +2522,15 @@ void asyncAskEp(tmq_t* pTmq, __tmq_askep_fn_t askEpFn, void* param) {
tscInfo("consumer:0x%" PRIx64 " ask ep from mnode, reqId:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
int64_t transporterId = 0;
asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if(code == 0){
return;
}
FAIL:
taosMemoryFreeClear(pParam);
taosMemoryFreeClear(pReq);
askEpFn(pTmq, code, NULL, param);
}
int32_t makeTopicVgroupKey(char* dst, const char* topicName, int32_t vg) {
@ -2609,8 +2553,6 @@ int32_t tmqCommitDone(SMqCommitCbParamSet* pParamSet) {
}
taosMemoryFree(pParamSet);
// tmq->needReportOffsetRows = true;
taosReleaseRef(tmqMgmt.rsetId, refId);
return 0;
}
@ -2783,7 +2725,14 @@ int64_t getCommittedFromServer(tmq_t *tmq, char* tname, int32_t vgId, SEpSet* ep
sendInfo->msgType = TDMT_VND_TMQ_VG_COMMITTEDINFO;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, epSet, &transporterId, sendInfo);
if(code != 0){
taosMemoryFree(buf);
taosMemoryFree(sendInfo);
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return code;
}
tsem_wait(&pParam->sem);
code = pParam->code;
@ -3042,7 +2991,12 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
tscInfo("consumer:0x%" PRIx64 " %s retrieve wal info vgId:%d, epoch %d, req:%s, reqId:0x%" PRIx64,
tmq->consumerId, pTopic->topicName, pClientVg->vgId, tmq->epoch, offsetFormatBuf, req.reqId);
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &pClientVg->epSet, &transporterId, sendInfo);
if(code != 0){
taosMemoryFree(pParam);
taosMemoryFree(msg);
goto end;
}
}
tsem_wait(&pCommon->rsp);
@ -3192,7 +3146,14 @@ int32_t tmq_offset_seek(tmq_t* tmq, const char* pTopicName, int32_t vgId, int64_
sendInfo->msgType = TDMT_VND_TMQ_SEEK;
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
code = asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
if(code != 0){
taosMemoryFree(msg);
taosMemoryFree(sendInfo);
tsem_destroy(&pParam->sem);
taosMemoryFree(pParam);
return code;
}
tsem_wait(&pParam->sem);
code = pParam->code;

View File

@ -401,7 +401,6 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
SMqSubscribeObj *pSub = mndAcquireSubscribe(pMnode, pConsumer->cgroup, data->topicName);
if(pSub == NULL){
ASSERT(0);
continue;
}
taosWLockLatch(&pSub->lock);

View File

@ -269,19 +269,21 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
}
tqDebug("tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s", req.consumerId, vgId, req.subKey);
taosWLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqWarn("tmq seek: consumer:0x%" PRIx64 " vgId:%d subkey %s not found", req.consumerId, vgId, req.subKey);
code = 0;
taosWUnLockLatch(&pTq->lock);
goto end;
}
// 2. check consumer-vg assignment status
taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != req.consumerId) {
tqError("ERROR tmq seek: consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
req.consumerId, vgId, req.subKey, pHandle->consumerId);
taosRUnLockLatch(&pTq->lock);
taosWUnLockLatch(&pTq->lock);
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
goto end;
}
@ -289,7 +291,7 @@ int32_t tqProcessSeekReq(STQ* pTq, SRpcMsg* pMsg) {
// if consumer register to push manager, push empty to consumer to change vg status from TMQ_VG_STATUS__WAIT to
// TMQ_VG_STATUS__IDLE, otherwise poll data failed after seek.
tqUnregisterPushHandle(pTq, pHandle);
taosRUnLockLatch(&pTq->lock);
taosWUnLockLatch(&pTq->lock);
end:
rsp.code = code;
@ -496,15 +498,16 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
// 1. find handle
taosRLockLatch(&pTq->lock);
STqHandle* pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle == NULL) {
tqError("consumer:0x%" PRIx64 " vgId:%d subkey:%s not found", consumerId, vgId, req.subKey);
terrno = TSDB_CODE_INVALID_MSG;
taosRUnLockLatch(&pTq->lock);
return -1;
}
// 2. check re-balance status
taosRLockLatch(&pTq->lock);
if (pHandle->consumerId != consumerId) {
tqDebug("ERROR consumer:0x%" PRIx64 " vgId:%d, subkey %s, mismatch for saved handle consumer:0x%" PRIx64,
consumerId, vgId, req.subKey, pHandle->consumerId);
@ -580,7 +583,7 @@ int32_t tqProcessDeleteSubReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
bool exec = tqIsHandleExec(pHandle);
if(exec){
tqInfo("vgId:%d, topic:%s, subscription is executing, wait for 10ms and retry, pHandle:%p", vgId,
tqInfo("vgId:%d, topic:%s, subscription is executing, delete wait for 10ms and retry, pHandle:%p", vgId,
pHandle->subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
taosMsleep(10);
@ -689,19 +692,29 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
}
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
} else {
taosWLockLatch(&pTq->lock);
if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
while(1){
taosWLockLatch(&pTq->lock);
bool exec = tqIsHandleExec(pHandle);
if(exec){
tqInfo("vgId:%d, topic:%s, subscription is executing, sub wait for 10ms and retry, pHandle:%p", pTq->pVnode->config.vgId,
pHandle->subKey, pHandle);
taosWUnLockLatch(&pTq->lock);
taosMsleep(10);
continue;
}
if (pHandle->consumerId == req.newConsumerId) { // do nothing
tqInfo("vgId:%d no switch consumer:0x%" PRIx64 " remains, because redo wal log", req.vgId, req.newConsumerId);
} else {
tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId,
req.newConsumerId);
atomic_store_64(&pHandle->consumerId, req.newConsumerId);
atomic_store_32(&pHandle->epoch, 0);
tqUnregisterPushHandle(pTq, pHandle);
ret = tqMetaSaveHandle(pTq, req.subKey, pHandle);
}
taosWUnLockLatch(&pTq->lock);
break;
}
taosWUnLockLatch(&pTq->lock);
}
end:
@ -1670,6 +1683,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SRpcMsg rsp = {.info = pMsg->info, .code = TSDB_CODE_SUCCESS};
bool allStopped = false;
SStreamTaskNodeUpdateMsg req = {0};
@ -1742,7 +1756,7 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
// possibly only handle the stream task.
int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
bool allStopped = (pMeta->closedTask == numOfTasks);
allStopped = (pMeta->closedTask == numOfTasks);
if (allStopped) {
pMeta->closedTask = 0;
} else {

View File

@ -366,7 +366,7 @@ int32_t extractMsgFromWal(SWalReader* pReader, void** pItem, int64_t maxVer, con
bool tqNextBlockInWal(STqReader* pReader, const char* id) {
SWalReader* pWalReader = pReader->pWalReader;
// uint64_t st = taosGetTimestampMs();
uint64_t st = taosGetTimestampMs();
while (1) {
SArray* pBlockList = pReader->submit.aSubmitTbData;
if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) {
@ -441,9 +441,9 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id) {
pReader->msg.msgStr = NULL;
// if(taosGetTimestampMs() - st > 5){
// return false;
// }
if(taosGetTimestampMs() - st > 1000){
return false;
}
}
}

View File

@ -213,11 +213,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
walReaderVerifyOffset(pHandle->pWalReader, offset);
int64_t fetchVer = offset->version;
// uint64_t st = taosGetTimestampMs();
uint64_t st = taosGetTimestampMs();
int totalRows = 0;
while (1) {
// int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
// ASSERT (savedEpoch <= pRequest->epoch);
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
ASSERT(savedEpoch <= pRequest->epoch);
if (tqFetchLog(pTq, pHandle, &fetchVer, pRequest->reqId) < 0) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer);
@ -260,8 +260,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto end;
}
// if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 5)) {
if (totalRows >= 4096 || taosxRsp.createTableNum > 0) {
if (totalRows >= 4096 || taosxRsp.createTableNum > 0 || (taosGetTimestampMs() - st > 1000)) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
goto end;

View File

@ -37,6 +37,8 @@ static char* getSyntaxErrFormat(int32_t errCode) {
return "Column ambiguously defined: %s";
case TSDB_CODE_PAR_WRONG_VALUE_TYPE:
return "Invalid value type: %s";
case TSDB_CODE_PAR_INVALID_VARBINARY:
return "Invalid varbinary value: %s";
case TSDB_CODE_PAR_ILLEGAL_USE_AGG_FUNCTION:
return "There mustn't be aggregation";
case TSDB_CODE_PAR_WRONG_NUMBER_OF_SELECT:

View File

@ -537,7 +537,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ROW_LENGTH, "Row length exceeds
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_COLUMNS_NUM, "Illegal number of columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_TOO_MANY_COLUMNS, "Too many columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FIRST_COLUMN, "First column must be timestamp")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid binary/nchar column/tag length")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN, "Invalid varbinary/binary/nchar column/tag length")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TAGS_NUM, "Invalid number of tag columns")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_PERMISSION_DENIED, "Permission denied")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Invalid stream query")
@ -572,7 +572,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_GET_META_ERROR, "Fail to get table i
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_NOT_UNIQUE_TABLE_ALIAS, "Not unique table/alias")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED_FUNC, "System table not allowed")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_SYSTABLE_NOT_ALLOWED, "System table not allowed")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VARBINARY, "Invalidate varbinary type")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_VARBINARY, "Invalidate varbinary value")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_IP_RANGE, "Invalid IPV4 address ranges")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INTERNAL_ERROR, "Parser internal error")

View File

@ -219,11 +219,11 @@ def get_auto_mix_cmds(args_list ,valgrind=valgrind_mode):
if valgrind :
crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203 '%(crash_gen_path ,arguments)
crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203,0x4012 '%(crash_gen_path ,arguments)
else:
crash_gen_cmd = 'cd %s && ./crash_gen.sh %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203'%(crash_gen_path ,arguments)
crash_gen_cmd = 'cd %s && ./crash_gen.sh %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203,0x4012'%(crash_gen_path ,arguments)
return crash_gen_cmd

View File

@ -220,11 +220,11 @@ def get_auto_mix_cmds(args_list ,valgrind=valgrind_mode):
if valgrind :
crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203 '%(crash_gen_path ,arguments)
crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203,0x4012 '%(crash_gen_path ,arguments)
else:
crash_gen_cmd = 'cd %s && ./crash_gen.sh %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203'%(crash_gen_path ,arguments)
crash_gen_cmd = 'cd %s && ./crash_gen.sh %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0203,0x4012'%(crash_gen_path ,arguments)
return crash_gen_cmd

View File

@ -220,11 +220,11 @@ def get_auto_mix_cmds(args_list ,valgrind=valgrind_mode):
if valgrind :
crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -i 3 %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0707,0x0203 '%(crash_gen_path ,arguments)
crash_gen_cmd = 'cd %s && ./crash_gen.sh --valgrind -i 3 %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0707,0x0203,0x4012 '%(crash_gen_path ,arguments)
else:
crash_gen_cmd = 'cd %s && ./crash_gen.sh -i 3 %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0014,0x0707,0x0203'%(crash_gen_path ,arguments)
crash_gen_cmd = 'cd %s && ./crash_gen.sh -i 3 %s -g 0x32c,0x32d,0x3d3,0x18,0x2501,0x369,0x388,0x061a,0x2550,0x0014,0x0707,0x0203,0x4012'%(crash_gen_path ,arguments)
return crash_gen_cmd

View File

@ -94,7 +94,7 @@ class TDTestCase:
resultList=[]
while 1:
tdSql.query("select * from %s.consumeresult"%cdbName)
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
# tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)))
if tdSql.getRows() == expectRows:
break
else:
@ -336,7 +336,7 @@ class TDTestCase:
for i in range(expectRows):
totalConsumeRows += resultList[i]
if totalConsumeRows > expectrowcnt or totalConsumeRows <= 0:
if totalConsumeRows > expectrowcnt or totalConsumeRows < 0:
tdLog.info("act consume rows: %d, expect consume rows between %d and 0"%(totalConsumeRows, expectrowcnt))
tdLog.exit("tmq consume rows error!")

View File

@ -218,7 +218,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -216,7 +216,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0]
tdLog.info("act consume rows: %d, expect rows range (0, %d)"%(actConsumeTotalRows, totalRowsInserted))
if not (actConsumeTotalRows > 0 and actConsumeTotalRows < totalRowsInserted):
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -218,7 +218,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted):
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)

View File

@ -216,7 +216,7 @@ class TDTestCase:
actConsumeTotalRows = resultList[0]
if not (actConsumeTotalRows > 0 and actConsumeTotalRows <= totalRowsInserted):
if not (actConsumeTotalRows >= 0 and actConsumeTotalRows <= totalRowsInserted):
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
tdLog.exit("%d tmq consume rows error!"%consumerId)

View File

@ -217,7 +217,7 @@ class TDTestCase:
tdLog.info("act consume rows: %d"%(actConsumeTotalRows))
tdLog.info("and second consume rows should be between 0 and %d"%(totalRowsInserted))
if not ((actConsumeTotalRows > 0) and (actConsumeTotalRows <= totalRowsInserted)):
if not ((actConsumeTotalRows >= 0) and (actConsumeTotalRows <= totalRowsInserted)):
tdLog.exit("%d tmq consume rows error!"%consumerId)
time.sleep(10)

View File

@ -696,7 +696,7 @@ static int32_t g_once_commit_flag = 0;
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
if (0 == g_once_commit_flag) {
if (0 == g_once_commit_flag && code == 0) {
g_once_commit_flag = 1;
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
}

View File

@ -85,6 +85,7 @@ void varbinary_sql_test() {
// test insert
pRes = taos_query(taos, "insert into tb2 using stb tags (2, 'tb2_bin1', 093) values (now + 2s, 'nchar1', 892, 0.3)");
printf("error:%s", taos_errstr(pRes));
ASSERT(taos_errno(pRes) != 0);
pRes = taos_query(taos, "insert into tb3 using stb tags (3, 'tb3_bin1', 0x7f829) values (now + 3s, 'nchar1', 0x7f829, 0.3)");