remove ep status and cnt
This commit is contained in:
parent
a14b8dcc99
commit
f2f6fb494c
|
@ -83,9 +83,11 @@ struct tmq_t {
|
||||||
|
|
||||||
// status
|
// status
|
||||||
int8_t status;
|
int8_t status;
|
||||||
int8_t epStatus;
|
|
||||||
int32_t epoch;
|
int32_t epoch;
|
||||||
|
#if 0
|
||||||
|
int8_t epStatus;
|
||||||
int32_t epSkipCnt;
|
int32_t epSkipCnt;
|
||||||
|
#endif
|
||||||
int64_t pollCnt;
|
int64_t pollCnt;
|
||||||
|
|
||||||
// timer
|
// timer
|
||||||
|
@ -464,8 +466,8 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
pTmq->status = TMQ_CONSUMER_STATUS__INIT;
|
||||||
pTmq->pollCnt = 0;
|
pTmq->pollCnt = 0;
|
||||||
pTmq->epoch = 0;
|
pTmq->epoch = 0;
|
||||||
pTmq->epStatus = 0;
|
/*pTmq->epStatus = 0;*/
|
||||||
pTmq->epSkipCnt = 0;
|
/*pTmq->epSkipCnt = 0;*/
|
||||||
|
|
||||||
// set conf
|
// set conf
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
|
@ -975,7 +977,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
atomic_store_8(&tmq->epStatus, 0);
|
/*atomic_store_8(&tmq->epStatus, 0);*/
|
||||||
if (pParam->sync) {
|
if (pParam->sync) {
|
||||||
tsem_post(&pParam->rspSem);
|
tsem_post(&pParam->rspSem);
|
||||||
}
|
}
|
||||||
|
@ -984,6 +986,7 @@ END:
|
||||||
|
|
||||||
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
#if 0
|
||||||
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
||||||
if (epStatus == 1) {
|
if (epStatus == 1) {
|
||||||
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
|
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
|
||||||
|
@ -991,11 +994,12 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
if (epSkipCnt < 5000) return 0;
|
if (epSkipCnt < 5000) return 0;
|
||||||
}
|
}
|
||||||
atomic_store_32(&tmq->epSkipCnt, 0);
|
atomic_store_32(&tmq->epSkipCnt, 0);
|
||||||
|
#endif
|
||||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
||||||
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
|
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
|
||||||
if (req == NULL) {
|
if (req == NULL) {
|
||||||
tscError("failed to malloc get subscribe ep buf");
|
tscError("failed to malloc get subscribe ep buf");
|
||||||
atomic_store_8(&tmq->epStatus, 0);
|
/*atomic_store_8(&tmq->epStatus, 0);*/
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
req->consumerId = htobe64(tmq->consumerId);
|
req->consumerId = htobe64(tmq->consumerId);
|
||||||
|
@ -1006,7 +1010,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
tscError("failed to malloc subscribe param");
|
tscError("failed to malloc subscribe param");
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
atomic_store_8(&tmq->epStatus, 0);
|
/*atomic_store_8(&tmq->epStatus, 0);*/
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pParam->tmq = tmq;
|
pParam->tmq = tmq;
|
||||||
|
@ -1018,7 +1022,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
tsem_destroy(&pParam->rspSem);
|
tsem_destroy(&pParam->rspSem);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
taosMemoryFree(req);
|
taosMemoryFree(req);
|
||||||
atomic_store_8(&tmq->epStatus, 0);
|
/*atomic_store_8(&tmq->epStatus, 0);*/
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue