merge main
This commit is contained in:
commit
efa56d5352
|
@ -80,15 +80,15 @@ int32_t taosTimeCountInterval(int64_t skey, int64_t ekey, int64_t interval, char
|
||||||
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
|
int32_t parseAbsoluteDuration(const char* token, int32_t tokenlen, int64_t* ts, char* unit, int32_t timePrecision);
|
||||||
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
|
int32_t parseNatualDuration(const char* token, int32_t tokenLen, int64_t* duration, char* unit, int32_t timePrecision);
|
||||||
|
|
||||||
int32_t taosParseTime(const char* timestr, int64_t* time, int32_t len, int32_t timePrec, int8_t dayligth);
|
int32_t taosParseTime(const char* timestr, int64_t* pTime, int32_t len, int32_t timePrec, int8_t dayligth);
|
||||||
void deltaToUtcInitOnce();
|
void deltaToUtcInitOnce();
|
||||||
char getPrecisionUnit(int32_t precision);
|
char getPrecisionUnit(int32_t precision);
|
||||||
|
|
||||||
int64_t convertTimePrecision(int64_t time, int32_t fromPrecision, int32_t toPrecision);
|
int64_t convertTimePrecision(int64_t ts, int32_t fromPrecision, int32_t toPrecision);
|
||||||
int64_t convertTimeFromPrecisionToUnit(int64_t time, int32_t fromPrecision, char toUnit);
|
int64_t convertTimeFromPrecisionToUnit(int64_t ts, int32_t fromPrecision, char toUnit);
|
||||||
int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal);
|
int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec, int64_t* timeVal);
|
||||||
|
|
||||||
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t time, int32_t precision);
|
void taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -68,12 +68,12 @@ static int64_t user_mktime64(const uint32_t year0, const uint32_t mon0, const ui
|
||||||
|
|
||||||
// ==== mktime() kernel code =================//
|
// ==== mktime() kernel code =================//
|
||||||
static int64_t m_deltaUtc = 0;
|
static int64_t m_deltaUtc = 0;
|
||||||
void deltaToUtcInitOnce() {
|
|
||||||
struct tm tm = {0};
|
|
||||||
|
|
||||||
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
|
void deltaToUtcInitOnce() {
|
||||||
m_deltaUtc = (int64_t)taosMktime(&tm);
|
struct tm tm = {0};
|
||||||
// printf("====delta:%lld\n\n", seconds);
|
(void)taosStrpTime("1970-01-01 00:00:00", (const char*)("%Y-%m-%d %H:%M:%S"), &tm);
|
||||||
|
m_deltaUtc = (int64_t)taosMktime(&tm);
|
||||||
|
// printf("====delta:%lld\n\n", seconds);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
|
static int64_t parseFraction(char* str, char** end, int32_t timePrec);
|
||||||
|
|
|
@ -559,6 +559,27 @@ int32_t mndSetConsumerCommitLogs(SMnode *pMnode, STrans *pTrans, SMqConsumerObj
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t validateTopics(const SArray* pTopicList, SMnode* pMnode, const char* pUser) {
|
||||||
|
int32_t numOfTopics = taosArrayGetSize(pTopicList);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < numOfTopics; i++) {
|
||||||
|
char *pOneTopic = taosArrayGetP(pTopicList, i);
|
||||||
|
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, pOneTopic);
|
||||||
|
if (pTopic == NULL) { // terrno has been set by callee function
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mndCheckTopicPrivilege(pMnode, pUser, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
SMnode *pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
char *msgStr = pMsg->pCont;
|
char *msgStr = pMsg->pCont;
|
||||||
|
@ -568,15 +589,15 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
uint64_t consumerId = subscribe.consumerId;
|
uint64_t consumerId = subscribe.consumerId;
|
||||||
char *cgroup = subscribe.cgroup;
|
char *cgroup = subscribe.cgroup;
|
||||||
SMqConsumerObj *pConsumerOld = NULL;
|
SMqConsumerObj *pExistedConsumer = NULL;
|
||||||
SMqConsumerObj *pConsumerNew = NULL;
|
SMqConsumerObj *pConsumerNew = NULL;
|
||||||
|
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
SArray *newSub = subscribe.topicNames;
|
SArray *pTopicList = subscribe.topicNames;
|
||||||
taosArraySort(newSub, taosArrayCompareString);
|
taosArraySort(pTopicList, taosArrayCompareString);
|
||||||
taosArrayRemoveDuplicateP(newSub, taosArrayCompareString, taosMemoryFree);
|
taosArrayRemoveDuplicateP(pTopicList, taosArrayCompareString, taosMemoryFree);
|
||||||
|
|
||||||
int32_t newTopicNum = taosArrayGetSize(newSub);
|
int32_t newTopicNum = taosArrayGetSize(pTopicList);
|
||||||
|
|
||||||
// check topic existence
|
// check topic existence
|
||||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
|
||||||
|
@ -584,38 +605,24 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
code = validateTopics(pTopicList, pMnode, pMsg->info.conn.user);
|
||||||
char *topic = taosArrayGetP(newSub, i);
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
SMqTopicObj *pTopic = mndAcquireTopic(pMnode, topic);
|
goto _over;
|
||||||
if (pTopic == NULL) { // terrno has been set by callee function
|
|
||||||
goto _over;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
goto _over;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (mndCheckTopicPrivilege(pMnode, pMsg->info.conn.user, MND_OPER_SUBSCRIBE, pTopic) != 0) {
|
|
||||||
goto _over;
|
|
||||||
}
|
|
||||||
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pConsumerOld = mndAcquireConsumer(pMnode, consumerId);
|
pExistedConsumer = mndAcquireConsumer(pMnode, consumerId);
|
||||||
if (pConsumerOld == NULL) {
|
if (pExistedConsumer == NULL) {
|
||||||
mInfo("receive subscribe request from new consumer:%" PRId64, consumerId);
|
mInfo("receive subscribe request from new consumer:0x%" PRIx64" cgroup:%s", consumerId, subscribe.cgroup);
|
||||||
|
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
tstrncpy(pConsumerNew->clientId, subscribe.clientId, 256);
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
taosArrayDestroy(pConsumerNew->rebNewTopics);
|
taosArrayDestroy(pConsumerNew->rebNewTopics);
|
||||||
pConsumerNew->rebNewTopics = newSub;
|
pConsumerNew->rebNewTopics = pTopicList; // all subscribe topics should re-balance.
|
||||||
subscribe.topicNames = NULL;
|
subscribe.topicNames = NULL;
|
||||||
|
|
||||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||||
char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, i));
|
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i));
|
||||||
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
|
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -623,12 +630,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
if (mndTransPrepare(pMnode, pTrans) != 0) goto _over;
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
/*taosRLockLatch(&pConsumerOld->lock);*/
|
/*taosRLockLatch(&pExistedConsumer->lock);*/
|
||||||
|
int32_t status = atomic_load_32(&pExistedConsumer->status);
|
||||||
|
|
||||||
int32_t status = atomic_load_32(&pConsumerOld->status);
|
mInfo("receive subscribe request from existed consumer:0x%" PRIx64 " cgroup:%s, current status:%d(%s), subscribe topic num: %d",
|
||||||
|
consumerId, subscribe.cgroup, status,mndConsumerStatusName(status), newTopicNum);
|
||||||
mInfo("receive subscribe request from existing consumer:%" PRId64 ", current status: %s, subscribe topic num: %d",
|
|
||||||
consumerId, mndConsumerStatusName(status), newTopicNum);
|
|
||||||
|
|
||||||
if (status != MQ_CONSUMER_STATUS__READY) {
|
if (status != MQ_CONSUMER_STATUS__READY) {
|
||||||
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
terrno = TSDB_CODE_MND_CONSUMER_NOT_READY;
|
||||||
|
@ -637,36 +643,36 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
pConsumerNew = tNewSMqConsumerObj(consumerId, cgroup);
|
||||||
if (pConsumerNew == NULL) {
|
if (pConsumerNew == NULL) {
|
||||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
goto _over;
|
goto _over;
|
||||||
}
|
}
|
||||||
|
|
||||||
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
pConsumerNew->updateType = CONSUMER_UPDATE__MODIFY;
|
||||||
|
|
||||||
for (int32_t i = 0; i < newTopicNum; i++) {
|
for (int32_t i = 0; i < newTopicNum; i++) {
|
||||||
char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, i));
|
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, i));
|
||||||
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
|
taosArrayPush(pConsumerNew->assignedTopics, &newTopicCopy);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t oldTopicNum = 0;
|
int32_t oldTopicNum = 0;
|
||||||
if (pConsumerOld->currentTopics) {
|
if (pExistedConsumer->currentTopics) {
|
||||||
oldTopicNum = taosArrayGetSize(pConsumerOld->currentTopics);
|
oldTopicNum = taosArrayGetSize(pExistedConsumer->currentTopics);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t i = 0, j = 0;
|
int32_t i = 0, j = 0;
|
||||||
while (i < oldTopicNum || j < newTopicNum) {
|
while (i < oldTopicNum || j < newTopicNum) {
|
||||||
if (i >= oldTopicNum) {
|
if (i >= oldTopicNum) {
|
||||||
char *newTopicCopy = taosStrdup(taosArrayGetP(newSub, j));
|
char *newTopicCopy = taosStrdup(taosArrayGetP(pTopicList, j));
|
||||||
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
|
taosArrayPush(pConsumerNew->rebNewTopics, &newTopicCopy);
|
||||||
j++;
|
j++;
|
||||||
continue;
|
continue;
|
||||||
} else if (j >= newTopicNum) {
|
} else if (j >= newTopicNum) {
|
||||||
char *oldTopicCopy = taosStrdup(taosArrayGetP(pConsumerOld->currentTopics, i));
|
char *oldTopicCopy = taosStrdup(taosArrayGetP(pExistedConsumer->currentTopics, i));
|
||||||
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
|
taosArrayPush(pConsumerNew->rebRemovedTopics, &oldTopicCopy);
|
||||||
i++;
|
i++;
|
||||||
continue;
|
continue;
|
||||||
} else {
|
} else {
|
||||||
char *oldTopic = taosArrayGetP(pConsumerOld->currentTopics, i);
|
char *oldTopic = taosArrayGetP(pExistedConsumer->currentTopics, i);
|
||||||
char *newTopic = taosArrayGetP(newSub, j);
|
char *newTopic = taosArrayGetP(pTopicList, j);
|
||||||
int comp = strcmp(oldTopic, newTopic);
|
int comp = strcmp(oldTopic, newTopic);
|
||||||
if (comp == 0) {
|
if (comp == 0) {
|
||||||
i++;
|
i++;
|
||||||
|
@ -686,7 +692,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConsumerOld && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
|
if (pExistedConsumer && taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 &&
|
||||||
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
||||||
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
|
/*if (taosArrayGetSize(pConsumerNew->assignedTopics) == 0) {*/
|
||||||
/*pConsumerNew->updateType = */
|
/*pConsumerNew->updateType = */
|
||||||
|
@ -703,10 +709,11 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
_over:
|
_over:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
|
|
||||||
if (pConsumerOld) {
|
if (pExistedConsumer) {
|
||||||
/*taosRUnLockLatch(&pConsumerOld->lock);*/
|
/*taosRUnLockLatch(&pExistedConsumer->lock);*/
|
||||||
mndReleaseConsumer(pMnode, pConsumerOld);
|
mndReleaseConsumer(pMnode, pExistedConsumer);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pConsumerNew) {
|
if (pConsumerNew) {
|
||||||
tDeleteSMqConsumerObj(pConsumerNew);
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
taosMemoryFree(pConsumerNew);
|
taosMemoryFree(pConsumerNew);
|
||||||
|
|
|
@ -655,6 +655,7 @@ STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict,
|
||||||
taosArrayPush(pTrans->pRpcArray, &pReq->info);
|
taosArrayPush(pTrans->pRpcArray, &pReq->info);
|
||||||
pTrans->originRpcType = pReq->msgType;
|
pTrans->originRpcType = pReq->msgType;
|
||||||
}
|
}
|
||||||
|
|
||||||
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
|
mTrace("trans:%d, local object is created, data:%p", pTrans->id, pTrans);
|
||||||
return pTrans;
|
return pTrans;
|
||||||
}
|
}
|
||||||
|
|
|
@ -275,7 +275,7 @@ int32_t tqSendDataRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, con
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
tqDebug("vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
tqDebug("vgId:%d consumer:0x%" PRIx64 " (epoch %d), block num:%d, reqOffset:%s, rspOffset:%s",
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -336,8 +336,7 @@ int32_t tqSendTaosxRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, co
|
||||||
char buf2[80] = {0};
|
char buf2[80] = {0};
|
||||||
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
tFormatOffset(buf1, 80, &pRsp->reqOffset);
|
||||||
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
tFormatOffset(buf2, 80, &pRsp->rspOffset);
|
||||||
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64
|
tqDebug("taosx rsp, vgId:%d, from consumer:0x%" PRIx64 " (epoch %d) send rsp, numOfBlks:%d, req:%s, rsp:%s",
|
||||||
" (epoch %d) send rsp, block num: %d, reqOffset:%s, rspOffset:%s",
|
|
||||||
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
TD_VID(pTq->pVnode), pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -496,14 +495,15 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// update epoch if need
|
// update epoch if need
|
||||||
int32_t consumerEpoch = atomic_load_32(&pHandle->epoch);
|
int32_t savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
while (consumerEpoch < reqEpoch) {
|
while (savedEpoch < reqEpoch) {
|
||||||
consumerEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, consumerEpoch, reqEpoch);
|
tqDebug("tmq poll: consumer:0x%"PRIx64 " epoch update from %d to %d by poll req", consumerId, savedEpoch, reqEpoch);
|
||||||
|
savedEpoch = atomic_val_compare_exchange_32(&pHandle->epoch, savedEpoch, reqEpoch);
|
||||||
}
|
}
|
||||||
|
|
||||||
char buf[80];
|
char buf[80];
|
||||||
tFormatOffset(buf, 80, &reqOffset);
|
tFormatOffset(buf, 80, &reqOffset);
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req in vg %d, req offset %s", consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, recv poll req vgId:%d, req:%s", consumerId,
|
||||||
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), buf);
|
||||||
|
|
||||||
// 2.reset offset if needed
|
// 2.reset offset if needed
|
||||||
|
@ -539,7 +539,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
tqInitDataRsp(&dataRsp, &req, pHandle->execHandle.subType);
|
||||||
|
|
||||||
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
tqOffsetResetToLog(&dataRsp.rspOffset, walGetLastVer(pTq->pVnode->pWal));
|
||||||
tqDebug("tmq poll: consumer:0x %" PRIx64 ", subkey %s, vg %d, offset reset to %" PRId64, consumerId,
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, offset reset to %" PRId64, consumerId,
|
||||||
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.rspOffset.version);
|
||||||
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
if (tqSendDataRsp(pTq, pMsg, &req, &dataRsp) < 0) {
|
||||||
code = -1;
|
code = -1;
|
||||||
|
@ -576,6 +576,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
|
// till now, all data has been rsp to consumer, new data needs to push client once arrived.
|
||||||
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG &&
|
||||||
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
|
dataRsp.reqOffset.version == dataRsp.rspOffset.version) {
|
||||||
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
STqPushEntry* pPushEntry = taosMemoryCalloc(1, sizeof(STqPushEntry));
|
||||||
|
@ -588,8 +589,9 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
pPushEntry->dataRsp.head.epoch = reqEpoch;
|
pPushEntry->dataRsp.head.epoch = reqEpoch;
|
||||||
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
pPushEntry->dataRsp.head.mqMsgType = TMQ_MSG_TYPE__POLL_RSP;
|
||||||
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
|
taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey) + 1, &pPushEntry, sizeof(void*));
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vg %d save handle to push mgr", consumerId,
|
|
||||||
pHandle->subKey, TD_VID(pTq->pVnode));
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s offset:%" PRId64 ", vgId:%d save handle to push mgr",
|
||||||
|
consumerId, pHandle->subKey, dataRsp.reqOffset.version, TD_VID(pTq->pVnode));
|
||||||
// unlock
|
// unlock
|
||||||
taosWUnLockLatch(&pTq->pushLock);
|
taosWUnLockLatch(&pTq->pushLock);
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -602,8 +604,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
code = -1;
|
code = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tmq poll: consumer:0x%" PRIx64
|
tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp data block:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
||||||
", subkey %s, vg %d, send data blockNum:%d, offset type:%d, uid/version:%" PRId64 ", ts:%" PRId64 "",
|
|
||||||
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
consumerId, pHandle->subKey, TD_VID(pTq->pVnode), dataRsp.blockNum, dataRsp.rspOffset.type,
|
||||||
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
dataRsp.rspOffset.uid, dataRsp.rspOffset.ts);
|
||||||
|
|
||||||
|
@ -664,11 +665,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
walSetReaderCapacity(pHandle->pWalReader, 2048);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
consumerEpoch = atomic_load_32(&pHandle->epoch);
|
savedEpoch = atomic_load_32(&pHandle->epoch);
|
||||||
if (consumerEpoch > reqEpoch) {
|
if (savedEpoch > reqEpoch) {
|
||||||
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
tqWarn("tmq poll: consumer:0x%" PRIx64 " (epoch %d), subkey %s, vg %d offset %" PRId64
|
||||||
", found new consumer epoch %d, discard req epoch %d",
|
", found new consumer epoch %d, discard req epoch %d",
|
||||||
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, consumerEpoch, reqEpoch);
|
consumerId, req.epoch, pHandle->subKey, TD_VID(pTq->pVnode), fetchVer, savedEpoch, reqEpoch);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -197,8 +197,8 @@ int32_t tqMetaSaveHandle(STQ* pTq, const char* key, const STqHandle* pHandle) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
tqDebug("tq save %s(%d) consumer %" PRId64 " vgId:%d", pHandle->subKey, (int32_t)strlen(pHandle->subKey),
|
tqDebug("tq save %s(%d) handle consumer:0x%" PRIx64 "epoch:%d vgId:%d", pHandle->subKey,
|
||||||
pHandle->consumerId, TD_VID(pTq->pVnode));
|
(int32_t)strlen(pHandle->subKey), pHandle->consumerId, pHandle->epoch, TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
void* buf = taosMemoryCalloc(1, vlen);
|
void* buf = taosMemoryCalloc(1, vlen);
|
||||||
if (buf == NULL) {
|
if (buf == NULL) {
|
||||||
|
|
|
@ -193,7 +193,7 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
|
memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo));
|
||||||
taosWUnLockLatch(&pHandle->pushHandle.lock);
|
taosWUnLockLatch(&pHandle->pushHandle.lock);
|
||||||
|
|
||||||
tqDebug("vgId:%d, offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64,
|
tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, reqOffset:%" PRId64 ", rspOffset:%" PRId64,
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
|
TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum,
|
||||||
rsp.reqOffset, rsp.rspOffset);
|
rsp.reqOffset, rsp.rspOffset);
|
||||||
|
|
||||||
|
@ -210,14 +210,15 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg));
|
||||||
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
int32_t len = msgLen - sizeof(SSubmitReq2Msg);
|
||||||
|
|
||||||
tqDebug("vgId:%d, tq push msg ver %" PRId64 ", type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
|
tqDebug("vgId:%d tq push msg version:%" PRId64 " type: %s, p head %p, p body %p, len %d", pTq->pVnode->config.vgId, ver,
|
||||||
TMSG_INFO(msgType), msg, pReq, len);
|
TMSG_INFO(msgType), msg, pReq, len);
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
// lock push mgr to avoid potential msg lost
|
// lock push mgr to avoid potential msg lost
|
||||||
taosWLockLatch(&pTq->pushLock);
|
taosWLockLatch(&pTq->pushLock);
|
||||||
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
|
|
||||||
if (taosHashGetSize(pTq->pPushMgr) != 0) {
|
if (taosHashGetSize(pTq->pPushMgr) != 0) {
|
||||||
|
|
||||||
|
tqDebug("vgId:%d, push handle num %d", pTq->pVnode->config.vgId, taosHashGetSize(pTq->pPushMgr));
|
||||||
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
SArray* cachedKeys = taosArrayInit(0, sizeof(void*));
|
||||||
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
SArray* cachedKeyLens = taosArrayInit(0, sizeof(size_t));
|
||||||
void* data = taosMemoryMalloc(len);
|
void* data = taosMemoryMalloc(len);
|
||||||
|
@ -241,11 +242,13 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
|
||||||
tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
|
tqDebug("vgId:%d, cannot find handle %s", pTq->pVnode->config.vgId, pPushEntry->subKey);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pPushEntry->dataRsp.reqOffset.version >= ver) {
|
if (pPushEntry->dataRsp.reqOffset.version >= ver) {
|
||||||
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
|
tqDebug("vgId:%d, push entry req version %" PRId64 ", while push version %" PRId64 ", skip",
|
||||||
pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver);
|
pTq->pVnode->config.vgId, pPushEntry->dataRsp.reqOffset.version, ver);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
STqExecHandle* pExec = &pHandle->execHandle;
|
STqExecHandle* pExec = &pHandle->execHandle;
|
||||||
qTaskInfo_t task = pExec->task;
|
qTaskInfo_t task = pExec->task;
|
||||||
|
|
||||||
|
|
|
@ -155,6 +155,7 @@ typedef struct SBlockInfoBuf {
|
||||||
int32_t currentIndex;
|
int32_t currentIndex;
|
||||||
SArray* pData;
|
SArray* pData;
|
||||||
int32_t numPerBucket;
|
int32_t numPerBucket;
|
||||||
|
int32_t numOfTables;
|
||||||
} SBlockInfoBuf;
|
} SBlockInfoBuf;
|
||||||
|
|
||||||
struct STsdbReader {
|
struct STsdbReader {
|
||||||
|
@ -302,6 +303,47 @@ static int32_t initBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||||
taosArrayPush(pBuf->pData, &p);
|
taosArrayPush(pBuf->pData, &p);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pBuf->numOfTables = numOfTables;
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t ensureBlockScanInfoBuf(SBlockInfoBuf* pBuf, int32_t numOfTables) {
|
||||||
|
if (numOfTables <= pBuf->numOfTables) {
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pBuf->numOfTables > 0) {
|
||||||
|
STableBlockScanInfo **p = (STableBlockScanInfo**)taosArrayPop(pBuf->pData);
|
||||||
|
taosMemoryFree(*p);
|
||||||
|
pBuf->numOfTables /= pBuf->numPerBucket;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t num = (numOfTables - pBuf->numOfTables) / pBuf->numPerBucket;
|
||||||
|
int32_t remainder = (numOfTables - pBuf->numOfTables) % pBuf->numPerBucket;
|
||||||
|
if (pBuf->pData == NULL) {
|
||||||
|
pBuf->pData = taosArrayInit(num + 1, POINTER_BYTES);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < num; ++i) {
|
||||||
|
char* p = taosMemoryCalloc(pBuf->numPerBucket, sizeof(STableBlockScanInfo));
|
||||||
|
if (p == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosArrayPush(pBuf->pData, &p);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (remainder > 0) {
|
||||||
|
char* p = taosMemoryCalloc(remainder, sizeof(STableBlockScanInfo));
|
||||||
|
if (p == NULL) {
|
||||||
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
taosArrayPush(pBuf->pData, &p);
|
||||||
|
}
|
||||||
|
|
||||||
|
pBuf->numOfTables = numOfTables;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3876,8 +3918,13 @@ int32_t tsdbSetTableList(STsdbReader* pReader, const void* pTableList, int32_t n
|
||||||
clearBlockScanInfo(*p);
|
clearBlockScanInfo(*p);
|
||||||
}
|
}
|
||||||
|
|
||||||
// todo handle the case where size is less than the value of num
|
if (size < num) {
|
||||||
ASSERT(size >= num);
|
int32_t code = ensureBlockScanInfoBuf(&pReader->blockInfoBuf, num);
|
||||||
|
if (code) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
pReader->status.uidList.tableUidList = (uint64_t*)taosMemoryRealloc(pReader->status.uidList.tableUidList, sizeof(uint64_t) * num);
|
||||||
|
}
|
||||||
|
|
||||||
taosHashClear(pReader->status.pTableMap);
|
taosHashClear(pReader->status.pTableMap);
|
||||||
STableUidList* pUidList = &pReader->status.uidList;
|
STableUidList* pUidList = &pReader->status.uidList;
|
||||||
|
|
|
@ -183,6 +183,7 @@
|
||||||
,,y,script,./test.sh -f tsim/query/event.sim
|
,,y,script,./test.sh -f tsim/query/event.sim
|
||||||
,,y,script,./test.sh -f tsim/query/forceFill.sim
|
,,y,script,./test.sh -f tsim/query/forceFill.sim
|
||||||
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
|
,,y,script,./test.sh -f tsim/query/emptyTsRange.sim
|
||||||
|
,,y,script,./test.sh -f tsim/query/partitionby.sim
|
||||||
,,y,script,./test.sh -f tsim/qnode/basic1.sim
|
,,y,script,./test.sh -f tsim/qnode/basic1.sim
|
||||||
,,y,script,./test.sh -f tsim/snode/basic1.sim
|
,,y,script,./test.sh -f tsim/snode/basic1.sim
|
||||||
,,y,script,./test.sh -f tsim/mnode/basic1.sim
|
,,y,script,./test.sh -f tsim/mnode/basic1.sim
|
||||||
|
|
|
@ -0,0 +1,39 @@
|
||||||
|
system sh/stop_dnodes.sh
|
||||||
|
system sh/deploy.sh -n dnode1 -i 1
|
||||||
|
system sh/exec.sh -n dnode1 -s start
|
||||||
|
sql connect
|
||||||
|
|
||||||
|
$dbPrefix = db
|
||||||
|
$tbPrefix1 = tba
|
||||||
|
$tbPrefix2 = tbb
|
||||||
|
$mtPrefix = stb
|
||||||
|
$tbNum = 10
|
||||||
|
$rowNum = 2
|
||||||
|
|
||||||
|
print =============== step1
|
||||||
|
$i = 0
|
||||||
|
$db = $dbPrefix . $i
|
||||||
|
$mt1 = $mtPrefix . $i
|
||||||
|
$i = 1
|
||||||
|
$mt2 = $mtPrefix . $i
|
||||||
|
|
||||||
|
sql drop database $db -x step1
|
||||||
|
step1:
|
||||||
|
sql create database $db vgroups 3
|
||||||
|
sql use $db
|
||||||
|
sql create table $mt1 (ts timestamp, f1 int) TAGS(tag1 int, tag2 binary(500))
|
||||||
|
sql create table tb0 using $mt1 tags(0, 'a');
|
||||||
|
sql create table tb1 using $mt1 tags(1, 'b');
|
||||||
|
sql create table tb2 using $mt1 tags(1, 'a');
|
||||||
|
sql create table tb3 using $mt1 tags(1, 'a');
|
||||||
|
sql create table tb4 using $mt1 tags(3, 'b');
|
||||||
|
sql create table tb5 using $mt1 tags(3, 'a');
|
||||||
|
sql create table tb6 using $mt1 tags(3, 'b');
|
||||||
|
sql create table tb7 using $mt1 tags(3, 'b');
|
||||||
|
|
||||||
|
sql select * from $mt1 partition by tag1,tag2 limit 1;
|
||||||
|
if $rows != 0 then
|
||||||
|
return -1
|
||||||
|
endi
|
||||||
|
|
||||||
|
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
Loading…
Reference in New Issue