Merge branch '3.0' into fix/TD-31899
This commit is contained in:
commit
6958ad9ce0
|
@ -966,6 +966,7 @@ int32_t taosGetErrSize();
|
||||||
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
|
#define TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP TAOS_DEF_ERROR_CODE(0, 0x4013)
|
||||||
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
|
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
|
||||||
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
|
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
|
||||||
|
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||||
|
|
|
@ -77,7 +77,10 @@ bool chkRequestKilled(void* param) {
|
||||||
return killed;
|
return killed;
|
||||||
}
|
}
|
||||||
|
|
||||||
void cleanupAppInfo() { taosHashCleanup(appInfo.pInstMap); }
|
void cleanupAppInfo() {
|
||||||
|
taosHashCleanup(appInfo.pInstMap);
|
||||||
|
taosHashCleanup(appInfo.pInstMapByClusterId);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
static int32_t taosConnectImpl(const char* user, const char* auth, const char* db, __taos_async_fn_t fp, void* param,
|
||||||
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
|
SAppInstInfo* pAppInfo, int connType, STscObj** pTscObj);
|
||||||
|
|
|
@ -73,6 +73,8 @@ void taos_cleanup(void) {
|
||||||
tscWarn("failed to cleanup task queue");
|
tscWarn("failed to cleanup task queue");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tmqMgmtClose();
|
||||||
|
|
||||||
int32_t id = clientReqRefPool;
|
int32_t id = clientReqRefPool;
|
||||||
clientReqRefPool = -1;
|
clientReqRefPool = -1;
|
||||||
taosCloseRef(id);
|
taosCloseRef(id);
|
||||||
|
@ -87,9 +89,6 @@ void taos_cleanup(void) {
|
||||||
tscDebug("rpc cleanup");
|
tscDebug("rpc cleanup");
|
||||||
|
|
||||||
taosConvDestroy();
|
taosConvDestroy();
|
||||||
|
|
||||||
tmqMgmtClose();
|
|
||||||
|
|
||||||
DestroyRegexCache();
|
DestroyRegexCache();
|
||||||
|
|
||||||
tscInfo("all local resources released");
|
tscInfo("all local resources released");
|
||||||
|
|
|
@ -10346,7 +10346,11 @@ int32_t tDecodeSMCreateStbRsp(SDecoder *pDecoder, SMCreateStbRsp *pRsp) {
|
||||||
}
|
}
|
||||||
tEndDecode(pDecoder);
|
tEndDecode(pDecoder);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
|
||||||
_exit:
|
_exit:
|
||||||
|
tFreeSTableMetaRsp(pRsp->pMeta);
|
||||||
|
taosMemoryFreeClear(pRsp->pMeta);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -511,6 +511,11 @@ static int32_t getTopicAddDelete(SMqConsumerObj *pExistedConsumer, SMqConsumerOb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// no topics need to be rebalanced
|
||||||
|
if (taosArrayGetSize(pConsumerNew->rebNewTopics) == 0 && taosArrayGetSize(pConsumerNew->rebRemovedTopics) == 0) {
|
||||||
|
code = TSDB_CODE_TMQ_NO_NEED_REBALANCE;
|
||||||
|
}
|
||||||
|
|
||||||
END:
|
END:
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -581,6 +586,10 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
if(ubSubscribe){
|
if(ubSubscribe){
|
||||||
SMqConsumerObj *pConsumerTmp = NULL;
|
SMqConsumerObj *pConsumerTmp = NULL;
|
||||||
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
|
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, subscribe.consumerId, &pConsumerTmp));
|
||||||
|
if (taosArrayGetSize(pConsumerTmp->assignedTopics) == 0){
|
||||||
|
mndReleaseConsumer(pMnode, pConsumerTmp);
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
mndReleaseConsumer(pMnode, pConsumerTmp);
|
mndReleaseConsumer(pMnode, pConsumerTmp);
|
||||||
}
|
}
|
||||||
MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
|
MND_TMQ_RETURN_CHECK(checkAndSortTopic(pMnode, subscribe.topicNames));
|
||||||
|
@ -599,7 +608,7 @@ END:
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tDeleteSMqConsumerObj(pConsumerNew);
|
tDeleteSMqConsumerObj(pConsumerNew);
|
||||||
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
taosArrayDestroyP(subscribe.topicNames, (FDelete)taosMemoryFree);
|
||||||
return code;
|
return (code == TSDB_CODE_TMQ_NO_NEED_REBALANCE || code == TSDB_CODE_MND_CONSUMER_NOT_EXIST) ? 0 : code;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
SSdbRaw *mndConsumerActionEncode(SMqConsumerObj *pConsumer) {
|
||||||
|
|
|
@ -543,14 +543,14 @@ void mndSyncCheckTimeout(SMnode *pMnode) {
|
||||||
if (delta > MNODE_TIMEOUT_SEC) {
|
if (delta > MNODE_TIMEOUT_SEC) {
|
||||||
mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
mError("trans:%d, failed to propose since timeout, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
||||||
pMgmt->transSec, curSec, delta, pMgmt->transSeq);
|
pMgmt->transSec, curSec, delta, pMgmt->transSeq);
|
||||||
pMgmt->transId = 0;
|
// pMgmt->transId = 0;
|
||||||
pMgmt->transSec = 0;
|
// pMgmt->transSec = 0;
|
||||||
pMgmt->transSeq = 0;
|
// pMgmt->transSeq = 0;
|
||||||
terrno = TSDB_CODE_SYN_TIMEOUT;
|
// terrno = TSDB_CODE_SYN_TIMEOUT;
|
||||||
pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
// pMgmt->errCode = TSDB_CODE_SYN_TIMEOUT;
|
||||||
if (tsem_post(&pMgmt->syncSem) < 0) {
|
//if (tsem_post(&pMgmt->syncSem) < 0) {
|
||||||
mError("failed to post sem");
|
// mError("failed to post sem");
|
||||||
}
|
//}
|
||||||
} else {
|
} else {
|
||||||
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
mDebug("trans:%d, waiting for sync confirm, start:%d cur:%d delta:%d seq:%" PRId64, pMgmt->transId,
|
||||||
pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
|
pMgmt->transSec, curSec, curSec - pMgmt->transSec, pMgmt->transSeq);
|
||||||
|
|
|
@ -667,8 +667,8 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
||||||
val = *pVal;
|
val = *pVal;
|
||||||
} else {
|
} else {
|
||||||
pCache->cacheHit += 1;
|
pCache->cacheHit += 1;
|
||||||
STableCachedVal* pVal = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
STableCachedVal* pValTmp = taosLRUCacheValue(pCache->pTableMetaEntryCache, h);
|
||||||
val = *pVal;
|
val = *pValTmp;
|
||||||
|
|
||||||
bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
|
bool bRes = taosLRUCacheRelease(pCache->pTableMetaEntryCache, h, false);
|
||||||
qTrace("release LRU cache, res %d", bRes);
|
qTrace("release LRU cache, res %d", bRes);
|
||||||
|
@ -720,12 +720,7 @@ int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int
|
||||||
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
|
if (IS_VAR_DATA_TYPE(((const STagVal*)p)->type)) {
|
||||||
taosMemoryFree(data);
|
taosMemoryFree(data);
|
||||||
}
|
}
|
||||||
if (code) {
|
QUERY_CHECK_CODE(code, lino, _end);
|
||||||
if (freeReader) {
|
|
||||||
pHandle->api.metaReaderFn.clearReader(&mr);
|
|
||||||
}
|
|
||||||
return code;
|
|
||||||
}
|
|
||||||
} else { // todo opt for json tag
|
} else { // todo opt for json tag
|
||||||
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
for (int32_t i = 0; i < pBlock->info.rows; ++i) {
|
||||||
code = colDataSetVal(pColInfoData, i, data, false);
|
code = colDataSetVal(pColInfoData, i, data, false);
|
||||||
|
|
|
@ -806,6 +806,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_SAME_COMMITTED_VALUE, "Same committed valu
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only one vgroup if subscribe super table")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
|
||||||
|
|
||||||
// stream
|
// stream
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||||
|
|
Loading…
Reference in New Issue