temp
This commit is contained in:
parent
adf8bd5e82
commit
8e2d121b3a
|
@ -79,6 +79,7 @@ struct tmq_t {
|
||||||
tmq_commit_cb* commit_cb;
|
tmq_commit_cb* commit_cb;
|
||||||
int32_t nextTopicIdx;
|
int32_t nextTopicIdx;
|
||||||
int8_t epStatus;
|
int8_t epStatus;
|
||||||
|
int32_t epSkipCnt;
|
||||||
int32_t waitingRequest;
|
int32_t waitingRequest;
|
||||||
int32_t readyRequest;
|
int32_t readyRequest;
|
||||||
SArray* clientTopics; // SArray<SMqClientTopic>
|
SArray* clientTopics; // SArray<SMqClientTopic>
|
||||||
|
@ -107,6 +108,7 @@ typedef struct {
|
||||||
// connection info
|
// connection info
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
int32_t vgStatus;
|
int32_t vgStatus;
|
||||||
|
int64_t skipCnt;
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
} SMqClientVg;
|
} SMqClientVg;
|
||||||
|
|
||||||
|
@ -313,6 +315,7 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
|
||||||
pTmq->waitingRequest = 0;
|
pTmq->waitingRequest = 0;
|
||||||
pTmq->readyRequest = 0;
|
pTmq->readyRequest = 0;
|
||||||
pTmq->epStatus = 0;
|
pTmq->epStatus = 0;
|
||||||
|
pTmq->epSkipCnt = 0;
|
||||||
// set conf
|
// set conf
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
strcpy(pTmq->groupId, conf->groupId);
|
strcpy(pTmq->groupId, conf->groupId);
|
||||||
|
@ -348,6 +351,8 @@ tmq_t* tmq_consumer_new1(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
|
||||||
pTmq->epoch = 0;
|
pTmq->epoch = 0;
|
||||||
pTmq->waitingRequest = 0;
|
pTmq->waitingRequest = 0;
|
||||||
pTmq->readyRequest = 0;
|
pTmq->readyRequest = 0;
|
||||||
|
pTmq->epStatus = 0;
|
||||||
|
pTmq->epSkipCnt = 0;
|
||||||
// set conf
|
// set conf
|
||||||
strcpy(pTmq->clientId, conf->clientId);
|
strcpy(pTmq->clientId, conf->clientId);
|
||||||
strcpy(pTmq->groupId, conf->groupId);
|
strcpy(pTmq->groupId, conf->groupId);
|
||||||
|
@ -842,7 +847,7 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
|
int32_t tmqEpoch = atomic_load_32(&tmq->epoch);
|
||||||
if (msgEpoch < tmqEpoch) {
|
if (msgEpoch < tmqEpoch) {
|
||||||
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
|
/*printf("discard rsp epoch %d, current epoch %d\n", msgEpoch, tmqEpoch);*/
|
||||||
tsem_post(&tmq->rspSem);
|
/*tsem_post(&tmq->rspSem);*/
|
||||||
tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch);
|
tscWarn("discard rsp epoch %d, current epoch %d", msgEpoch, tmqEpoch);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -892,26 +897,26 @@ int32_t tmqPollCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
tscError("tmq recv poll: vg %d, req offset %ld, rsp offset %ld", pParam->pVg->vgId, pRsp->msg.reqOffset,
|
tscDebug("consumer %ld recv poll: vg %d, req offset %ld, rsp offset %ld", tmq->consumerId, pParam->pVg->vgId, pRsp->msg.reqOffset,
|
||||||
pRsp->msg.rspOffset);
|
pRsp->msg.rspOffset);
|
||||||
|
|
||||||
pRsp->vg = pParam->pVg;
|
pRsp->vg = pParam->pVg;
|
||||||
taosWriteQitem(tmq->mqueue, pRsp);
|
taosWriteQitem(tmq->mqueue, pRsp);
|
||||||
atomic_add_fetch_32(&tmq->readyRequest, 1);
|
atomic_add_fetch_32(&tmq->readyRequest, 1);
|
||||||
tsem_post(&tmq->rspSem);
|
/*tsem_post(&tmq->rspSem);*/
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
CREATE_MSG_FAIL:
|
CREATE_MSG_FAIL:
|
||||||
if (pParam->epoch == tmq->epoch) {
|
if (pParam->epoch == tmq->epoch) {
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
}
|
}
|
||||||
tsem_post(&tmq->rspSem);
|
/*tsem_post(&tmq->rspSem);*/
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||||
/*printf("call update ep %d\n", epoch);*/
|
/*printf("call update ep %d\n", epoch);*/
|
||||||
tscDebug("tmq update ep epoch %d to epoch %d", tmq->epoch, epoch);
|
tscDebug("consumer %ld update ep epoch %d to epoch %d", tmq->consumerId, tmq->epoch, epoch);
|
||||||
bool set = false;
|
bool set = false;
|
||||||
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
||||||
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
|
char vgKey[TSDB_TOPIC_FNAME_LEN + 22];
|
||||||
|
@ -942,7 +947,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||||
for (int32_t k = 0; k < vgNumCur; k++) {
|
for (int32_t k = 0; k < vgNumCur; k++) {
|
||||||
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
|
SMqClientVg* pVgCur = taosArrayGet(pTopicCur->vgs, k);
|
||||||
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
|
sprintf(vgKey, "%s:%d", topic.topicName, pVgCur->vgId);
|
||||||
tscDebug("epoch %d vg %d build %s\n", epoch, pVgCur->vgId, vgKey);
|
tscDebug("consumer %ld epoch %d vg %d build %s\n", tmq->consumerId, epoch, pVgCur->vgId, vgKey);
|
||||||
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
|
taosHashPut(pHash, vgKey, strlen(vgKey), &pVgCur->currentOffset, sizeof(int64_t));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -956,18 +961,19 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||||
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
|
sprintf(vgKey, "%s:%d", topic.topicName, pVgEp->vgId);
|
||||||
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
|
int64_t* pOffset = taosHashGet(pHash, vgKey, strlen(vgKey));
|
||||||
int64_t offset = pVgEp->offset;
|
int64_t offset = pVgEp->offset;
|
||||||
tscDebug("epoch %d vg %d offset og to %ld\n", epoch, pVgEp->vgId, offset);
|
tscDebug("consumer %ld epoch %d vg %d offset og to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset);
|
||||||
if (pOffset != NULL) {
|
if (pOffset != NULL) {
|
||||||
offset = *pOffset;
|
offset = *pOffset;
|
||||||
tscDebug("epoch %d vg %d found %s\n", epoch, pVgEp->vgId, vgKey);
|
tscDebug("consumer %ld epoch %d vg %d found %s\n", tmq->consumerId, epoch, pVgEp->vgId, vgKey);
|
||||||
}
|
}
|
||||||
tscDebug("epoch %d vg %d offset set to %ld\n", epoch, pVgEp->vgId, offset);
|
tscDebug("consumer %ld epoch %d vg %d offset set to %ld\n", tmq->consumerId, epoch, pVgEp->vgId, offset);
|
||||||
SMqClientVg clientVg = {
|
SMqClientVg clientVg = {
|
||||||
.pollCnt = 0,
|
.pollCnt = 0,
|
||||||
.currentOffset = offset,
|
.currentOffset = offset,
|
||||||
.vgId = pVgEp->vgId,
|
.vgId = pVgEp->vgId,
|
||||||
.epSet = pVgEp->epSet,
|
.epSet = pVgEp->epSet,
|
||||||
.vgStatus = TMQ_VG_STATUS__IDLE,
|
.vgStatus = TMQ_VG_STATUS__IDLE,
|
||||||
|
.skipCnt = 0,
|
||||||
};
|
};
|
||||||
taosArrayPush(topic.vgs, &clientVg);
|
taosArrayPush(topic.vgs, &clientVg);
|
||||||
set = true;
|
set = true;
|
||||||
|
@ -984,9 +990,8 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
|
||||||
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
SMqAskEpCbParam* pParam = (SMqAskEpCbParam*)param;
|
||||||
tmq_t* tmq = pParam->tmq;
|
tmq_t* tmq = pParam->tmq;
|
||||||
tscDebug("consumer %ld recv ep", tmq->consumerId);
|
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
tscError("get topic endpoint error, not ready, wait:%d\n", pParam->sync);
|
tscError("consumer %ld get topic endpoint error, not ready, wait:%d", tmq->consumerId, pParam->sync);
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -995,6 +1000,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
// Epoch will only increase when received newer epoch ep msg
|
// Epoch will only increase when received newer epoch ep msg
|
||||||
SMqRspHead* head = pMsg->pData;
|
SMqRspHead* head = pMsg->pData;
|
||||||
int32_t epoch = atomic_load_32(&tmq->epoch);
|
int32_t epoch = atomic_load_32(&tmq->epoch);
|
||||||
|
tscDebug("consumer %ld recv ep, msg epoch %d, current epoch %d", tmq->consumerId, head->epoch, epoch);
|
||||||
if (head->epoch <= epoch) {
|
if (head->epoch <= epoch) {
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
@ -1019,7 +1025,7 @@ int32_t tmqAskEpCb(void* param, const SDataBuf* pMsg, int32_t code) {
|
||||||
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp);
|
tDecodeSMqCMGetSubEpRsp(POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pRsp);
|
||||||
|
|
||||||
taosWriteQitem(tmq->mqueue, pRsp);
|
taosWriteQitem(tmq->mqueue, pRsp);
|
||||||
tsem_post(&tmq->rspSem);
|
/*tsem_post(&tmq->rspSem);*/
|
||||||
}
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
|
@ -1033,9 +1039,11 @@ END:
|
||||||
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
int32_t tmqAskEp(tmq_t* tmq, bool sync) {
|
||||||
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
|
||||||
if (epStatus == 1) {
|
if (epStatus == 1) {
|
||||||
tscDebug("consumer %ld skip ask ep", tmq->consumerId);
|
int32_t epSkipCnt = atomic_add_fetch_32(&tmq->epSkipCnt, 1);
|
||||||
return 0;
|
tscDebug("consumer %ld skip ask ep cnt %d", tmq->consumerId, epSkipCnt);
|
||||||
|
if (epSkipCnt < 40) return 0;
|
||||||
}
|
}
|
||||||
|
atomic_store_32(&tmq->epSkipCnt, 0);
|
||||||
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
int32_t tlen = sizeof(SMqCMGetSubEpReq);
|
||||||
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
|
SMqCMGetSubEpReq* req = taosMemoryMalloc(tlen);
|
||||||
if (req == NULL) {
|
if (req == NULL) {
|
||||||
|
@ -1221,20 +1229,29 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
SMqClientVg* pVg = taosArrayGet(pTopic->vgs, j);
|
||||||
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
int32_t vgStatus = atomic_val_compare_exchange_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE, TMQ_VG_STATUS__WAIT);
|
||||||
if (vgStatus != TMQ_VG_STATUS__IDLE) {
|
if (vgStatus != TMQ_VG_STATUS__IDLE) {
|
||||||
tscDebug("consumer %ld skip vg %d", tmq->consumerId, pVg->vgId);
|
int64_t skipCnt = atomic_add_fetch_64(&pVg->skipCnt, 1);
|
||||||
|
tscDebug("consumer %ld skip vg %d skip cnt %ld", tmq->consumerId, pVg->vgId, skipCnt);
|
||||||
continue;
|
continue;
|
||||||
|
#if 0
|
||||||
|
if (skipCnt < 30000) {
|
||||||
|
continue;
|
||||||
|
} else {
|
||||||
|
tscDebug("consumer %ld skip vg %d skip too much reset", tmq->consumerId, pVg->vgId);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
atomic_store_64(&pVg->skipCnt, 0);
|
||||||
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
|
SMqPollReq* pReq = tmqBuildConsumeReqImpl(tmq, blockingTime, pTopic, pVg);
|
||||||
if (pReq == NULL) {
|
if (pReq == NULL) {
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
tsem_post(&tmq->rspSem);
|
/*tsem_post(&tmq->rspSem);*/
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
SMqPollCbParam* pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||||
if (pParam == NULL) {
|
if (pParam == NULL) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
tsem_post(&tmq->rspSem);
|
/*tsem_post(&tmq->rspSem);*/
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
pParam->tmq = tmq;
|
pParam->tmq = tmq;
|
||||||
|
@ -1247,7 +1264,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
taosMemoryFree(pParam);
|
taosMemoryFree(pParam);
|
||||||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||||
tsem_post(&tmq->rspSem);
|
/*tsem_post(&tmq->rspSem);*/
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1379,7 +1396,7 @@ tmq_message_t* tmq_consumer_poll(tmq_t* tmq, int64_t blocking_time) {
|
||||||
tmqAskEp(tmq, false);
|
tmqAskEp(tmq, false);
|
||||||
tmqPollImpl(tmq, blocking_time);
|
tmqPollImpl(tmq, blocking_time);
|
||||||
|
|
||||||
tsem_wait(&tmq->rspSem);
|
/*tsem_wait(&tmq->rspSem);*/
|
||||||
|
|
||||||
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
|
rspMsg = tmqHandleAllRsp(tmq, blocking_time, false);
|
||||||
if (rspMsg) {
|
if (rspMsg) {
|
||||||
|
|
|
@ -216,7 +216,7 @@ static int32_t mndProcessGetSubEpReq(SNodeMsg *pMsg) {
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
|
int32_t hbStatus = atomic_load_32(&pConsumer->hbStatus);
|
||||||
mTrace("try to get sub ep, old val: %d", hbStatus);
|
mTrace("consumer %ld epoch(%d) try to get sub ep, server epoch %d, old val: %d", consumerId, epoch, pConsumer->epoch, hbStatus);
|
||||||
atomic_store_32(&pConsumer->hbStatus, 0);
|
atomic_store_32(&pConsumer->hbStatus, 0);
|
||||||
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
|
/*SSdbRaw *pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
|
||||||
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
|
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
|
||||||
|
|
|
@ -214,6 +214,7 @@ static void *taosThreadToOpenNewFile(void *param) {
|
||||||
tsLogObj.logHandle->pFile = pFile;
|
tsLogObj.logHandle->pFile = pFile;
|
||||||
tsLogObj.lines = 0;
|
tsLogObj.lines = 0;
|
||||||
tsLogObj.openInProgress = 0;
|
tsLogObj.openInProgress = 0;
|
||||||
|
taosSsleep(2);
|
||||||
taosCloseLogByFd(pOldFile);
|
taosCloseLogByFd(pOldFile);
|
||||||
|
|
||||||
uInfo(" new log file:%d is opened", tsLogObj.flag);
|
uInfo(" new log file:%d is opened", tsLogObj.flag);
|
||||||
|
@ -347,12 +348,13 @@ static int32_t taosOpenLogFile(char *fn, int32_t maxLines, int32_t maxFileNum) {
|
||||||
taosThreadMutexInit(&tsLogObj.logMutex, NULL);
|
taosThreadMutexInit(&tsLogObj.logMutex, NULL);
|
||||||
|
|
||||||
taosUmaskFile(0);
|
taosUmaskFile(0);
|
||||||
tsLogObj.logHandle->pFile = taosOpenFile(fileName, TD_FILE_CTEATE | TD_FILE_WRITE);
|
TdFilePtr pFile = taosOpenFile(fileName, TD_FILE_CTEATE | TD_FILE_WRITE);
|
||||||
|
|
||||||
if (tsLogObj.logHandle->pFile == NULL) {
|
if (pFile == NULL) {
|
||||||
printf("\nfailed to open log file:%s, reason:%s\n", fileName, strerror(errno));
|
printf("\nfailed to open log file:%s, reason:%s\n", fileName, strerror(errno));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
tsLogObj.logHandle->pFile = pFile;
|
||||||
taosLockLogFile(tsLogObj.logHandle->pFile);
|
taosLockLogFile(tsLogObj.logHandle->pFile);
|
||||||
|
|
||||||
// only an estimate for number of lines
|
// only an estimate for number of lines
|
||||||
|
@ -746,4 +748,4 @@ void taosSetAllDebugFlag(int32_t flag) {
|
||||||
fsDebugFlag = flag;
|
fsDebugFlag = flag;
|
||||||
|
|
||||||
uInfo("all debug flag are set to %d", flag);
|
uInfo("all debug flag are set to %d", flag);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue